In [68]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, monotonically_increasing_id
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
from pyspark.sql.types import StructType, StructField, DoubleType, StringType, IntegerType, DateType, TimestampType


In [24]:
config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config.get('IAM_USER', 'AWS_ACCESS_KEY_ID')
os.environ['AWS_SECRET_ACCESS_KEY']=config.get('IAM_USER', 'AWS_SECRET_ACCESS_KEY')

In [25]:
def create_spark_session():
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()
    return spark

spark = create_spark_session()

In [26]:
spark

In [6]:
song_data='data/song_data/*/*/*/*.json'
song_schema = StructType([
        StructField('num_songs', IntegerType()),
        StructField('artist_id', StringType()),
        StructField('artist_latitude', DoubleType()),
        StructField('artist_longitude', DoubleType()),
        StructField('artist_location', StringType()),
        StructField('artist_name', StringType()),
        StructField('song_id', StringType()),
        StructField('title', StringType()),
        StructField('duration', DoubleType()),
        StructField('year', IntegerType())
    ])
df = spark.read.json(song_data, schema=song_schema)
df.show(5)

+---------+------------------+---------------+----------------+--------------------+--------------------+------------------+--------------------+---------+----+
|num_songs|         artist_id|artist_latitude|artist_longitude|     artist_location|         artist_name|           song_id|               title| duration|year|
+---------+------------------+---------------+----------------+--------------------+--------------------+------------------+--------------------+---------+----+
|        1|ARDR4AC1187FB371A1|           null|            null|                    |Montserrat Caball...|SOBAYLL12A8C138AF9|Sono andati? Fing...|511.16363|   0|
|        1|AREBBGV1187FB523D2|           null|            null|         Houston, TX|Mike Jones (Featu...|SOOLYAZ12A6701F4A6|Laws Patrolling (...|173.66159|   0|
|        1|ARMAC4T1187FB3FA4C|       40.82624|       -74.47995|   Morris Plains, NJ|The Dillinger Esc...|SOBBUGU12A8C13E95D|Setting Fire to S...|207.77751|2004|
|        1|ARPBNLO1187FB3D52F|    

In [7]:
df.count()

71

Schema for Song Play Analysis

Using the song and log datasets, you'll need to create a star schema optimized for queries on song play analysis. This includes the following tables.
Fact Table

    songplays - records in log data associated with song plays i.e. records with page NextSong
        songplay_id, start_time, user_id, level, song_id, artist_id, session_id, location, user_agent

Dimension Tables

    users - users in the app
        user_id, first_name, last_name, gender, level
    songs - songs in music database
        song_id, title, artist_id, year, duration
    artists - artists in music database
        artist_id, name, location, lattitude, longitude
    time - timestamps of records in songplays broken down into specific units
        start_time, hour, day, week, month, year, weekday



In [8]:
songs_columns = ['song_id', 'title', 'artist_id', 'year' ,'duration']
df_songs = df.select(songs_columns)
print(df_songs.count())
df_songs.show(5)

71
+------------------+--------------------+------------------+----+---------+
|           song_id|               title|         artist_id|year| duration|
+------------------+--------------------+------------------+----+---------+
|SOBAYLL12A8C138AF9|Sono andati? Fing...|ARDR4AC1187FB371A1|   0|511.16363|
|SOOLYAZ12A6701F4A6|Laws Patrolling (...|AREBBGV1187FB523D2|   0|173.66159|
|SOBBUGU12A8C13E95D|Setting Fire to S...|ARMAC4T1187FB3FA4C|2004|207.77751|
|SOAOIBZ12AB01815BE|I Hold Your Hand ...|ARPBNLO1187FB3D52F|2000| 43.36281|
|SONWXQJ12A8C134D94|The Ballad Of Sle...|ARNF6401187FB57032|1994|  305.162|
+------------------+--------------------+------------------+----+---------+
only showing top 5 rows



In [9]:
df_songs.drop_duplicates().count()

71

In [10]:
df_songs.createOrReplaceTempView('songs_sql')

In [11]:
songs_with_year_zero = spark.sql('''
SELECT year, count(year) FROM songs_sql
GROUP BY year HAVING year = 0
''').show()

+----+-----------+
|year|count(year)|
+----+-----------+
|   0|         43|
+----+-----------+



In [12]:
df_songs.write.partitionBy('year', 'artist_id').parquet('parquet/songs/')

In [13]:
parquet_songs = spark.read.parquet('parquet/songs')
parquet_songs.show(5)
print(parquet_songs.count())

+------------------+--------------------+---------+----+------------------+
|           song_id|               title| duration|year|         artist_id|
+------------------+--------------------+---------+----+------------------+
|SOAOIBZ12AB01815BE|I Hold Your Hand ...| 43.36281|2000|ARPBNLO1187FB3D52F|
|SONYPOM12A8C13B2D7|I Think My Wife I...|186.48771|2005|ARDNS031187B9924F0|
|SODREIN12A58A7F2E5|A Whiter Shade Of...|326.00771|   0|ARLTWXK1187FB5A3F8|
|SOYMRWW12A6D4FAB14|The Moon And I (O...| 267.7024|   0|ARKFYS91187B98E58F|
|SOWQTQZ12A58A7B63E|Streets On Fire (...|279.97995|   0|ARPFHN61187FB575F6|
+------------------+--------------------+---------+----+------------------+
only showing top 5 rows

71


In [14]:

df.createOrReplaceTempView('parquet_db')
artists_data = spark.sql('''
    SELECT DISTINCT artist_id, artist_name as name, artist_location as location, 
           artist_latitude as latitude, artist_longitude as longitude
    FROM parquet_db
''')
artists_data.show(5)
#df.selectExpr(artist_columns)

+------------------+------------+---------------+--------+----------+
|         artist_id|        name|       location|latitude| longitude|
+------------------+------------+---------------+--------+----------+
|ARPBNLO1187FB3D52F|    Tiny Tim|   New York, NY|40.71455| -74.00712|
|ARBEBBY1187B9B43DB|   Tom Petty|Gainesville, FL|    null|      null|
|AR0IAWL1187B9A96D0|Danilo Perez|         Panama|  8.4177| -80.11278|
|ARMBR4Y1187B9990EB|David Martin|California - SF|37.77916|-122.42005|
|ARD0S291187B9B7BF5|     Rated R|           Ohio|    null|      null|
+------------------+------------+---------------+--------+----------+
only showing top 5 rows



In [10]:
artist_columns = ['artist_id',
                  'artist_name as name',
                  'artist_location as location', 
                  'artist_latitude as latitude',
                  'artist_longitude as longitude']

df.selectExpr(artist_columns).dropDuplicates().show(5)

+------------------+---------------+---------------+--------+----------+
|         artist_id|           name|       location|latitude| longitude|
+------------------+---------------+---------------+--------+----------+
|ARPBNLO1187FB3D52F|       Tiny Tim|   New York, NY|40.71455| -74.00712|
|ARXR32B1187FB57099|            Gob|               |    null|      null|
|AROGWRA122988FEE45|Christos Dantis|               |    null|      null|
|ARBGXIG122988F409D|     Steel Rain|California - SF|37.77916|-122.42005|
|AREVWGE1187B9B890A|     Bitter End|      Noci (BA)| -13.442|  -41.9952|
+------------------+---------------+---------------+--------+----------+
only showing top 5 rows



In [None]:
artists_data.write.parquet('parquet/artists/')

LOG data

In [27]:
log_data = 'data/logs_data/*.json'
# two partitions in s3 datas (year and month). But no paertitions here.
df_log = spark.read.json(log_data)
df_log.take(3)


[Row(artist='Harmonia', auth='Logged In', firstName='Ryan', gender='M', itemInSession=0, lastName='Smith', length=655.77751, level='free', location='San Jose-Sunnyvale-Santa Clara, CA', method='PUT', page='NextSong', registration=1541016707796.0, sessionId=583, song='Sehr kosmisch', status=200, ts=1542241826796, userAgent='"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/36.0.1985.125 Chrome/36.0.1985.125 Safari/537.36"', userId='26'),
 Row(artist='The Prodigy', auth='Logged In', firstName='Ryan', gender='M', itemInSession=1, lastName='Smith', length=260.07465, level='free', location='San Jose-Sunnyvale-Santa Clara, CA', method='PUT', page='NextSong', registration=1541016707796.0, sessionId=583, song='The Big Gundown', status=200, ts=1542242481796, userAgent='"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/36.0.1985.125 Chrome/36.0.1985.125 Safari/537.36"', userId='26'),
 Row(artist='Train', auth='Logged In'

In [28]:
# filter by actions for song plays
df_log = df_log.filter(df_log.page == 'NextSong')
df_log.show(3)

+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+---------------+------+-------------+--------------------+------+
|     artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|           song|status|           ts|           userAgent|userId|
+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+---------------+------+-------------+--------------------+------+
|   Harmonia|Logged In|     Ryan|     M|            0|   Smith|655.77751| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|  Sehr kosmisch|   200|1542241826796|"Mozilla/5.0 (X11...|    26|
|The Prodigy|Logged In|     Ryan|     M|            1|   Smith|260.07465| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|The Big Gundown|

In [29]:
df_log.columns

['artist',
 'auth',
 'firstName',
 'gender',
 'itemInSession',
 'lastName',
 'length',
 'level',
 'location',
 'method',
 'page',
 'registration',
 'sessionId',
 'song',
 'status',
 'ts',
 'userAgent',
 'userId']

In [31]:
# user_id, first_name, last_name, gender, level
users_columns = ['userId as user_id', 'firstName as first_name', 'lastName as last_name', 'gender', 'level']
users_table = df_log.selectExpr(users_columns)
users_table.show(5)

+-------+----------+---------+------+-----+
|user_id|first_name|last_name|gender|level|
+-------+----------+---------+------+-----+
|     26|      Ryan|    Smith|     M| free|
|     26|      Ryan|    Smith|     M| free|
|     26|      Ryan|    Smith|     M| free|
|     61|    Samuel| Gonzalez|     M| free|
|     80|     Tegan|   Levine|     F| paid|
+-------+----------+---------+------+-----+
only showing top 5 rows



In [32]:
users_table.write.parquet('parquet/users/')

In [45]:
# create timestamp column from original timestamp column
get_timestamp = udf(lambda x: datetime.fromtimestamp(x/1000), TimestampType())
#df_log.drop(df_log.start_time).collect()
df_log = df_log.withColumn('start_time', get_timestamp('ts'))


In [48]:
df_log.limit(14).toPandas()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId,start_time
0,Harmonia,Logged In,Ryan,M,0,Smith,655.77751,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Sehr kosmisch,200,1542241826796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26,2018-11-15 00:30:26.796
1,The Prodigy,Logged In,Ryan,M,1,Smith,260.07465,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,The Big Gundown,200,1542242481796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26,2018-11-15 00:41:21.796
2,Train,Logged In,Ryan,M,2,Smith,205.45261,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Marry Me,200,1542242741796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26,2018-11-15 00:45:41.796
3,Sony Wonder,Logged In,Samuel,M,0,Gonzalez,218.06975,free,"Houston-The Woodlands-Sugar Land, TX",PUT,NextSong,1540493000000.0,597,Blackbird,200,1542253449796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",61,2018-11-15 03:44:09.796
4,Van Halen,Logged In,Tegan,F,2,Levine,289.38404,paid,"Portland-South Portland, ME",PUT,NextSong,1540794000000.0,602,Best Of Both Worlds (Remastered Album Version),200,1542260935796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",80,2018-11-15 05:48:55.796
5,Magic Sam,Logged In,Tegan,F,3,Levine,132.04853,paid,"Portland-South Portland, ME",PUT,NextSong,1540794000000.0,602,Call Me If You Need Me,200,1542261224796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",80,2018-11-15 05:53:44.796
6,Edward Sharpe & The Magnetic Zeros,Logged In,Tegan,F,4,Levine,306.31138,paid,"Portland-South Portland, ME",PUT,NextSong,1540794000000.0,602,Home,200,1542261356796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",80,2018-11-15 05:55:56.796
7,Usher featuring will.i.am,Logged In,Tegan,F,5,Levine,395.72853,paid,"Portland-South Portland, ME",PUT,NextSong,1540794000000.0,602,OMG,200,1542261662796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",80,2018-11-15 06:01:02.796
8,Helen Reddy,Logged In,Tegan,F,7,Levine,176.50893,paid,"Portland-South Portland, ME",PUT,NextSong,1540794000000.0,602,Candle On The Water,200,1542262057796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",80,2018-11-15 06:07:37.796
9,Taylor Swift,Logged In,Tegan,F,8,Levine,201.06404,paid,"Portland-South Portland, ME",PUT,NextSong,1540794000000.0,602,Our Song,200,1542262233796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",80,2018-11-15 06:10:33.796


In [None]:
# Is this even needed?
get_datetime = udf(lambda x: datetime.from_timestamp())
df_log.withColumn('', )

Schema for Song Play Analysis

Using the song and log datasets, you'll need to create a star schema optimized for queries on song play analysis. This includes the following tables.
Fact Table

    songplays - records in log data associated with song plays i.e. records with page NextSong
        songplay_id, start_time, user_id, level, song_id, artist_id, session_id, location, user_agent

Dimension Tables

    users - users in the app
        user_id, first_name, last_name, gender, level
    songs - songs in music database
        song_id, title, artist_id, year, duration
    artists - artists in music database
        artist_id, name, location, lattitude, longitude
    time - timestamps of records in songplays broken down into specific units
        start_time, hour, day, week, month, year, weekday



In [53]:
time_table = df_log.select('start_time').dropDuplicates()#.withColumn('hour', hour(df_log.start_time))
# year, month, dayofmonth, hour, weekofyear
time_table.show(5)

+--------------------+
|          start_time|
+--------------------+
|2018-11-21 06:18:...|
|2018-11-21 18:49:...|
|2018-11-14 15:20:...|
|2018-11-05 16:31:...|
|2018-11-13 18:00:...|
+--------------------+
only showing top 5 rows



In [54]:
# See https://stackoverflow.com/questions/38928919/how-to-get-the-weekday-from-day-of-month-using-pyspark
# for using date_format to get weekday. 'u' gives a number (1,2,...), 'E' gives Mon, Tue,..
# Formats based on https://docs.oracle.com/javase/10/docs/api/java/text/SimpleDateFormat.html
time_table = time_table.select('start_time', hour('start_time').alias('hour'),\
                 dayofmonth('start_time').alias('day'), \
                 weekofyear('start_time').alias('week'),\
                 month('start_time').alias('month'),\
                 year('start_time').alias('year'),\
                 date_format('start_time', 'u').alias('weekday')
                 )
time_table.show(3)

+--------------------+----+---+----+-----+----+-------+
|          start_time|hour|day|week|month|year|weekday|
+--------------------+----+---+----+-----+----+-------+
|2018-11-21 06:18:...|   6| 21|  47|   11|2018|      3|
|2018-11-21 18:49:...|  18| 21|  47|   11|2018|      3|
|2018-11-14 15:20:...|  15| 14|  46|   11|2018|      3|
+--------------------+----+---+----+-----+----+-------+
only showing top 3 rows



In [55]:
time_table.write.partitionBy('year', 'month').parquet('parquet/time/')

In [56]:
output_data = 'parquet/'
songs_df = spark.read.parquet('{}songs/'.format(output_data))
artists_df = spark.read.parquet('{}artists/'.format(output_data))

In [60]:
songs_and_artists_df = songs_df.join(artists_df,
                                     songs_df.artist_id == artists_df.artist_id)\
                        .drop(songs_df.artist_id)
songs_and_artists_df.limit(5).toPandas()
#songs_and_artists_df.select('artist_id').show(5)

Unnamed: 0,song_id,title,duration,year,artist_id,name,location,latitude,longitude
0,SOBBUGU12A8C13E95D,Setting Fire to Sleeping Giants,207.77751,2004,ARMAC4T1187FB3FA4C,The Dillinger Escape Plan,"Morris Plains, NJ",40.82624,-74.47995
1,SOBZBAZ12A6D4F8742,Spanish Grease,168.25424,1997,AROUOZZ1187B9ABE51,Willie Bobo,"New York, NY [Spanish Harlem]",40.79195,-73.94512
2,SODUJBS12A8C132150,Wessex Loses a Bride,111.62077,0,ARI2JSK1187FB496EF,Nick Ingman;Gavyn Wright,"London, England",51.50632,-0.12714
3,SOOLYAZ12A6701F4A6,Laws Patrolling (Album Version),173.66159,0,AREBBGV1187FB523D2,Mike Jones (Featuring CJ_ Mello & Lil' Bran),"Houston, TX",,
4,SOHUOAP12A8AE488E9,Floating,491.12771,1987,ARD842G1187B997376,Blue Rodeo,"Toronto, Ontario, Canada",43.64856,-79.38533


In [71]:
# Keep only columns we need in songplays:
# song_id, title, artist_id, name
songs_and_artists_df = songs_and_artists_df.select('song_id', 'title', 'artist_id', 
                                                  col('name').alias('artist_name'))

In [72]:
songs_and_artists_df.limit(5).toPandas()

Unnamed: 0,song_id,title,artist_id,artist_name
0,SOAOIBZ12AB01815BE,I Hold Your Hand In Mine [Live At Royal Albert...,ARPBNLO1187FB3D52F,Tiny Tim
1,SONYPOM12A8C13B2D7,I Think My Wife Is Running Around On Me (Taco ...,ARDNS031187B9924F0,Tim Wilson
2,SODREIN12A58A7F2E5,A Whiter Shade Of Pale (Live @ Fillmore West),ARLTWXK1187FB5A3F8,King Curtis
3,SOYMRWW12A6D4FAB14,The Moon And I (Ordinary Day Album Version),ARKFYS91187B98E58F,Jeff And Sheri Easter
4,SOWQTQZ12A58A7B63E,Streets On Fire (Explicit Album Version),ARPFHN61187FB575F6,Lupe Fiasco


In [73]:
# Join logs and songs
songs_logs_df = df_log.join(songs_and_artists_df,
                         (df_log.artist==songs_and_artists_df.artist_name)\
                         & (df_log.song==songs_and_artists_df.title))

In [74]:
songs_logs_df.limit(5).toPandas()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,...,song,status,ts,userAgent,userId,start_time,song_id,title,artist_id,artist_name
0,Elena,Logged In,Lily,F,5,Koch,269.58322,paid,"Chicago-Naperville-Elgin, IL-IN-WI",PUT,...,Setanta matins,200,1542837407796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",15,2018-11-21 21:56:47.796,SOZCTXZ12AB0182364,Setanta matins,AR5KOSW1187FB35FF4,Elena


In [76]:
songplays_table = songs_logs_df.select(monotonically_increasing_id().alias('songplay_id'),\
                                get_timestamp('ts').alias('start_time'),\
                                col('userId').alias('user_id'),\
                                'level', 'song_id', 'artist_id',\
                                col('sessionId').alias('session_id'),\
                                'location', col('userAgent').alias('user_agent'),\
                                month('start_time').alias('month'),year('start_time').alias('year')
                                
                               )

In [77]:
songplays_table.limit(5).toPandas()

Unnamed: 0,songplay_id,start_time,user_id,level,song_id,artist_id,session_id,location,user_agent,month,year
0,8589934592,2018-11-21 21:56:47.796,15,paid,SOZCTXZ12AB0182364,AR5KOSW1187FB35FF4,818,"Chicago-Naperville-Elgin, IL-IN-WI","""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",11,2018


In [80]:
songplays_table.write.partitionBy('year', 'month')\
        .parquet('{}songplays/'.format(output_data))

In [81]:
# Test spark context
sc = spark.sparkContext