In [2]:
# Import libraries
from pyspark.sql.types import *
from pyspark.sql import SparkSession
import pyspark.sql.functions as F 
from pyspark.sql.window import Window

import numpy as np
import pandas as pd
import os
import glob

In [3]:
# Create a Spark session
spark = SparkSession \
    .builder \
    .appName("Sparkify") \
    .getOrCreate()

In [73]:
# Spark and pandas settings
spark.conf.set("spark.sql.legacy.timeParserPolicy","LEGACY")
spark.sparkContext.setLogLevel('ERROR')
pd.set_option('display.max_colwidth', 500)

## Import

In [5]:
def get_files(filepath):
    """Browses the root directory of JSON data files.
    
    Args:
        filepath (str): The filepath to the JSON data source.
        
    Returns:
        A list of all JSON files in our data source.
    """    
    all_files = []
    for root, dirs, files in os.walk(filepath):
        files = glob.glob(os.path.join(root,'*.json'))
        for f in files :
            all_files.append(os.path.abspath(f))
    
    return all_files

### Logs

In [138]:
logs = spark.read.json('data/log_data/*-events.json')
logs.printSchema()
logs.count()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



8056

In [139]:
# Add songplay_id and start_time
w =  Window.partitionBy(logs.userId).orderBy('ts')
logs = logs \
    .withColumn('songplay_id', F.row_number().over(w)) \
    .withColumn('start_time', (F.col('ts') / 1000.0).cast(TimestampType()))
logs.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- songplay_id: integer (nullable = true)
 |-- start_time: timestamp (nullable = true)



In [141]:
logs.limit(3).toPandas()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId,songplay_id,start_time
0,Mariza,Logged In,Maia,F,0,Burke,239.3073,free,"Houston-The Woodlands-Sugar Land, TX",PUT,NextSong,1540677000000.0,143,Vielas De Alfama,200,1541313299796,"""Mozilla/5.0 (Windows NT 6.3; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.143 Safari/537.36""",51,1,2018-11-04 07:34:59.796
1,Alejandro Sanz,Logged In,Maia,F,1,Burke,230.24281,free,"Houston-The Woodlands-Sugar Land, TX",PUT,NextSong,1540677000000.0,143,Non E_ Per Te_ Per Me,200,1541313538796,"""Mozilla/5.0 (Windows NT 6.3; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.143 Safari/537.36""",51,2,2018-11-04 07:38:58.796
2,,Logged In,Maia,F,0,Burke,,free,"Houston-The Woodlands-Sugar Land, TX",GET,Home,1540677000000.0,230,,200,1541514106796,"""Mozilla/5.0 (Windows NT 6.3; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.143 Safari/537.36""",51,3,2018-11-06 15:21:46.796


### Songs

In [142]:
songs = spark.read.json('data/song_data/*/*/*/*.json')
songs.printSchema()
songs.count()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



71

## Transform staging data into star schema using spark

In [107]:
logs.createOrReplaceTempView('logs')
songs.createOrReplaceTempView('songs')

### Users_table

In [108]:
users_table = spark.sql("""
    SELECT DISTINCT 
        CAST(A.userId AS int) AS user_id
        , A.firstName AS first_name
        , A.lastName AS last_name
        , A.gender
        , A.level
    FROM logs AS A
    INNER JOIN (
        SELECT userId, MAX(ts) AS tsLast
        FROM logs
        GROUP BY userId
    ) AS B
        ON A.userId = B.userId
        AND A.ts = B.tsLast
    WHERE A.userId IS NOT NULL;
""")

In [143]:
users_table.printSchema()
users_table.count()
users_table.show(5)

