In [1]:
import etl

In [2]:
AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY = etl.setup_aws_credentials()
spark = etl.create_spark_session()
spark._jsc.hadoopConfiguration().set("fs.s3a.access.key", AWS_ACCESS_KEY_ID)
spark._jsc.hadoopConfiguration().set("fs.s3a.secret.key", AWS_SECRET_ACCESS_KEY)
input_data = "s3a://udacity-dend/"
output_data = "s3a://claudiordgz-udacity-dend"

In [3]:
song_data = f'{input_data}/song_data/*/*/*/*.json'
df = spark.read.json(song_data)
df.limit(1).toPandas()

Unnamed: 0,artist_id,artist_latitude,artist_location,artist_longitude,artist_name,duration,num_songs,song_id,title,year
0,AR4T2IF1187B9ADBB7,63.96027,"<a href=""http://billyidol.net"" onmousedown='Un...",10.22442,Billy Idol,233.22077,1,SOVIYJY12AF72A4B00,The Dead Next Door (Digitally Remastered 99),1983


In [4]:
df.count()

14896

In [5]:
df.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



In [6]:
df.limit(2).toPandas()

Unnamed: 0,artist_id,artist_latitude,artist_location,artist_longitude,artist_name,duration,num_songs,song_id,title,year
0,AR4T2IF1187B9ADBB7,63.96027,"<a href=""http://billyidol.net"" onmousedown='Un...",10.22442,Billy Idol,233.22077,1,SOVIYJY12AF72A4B00,The Dead Next Door (Digitally Remastered 99),1983
1,AR4T2IF1187B9ADBB7,63.96027,"<a href=""http://billyidol.net"" onmousedown='Un...",10.22442,Billy Idol,287.92118,1,SOVYXYL12AF72A3373,Rebel Yell (1999 Digital Remaster),1983


In [7]:
log_data = f'{input_data}/log_data/*/*/*.json'
dfLog = spark.read.json(log_data)

dfLog.printSchema()

dfLog.limit(2).toPandas()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,Harmonia,Logged In,Ryan,M,0,Smith,655.77751,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Sehr kosmisch,200,1542241826796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
1,The Prodigy,Logged In,Ryan,M,1,Smith,260.07465,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,The Big Gundown,200,1542242481796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26


In [8]:
dfLog.repartition(6)

dfLog.count()

8056

# Dimension Tables

### users - users in the app
`user_id, first_name, last_name, gender, level`

```
user_table_insert = ("""
    INSERT INTO users (user_id, first_name, last_name, gender, level)
    SELECT DISTINCT(userId) AS user_id,
            firstName       AS first_name,
            lastName        AS last_name,
            gender,
            level
    FROM staging_events
    WHERE user_id IS NOT NULL AND page = 'NextSong';
""")
```

In [12]:
users = dfLog \
         .selectExpr('userId as user_id', 'firstName as first_name', 'lastName as last_name', 'gender', 'level') \
         .dropDuplicates(["user_id"])

users.limit(1).toPandas()

Unnamed: 0,user_id,first_name,last_name,gender,level
0,51,Maia,Burke,F,free


In [13]:
users.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- level: string (nullable = true)



### songs - songs in music database
`song_id, title, artist_id, year, duration`

```
song_table_insert = ("""
    INSERT INTO songs (song_id, title, artist_id, year, duration)
    SELECT DISTINCT(song_id) AS song_id,
            title,
            artist_id,
            year,
            duration
    FROM staging_songs
    WHERE song_id IS NOT NULL;
""")

```

In [14]:
songs = df.select('song_id', 'title', 'artist_id', 'year', 'duration').dropDuplicates(["song_id"])

songs.limit(1).toPandas()

Unnamed: 0,song_id,title,artist_id,year,duration
0,SOAONPI12A6D4F8A49,Sea Of Tears,AR9JET41187FB3DE77,0,720.87465


In [15]:
songs.printSchema()

root
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- year: long (nullable = true)
 |-- duration: double (nullable = true)



### artists - artists in music database
`artist_id, name, location, latitude, longitude`

```
artist_table_insert = ("""
    INSERT INTO artists (artist_id, name, location, latitude, longitude)
    SELECT DISTINCT(artist_id) AS artist_id,
            artist_name         AS name,
            artist_location     AS location,
            artist_latitude     AS latitude,
            artist_longitude    AS longitude
    FROM staging_songs
    WHERE artist_id IS NOT NULL;
""")

```

