In [57]:
input_data = "s3a://udacity-dend/"
output_data = "s3a://sparkify-data-lake-tables/"

In [None]:
!pip install pyspark

In [None]:
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq

In [32]:
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format

In [33]:

def create_spark_session():
    """returns: Spark Session object
    
    Description: Factory function that creates Spark Session objects.
    
    """
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()
    return spark

spark = create_spark_session()

In [38]:
song_data = input_data + 'song_data/*/*/*/*.json'
df = spark.read.json(song_data)

In [39]:
df.limit(5).toPandas()

Unnamed: 0,artist_id,artist_latitude,artist_location,artist_longitude,artist_name,duration,num_songs,song_id,title,year
0,ARDR4AC1187FB371A1,,,,Montserrat Caballé;Placido Domingo;Vicente Sar...,511.16363,1,SOBAYLL12A8C138AF9,Sono andati? Fingevo di dormire,0
1,AREBBGV1187FB523D2,,"Houston, TX",,Mike Jones (Featuring CJ_ Mello & Lil' Bran),173.66159,1,SOOLYAZ12A6701F4A6,Laws Patrolling (Album Version),0
2,ARMAC4T1187FB3FA4C,40.82624,"Morris Plains, NJ",-74.47995,The Dillinger Escape Plan,207.77751,1,SOBBUGU12A8C13E95D,Setting Fire to Sleeping Giants,2004
3,ARPBNLO1187FB3D52F,40.71455,"New York, NY",-74.00712,Tiny Tim,43.36281,1,SOAOIBZ12AB01815BE,I Hold Your Hand In Mine [Live At Royal Albert...,2000
4,ARNF6401187FB57032,40.79086,"New York, NY [Manhattan]",-73.96644,Sophie B. Hawkins,305.162,1,SONWXQJ12A8C134D94,The Ballad Of Sleeping Beauty,1994


In [40]:
df.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



In [41]:
df.createOrReplaceTempView('song_data_table')

In [42]:

# extract columns to create songs table
songs_table = spark.sql("""
                                SELECT DISTINCT artist_id, song_id, title, year, artist_name, duration
                                FROM song_data_table
                                WHERE song_id IS NOT NULL
                            """)

In [47]:
songs_table.limit(5).toPandas()

Unnamed: 0,artist_id,song_id,title,year,artist_name,duration
0,ARNF6401187FB57032,SONWXQJ12A8C134D94,The Ballad Of Sleeping Beauty,1994,Sophie B. Hawkins,305.162
1,AR7SMBG1187B9B9066,SOBCOSW12A8C13D398,Rumba De Barcelona,0,Los Manolos,218.38322
2,ARPFHN61187FB575F6,SOWQTQZ12A58A7B63E,Streets On Fire (Explicit Album Version),0,Lupe Fiasco,279.97995
3,ARD0S291187B9B7BF5,SOMJBYD12A6D4F8557,Keepin It Real (Skit),0,Rated R,114.78159
4,AR36F9J1187FB406F1,SOBKWDJ12A8C13B2F3,Wild Rose (Back 2 Basics Mix),0,Bombay Rockers,230.71302


In [44]:
songs_table.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)



In [45]:
# extract columns to create artists table
artists_table = spark.sql("""
                             SELECT artist_name, artist_id, artist_location, artist_latitude, artist_longitude
                             FROM song_data_table
                             WHERE artist_id IS NOT NULL
                          """)


In [46]:
artists_table.limit(5).toPandas()

Unnamed: 0,artist_name,artist_id,artist_location,artist_latitude,artist_longitude
0,Montserrat Caballé;Placido Domingo;Vicente Sar...,ARDR4AC1187FB371A1,,,
1,Mike Jones (Featuring CJ_ Mello & Lil' Bran),AREBBGV1187FB523D2,"Houston, TX",,
2,The Dillinger Escape Plan,ARMAC4T1187FB3FA4C,"Morris Plains, NJ",40.82624,-74.47995
3,Tiny Tim,ARPBNLO1187FB3D52F,"New York, NY",40.71455,-74.00712
4,Sophie B. Hawkins,ARNF6401187FB57032,"New York, NY [Manhattan]",40.79086,-73.96644


In [48]:
artists_table.printSchema()

root
 |-- artist_name: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_longitude: double (nullable = true)



In [61]:
log_data = input_data + 'log_data/*/*/*.json'

# read log data file
log_df = spark.read.json(log_data)


In [62]:
log_df.limit(5).toPandas()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,Harmonia,Logged In,Ryan,M,0,Smith,655.77751,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Sehr kosmisch,200,1542241826796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
1,The Prodigy,Logged In,Ryan,M,1,Smith,260.07465,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,The Big Gundown,200,1542242481796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
2,Train,Logged In,Ryan,M,2,Smith,205.45261,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Marry Me,200,1542242741796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
3,,Logged In,Wyatt,M,0,Scott,,free,"Eureka-Arcata-Fortuna, CA",GET,Home,1540872000000.0,563,,200,1542247071796,Mozilla/5.0 (Windows NT 6.1; WOW64; Trident/7....,9
4,,Logged In,Austin,M,0,Rosales,,free,"New York-Newark-Jersey City, NY-NJ-PA",GET,Home,1541060000000.0,521,,200,1542252577796,Mozilla/5.0 (Windows NT 6.1; rv:31.0) Gecko/20...,12


In [63]:
log_df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [64]:
log_df.createOrReplaceTempView('log_data_table')

In [65]:
# filter by actions for song plays
log_df = spark.sql("""
                    SELECT * FROM log_data_table
                    WHERE page = 'NextSong'
                   """)

In [66]:
# extract columns for users table    
users_table = spark.sql("""
                            SELECT DISTINCT firstName, lastName, gender, userId 
                            FROM log_data_table
                            WHERE userId IS NOT NULL
                        """)

In [67]:
users_table.limit(5).toPandas()

Unnamed: 0,firstName,lastName,gender,userId
0,Anabelle,Simpson,F,69
1,Sylvie,Cruz,F,10
2,Harper,Barrett,M,42
3,Cecilia,Owens,F,6
4,Jaleah,Hayes,F,70


In [68]:
users_table.printSchema()

root
 |-- firstName: string (nullable = true)
 |-- lastName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- userId: string (nullable = true)



In [69]:
ts_df = spark.sql("""SELECT *,to_timestamp(ts/1000) as start_time
                            FROM log_data_table""")
ts_df.createOrReplaceTempView('timestamp_df')

# extract columns to create time table
time_table = spark.sql("""SELECT 
                            start_time,
                            hour(start_time) as hour,
                            dayofmonth(start_time) as day,
                            weekofyear(start_time) as week,
                            month(start_time) as month,
                            year(start_time) as year,
                            dayofweek(start_time) as weekday
                            FROM timestamp_df                          
                        """)


In [70]:
time_table.printSchema()

root
 |-- start_time: timestamp (nullable = true)
 |-- hour: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- weekday: integer (nullable = true)



In [71]:
time_table.limit(5).toPandas()

Unnamed: 0,start_time,hour,day,week,month,year,weekday
0,2018-11-15 00:30:26.796,0,15,46,11,2018,5
1,2018-11-15 00:41:21.796,0,15,46,11,2018,5
2,2018-11-15 00:45:41.796,0,15,46,11,2018,5
3,2018-11-15 01:57:51.796,1,15,46,11,2018,5
4,2018-11-15 03:29:37.796,3,15,46,11,2018,5


In [72]:
songs_table.createOrReplaceTempView('song_table')

In [73]:
# extract columns from joined song and log datasets to create songplays table 
songplays_df = spark.sql("""SELECT 
                                monotonically_increasing_id() as songplay_id, 
                                log_data_table.userId as user_id,
                                to_timestamp(log_data_table.ts/1000) as start_time,
                                month(to_timestamp(log_data_table.ts/1000)) as month,
                                year(to_timestamp(log_data_table.ts/1000)) as year,
                                log_data_table.level as level,
                                song_table.song_id as song_id,
                                song_table.artist_id as artist_id,
                                log_data_table.sessionId as session_id,
                                log_data_table.location as location,
                                log_data_table.userAgent as user_agent
                            FROM song_table 
                            JOIN log_data_table ON song_table.artist_name = log_data_table.artist 
                                                    AND song_table.title = log_data_table.song
                        """)


In [74]:
songplays_df.limit(5).toPandas()

Unnamed: 0,songplay_id,user_id,start_time,month,year,level,song_id,artist_id,session_id,location,user_agent
0,0,15,2018-11-21 21:56:47.796,11,2018,paid,SOZCTXZ12AB0182364,AR5KOSW1187FB35FF4,818,"Chicago-Naperville-Elgin, IL-IN-WI","""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5..."


In [75]:
songplays_df.printSchema()

root
 |-- songplay_id: long (nullable = false)
 |-- user_id: string (nullable = true)
 |-- start_time: timestamp (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- level: string (nullable = true)
 |-- song_id: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- session_id: long (nullable = true)
 |-- location: string (nullable = true)
 |-- user_agent: string (nullable = true)



In [None]:
# Time Consuming operations of writing data to S3 Bucket

# Some insights about the cause:
# https://stackoverflow.com/questions/42822483/extremely-slow-s3-write-times-from-emr-spark

In [76]:

# write artists table to parquet files
artists_table.limit(100).write.mode('overwrite').parquet(output_data + "artists_table/")

In [77]:
# write songs table to parquet files partitioned by year and artist
songs_table.limit(10).write.mode('overwrite').partitionBy(["year","artist_id"]).parquet(output_data + "song_table_v2/")

In [78]:
# write users table to parquet files
users_table.limit(10).write.mode('overwrite').parquet(output_data + "users_table/")

In [79]:
# write time table to parquet files partitioned by year and month
time_table.limit(10).write.mode('overwrite').partitionBy(["year","month"]).parquet(output_data + "time_table/")

In [80]:

# write songplays table to parquet files partitioned by year and month
songplays_df.limit(10).write.mode('overwrite').partitionBy(["year","month"]).parquet(output_data + "songplay/")