root
 |-- user_id: integer (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- level: string (nullable = true)

+-------+----------+---------+------+-----+
|user_id|first_name|last_name|gender|level|
+-------+----------+---------+------+-----+
|     26|      Ryan|    Smith|     M| free|
|     15|      Lily|     Koch|     F| paid|
|      8|    Kaylee|  Summers|     F| free|
|     17|  Makinley|    Jones|     F| free|
|     82|     Avery| Martinez|     F| paid|
+-------+----------+---------+------+-----+
only showing top 5 rows



### Songs_table

In [112]:
songs_table = spark.sql("""
    SELECT song_id
        , MIN(title) AS title
        , MIN(artist_id) AS artist_id
        , MIN(year) AS year
        , MIN(duration) AS duration
    FROM songs
    GROUP BY song_id;
""")

In [144]:
songs_table.printSchema()
songs_table.count()
songs_table.show(5)

root
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- year: long (nullable = true)
 |-- duration: double (nullable = true)

+------------------+--------------------+------------------+----+---------+
|           song_id|               title|         artist_id|year| duration|
+------------------+--------------------+------------------+----+---------+
|SOGOSOV12AF72A285E|   ¿Dónde va Chichi?|ARGUVEV1187B98BA17|1997|313.12934|
|SOMZWCG12A8C13C480|    I Didn't Mean To|ARD7TVE1187B99BFB1|   0|218.93179|
|SOUPIRU12A6D4FA1E1| Der Kleine Dompfaff|ARJIE2Y1187B994AB7|   0|152.92036|
|SOXVLOJ12AB0189215|     Amor De Cabaret|ARKRRTF1187B9984DA|   0|177.47546|
|SOWTBJW12AC468AC6E|Broken-Down Merry...|ARQGYP71187FB44566|   0|151.84934|
+------------------+--------------------+------------------+----+---------+
only showing top 5 rows



### artists_table

In [116]:
artists_table = spark.sql("""
    SELECT artist_id
        , MIN(artist_name) AS name
        , MIN(artist_location) AS location
        , MIN(artist_latitude) AS latitude
        , MIN(artist_longitude) AS longitude
    FROM songs
    GROUP BY artist_id;
""")

In [145]:
artists_table.printSchema()
artists_table.count()
artists_table.show(5)

root
 |-- artist_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- location: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)

+------------------+--------------------+--------------------+--------+---------+
|         artist_id|                name|            location|latitude|longitude|
+------------------+--------------------+--------------------+--------+---------+
|AR9AWNF1187B9AB0B4|Kenny G featuring...|Seattle, Washingt...|    null|     null|
|AR0IAWL1187B9A96D0|        Danilo Perez|              Panama|  8.4177|-80.11278|
|AR0RCMP1187FB3F427|    Billie Jo Spears|        Beaumont, TX|30.08615|-94.10158|
|AREDL271187FB40F44|        Soul Mekanik|                    |    null|     null|
|ARI3BMM1187FB4255E|        Alice Stuart|          Washington| 38.8991|  -77.029|
+------------------+--------------------+--------------------+--------+---------+
only showing top 5 rows



### time_table

In [121]:
time_table = logs \
    .withColumn('hour', F.hour(F.col('start_time'))) \
    .withColumn('day', F.dayofmonth(F.col('start_time'))) \
    .withColumn('week', F.weekofyear(F.col('start_time'))) \
    .withColumn('month', F.month(F.col('start_time'))) \
    .withColumn('year', F.year(F.col('start_time'))) \
    .withColumn('weekday', F.dayofweek(F.col('start_time'))) \
    .select('start_time', 'hour', 'day', 'week', 'month', 'year', 'weekday') \
    .dropDuplicates()

In [146]:
time_table.printSchema()
time_table.count()
time_table.show(5)

root
 |-- start_time: timestamp (nullable = true)
 |-- hour: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- weekday: integer (nullable = true)

+--------------------+----+---+----+-----+----+-------+
|          start_time|hour|day|week|month|year|weekday|
+--------------------+----+---+----+-----+----+-------+
|2018-11-15 11:31:...|  11| 15|  46|   11|2018|      5|
|2018-11-21 03:18:...|   3| 21|  47|   11|2018|      4|
|2018-11-22 00:51:...|   0| 22|  47|   11|2018|      5|
|2018-11-14 17:19:...|  17| 14|  46|   11|2018|      4|
|2018-11-28 17:06:...|  17| 28|  48|   11|2018|      4|
+--------------------+----+---+----+-----+----+-------+
only showing top 5 rows



### songplays_table

In [126]:
songplays_table = spark.sql("""
    SELECT A.songplay_id
        , A.start_time 
        , CAST(A.userId AS int) AS user_id
        , A.level
        , B.song_id
        , B.artist_id
        , A.sessionId AS session_id
        , A.location
        , A.userAgent AS user_agent
    FROM logs AS A
    INNER JOIN songs AS B
        ON A.song = B.title
        AND A.artist = B.artist_name;
""")

In [147]:
songplays_table.printSchema()
songplays_table.count()
songplays_table.show(5)

root
 |-- songplay_id: integer (nullable = true)
 |-- start_time: timestamp (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- level: string (nullable = true)
 |-- song_id: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- session_id: long (nullable = true)
 |-- location: string (nullable = true)
 |-- user_agent: string (nullable = true)

+-----------+--------------------+-------+-----+------------------+------------------+----------+--------------------+--------------------+
|songplay_id|          start_time|user_id|level|           song_id|         artist_id|session_id|            location|          user_agent|
+-----------+--------------------+-------+-----+------------------+------------------+----------+--------------------+--------------------+
|        405|2018-11-21 22:56:...|     15| paid|SOZCTXZ12AB0182364|AR5KOSW1187FB35FF4|       818|Chicago-Napervill...|"Mozilla/5.0 (X11...|
+-----------+--------------------+-------+-----+------------------