In [1]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format


In [2]:
config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']

In [3]:
spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()

In [4]:
from pyspark.sql.types import StructType as R, StructField as Fld, DoubleType as Dbl, StringType as Str, IntegerType as Int, DateType as Date, TimestampType as TimeStamp


In [5]:
songSchema = R([
    Fld("num_songs", Int()),
    Fld("artist_id", Str()),
    Fld("artist_latitude", Dbl()),
    Fld("artist_longitude", Dbl()),
    Fld("artist_location", Str()),
    Fld("artist_name", Str()),
    Fld("song_id", Str()),
    Fld("title", Str()),
    Fld("duration", Dbl()),
    Fld("year", Int())
    
])

In [6]:
import boto3

In [7]:
s3 =  boto3.resource('s3', region_name='us-west-2', aws_access_key_id=config['AWS']['AWS_ACCESS_KEY_ID'], aws_secret_access_key=config['AWS']['AWS_SECRET_ACCESS_KEY'])
sampleDbBucket =  s3.Bucket("udacity-dend")
counter = 0
for obj in sampleDbBucket.objects.filter(Prefix="song_data"):
    print(obj)
    counter += 1
    if counter > 10:
        break;

s3.ObjectSummary(bucket_name='udacity-dend', key='song_data/')
s3.ObjectSummary(bucket_name='udacity-dend', key='song_data/A/A/A/TRAAAAK128F9318786.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='song_data/A/A/A/TRAAAAV128F421A322.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='song_data/A/A/A/TRAAABD128F429CF47.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='song_data/A/A/A/TRAAACN128F9355673.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='song_data/A/A/A/TRAAAEA128F935A30D.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='song_data/A/A/A/TRAAAED128E0783FAB.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='song_data/A/A/A/TRAAAEM128F93347B9.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='song_data/A/A/A/TRAAAEW128F42930C0.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='song_data/A/A/A/TRAAAFD128F92F423A.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='song_data/A/A/A/TRAAAGR128F425B14B.json')


In [8]:
%%time
#df_song = spark.read.format("json").load("s3a://udacity-dend/song_data/*/*/*/*", schema=songSchema)
df_song = spark.read.format("json").load("s3a://udacity-dend/song_data/A/A/A/*", schema=songSchema)


CPU times: user 4.11 ms, sys: 514 µs, total: 4.63 ms
Wall time: 5.41 s


In [9]:
df_song.printSchema()
df_song.show(5)

root
 |-- num_songs: integer (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- year: integer (nullable = true)

+---------+------------------+---------------+----------------+--------------------+--------------------+------------------+--------------------+---------+----+
|num_songs|         artist_id|artist_latitude|artist_longitude|     artist_location|         artist_name|           song_id|               title| duration|year|
+---------+------------------+---------------+----------------+--------------------+--------------------+------------------+--------------------+---------+----+
|        1|ARTC1LV1187B9A4858|        51.4536|        -0.01802|Goldsmith's Colle...|  The 

In [10]:
counter = 0
for obj in sampleDbBucket.objects.filter(Prefix="log_data"):
    print(obj)
    counter += 1
    if counter > 10:
        break;

s3.ObjectSummary(bucket_name='udacity-dend', key='log_data/')
s3.ObjectSummary(bucket_name='udacity-dend', key='log_data/2018/11/2018-11-01-events.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='log_data/2018/11/2018-11-02-events.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='log_data/2018/11/2018-11-03-events.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='log_data/2018/11/2018-11-04-events.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='log_data/2018/11/2018-11-05-events.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='log_data/2018/11/2018-11-06-events.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='log_data/2018/11/2018-11-07-events.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='log_data/2018/11/2018-11-08-events.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='log_data/2018/11/2018-11-09-events.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='log_data/2018/11/2018-11-10-events.json')


In [11]:
%%time
df_log = spark.read.format("json").load("s3a://udacity-dend/log_data/2018/11/*")


CPU times: user 4.78 ms, sys: 578 µs, total: 5.36 ms
Wall time: 19.7 s


In [12]:
from pyspark.sql import functions as F
from pyspark.sql import types as T
from datetime import datetime

get_timestamp = F.udf(lambda x: datetime.fromtimestamp( (x/1000.0) ), T.TimestampType()) 
df_log = df_log.withColumn("listening_date", get_timestamp(df_log.ts))
df_log.limit(5).toPandas()
    
    

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId,listening_date
0,Harmonia,Logged In,Ryan,M,0,Smith,655.77751,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Sehr kosmisch,200,1542241826796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26,2018-11-15 00:30:26.796
1,The Prodigy,Logged In,Ryan,M,1,Smith,260.07465,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,The Big Gundown,200,1542242481796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26,2018-11-15 00:41:21.796
2,Train,Logged In,Ryan,M,2,Smith,205.45261,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Marry Me,200,1542242741796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26,2018-11-15 00:45:41.796
3,,Logged In,Wyatt,M,0,Scott,,free,"Eureka-Arcata-Fortuna, CA",GET,Home,1540872000000.0,563,,200,1542247071796,Mozilla/5.0 (Windows NT 6.1; WOW64; Trident/7....,9,2018-11-15 01:57:51.796
4,,Logged In,Austin,M,0,Rosales,,free,"New York-Newark-Jersey City, NY-NJ-PA",GET,Home,1541060000000.0,521,,200,1542252577796,Mozilla/5.0 (Windows NT 6.1; rv:31.0) Gecko/20...,12,2018-11-15 03:29:37.796


In [35]:
# create datetime column from original timestamp column
get_datetime = F.udf(lambda x: datetime.fromtimestamp( (x/1000.0) ), T.DateType()) 
df_log = df_log.withColumn("listening_datetime", get_datetime(df_log.ts))
df_log.limit(5).toPandas()
    

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId,listening_date,listening_datetime
0,Harmonia,Logged In,Ryan,M,0,Smith,655.77751,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Sehr kosmisch,200,1542241826796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26,2018-11-15 00:30:26.796,2018-11-15
1,The Prodigy,Logged In,Ryan,M,1,Smith,260.07465,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,The Big Gundown,200,1542242481796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26,2018-11-15 00:41:21.796,2018-11-15
2,Train,Logged In,Ryan,M,2,Smith,205.45261,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Marry Me,200,1542242741796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26,2018-11-15 00:45:41.796,2018-11-15
3,,Logged In,Wyatt,M,0,Scott,,free,"Eureka-Arcata-Fortuna, CA",GET,Home,1540872000000.0,563,,200,1542247071796,Mozilla/5.0 (Windows NT 6.1; WOW64; Trident/7....,9,2018-11-15 01:57:51.796,2018-11-15
4,,Logged In,Austin,M,0,Rosales,,free,"New York-Newark-Jersey City, NY-NJ-PA",GET,Home,1541060000000.0,521,,200,1542252577796,Mozilla/5.0 (Windows NT 6.1; rv:31.0) Gecko/20...,12,2018-11-15 03:29:37.796,2018-11-15


In [13]:
df_log.printSchema()
df_log.show(5)

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- listening_date: timestamp (nullable = true)

+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+---------------+------+-------------+--------------------+------+--------------------+
|     artist|     auth|

Dimension Tables  
users - users in the app  
user_id, first_name, last_name, gender, level  
songs - songs in music database  
song_id, title, artist_id, year, duration  
artists - artists in music database  
artist_id, name, location, lattitude, longitude  
time - timestamps of records in songplays broken down into specific units  
start_time, hour, day, week, month, year, weekday

In [33]:
df_log.where("page == 'NextSong'").show(5)

#peopleDf.where($"age" > 15)

+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+--------------------+
|     artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|                song|status|           ts|           userAgent|userId|      listening_date|
+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+--------------------+
|   Harmonia|Logged In|     Ryan|     M|            0|   Smith|655.77751| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|       Sehr kosmisch|   200|1542241826796|"Mozilla/5.0 (X11...|    26|2018-11-15 00:30:...|
|The Prodigy|Logged In|     Ryan|     M|            1|  

In [14]:
df_log.createOrReplaceTempView("df_log")
df_song.createOrReplaceTempView("df_song")


In [15]:
#users - users in the app  
#user_id, first_name, last_name, gender, level  
dim_user = spark.sql("""
  select DISTINCT userId,
                firstName,
                lastName,
                gender,
                level
        FROM df_log as e
            WHERE e.page = 'NextSong' 
""")



In [16]:
#songs - songs in music database  
#song_id, title, artist_id, year, duration  
dim_song = spark.sql("""
 SELECT DISTINCT song_id, 
                title,
                artist_id, 
                year, 
                duration
        FROM df_song
""")

In [17]:
#artists - artists in music database  
#artist_id, name, location, lattitude, longitude 
dim_artist = spark.sql("""
 SELECT DISTINCT artist_id,
                artist_name,
                artist_location,
                artist_latitude,
                artist_longitude
        FROM df_song
""")

In [18]:
#time - timestamps of records in songplays broken down into specific units  
#start_time, hour, day, week, month, year, weekday

dim_time = spark.sql("""
 SELECT DISTINCT listening_date as t_start_time,
     hour(listening_date) as t_hourofday,
     day(listening_date) as t_daynuminmonth,
    weekofyear(listening_date) as t_weeknuminyear,
     month(listening_date) as t_monthnuminyear,
     year(listening_date) as t_yearnuminyear,
     dayofweek(listening_date) as t_daynuminweek
     
     
            FROM df_log as s
            WHERE s.page = 'NextSong'
""")


In [24]:
#songplays - records in log data associated with song plays i.e. records with page NextSong
#songplay_id, start_time, user_id, level, song_id, artist_id, session_id, location, user_agent
dim_time.createOrReplaceTempView("dim_time")
songplays = spark.sql("""
SELECT DISTINCT e.listening_date   AS t_start_time,
                userId as u_user_id,
                level as u_level,
                song_id as s_song_id,
                artist_id as a_artist_id,
                sessionId as sp_session_id,
                location as sp_location,
                userAgent as sp_user_agent,
                t.t_yearnuminyear,
                t.t_monthnuminyear
            FROM  df_log as e
                JOIN df_song s
                    ON (e.artist = s.artist_name)
                JOIN dim_time t
                    on t.t_start_time = e.listening_date
            WHERE e.page = 'NextSong'
""")

Each of the five tables are written to parquet files in a separate analytics directory on S3.  Each table has its own folder within the directory.  Songs table files are partitioned by year and then artist.  Time table files are partitioned by year and month.  Songplays table files are partitioned by year and month.

In [21]:
dim_song.write.partitionBy('year', 'artist_id').parquet('data/analytics/dim_song.pq')

In [25]:
dim_time.write.partitionBy('t_yearnuminyear', 't_monthnuminyear').parquet('data/analytics/dim_time.pq')


In [26]:
songplays.write.partitionBy('t_yearnuminyear', 't_monthnuminyear').parquet('data/analytics/songplays.pq')

In [27]:
dim_artist.write.parquet('data/analytics/dim_artist.pq')
dim_user.write.parquet('data/analytics/dim_user.pq')


In [38]:
song_parquet = spark.read.parquet('data/analytics/dim_song.pq/*/*')