In [29]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
from pyspark.sql import types as t

In [30]:
def create_spark_session():
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()
    return spark

In [31]:
spark = create_spark_session()

## Process Song Data

In [32]:
input_data = 'test_data/'

In [33]:
song_data = os.path.join(input_data, 'song_data/*/*/*/*.json')

# read song data file
df = spark.read.json(song_data)

In [34]:
df.toPandas().head(2)

Unnamed: 0,artist_id,artist_latitude,artist_location,artist_longitude,artist_name,duration,num_songs,song_id,title,year
0,ARDR4AC1187FB371A1,,,,Montserrat Caballé;Placido Domingo;Vicente Sar...,511.16363,1,SOBAYLL12A8C138AF9,Sono andati? Fingevo di dormire,0
1,AREBBGV1187FB523D2,,"Houston, TX",,Mike Jones (Featuring CJ_ Mello & Lil' Bran),173.66159,1,SOOLYAZ12A6701F4A6,Laws Patrolling (Album Version),0


### Get artists and songs tables

songs - songs in music database
`song_id, title, artist_id, year, duration`
artists - artists in music database
`artist_id, name, location, lattitude, longitude`

In [35]:
songs_table = df.select('song_id', 'title', 'artist_id', 'year', 'duration').drop_duplicates(['song_id'])
artists_table = df.select('artist_id', 'artist_name', 'artist_location', 'artist_latitude', 'artist_longitude')\
                .withColumnRenamed('artist_name', 'name')\
             .withColumnRenamed('artist_location', 'location')\
             .withColumnRenamed('artist_latitude', 'latitude')\
             .withColumnRenamed('artist_longitude', 'longitude').drop_duplicates(['artist_id'])


In [36]:
df.select('song_id', 'title', 'artist_id', 'year', 'duration').count()

71

In [37]:
artists_table

DataFrame[artist_id: string, name: string, location: string, latitude: double, longitude: double]

In [38]:
songs_table.show(5)

+------------------+--------------------+------------------+----+---------+
|           song_id|               title|         artist_id|year| duration|
+------------------+--------------------+------------------+----+---------+
|SOGOSOV12AF72A285E|   ¿Dónde va Chichi?|ARGUVEV1187B98BA17|1997|313.12934|
|SOMZWCG12A8C13C480|    I Didn't Mean To|ARD7TVE1187B99BFB1|   0|218.93179|
|SOUPIRU12A6D4FA1E1| Der Kleine Dompfaff|ARJIE2Y1187B994AB7|   0|152.92036|
|SOXVLOJ12AB0189215|     Amor De Cabaret|ARKRRTF1187B9984DA|   0|177.47546|
|SOWTBJW12AC468AC6E|Broken-Down Merry...|ARQGYP71187FB44566|   0|151.84934|
+------------------+--------------------+------------------+----+---------+
only showing top 5 rows



In [39]:
artists_table.show(5)

+------------------+--------------------+--------------------+--------+---------+
|         artist_id|                name|            location|latitude|longitude|
+------------------+--------------------+--------------------+--------+---------+
|AR9AWNF1187B9AB0B4|Kenny G featuring...|Seattle, Washingt...|    null|     null|
|AR0IAWL1187B9A96D0|        Danilo Perez|              Panama|  8.4177|-80.11278|
|AR0RCMP1187FB3F427|    Billie Jo Spears|        Beaumont, TX|30.08615|-94.10158|
|AREDL271187FB40F44|        Soul Mekanik|                    |    null|     null|
|ARI3BMM1187FB4255E|        Alice Stuart|          Washington| 38.8991|  -77.029|
+------------------+--------------------+--------------------+--------+---------+
only showing top 5 rows



### Songs table files are partitioned by year and then artist.

In [40]:
output_data = 'output'

In [41]:
songs_table.write.partitionBy('year', 'artist_id').mode("overwrite").save(output_data + "/songs_data/songs_table.parquet")

### Save Parquet files for artists

In [42]:
artists_parquet_dir = os.path.join('output', 'artists_data/artists_table.parquet')
artists_table.write.mode('overwrite').save(artists_parquet_dir)

## Process Log data

In [43]:
log_data = os.path.join(input_data, 'log-data')

# read log data file
df = spark.read.json(log_data)

In [44]:
df.toPandas().head(2)

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,Harmonia,Logged In,Ryan,M,0,Smith,655.77751,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Sehr kosmisch,200,1542241826796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
1,The Prodigy,Logged In,Ryan,M,1,Smith,260.07465,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,The Big Gundown,200,1542242481796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26


In [45]:
df = df.filter(col('page')=='NextSong')

### Extract columns for users table, users - users in the app
`user_id, first_name, last_name, gender, level`

In [46]:
users_table = df.select('userId', 'firstName', 'lastName', 'gender', 'level')\
.withColumnRenamed('userId', 'user_id')\
.withColumnRenamed('firstName', 'first_name')\
.withColumnRenamed('lastName', 'last_name')\
.distinct()

In [47]:
users_table.count()

104

In [48]:
users_parquet_dir = os.path.join(output_data, 'users_data/users_table.parquet')
users_table.write.mode('overwrite').save(users_parquet_dir)

In [49]:
from pyspark.sql.functions import *

In [50]:
get_timestamp = udf(lambda ts: datetime.fromtimestamp(ts/1000), t.TimestampType())
df = df.withColumn('timestamp', get_timestamp('ts'))

In [51]:
df.select('timestamp', 'ts').show(5)

+--------------------+-------------+
|           timestamp|           ts|
+--------------------+-------------+
|2018-11-15 00:30:...|1542241826796|
|2018-11-15 00:41:...|1542242481796|
|2018-11-15 00:45:...|1542242741796|
|2018-11-15 03:44:...|1542253449796|
|2018-11-15 05:48:...|1542260935796|
+--------------------+-------------+
only showing top 5 rows



In [52]:
get_datetime = udf(lambda ts: datetime.fromtimestamp(ts / 1000.0).strftime('%Y-%m-%d %H:%M:%S'), t.StringType())
df = df.withColumn('datetime', get_datetime('ts'))

In [53]:
df.select('datetime', 'timestamp').toPandas().head()

Unnamed: 0,datetime,timestamp
0,2018-11-15 00:30:26,2018-11-15 00:30:26.796
1,2018-11-15 00:41:21,2018-11-15 00:41:21.796
2,2018-11-15 00:45:41,2018-11-15 00:45:41.796
3,2018-11-15 03:44:09,2018-11-15 03:44:09.796
4,2018-11-15 05:48:55,2018-11-15 05:48:55.796


In [54]:
from pyspark.sql.functions import year, month, dayofmonth, weekofyear

In [55]:
df2 = df.withColumn("hour", hour(col('timestamp')))\
    .withColumn("day", dayofmonth(col("timestamp")))\
    .withColumn("week", weekofyear(col("timestamp")))\
    .withColumn("month", month(col("timestamp")))\
    .withColumn("year", year(col("timestamp")))

In [57]:
with_time = df.drop_duplicates(['datetime'])\
.withColumn("hour", hour(col('timestamp')))\
.withColumn("day", dayofmonth(col("timestamp")))\
.withColumn("week", weekofyear(col("timestamp")))\
.withColumn("month", month(col("timestamp")))\
.withColumn("year", year(col("timestamp")))
    
    # extract columns to create time table
time_table = with_time.select('datetime', 'hour', 'day', 'week', 'month', 'year').distinct()

In [58]:
time_table.show()

+-------------------+----+---+----+-----+----+
|           datetime|hour|day|week|month|year|
+-------------------+----+---+----+-----+----+
|2018-11-15 07:56:18|   7| 15|  46|   11|2018|
|2018-11-15 16:51:56|  16| 15|  46|   11|2018|
|2018-11-15 18:31:38|  18| 15|  46|   11|2018|
|2018-11-14 00:41:15|   0| 14|  46|   11|2018|
|2018-11-14 00:53:43|   0| 14|  46|   11|2018|
|2018-11-14 17:30:51|  17| 14|  46|   11|2018|
|2018-11-14 22:40:13|  22| 14|  46|   11|2018|
|2018-11-05 09:44:49|   9|  5|  45|   11|2018|
|2018-11-05 14:52:12|  14|  5|  45|   11|2018|
|2018-11-05 15:19:50|  15|  5|  45|   11|2018|
|2018-11-13 09:17:59|   9| 13|  46|   11|2018|
|2018-11-13 14:29:21|  14| 13|  46|   11|2018|
|2018-11-13 21:18:37|  21| 13|  46|   11|2018|
|2018-11-13 23:43:51|  23| 13|  46|   11|2018|
|2018-11-30 04:27:14|   4| 30|  48|   11|2018|
|2018-11-16 15:24:14|  15| 16|  46|   11|2018|
|2018-11-20 11:06:04|  11| 20|  47|   11|2018|
|2018-11-20 14:21:35|  14| 20|  47|   11|2018|
|2018-11-20 1

In [59]:
time_parquet_dir = os.path.join(output_data, 'time_data/time_table.parquet')
time_table.write.partitionBy(['year', 'month']).mode('overwrite').save(time_parquet_dir)

In [60]:
song_data = spark.read.json(os.path.join(input_data, 'song_data/A/A/A'))

In [61]:
df

DataFrame[artist: string, auth: string, firstName: string, gender: string, itemInSession: bigint, lastName: string, length: double, level: string, location: string, method: string, page: string, registration: double, sessionId: bigint, song: string, status: bigint, ts: bigint, userAgent: string, userId: string, timestamp: timestamp, datetime: string]

In [62]:
song_data

DataFrame[artist_id: string, artist_latitude: double, artist_location: string, artist_longitude: double, artist_name: string, duration: double, num_songs: bigint, song_id: string, title: string, year: bigint]

In [63]:
import pyspark.sql.functions as F

In [76]:
songplays_table = df.join(song_data, on = 
      (df.song==song_data.title) & 
      (df.artist == song_data.artist_name) & 
      (df.length== song_data.duration))\
      .withColumn("songplay_id", F.monotonically_increasing_id())\
      .select('songplay_id', 'timestamp', 'userId', 'level', 'song_id', 'artist_id', 'sessionId', 'location', 'userAgent')\
      .withColumnRenamed('timestamp', 'start_time')\
      .withColumnRenamed('userId', 'user_id')\
      .withColumnRenamed('sessionId', 'session_id')\
      .withColumnRenamed('userAgent', 'user_agent')\
      .withColumn("month", month(col("start_time")))\
      .withColumn("year", year(col("start_time")))

In [77]:
songplays_parquet_dir = os.path.join(output_data, 'songplays_data/songplays_table.parquet')
songplays_table.write.partitionBy(['year', 'month']).mode('overwrite').save(songplays_parquet_dir)

In [78]:
songplays_table.show(5)

+-----------+----------+-------+-----+-------+---------+----------+--------+----------+-----+----+
|songplay_id|start_time|user_id|level|song_id|artist_id|session_id|location|user_agent|month|year|
+-----------+----------+-------+-----+-------+---------+----------+--------+----------+-----+----+
+-----------+----------+-------+-----+-------+---------+----------+--------+----------+-----+----+



## Example Query

In [89]:
songs = spark.read.parquet(output_data + '/songs_data/songs_table.parquet')    
songs.groupBy('year').count().collect()

[Row(year=2003, count=2),
 Row(year=2007, count=1),
 Row(year=1961, count=1),
 Row(year=1997, count=2),
 Row(year=1994, count=2),
 Row(year=2004, count=4),
 Row(year=1969, count=1),
 Row(year=1982, count=1),
 Row(year=1985, count=1),
 Row(year=1987, count=1),
 Row(year=1972, count=1),
 Row(year=1992, count=1),
 Row(year=2005, count=2),
 Row(year=1984, count=1),
 Row(year=2000, count=2),
 Row(year=1964, count=1),
 Row(year=1986, count=1),
 Row(year=2008, count=1),
 Row(year=1999, count=1),
 Row(year=0, count=43),
 Row(year=1993, count=1)]