In [29]:
artists = df \
         .selectExpr('artist_id', 'artist_name as name', 'artist_location as location', 'artist_latitude as latitude', 'artist_longitude as longitude') \
         .dropDuplicates(["artist_id"])

artists.limit(1).toPandas()

Unnamed: 0,artist_id,name,location,latitude,longitude
0,AR0G85S1187FB4D46D,Hey Hey My My,,,


In [30]:
artists.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- location: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)



### time - timestamps of records in songplays broken down into specific units
`start_time, hour, day, week, month, year, weekday`

```
time_table_insert = ("""
    INSERT INTO time (start_time, hour, day, week, month, year, weekday)
    SELECT  DISTINCT(start_time)                AS start_time,
            EXTRACT(hour FROM start_time)       AS hour,
            EXTRACT(day FROM start_time)        AS day,
            EXTRACT(week FROM start_time)       AS week,
            EXTRACT(month FROM start_time)      AS month,
            EXTRACT(year FROM start_time)       AS year,
            EXTRACT(dayofweek FROM start_time)  as weekday
    FROM songplays;
""")

```

In [31]:
import pyspark.sql.functions as F

time_table = dfLog \
               .dropDuplicates(['ts']) \
               .withColumn("start_time", F.from_unixtime(F.col("ts") / 1000)) \
               .withColumn('year', F.year('start_time')) \
               .withColumn('month', F.month('start_time')) \
               .withColumn('week', F.weekofyear('start_time')) \
               .withColumn('weekday', F.dayofweek('start_time')) \
               .withColumn('day', F.dayofmonth('start_time')) \
               .withColumn('hour', F.hour('start_time')) \
               .select("start_time", "year", "month", "week", "weekday", "day", "hour")
               
time_table.limit(1).toPandas()

Unnamed: 0,start_time,year,month,week,weekday,day,hour
0,2018-11-21 02:13:42,2018,11,47,4,21,2


In [32]:
time_table.printSchema()

root
 |-- start_time: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- weekday: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- hour: integer (nullable = true)



# Fact Table
### songplays - records in log data associated with song plays i.e. records with page NextSong
`songplay_id, start_time, user_id, level, song_id, artist_id, session_id, location, user_agent`


```
songplay_table_insert = ("""
    INSERT INTO songplays (start_time, user_id, level, song_id, artist_id, session_id, location, user_agent)
    SELECT DISTINCT(log.ts) AS start_time,
            log.userId      AS user_id,
            log.level       AS level,
            song.song_id    AS song_id,
            song.artist_id  AS artist_id,
            log.sessionId   AS session_id,
            log.location    AS location,
            log.userAgent   AS user_agent
    FROM staging_events log
    JOIN staging_songs song 
    ON (log.song = song.title AND log.artist = song.artist_name) 
    WHERE (log.page = 'NextSong');
""")
```

In [34]:
import pyspark.sql.functions as F

songplays_table = dfLog \
              .join(df, (df.title == dfLog.song) & (df.artist_name == dfLog.artist)) \
              .join(time_table, F.from_unixtime(dfLog.ts / 1000) == time_table.start_time) \
              .withColumn('time_year', time_table.year) \
              .withColumn('time_month', time_table.month) \
              .where(dfLog.page == "NextSong") \
              .dropDuplicates(["start_time"]) \
              .selectExpr('start_time',
                          'time_year as year', 'time_month as month',
                          'userId as user_id', 
                          'level', 'song_id', 'artist_id', 'sessionId as session_id',
                          'location', 'userAgent as user_agent'
                         )

songplays_table.limit(1).toPandas()

Unnamed: 0,start_time,year,month,user_id,level,song_id,artist_id,session_id,location,user_agent
0,2018-11-05 07:43:22,2018,11,97,paid,SODHZVG12A8C1404DD,ARS5WKC1187B9AC7D1,147,"Lansing-East Lansing, MI","""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5..."


In [35]:
songplays_table.count()

333

In [36]:
songplays_table.printSchema()

root
 |-- start_time: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- user_id: string (nullable = true)
 |-- level: string (nullable = true)
 |-- song_id: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- session_id: long (nullable = true)
 |-- location: string (nullable = true)
 |-- user_agent: string (nullable = true)

