In [137]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.functions import *
import os
import zipfile

In [56]:
with zipfile.ZipFile(os.getcwd() + '/log-data.zip', 'r') as zip_ref:
    zip_ref.extractall(os.getcwd() + '/log-data')

In [7]:
with zipfile.ZipFile(os.getcwd() + '/song-data.zip', 'r') as zip_ref:
    zip_ref.extractall(os.getcwd() + '/song-data')

In [12]:
spark = SparkSession \
    .builder \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
    .getOrCreate()

In [38]:
song_data_path = os.getcwd() + '/song-data/song_data/*/*/*/*.json'

In [39]:
df = spark.read.json(song_data_path)

In [98]:
df.show(5)

+------------------+---------------+-----------------+----------------+--------------------+---------+---------+------------------+--------------------+----+
|         artist_id|artist_latitude|  artist_location|artist_longitude|         artist_name| duration|num_songs|           song_id|               title|year|
+------------------+---------------+-----------------+----------------+--------------------+---------+---------+------------------+--------------------+----+
|ARDR4AC1187FB371A1|           null|                 |            null|Montserrat Caball...|511.16363|        1|SOBAYLL12A8C138AF9|Sono andati? Fing...|   0|
|AREBBGV1187FB523D2|           null|      Houston, TX|            null|Mike Jones (Featu...|173.66159|        1|SOOLYAZ12A6701F4A6|Laws Patrolling (...|   0|
|ARMAC4T1187FB3FA4C|       40.82624|Morris Plains, NJ|       -74.47995|The Dillinger Esc...|207.77751|        1|SOBBUGU12A8C13E95D|Setting Fire to S...|2004|
|ARPBNLO1187FB3D52F|       40.71455|     New York, N

In [190]:
log_data_path = os.getcwd() + '/log-data/*.json'
df_log = spark.read.json(log_data_path)

In [192]:
song_df = df.select('song_id', 'title', 'artist_id', 'year', 'duration')\
    .dropDuplicates()

In [76]:
artist_df = df.select('artist_id', 'artist_name', 'artist_location', 'artist_latitude', 'artist_longitude')\
    .dropDuplicates()\
    .withColumnRenamed('artist_name', 'name')\
    .withColumnRenamed('artist_location', 'location')\
    .withColumnRenamed('artist_latitude', 'latitude')\
    .withColumnRenamed('artist_longitude', 'longitude')

root
 |-- artist_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- location: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)



In [193]:
bigint_from_ms = udf(lambda x: str(int(x/1000)))

df_log = df_log.withColumn('ts_ms', bigint_from_ms(df_log.ts))

df_log = df_log.withColumn('timestamp', from_unixtime(df_log.ts_ms))

time_df = df_log.withColumn('hour', hour(df_log.timestamp))\
    .withColumn('day', dayofmonth(df_log.timestamp))\
    .withColumn('week', weekofyear(df_log.timestamp))\
    .withColumn('month', month(df_log.timestamp))\
    .withColumn('year', year(df_log.timestamp))\
    .withColumn('weekday', dayofweek(df_log.timestamp))\
    .withColumnRenamed('timestamp', 'start_time')\
    .select('start_time', 'hour', 'day', 'week', 'year', 'weekday')\
    .show(5)

+-------------------+----+---+----+----+-------+
|         start_time|hour|day|week|year|weekday|
+-------------------+----+---+----+----+-------+
|2018-11-15 00:30:26|   0| 15|  46|2018|      5|
|2018-11-15 00:41:21|   0| 15|  46|2018|      5|
|2018-11-15 00:45:41|   0| 15|  46|2018|      5|
|2018-11-15 01:57:51|   1| 15|  46|2018|      5|
|2018-11-15 03:29:37|   3| 15|  46|2018|      5|
+-------------------+----+---+----+----+-------+
only showing top 5 rows



In [197]:
songplay_df = df_log.filter(df_log.page == 'NextSong')\
    .join(df, (df_log.song == df.title) & (df_log.artist == df.artist_name), how='left')\
    .withColumn("songplay_id", monotonically_increasing_id())\
    .select('songplay_id', 'timestamp', 'userId', 'level', 'song_id', 'artist_id', 'sessionId', 'location', 'userAgent')\
    .withColumnRenamed('timestamp', 'start_time')\
    .withColumnRenamed('userId', 'user_id')\
    .withColumnRenamed('sessionId', 'session_id')\
    .withColumnRenamed('userAgent', 'user_agent')

+-----------+-------------------+-------+-----+-------+---------+----------+--------------------+--------------------+
|songplay_id|         start_time|user_id|level|song_id|artist_id|session_id|            location|          user_agent|
+-----------+-------------------+-------+-----+-------+---------+----------+--------------------+--------------------+
|          0|2018-11-15 00:30:26|     26| free|   null|     null|       583|San Jose-Sunnyval...|"Mozilla/5.0 (X11...|
|          1|2018-11-15 00:41:21|     26| free|   null|     null|       583|San Jose-Sunnyval...|"Mozilla/5.0 (X11...|
|          2|2018-11-15 00:45:41|     26| free|   null|     null|       583|San Jose-Sunnyval...|"Mozilla/5.0 (X11...|
|          3|2018-11-15 03:44:09|     61| free|   null|     null|       597|Houston-The Woodl...|"Mozilla/5.0 (Mac...|
+-----------+-------------------+-------+-----+-------+---------+----------+--------------------+--------------------+
only showing top 4 rows



In [129]:
max_ts_user_df = df_log.groupBy('userId') \
    .max('ts')\
    .withColumnRenamed('max(ts)', 'max_ts')\
    .withColumnRenamed('userId', 'user_id')

mask = (df_log.userId == max_ts_user_df.user_id) & (df_log.ts == max_ts_user_df.max_ts)

user_df = df_log.join(max_ts_user_df, mask, how='inner')\
    .select('userId', 'firstName', 'lastName', 'gender', 'level')\
    .withColumnRenamed('userId', 'user_id')\
    .withColumnRenamed('firstName', 'first_name')\
    .withColumnRenamed('lastName', 'last_name')

+-------+----------+---------+------+-----+
|user_id|first_name|last_name|gender|level|
+-------+----------+---------+------+-----+
|     84|   Shakira|     Hunt|     F| free|
|     65|     Amiya| Davidson|     F| paid|
|     59|      Lily|   Cooper|     F| free|
|     40|    Tucker| Garrison|     M| free|
|     76|    Jayden|    Duffy|     F| free|
+-------+----------+---------+------+-----+
only showing top 5 rows

