In [1]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
from pyspark.sql.types import IntegerType
from zipfile import ZipFile

config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config.get('AWS','AWS_ACCESS_KEY_ID')
os.environ['AWS_SECRET_ACCESS_KEY']=config.get('AWS','AWS_SECRET_ACCESS_KEY')

In [2]:
spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()

In [3]:
#with ZipFile('data/log-data.zip') as zipObj:
#    zipObj.extractall('data/log-data')
#with ZipFile('data/song-data.zip') as zipObj:
#    zipObj.extractall('data/song-data')

In [3]:
log_data = spark.read.json('data/log-data/')

In [4]:
log_data.count()

8056

In [5]:
log_data.show(5)

+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+---------------+------+-------------+--------------------+------+
|     artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|           song|status|           ts|           userAgent|userId|
+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+---------------+------+-------------+--------------------+------+
|   Harmonia|Logged In|     Ryan|     M|            0|   Smith|655.77751| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|  Sehr kosmisch|   200|1542241826796|"Mozilla/5.0 (X11...|    26|
|The Prodigy|Logged In|     Ryan|     M|            1|   Smith|260.07465| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|The Big Gundown|

In [6]:
log_data.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [11]:
log_data.describe().show()

+-------+------------------+----------+---------+------+------------------+--------+------------------+-----+--------------------+------+-------+--------------------+------------------+--------------------+------------------+--------------------+--------------------+-----------------+
|summary|            artist|      auth|firstName|gender|     itemInSession|lastName|            length|level|            location|method|   page|        registration|         sessionId|                song|            status|                  ts|           userAgent|           userId|
+-------+------------------+----------+---------+------+------------------+--------+------------------+-----+--------------------+------+-------+--------------------+------------------+--------------------+------------------+--------------------+--------------------+-----------------+
|  count|              6820|      8056|     7770|  7770|              8056|    7770|              6820| 8056|                7770|  8056|   80

In [4]:
song_data = spark.read.json('data/song-data/song_data/*/*/*')

In [8]:
song_data.count()

71

In [9]:
song_data.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



In [10]:
song_data.show(5)

