In [1]:
import configparser
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import year, month

In [2]:
config = configparser.ConfigParser()
config.read('dl.cfg')

['dl.cfg']

In [3]:
os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']

In [4]:
config['AWS']['AWS_SECRET_ACCESS_KEY']

'HFN31qN/q6IsCL6g0DXk2+pcpnJ+5Du3rYgjit28'

In [5]:
spark = SparkSession \
        .builder \
        .appName("spark-datalake") \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()

In [9]:
input_data = "s3a://udacity-dend/"
output_data ="files/"

In [7]:
song_data = input_data + 'song_data/A/A/A/*.json' 
song_data

's3a://udacity-dend/song_data/A/A/A/*.json'

In [8]:
df = spark.read.json(song_data)

In [10]:
songs_columns = ['song_id','title','artist_id','year','duration']
songs_table = df.selectExpr(*songs_columns)

In [13]:
songs_table.toPandas()

Unnamed: 0,song_id,title,artist_id,year,duration
0,SOAFBCP12A8C13CC7D,King Of Scurf (2007 Digital Remaster),ARTC1LV1187B9A4858,1972,301.40036
1,SOKTJDS12AF72A25E5,Drown In My Own Tears (24-Bit Digitally Remast...,ARA23XO1187B9AF18F,0,192.522
2,SOEKAZG12AB018837E,I'll Slap Your Face (Entertainment USA Theme),ARSVTNL1187B992A91,2001,129.85424
3,SOQPWCR12A6D4FB2A3,A Poor Recipe For Civic Cohesion,AR73AIO1187B9AD57B,2005,118.07302
4,SOBRKGM12A8C139EF6,Welcome to the Pleasuredome,ARXQBR11187B98A2CC,1985,821.05424
5,SORRNOC12AB017F52B,The Last Beat Of My Heart (b-side),ARSZ7L31187FB4E610,2004,337.81506
6,SOHKNRJ12A6701D1F8,Drop of Rain,AR10USD1187B99F3F1,0,189.57016
7,SOAPERH12A58A787DC,The One And Only (Edited),ARZ5H0P1187B98A1DD,0,230.42567
8,SOSMJFC12A8C13DE0C,Is That All There Is?,AR1KTV21187B9ACD72,0,343.87546
9,SOOVHYF12A8C134892,I'll Be Waiting,ARCLYBR1187FB53913,1989,304.56118


In [14]:
songs_table.write.partitionBy(['year','artist_id']).parquet("{}/songs.parquet".format(output_data),mode="overwrite")

In [17]:
artists_columns = ['artist_id','artist_name as name','artist_location as location','artist_latitude as latitude','artist_longitude as longitude']
artists_table = df.selectExpr(*artists_columns)
artists_table.createOrReplaceTempView('artists')

In [50]:
spark.sql("select artist_id, \
                  name, \
                  case \
                      when trim(location) = '' \
                      then Null \
                      else location \
                      end as location, \
                  latitude,longitude \
                  from artists").toPandas()

Unnamed: 0,artist_id,name,location,latitude,longitude
0,ARTC1LV1187B9A4858,The Bonzo Dog Band,"Goldsmith's College, Lewisham, Lo",51.4536,-0.01802
1,ARA23XO1187B9AF18F,The Smithereens,"Carteret, New Jersey",40.57885,-74.21956
2,ARSVTNL1187B992A91,Jonathan King,"London, England",51.50632,-0.12714
3,AR73AIO1187B9AD57B,Western Addiction,"San Francisco, CA",37.77916,-122.42005
4,ARXQBR11187B98A2CC,Frankie Goes To Hollywood,"Liverpool, England",,
5,ARSZ7L31187FB4E610,Devotchka,"Denver, CO",39.74001,-104.99226
6,AR10USD1187B99F3F1,Tweeterfriendly Music,"Burlington, Ontario, Canada",,
7,ARZ5H0P1187B98A1DD,Snoop Dogg,"Long Beach, CA",33.76672,-118.1924
8,AR1KTV21187B9ACD72,Cristina,California - LA,34.05349,-118.24532
9,ARCLYBR1187FB53913,Neal Schon,"San Mateo, CA",37.54703,-122.31483


In [49]:
artists_table = spark.sql("select artist_id, \
                  name, \
                  case \
                      when trim(location) = '' \
                      then Null \
                      else location \
                      end as location, \
                  latitude,longitude \
                  from artists")

In [51]:
artists_table.write.parquet("{}/artists.parquet".format(output_data),mode="overwrite")

In [52]:
log_data = input_data+'log_data/2018/11/*.json'

In [53]:
df = spark.read.json(log_data)

In [54]:
df = df.filter(df.page =='NextSong')

In [55]:
users_columns = ['userId as user_id','firstName as first_name','lastName as last_name','gender','level']
users_table = df.selectExpr(*users_columns).distinct()

In [58]:
users_table.toPandas().head(10)

Unnamed: 0,user_id,first_name,last_name,gender,level
0,26,Ryan,Smith,M,free
1,7,Adelyn,Jordan,F,free
2,71,Ayleen,Wise,F,free
3,81,Sienna,Colon,F,free
4,87,Dustin,Lee,M,free
5,23,Morris,Gilmore,M,free
6,75,Joseph,Gutierrez,M,free
7,16,Rylan,George,M,paid
8,2,Jizelle,Benjamin,F,free
9,3,Isaac,Valdez,M,free


In [59]:
users_table.write.parquet('{}/users.parquet'.format(output_data),mode="overwrite")

In [60]:
df.createOrReplaceTempView("user_logs")

time_table = spark.sql("select \
                        t.timestamp as start_time, \
                        hour(t.timestamp)       as hour, \
                        day(t.timestamp)        as day, \
                        weekofyear(t.timestamp) as week, \
                        month(t.timestamp)      as month, \
                        year(t.timestamp)       as year, \
                        dayofweek(t.timestamp)  as weekday \
                        from \
                        (select from_unixtime(ts/1000) as timestamp from user_logs group by timestamp) as t"
                        )

In [61]:
time_table.toPandas().head(10)

Unnamed: 0,start_time,hour,day,week,month,year,weekday
0,2018-11-15 07:56:18,7,15,46,11,2018,5
1,2018-11-15 16:51:56,16,15,46,11,2018,5
2,2018-11-15 18:31:38,18,15,46,11,2018,5
3,2018-11-14 00:41:15,0,14,46,11,2018,4
4,2018-11-14 00:53:43,0,14,46,11,2018,4
5,2018-11-14 17:30:51,17,14,46,11,2018,4
6,2018-11-14 22:40:13,22,14,46,11,2018,4
7,2018-11-05 09:44:49,9,5,45,11,2018,2
8,2018-11-05 14:52:12,14,5,45,11,2018,2
9,2018-11-05 15:19:50,15,5,45,11,2018,2


In [62]:
time_table.write.partitionBy(['year','month']).parquet("{}/time.parquet".format(output_data),mode="overwrite")

In [63]:
song_df = spark.read.parquet("{}/song.parquet".format(output_data))

song_df.createOrReplaceTempView("songs")

In [64]:
spark.sql("select * from user_logs Limit 10").toPandas()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,Harmonia,Logged In,Ryan,M,0,Smith,655.77751,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Sehr kosmisch,200,1542241826796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
1,The Prodigy,Logged In,Ryan,M,1,Smith,260.07465,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,The Big Gundown,200,1542242481796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
2,Train,Logged In,Ryan,M,2,Smith,205.45261,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Marry Me,200,1542242741796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
3,Sony Wonder,Logged In,Samuel,M,0,Gonzalez,218.06975,free,"Houston-The Woodlands-Sugar Land, TX",PUT,NextSong,1540493000000.0,597,Blackbird,200,1542253449796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",61
4,Van Halen,Logged In,Tegan,F,2,Levine,289.38404,paid,"Portland-South Portland, ME",PUT,NextSong,1540794000000.0,602,Best Of Both Worlds (Remastered Album Version),200,1542260935796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",80
5,Magic Sam,Logged In,Tegan,F,3,Levine,132.04853,paid,"Portland-South Portland, ME",PUT,NextSong,1540794000000.0,602,Call Me If You Need Me,200,1542261224796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",80
6,Edward Sharpe & The Magnetic Zeros,Logged In,Tegan,F,4,Levine,306.31138,paid,"Portland-South Portland, ME",PUT,NextSong,1540794000000.0,602,Home,200,1542261356796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",80
7,Usher featuring will.i.am,Logged In,Tegan,F,5,Levine,395.72853,paid,"Portland-South Portland, ME",PUT,NextSong,1540794000000.0,602,OMG,200,1542261662796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",80
8,Helen Reddy,Logged In,Tegan,F,7,Levine,176.50893,paid,"Portland-South Portland, ME",PUT,NextSong,1540794000000.0,602,Candle On The Water,200,1542262057796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",80
9,Taylor Swift,Logged In,Tegan,F,8,Levine,201.06404,paid,"Portland-South Portland, ME",PUT,NextSong,1540794000000.0,602,Our Song,200,1542262233796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",80


In [65]:
spark.sql("select * from songs LIMIT 10").toPandas()

Unnamed: 0,song_id,title,duration,year,artist_id
0,SOKTJDS12AF72A25E5,Drown In My Own Tears (24-Bit Digitally Remast...,192.522,0,ARA23XO1187B9AF18F
1,SOEKAZG12AB018837E,I'll Slap Your Face (Entertainment USA Theme),129.85424,2001,ARSVTNL1187B992A91
2,SOAFBCP12A8C13CC7D,King Of Scurf (2007 Digital Remaster),301.40036,1972,ARTC1LV1187B9A4858
3,SORRNOC12AB017F52B,The Last Beat Of My Heart (b-side),337.81506,2004,ARSZ7L31187FB4E610
4,SOQPWCR12A6D4FB2A3,A Poor Recipe For Civic Cohesion,118.07302,2005,AR73AIO1187B9AD57B
5,SODZYPO12A8C13A91E,Burn My Body (Album Version),177.99791,0,AR1C2IX1187B99BF74
6,SOBRKGM12A8C139EF6,Welcome to the Pleasuredome,821.05424,1985,ARXQBR11187B98A2CC
7,SOERIDA12A6D4F8506,I Want You (Album Version),192.28689,2006,ARBZIN01187FB362CC
8,SOAPERH12A58A787DC,The One And Only (Edited),230.42567,0,ARZ5H0P1187B98A1DD
9,SOSMJFC12A8C13DE0C,Is That All There Is?,343.87546,0,AR1KTV21187B9ACD72


In [66]:
songplays_table = spark.sql("select \
                             monotonically_increasing_id() as songplay_id, \
                             from_unixtime(user_logs.ts/1000) as start_time, \
                             user_logs.userId as user_id, \
                             user_logs.level as level,\
                             songs.song_id as song_id, \
                             songs.artist_id as artist_id, \
                             user_logs.sessionId as session_id, \
                             user_logs.location, \
                             user_logs.userAgent as user_agent\
                             from \
                             user_logs \
                             left join songs \
                             on user_logs.song = songs.title"
                             )

In [67]:
songplays_table.toPandas().head(10)

Unnamed: 0,songplay_id,start_time,user_id,level,song_id,artist_id,session_id,location,user_agent
0,0,2018-11-15 00:30:26,26,free,,,583,"San Jose-Sunnyvale-Santa Clara, CA","""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5..."
1,1,2018-11-15 00:41:21,26,free,,,583,"San Jose-Sunnyvale-Santa Clara, CA","""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5..."
2,2,2018-11-15 00:45:41,26,free,,,583,"San Jose-Sunnyvale-Santa Clara, CA","""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5..."
3,3,2018-11-15 03:44:09,61,free,,,597,"Houston-The Woodlands-Sugar Land, TX","""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4..."
4,4,2018-11-15 05:48:55,80,paid,,,602,"Portland-South Portland, ME","""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4..."
5,5,2018-11-15 05:53:44,80,paid,,,602,"Portland-South Portland, ME","""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4..."
6,6,2018-11-15 05:55:56,80,paid,,,602,"Portland-South Portland, ME","""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4..."
7,7,2018-11-15 06:01:02,80,paid,,,602,"Portland-South Portland, ME","""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4..."
8,8,2018-11-15 06:07:37,80,paid,,,602,"Portland-South Portland, ME","""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4..."
9,9,2018-11-15 06:10:33,80,paid,,,602,"Portland-South Portland, ME","""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4..."


In [68]:
songplays_table = songplays_table.withColumn('year',year(songplays_table.start_time)).withColumn('month',month(songplays_table.start_time))

In [69]:
songplays_table.write.partitionBy(['year','month']).parquet("{}/songplays.parquet".format(output_data),mode="overwrite")

In [70]:
songplays_df = spark.read.parquet("{}/songplays.parquet".format(output_data))

In [71]:
songplays_df.toPandas().head(10)

Unnamed: 0,songplay_id,start_time,user_id,level,song_id,artist_id,session_id,location,user_agent,year,month
0,0,2018-11-15 00:30:26,26,free,,,583,"San Jose-Sunnyvale-Santa Clara, CA","""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",2018,11
1,1,2018-11-15 00:41:21,26,free,,,583,"San Jose-Sunnyvale-Santa Clara, CA","""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",2018,11
2,2,2018-11-15 00:45:41,26,free,,,583,"San Jose-Sunnyvale-Santa Clara, CA","""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",2018,11
3,3,2018-11-15 03:44:09,61,free,,,597,"Houston-The Woodlands-Sugar Land, TX","""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",2018,11
4,4,2018-11-15 05:48:55,80,paid,,,602,"Portland-South Portland, ME","""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",2018,11
5,5,2018-11-15 05:53:44,80,paid,,,602,"Portland-South Portland, ME","""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",2018,11
6,6,2018-11-15 05:55:56,80,paid,,,602,"Portland-South Portland, ME","""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",2018,11
7,7,2018-11-15 06:01:02,80,paid,,,602,"Portland-South Portland, ME","""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",2018,11
8,8,2018-11-15 06:07:37,80,paid,,,602,"Portland-South Portland, ME","""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",2018,11
9,9,2018-11-15 06:10:33,80,paid,,,602,"Portland-South Portland, ME","""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",2018,11