+------------------+---------------+--------------------+----------------+--------------------+---------+---------+------------------+--------------------+----+
|         artist_id|artist_latitude|     artist_location|artist_longitude|         artist_name| duration|num_songs|           song_id|               title|year|
+------------------+---------------+--------------------+----------------+--------------------+---------+---------+------------------+--------------------+----+
|ARDR4AC1187FB371A1|           null|                    |            null|Montserrat Caball...|511.16363|        1|SOBAYLL12A8C138AF9|Sono andati? Fing...|   0|
|AREBBGV1187FB523D2|           null|         Houston, TX|            null|Mike Jones (Featu...|173.66159|        1|SOOLYAZ12A6701F4A6|Laws Patrolling (...|   0|
|ARMAC4T1187FB3FA4C|       40.82624|   Morris Plains, NJ|       -74.47995|The Dillinger Esc...|207.77751|        1|SOBBUGU12A8C13E95D|Setting Fire to S...|2004|
|ARPBNLO1187FB3D52F|       40.7145

In [5]:
songs_table = song_data.select(["song_id",
                                "title",
                                "artist_id",
                                "year",
                                "duration"])

In [7]:
songs_table.show(5)

+------------------+--------------------+------------------+----+---------+
|           song_id|               title|         artist_id|year| duration|
+------------------+--------------------+------------------+----+---------+
|SOBAYLL12A8C138AF9|Sono andati? Fing...|ARDR4AC1187FB371A1|   0|511.16363|
|SOOLYAZ12A6701F4A6|Laws Patrolling (...|AREBBGV1187FB523D2|   0|173.66159|
|SOBBUGU12A8C13E95D|Setting Fire to S...|ARMAC4T1187FB3FA4C|2004|207.77751|
|SOAOIBZ12AB01815BE|I Hold Your Hand ...|ARPBNLO1187FB3D52F|2000| 43.36281|
|SONWXQJ12A8C134D94|The Ballad Of Sle...|ARNF6401187FB57032|1994|  305.162|
+------------------+--------------------+------------------+----+---------+
only showing top 5 rows



In [9]:
songs_table.describe().show()

+-------+------------------+--------------------+------------------+-----------------+------------------+
|summary|           song_id|               title|         artist_id|             year|          duration|
+-------+------------------+--------------------+------------------+-----------------+------------------+
|  count|                71|                  71|                71|               71|                71|
|   mean|              null|                null|              null|785.9577464788732|239.72967605633804|
| stddev|              null|                null|              null|980.9571191533839|106.56277912134071|
|    min|SOAOIBZ12AB01815BE|A Higher Place (A...|AR051KA1187B98B2FF|                0|          29.54404|
|    max|SOZVMJI12AB01808AF|   ¿Dónde va Chichi?|ARYKCQI1187FB3B18F|             2008|         599.24853|
+-------+------------------+--------------------+------------------+-----------------+------------------+



In [6]:
artist_table = song_data.selectExpr(["artist_id",
                                     "artist_name AS name",
                                     "artist_location AS location",
                                     "artist_latitude AS latitude",
                                     "artist_longitude AS longitude"]).dropDuplicates()

In [35]:
artist_table.orderBy("artist_id").show(5)

+------------------+--------------------+--------------------+--------+---------+
|         artist_id|                name|            location|latitude|longitude|
+------------------+--------------------+--------------------+--------+---------+
|AR051KA1187B98B2FF|               Wilks|                    |    null|     null|
|AR0IAWL1187B9A96D0|        Danilo Perez|              Panama|  8.4177|-80.11278|
|AR0RCMP1187FB3F427|    Billie Jo Spears|        Beaumont, TX|30.08615|-94.10158|
|AR10USD1187B99F3F1|Tweeterfriendly M...|Burlington, Ontar...|    null|     null|
|AR1Y2PT1187FB5B9CE|         John Wesley|             Brandon|27.94017|-82.32547|
+------------------+--------------------+--------------------+--------+---------+
only showing top 5 rows



In [29]:
artist_table.describe().show()

+-------+------------------+---------+--------------+------------------+------------------+
|summary|         artist_id|     name|      location|          latitude|         longitude|
+-------+------------------+---------+--------------+------------------+------------------+
|  count|                69|       69|            69|                31|                31|
|   mean|              null|     null|          null| 36.55297161290323|-73.25123258064515|
| stddev|              null|     null|          null|12.431023413063542| 36.05807592882608|
|    min|AR051KA1187B98B2FF|  40 Grit|              |           -13.442|        -122.42005|
|    max|ARYKCQI1187FB3B18F|lextrical|Zagreb Croatia|          56.27609|           15.9676|
+-------+------------------+---------+--------------+------------------+------------------+



In [41]:
songs_table.write.partitionBy(['year', 'artist_id']).parquet('song/song.parquet')

In [42]:
artist_table.write.parquet('artist/artist.parquet')

In [48]:
#spark.read.parquet('song/song.parquet').show(70)

In [6]:
log_data.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [13]:
log_data.orderBy(['ts'], ascending = False).dropDuplicates(subset=['userId']).show()

+-------------------+---------+----------+------+-------------+--------+---------+-----+--------------------+------+---------+-----------------+---------+--------------------+------+-------------+--------------------+------+
|             artist|     auth| firstName|gender|itemInSession|lastName|   length|level|            location|method|     page|     registration|sessionId|                song|status|           ts|           userAgent|userId|
+-------------------+---------+----------+------+-------------+--------+---------+-----+--------------------+------+---------+-----------------+---------+--------------------+------+-------------+--------------------+------+
|  Dislocation Dance|Logged In|      Maia|     F|            0|   Burke|122.87955| free|Houston-The Woodl...|   PUT| NextSong|1.540676534796E12|      628|With A Smile On Y...|   200|1542673957796|"Mozilla/5.0 (Win...|    51|
|               null|Logged In|    Adelyn|     F|            4|  Jordan|     null| free|Chicago-Nape

In [29]:
users_table = log_datalog_data.orderBy(['ts'], ascending = False)\
                              .selectExpr(["userId as user_id",
                                           "firstName AS first_name",
                                           "lastName AS last_name",
                                           "gender",
                                           "level",
                                           "ts"])\
                              .dropDuplicates(subset=['userId'])\


                                  
users_table = users_table.filter((users_table.user_id.isNotNull()) 
                                 & (users_table.user_id != ''))\
                         .withColumn("user_id", users_table.user_id.cast(IntegerType()))

In [45]:
users_table = log_data.filter((col('userId').isNotNull()) 
                            & (col('userId') != ''))\
                      .orderBy(['ts'], ascending = False)\
                      .selectExpr(["userId as user_id",
                                   "firstName AS first_name",
                                   "lastName AS last_name",
                                   "gender",
                                   "level"])\
                      .dropDuplicates(subset=['user_id'])\
                      .withColumn("user_id", col("user_id").cast(IntegerType()))\
                      .orderBy(['user_id'])
users_table.show()

+-------+----------+---------+------+-----+
|user_id|first_name|last_name|gender|level|
+-------+----------+---------+------+-----+
|      2|   Jizelle| Benjamin|     F| free|
|      3|     Isaac|   Valdez|     M| free|
|      4|    Alivia|  Terrell|     F| free|
|      5|    Elijah|    Davis|     M| free|
|      6|   Cecilia|    Owens|     F| free|
|      7|    Adelyn|   Jordan|     F| free|
|      8|    Kaylee|  Summers|     F| free|
|      9|     Wyatt|    Scott|     M| free|
|     10|    Sylvie|     Cruz|     F| free|
|     11| Christian|   Porter|     F| free|
|     12|    Austin|  Rosales|     M| free|
|     13|       Ava| Robinson|     F| free|
|     14|  Theodore|   Harris|     M| free|
|     15|      Lily|     Koch|     F| paid|
|     16|     Rylan|   George|     M| paid|
|     17|  Makinley|    Jones|     F| free|
|     18|     Jacob|   Rogers|     M| free|
|     19|   Zachary|   Thomas|     M| free|
|     20|     Aiden|  Ramirez|     M| paid|
|     21|   Preston|  Sanders|  

In [16]:
users_table.filter(users_table.user_id == '88').orderBy('ts').count()

312

In [8]:
users_table.write.parquet('user/user.parquet')

In [8]:
log_data2 = log_data.where(log_data.page == 'NextSong')
joined_table = log_data2.join(song_data, 
                             (log_data.song == song_data.title) & (log_data.artist == song_data.artist_name),
                             'left')

In [39]:
#log_data.show(10)

+-----------+----------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+
|     artist|      auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|                song|status|           ts|           userAgent|userId|
+-----------+----------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+
|   Harmonia| Logged In|     Ryan|     M|            0|   Smith|655.77751| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|       Sehr kosmisch|   200|1542241826796|"Mozilla/5.0 (X11...|    26|
|The Prodigy| Logged In|     Ryan|     M|            1|   Smith|260.07465| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12| 

In [9]:
songplays = joined_table.selectExpr(["ts AS start_time",
                                 "userId AS user_id",
                                 "level",
                                 "song_id",
                                 "artist_id",
                                 "sessionId AS session_id",
                                 "location",
                                 "userAgent AS user_agent"])

In [43]:
songplays.count()

6820

In [10]:
import pyspark.sql.functions as F
songplays = songplays.withColumn("start_time", (F.to_timestamp(songplays.start_time/1000))).orderBy(songplays.start_time)

In [11]:
songplays = songplays.withColumn("songplay_id", F.monotonically_increasing_id())

In [61]:
songplays.show(5)

+--------------------+-------+-----+-------+---------+----------+--------------------+--------------------+-----------+
|          start_time|user_id|level|song_id|artist_id|session_id|            location|          user_agent|songplay_id|
+--------------------+-------+-----+-------+---------+----------+--------------------+--------------------+-----------+
|2018-11-01 21:01:...|      8| free|   null|     null|       139|Phoenix-Mesa-Scot...|"Mozilla/5.0 (Win...|          0|
|2018-11-01 21:05:...|      8| free|   null|     null|       139|Phoenix-Mesa-Scot...|"Mozilla/5.0 (Win...|          1|
|2018-11-01 21:08:...|      8| free|   null|     null|       139|Phoenix-Mesa-Scot...|"Mozilla/5.0 (Win...|          2|
|2018-11-01 21:11:...|      8| free|   null|     null|       139|Phoenix-Mesa-Scot...|"Mozilla/5.0 (Win...|          3|
|2018-11-01 21:17:...|      8| free|   null|     null|       139|Phoenix-Mesa-Scot...|"Mozilla/5.0 (Win...|          4|
+--------------------+-------+-----+----

In [12]:
time = songplays.select(songplays.start_time)

In [13]:
#fields = ['hour', 'day', 'week', 'month', 'year', 'weekday']
time = time.withColumn("hour", F.hour(time.start_time))
time = time.withColumn("day", F.dayofmonth(time.start_time))
time = time.withColumn("week", F.weekofyear(time.start_time))
time = time.withColumn("month", F.month(time.start_time))
time = time.withColumn("year", F.year(time.start_time))
time = time.withColumn("weekday", F.dayofweek(time.start_time))

In [14]:
time.show()

+--------------------+----+---+----+-----+----+-------+
|          start_time|hour|day|week|month|year|weekday|
+--------------------+----+---+----+-----+----+-------+
|2018-11-01 21:01:...|  21|  1|  44|   11|2018|      5|
|2018-11-01 21:05:...|  21|  1|  44|   11|2018|      5|
|2018-11-01 21:08:...|  21|  1|  44|   11|2018|      5|
|2018-11-01 21:11:...|  21|  1|  44|   11|2018|      5|
|2018-11-01 21:17:...|  21|  1|  44|   11|2018|      5|
|2018-11-01 21:24:...|  21|  1|  44|   11|2018|      5|
|2018-11-01 21:28:...|  21|  1|  44|   11|2018|      5|
|2018-11-01 21:42:...|  21|  1|  44|   11|2018|      5|
|2018-11-01 21:52:...|  21|  1|  44|   11|2018|      5|
|2018-11-01 21:55:...|  21|  1|  44|   11|2018|      5|
|2018-11-01 22:23:...|  22|  1|  44|   11|2018|      5|
|2018-11-02 01:25:...|   1|  2|  44|   11|2018|      6|
|2018-11-02 01:30:...|   1|  2|  44|   11|2018|      6|
|2018-11-02 01:34:...|   1|  2|  44|   11|2018|      6|
|2018-11-02 02:42:...|   2|  2|  44|   11|2018| 

In [15]:
time.write.partitionBy(['year', 'month']).parquet('time/time.parquet')

In [19]:
songplays = songplays.withColumn("month", F.month(songplays.start_time))
songplays = songplays.withColumn("year", F.year(songplays.start_time))
songplays.write.partitionBy(['year', 'month'])\
                  .parquet('songplays/songplays.parquet')