### Develop code for etl.py script

In [47]:
# etl.py
#
# PROGRAMMER: Brian Pederson
# DATE CREATED: 03/10/2020
# PURPOSE: Script to implement ETL processes to populate 'data lake' tables for Data Engineering Project 3.
#
# Included functions:
#     get_input_args    - function to process input arguments specific to train.py 
#     process_song_data - extract/insert data for songs and artists dimensions from source song json files
#     process log_data  - extract/insert data for users and time dimensions; songplays fact from source log json files
#     main              - main function performs ETL
#   


In [1]:
import os
import time  
import configparser
import sys
import argparse  
from datetime import datetime

from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, expr
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, dayofweek, date_format, from_unixtime, to_timestamp
from pyspark.sql.types import StructType, StructField, DoubleType, StringType, IntegerType, LongType, DateType, TimestampType

In [2]:
config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']

In [3]:
def create_spark_session():
    """
    Utility function to create a Spark session
    """
    
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()
    return spark

In [4]:
def process_song_data(spark, input_data, output_data, input_song_pattern):
    """
    Extract data for song and artist dimensions from source song json files then insert into parquet files
    Parameters:
      spark - Spark session
      input_data - filepath to source json files
      output_data - filepath to target parquet files
      input_song_pattern - file pattern for input song files
    """   

    # get filepath to song data file
    song_data = input_data + input_song_pattern   # 'song_data/*/*/*/*.json'   # 'song_data/A/A/*/*.json'   # 'song_data/A/A/A/*.json' 
    print(song_data)  # temp debug
     
    # this is not necessary but useful as a code sample for future reference 
    songSchema = StructType([StructField("artist_id", StringType()),
                             StructField("artist_latitude", DoubleType()),
                             StructField("artist_location", StringType()),
                             StructField("artist_longitude", DoubleType()),
                             StructField("artist_name", StringType()),
                             StructField("duration", DoubleType()),
                             StructField("num_songs", IntegerType()),
                             StructField("song_id", StringType()),
                             StructField("title", StringType()),
                             StructField("year", IntegerType())
                            ])   

    # read song/artist source data file
    dfSongSource = spark.read.json(song_data, schema=songSchema)
    
    # extract columns to create songs dataframe (pre songs table)
    # use Spark SQL query to create songs dataframe (proto songs table)
    dfSongSource.createOrReplaceTempView("staging_songs")

    songs_table = spark.sql(
    """
    SELECT song_id, 
           MIN(title) AS title, 
           MIN(artist_id) AS artist_id, 
           MIN(year) AS year, 
           MIN(duration) AS duration
      FROM staging_songs
     GROUP BY song_id
    """)   
    
    # add unknown dummy row to songs dataframe
    unknownSongRow = spark.createDataFrame([('***UNKNOWN_SONG***', '***Unknown Song***', '***UNKNOWN_ARTIST***', 0, 0)])
    songs_table = songs_table.union(unknownSongRow)
    
    # write songs table to parquet files partitioned by year and artist
    songs_table.write.partitionBy("year", "artist_id").format("parquet").mode("overwrite").save(output_data + "songs.parquet")

    # extract columns to create artists table
    # use Spark SQL query to create artists dataframe (proto artists table)
    dfSongSource.createOrReplaceTempView("staging_songs")

    artists_table = spark.sql(
    """
    SELECT artist_id,
           MIN(artist_name) AS name,
           MIN(artist_location) AS location,
           MIN(artist_latitude) AS latitude,
           MIN(artist_longitude) AS longitude
      FROM staging_songs
     GROUP BY artist_id
    """)  
    
    # add unknown dummy row to artists dataframe
    unknownArtistRow = spark.createDataFrame([('***UNKNOWN_ARTIST***', '*** Unknown Artist ***', '', 0.0, 0.0)])
    artists_table = artists_table.union(unknownArtistRow)
    
    # write artists table to parquet files
    artists_table.write.format("parquet").mode("overwrite").save(output_data + "artists.parquet")

    # extract columns to create song keys file dataframe (proto song_keys table)
    song_keys_table = dfSongSource.select(["song_id", "title", "duration", "artist_id", "artist_name"]).dropDuplicates()

    ## add unknown dummy row to artists dataframe
    #unknownSongKeyRow = spark.createDataFrame( \
    #                       [('***UNKNOWN_SONG***', '*** Unknown Song ***', 0.0, '***UNKNOWN_ARTIST***', '*** Unknown Artist ***')])
    #song_keys_table = song_keys_table.union(unknownSongKeyRow)

    # write song keys table to parquet files
    song_keys_table.write.format("parquet").mode("overwrite").save(output_data + "song_keys.parquet")
    

In [5]:
def process_log_data(spark, input_data, output_data, input_log_pattern, mode):
    """
    Extract data for users, time dimensions and songplays fact from source log json files then insert into parquet files
    Parameters:
      spark - Spark session
      input_data - filepath to source json files
      output_data - filepath to target parquet files
      input_log_pattern - file pattern for input log files
    """   
    
    # get filepath to log data file
    log_data = input_data + input_log_pattern  #'log_data/*/*/*.json'  # 'log_data/*/*/2018-11-01-events.json'
    print(log_data)  # temp debug
    
    # read log data file
    dfLogSource = spark.read.json(log_data)

    # filter log data by actions for song plays
    dfLogSource = dfLogSource.where(dfLogSource.page == 'NextSong')   # filter dataframe to only include rows with page == 'NextSong'

    # convert userID from string to integer
    dfLogSource = dfLogSource.withColumn('userID', expr("cast(userID as int)"))

    # extract columns for users table
    # use Spark SQL query to create users dataframe (proto users table)
    dfLogSource.createOrReplaceTempView("staging_log")

    users_table = spark.sql(
    """
    SELECT userID AS user_id,
           MIN(firstName) AS first_name,
           MIN(lastName) AS last_name,
           MIN(gender) AS gender,
           MIN(level) AS level
      FROM staging_log
     GROUP BY userID
    """)    

    # write users table to parquet files
    users_table.write.format("parquet").mode("overwrite").save(output_data + "users.parquet")

    # create timestamp column start_time from original timestamp column ts NOT using udf()
    # note that this method has a side effect of stripping microseconds from start_time timestamp
    dfLogSource = dfLogSource.withColumn("start_time", to_timestamp(from_unixtime(dfLogSource.ts/1000)))

    # extract columns to create time table
    # use Spark SQL query to create time dataframe (pre time table)
    dfLogSource.createOrReplaceTempView("staging_log")

    time_table = spark.sql(
    """
    SELECT DISTINCT start_time,
                    hour(start_time)       AS hour,
                    day(start_time)        AS day,
                    weekofyear(start_time) AS week,
                    month(start_time)      AS month,
                    year(start_time)       AS year,
                    dayofweek(start_time)  AS weekday
      FROM staging_log
     WHERE 1=1
    """)

    # write time table to parquet files partitioned by year and month
    time_table.write.partitionBy("year", "month").format("parquet").mode("overwrite").save(output_data + "time.parquet")

    # enhance log source to include synthetic primary key songplay_id
    get_songplay_id = udf(lambda x, y, z: (f"{x:06}.{y:06}.{z}"), StringType())

    dfLogSource = dfLogSource.withColumn("songplay_id", get_songplay_id(dfLogSource.userID, dfLogSource.sessionId, dfLogSource.ts) )
                     
    # enhance log source to include hour and year to facilitate partitioning below
    dfLogSource = dfLogSource.withColumn("year", year(dfLogSource.start_time)) \
                             .withColumn("month", month(dfLogSource.start_time))

    # read in song keys data to use for songplays table
    song_keys_table = spark.read.parquet(output_data + "song_keys.parquet")

    # register temp view for song_keys utility table
    song_keys_table.createOrReplaceTempView("song_keys")

    # extract columns from joined song, artists and log datasets to create songplays table
    # use Spark SQL query to create songplays dataframe (proto songplays table)
    dfLogSource.createOrReplaceTempView("staging_log")

    songplays_table = spark.sql(
    """
    SELECT e.songplay_id,
           e.start_time,
           e.year,
           e.month,
           e.userID as user_id,
           e.level,
           COALESCE(s.song_id, '***UNKNOWN_SONG***') as song_id,
           COALESCE(s.artist_id, '***UNKNOWN_ARTIST***') as artist_id,
           e.sessionid as session_id,
           e.location,
           e.useragent as user_agent
      FROM staging_log e
      LEFT OUTER JOIN song_keys s ON e.song = s.title AND e.artist = s.artist_name and e.length = s.duration
     WHERE e.page = 'NextSong'
    """)  

    # write songplays table to parquet files partitioned by year and month
    # note the mode can vary between overwrite and append based on command line parm
    songplays_table.write.partitionBy("year", "month").format("parquet").mode(mode).save(output_data + "songplays.parquet")


In [6]:
def get_input_args():
    """
    Retrieves and parses the 5 optional arguments provided by the user.

    Mandatory command line arguments:
      None
    Optional command line arguments:
      1. Path for input bucket/directory.
      2. File pattern for input song files.
      3. File pattern for input log files.
      4. Path for output bucket/directory.
      5. Mode for songplays fact table (overwrite vs append)

    This function returns these arguments as an ArgumentParser object.
    Parameters:
      None - using argparse module to create & store command line arguments
    Returns:
      parser.namespace - data structure that stores the command line arguments object
    """
    # Create Parse using ArgumentParser
    parser = argparse.ArgumentParser()
    parser.prog = 'etl.py'
    parser.description = "Performs ETL functions for Sparkify Data Lake."
    
    # Argument 1
    parser.add_argument('--input_path', type = str, default="s3a://udacity-dend/", 
               help = "Path for input bucket or directory (e.g. 's3a://udacity-dend/')")
    # Argument 2
    parser.add_argument('--input_song_pattern', type = str, default='song_data/*/*/*/*.json',
               help = "File pattern for song json files (e.g. 'song_data/*/*/*/*.json', 'song_data/A/A/*/*.json', 'song_data/A/A/A/*.json')")    
    # Argument 3
    parser.add_argument('--input_log_pattern', type = str, default='log_data/*/*/*.json',
               help = "File pattern for log json files (e.g. 'log_data/*/*/*.json', 'log_data/*/*/2018-11-0*.json', 'log_data/*/*/2018-11-01-events.json')")     
    # Argument 4
    parser.add_argument('--output_path', type = str, required=True, # default="analytics/",
               help = "Path for output bucket or directory (e.g. 's3a://sparkify-bp/analytics/', 'analytics/')")    
    # Argument 5:
    parser.add_argument('--mode', type = str, choices = ['overwrite', 'append'], default="overwrite",
               help = "Mode to update songplays fact table ('overwrite', 'append')")

    # Note: this will perform system exit if argument is malformed or imcomplete
    in_args = parser.parse_args()

    # return parsed argument collection
    return in_args 


In [4]:
# remove soon
spark = create_spark_session()

In [5]:
# remove soon
input_data = "s3a://udacity-dend/"
#output_data = "s3a://sparkify-bp/analytics/"
output_data = "analytics2/"

#process_song_data(spark, input_data, output_data)    
#process_log_data(spark, input_data, output_data)

In [7]:
def main():
    """
    Main function contains core logic for ETL.
    Parameters: see parameters of get_input_args function.
    """
    
    in_args = get_input_args()      
    print(in_args)    # temp debug 

    start_time = time.time()   
    
    spark = create_spark_session()
    
    process_song_data(spark, in_args.input_path, in_args.output_path, in_args.input_song_pattern)    
    process_log_data(spark, in_args.input_path, in_args.output_path, in_args.input_log_pattern, in_args.mode)
    
    print("** Total Elapsed Runtime: " + time.strftime("%H:%M:%S", time.gmtime(time.time() - start_time)) )    


#### Run the proto script

In [9]:
# Simulate running script with command line arguments
sys.argv = ['etl.py', 
            '--input_song_pattern', 'song_data/A/*/*/*.json', '--input_log_pattern', 'log_data/*/*/2018-11-1*.json',
            '--output_path', 'analytics/', '--mode', 'overwrite']
#            '--output_path', 's3a://sparkify-bp/analytics/', '--mode', 'overwrite']

if __name__ == "__main__":
    main()  

Namespace(input_log_pattern='log_data/*/*/2018-11-1*.json', input_path='s3a://udacity-dend/', input_song_pattern='song_data/A/*/*/*.json', mode='overwrite', output_path='analytics/')
s3a://udacity-dend/song_data/A/*/*/*.json
s3a://udacity-dend/log_data/*/*/2018-11-1*.json
root
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- artist_name: string (nullable = true)

+-------+------------------+
|summary|           song_id|
+-------+------------------+
|  count|             14896|
|   mean|              null|
| stddev|              null|
|    min|SOAAAQN12AB01856D3|
|    max|SOZZZON12A8C139ED5|
+-------+------------------+

** Total Elapsed Runtime: 02:18:19


### Develop individual components for inclusion in script

#### Work the song/artist source 

In [6]:
# get filepath to song data file
song_data = input_data + 'song_data/A/D/*/*.json'   # 'song_data/A/A/A/*.json'   # 'song_data/*/*/*/*.json'

# this is not necessary but useful as a code sample for future reference 
songSchema = StructType([StructField("artist_id", StringType()),
                         StructField("artist_latitude", DoubleType()),
                         StructField("artist_location", StringType()),
                         StructField("artist_longitude", DoubleType()),
                         StructField("artist_name", StringType()),
                         StructField("duration", DoubleType()),
                         StructField("num_songs", IntegerType()),
                         StructField("song_id", StringType()),
                         StructField("title", StringType()),
                         StructField("year", IntegerType())
                        ])   

# read song data file
dfSongSource = spark.read.json(song_data, schema=songSchema)

dfSongSource.printSchema()
dfSongSource.describe("artist_name").show() 
dfSongSource.limit(4).toPandas()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: integer (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: integer (nullable = true)

+-------+-----------+
|summary|artist_name|
+-------+-----------+
|  count|        585|
|   mean|     2002.0|
| stddev|        NaN|
|    min|  2 Bit Pie|
|    max|        Zia|
+-------+-----------+



Unnamed: 0,artist_id,artist_latitude,artist_location,artist_longitude,artist_name,duration,num_songs,song_id,title,year
0,ARDSWIE1187FB39056,,"Boston, Massachusettes",,Gang Starr Featuring Jeru The Damaja And Lil Dap,244.32281,1,SOIVSQZ12A6D4F68BF,I'm The Man (Explicit) (Feat. Jeru The Damaja ...,1992
1,ARAPIIU1187B98FAC9,28.02232,"Winter Haven, FL",-81.73295,Gram Parsons,376.18893,1,SOIAZUN12A58A81065,Encore Medley: Bony Moronie/Forty Days/Almost ...,0
2,ARU1K2U1187FB48529,39.76691,"Indianapolis, IN",-86.14996,Freddie Hubbard,384.9922,1,SONOOQK12A58533A67,D Minor Mint (Rudy Van Gelder 24Bit Mastering)...,0
3,ARGIABO1187FB4B3B5,35.32689,"Robbinsville, NC",-83.80489,Ronnie Milsap,171.25832,1,SONFXQX12AB01872A4,Up To Zion (High Key-Premiere Performance Plus...,0


##### Use Spark dataframe to construct songs table

In [10]:
# extract columns to create songs dataframe (proto songs table)
songs_table = dfSongSource.select(["song_id", "title", "artist_id", "year", "duration"]).dropDuplicates()

# alternate syntax
#songs_table = dfSongSource['song_id', 'title', 'artist_id', 'year', 'duration'].dropDuplicates()

songs_table.describe("song_id").show()
songs_table.show(4)

+-------+------------------+
|summary|           song_id|
+-------+------------------+
|  count|               585|
|   mean|              null|
| stddev|              null|
|    min|SOAAHZO12A67AE1265|
|    max|SOZYUKG12A6D4FB64F|
+-------+------------------+

+------------------+--------------------+------------------+----+---------+
|           song_id|               title|         artist_id|year| duration|
+------------------+--------------------+------------------+----+---------+
|SOUOVIB12AF729C93A|               Alone|ART9VTZ1187FB48DDC|1972|329.35138|
|SOLJRHX12AC4687C8D|          Number One|ARN0JNC1187FB38B22|   0|118.72608|
|SOXXQHZ12A6D4F6E5D|Toccata And Fugue...|ARS7F8J1187B9AFE31|   0|279.61424|
|SOGWCWD12AB017FC95|Money Make The Wo...|ARRXG5Y1187B9AA016|   0|254.09261|
+------------------+--------------------+------------------+----+---------+
only showing top 4 rows



##### Use Spark SQL to construct songs table

In [11]:
# use Spark SQL query to create songs dataframe (proto songs table) 
dfSongSource.createOrReplaceTempView("staging_songs") 

songs_table = spark.sql(
"""
SELECT song_id, MIN(title) AS title, MIN(artist_id) AS artist_id, MIN(year) AS year, MIN(duration) AS duration
  FROM staging_songs
 WHERE 1=1
 GROUP BY song_id
""")

songs_table.describe("song_id").show() 
songs_table.show(4)

+-------+------------------+
|summary|           song_id|
+-------+------------------+
|  count|               585|
|   mean|              null|
| stddev|              null|
|    min|SOAAHZO12A67AE1265|
|    max|SOZYUKG12A6D4FB64F|
+-------+------------------+

+------------------+--------------------+------------------+----+---------+
|           song_id|               title|         artist_id|year| duration|
+------------------+--------------------+------------------+----+---------+
|SOAZQJP12A8C13BD71|That Night (Wah-C...|ARZHCKQ1187B9BA5BA|   0|418.48118|
|SODPHBK12A6D4FA4DE|Misadventures of ...|ARHGQUW1187FB3905C|2007|175.77751|
|SOSGPGG12A8C136117|Waltz in A minor_...|ARRYQNN121318C55B3|   0|311.84934|
|SOWZOPT12A67AD7271|    Y Si La Historia|AR4XVA51187FB39FEC|   0|290.11546|
+------------------+--------------------+------------------+----+---------+
only showing top 4 rows



##### Add unknown dummy row to songs dataframe

In [12]:
# add unknown dummy row to songs dataframe
unknownSongRow = spark.createDataFrame([('***UNKNOWN_SONG***', '***Unknown Song***', '***UNKNOWN_ARTIST***', 0, 0)])
songs_table = songs_table.union(unknownSongRow)

In [13]:
songs_table.printSchema()
songs_table.describe("song_id").show() 
songs_table.show(4)

root
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- year: long (nullable = true)
 |-- duration: double (nullable = true)

+-------+------------------+
|summary|           song_id|
+-------+------------------+
|  count|               586|
|   mean|              null|
| stddev|              null|
|    min|***UNKNOWN_SONG***|
|    max|SOZYUKG12A6D4FB64F|
+-------+------------------+

+------------------+--------------------+------------------+----+---------+
|           song_id|               title|         artist_id|year| duration|
+------------------+--------------------+------------------+----+---------+
|SOAZQJP12A8C13BD71|That Night (Wah-C...|ARZHCKQ1187B9BA5BA|   0|418.48118|
|SODPHBK12A6D4FA4DE|Misadventures of ...|ARHGQUW1187FB3905C|2007|175.77751|
|SOSGPGG12A8C136117|Waltz in A minor_...|ARRYQNN121318C55B3|   0|311.84934|
|SOWZOPT12A67AD7271|    Y Si La Historia|AR4XVA51187FB39FEC|   0|290.11546|
+--------

In [14]:
# write songs table to parquet files partitioned by year and artist
songs_table.write.partitionBy("year", "artist_id").format("parquet").mode("overwrite").save(output_data + "songs.parquet")

# alternate syntax:
#songs_table.write.partitionBy('year', 'artist_id').parquet(os.path.join(output_data, 'songs'), 'overwrite')

In [15]:
# read what's in the songs table
dfCheckSongs = spark.read.parquet(output_data + "songs.parquet")

dfCheckSongs.printSchema()
dfCheckSongs.describe("song_id").show() 
dfCheckSongs.limit(4).toPandas()

root
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- year: integer (nullable = true)
 |-- artist_id: string (nullable = true)

+-------+------------------+
|summary|           song_id|
+-------+------------------+
|  count|               586|
|   mean|              null|
| stddev|              null|
|    min|***UNKNOWN_SONG***|
|    max|SOZYUKG12A6D4FB64F|
+-------+------------------+



Unnamed: 0,song_id,title,duration,year,artist_id
0,SOMBEOP12AB018A0F7,Bust At You [featuring Baby_ Scarface & Tony S...,277.002,2002,AR2XI6G1187B9AE7DA
1,SOIAZUN12A58A81065,Encore Medley: Bony Moronie/Forty Days/Almost ...,376.18893,0,ARAPIIU1187B98FAC9
2,SONOOQK12A58533A67,D Minor Mint (Rudy Van Gelder 24Bit Mastering)...,384.9922,0,ARU1K2U1187FB48529
3,SONFXQX12AB01872A4,Up To Zion (High Key-Premiere Performance Plus...,171.25832,0,ARGIABO1187FB4B3B5


##### Use Spark dataframe to construct artists table

In [16]:
# extract columns to create artists dataframe (proto artists table)
# don't use this because the source data contains multiple artist alias names. need to group by artist_id and easier to do that with SQL
artists_table = dfSongSource.select(["artist_id", "artist_name", "artist_location", "artist_latitude", "artist_longitude"]) 
#artists_table.show(4)

artists_table = artists_table.withColumnRenamed("artist_name", "name") \
                             .withColumnRenamed("artist_location", "location") \
                             .withColumnRenamed("artist_latitude", "latitude") \
                             .withColumnRenamed("artist_longitude", "longitude") \
                             .dropDuplicates()

artists_table.describe("artist_id").show() 
artists_table.show(4)

+-------+------------------+
|summary|         artist_id|
+-------+------------------+
|  count|               574|
|   mean|              null|
| stddev|              null|
|    min|AR00FVC1187FB5BE3E|
|    max|ARZZXT51187FB4627E|
+-------+------------------+

+------------------+--------------------+--------------------+--------+---------+
|         artist_id|                name|            location|latitude|longitude|
+------------------+--------------------+--------------------+--------+---------+
|ARTNWGK122C86756BE|El Pollito De Cal...|                    |    null|     null|
|ARYEBQC1187FB5ACD9|The Devin Townsen...|                    |    null|     null|
|ARMC5JL1187B98C9ED|The Most Serene R...|Milton, Ontario, ...|43.51103|-79.88389|
|ARZB3YT1187B9912DD|      Wayman Tisdale|      Fort Worth, TX|    null|     null|
+------------------+--------------------+--------------------+--------+---------+
only showing top 4 rows



##### Use Spark SQL to construct artists table

In [17]:
# use Spark SQL query to create artists dataframe (proto artists table) 
dfSongSource.createOrReplaceTempView("staging_songs") 

artists_table = spark.sql(
"""
SELECT artist_id, 
       MIN(artist_name) AS name, 
       MIN(artist_location) AS location, 
       MIN(artist_latitude) AS latitude, 
       MIN(artist_longitude) AS longitude
  FROM staging_songs
 WHERE 1=1
 GROUP BY artist_id  
""")

artists_table.describe("artist_id").show() 
artists_table.show(4)

+-------+------------------+
|summary|         artist_id|
+-------+------------------+
|  count|               571|
|   mean|              null|
| stddev|              null|
|    min|AR00FVC1187FB5BE3E|
|    max|ARZZXT51187FB4627E|
+-------+------------------+

+------------------+------------+---------------+--------+----------+
|         artist_id|        name|       location|latitude| longitude|
+------------------+------------+---------------+--------+----------+
|AR46CAD1187FB4D84B|Bad Religion|Los Angeles, Ca|34.05349|-118.24532|
|ARGGPT11187B98D573|Dressy Bessy|     Denver, CO|39.74001|-104.99226|
|ARPDB5W1187FB430D1|   Tall Firs|               |    null|      null|
|AR5UVIM1187B98A8B9|    Dropline|               |    null|      null|
+------------------+------------+---------------+--------+----------+
only showing top 4 rows



##### Add unknown dummy row to artists dataframe

In [18]:
# add unknown dummy row to artists dataframe
unknownArtistRow = spark.createDataFrame([('***UNKNOWN_ARTIST***', '*** Unknown Artist ***', '', 0.0, 0.0)])
artists_table = artists_table.union(unknownArtistRow)

In [19]:
artists_table.printSchema()
artists_table.describe("artist_id").show() 
artists_table.show(4)

root
 |-- artist_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- location: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)

+-------+--------------------+
|summary|           artist_id|
+-------+--------------------+
|  count|                 572|
|   mean|                null|
| stddev|                null|
|    min|***UNKNOWN_ARTIST***|
|    max|  ARZZXT51187FB4627E|
+-------+--------------------+

+------------------+------------+---------------+--------+----------+
|         artist_id|        name|       location|latitude| longitude|
+------------------+------------+---------------+--------+----------+
|AR46CAD1187FB4D84B|Bad Religion|Los Angeles, Ca|34.05349|-118.24532|
|ARGGPT11187B98D573|Dressy Bessy|     Denver, CO|39.74001|-104.99226|
|ARPDB5W1187FB430D1|   Tall Firs|               |    null|      null|
|AR5UVIM1187B98A8B9|    Dropline|               |    null|      null|
+------------------+------

In [20]:
# write artists table to parquet files
artists_table.write.format("parquet").mode("overwrite").save(output_data + "artists.parquet")

In [21]:
# read what's in the artists table
dfCheckArtists = spark.read.parquet(output_data + "artists.parquet")

dfCheckArtists.printSchema()
dfCheckArtists.describe("artist_id").show() 
dfCheckArtists.limit(4).toPandas()

root
 |-- artist_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- location: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)

+-------+--------------------+
|summary|           artist_id|
+-------+--------------------+
|  count|                 572|
|   mean|                null|
| stddev|                null|
|    min|***UNKNOWN_ARTIST***|
|    max|  ARZZXT51187FB4627E|
+-------+--------------------+



Unnamed: 0,artist_id,name,location,latitude,longitude
0,ARJW66R1187FB3C9B3,Mike Oldfield,"Reading, Berkshire, England",,
1,ARM1IMV1187B995003,Unwound,"Tumwater, WA",46.98538,-122.91227
2,ARNE16X1187B9B2DEE,Jimmy Cliff,"St. Catherine, Jamaica",18.04552,-77.02659
3,ARTXGGI1187B9B3D58,Blackalicious,"Davis, CA",38.54666,-121.74477


##### Create song keys xref utility table

In [42]:
# extract columns to create song keys file dataframe (proto song_keys table)
song_keys_table = dfSongSource.select(["song_id", "title", "duration", "artist_id", "artist_name"]).dropDuplicates()

# add unknown dummy row to artists dataframe
unknownSongKeyRow = spark.createDataFrame( \
                       [('***UNKNOWN_SONG***', '*** Unknown Song ***', 0.0, '***UNKNOWN_ARTIST***', '*** Unknown Artist ***')])
song_keys_table = song_keys_table.union(unknownSongKeyRow)

song_keys_table.printSchema()
song_keys_table.describe("song_id").show()
song_keys_table.show(4)

# write song keys table to parquet files 
song_keys_table.write.format("parquet").mode("overwrite").save(output_data + "song_keys.parquet")

root
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- artist_name: string (nullable = true)

+-------+------------------+
|summary|           song_id|
+-------+------------------+
|  count|               586|
|   mean|              null|
| stddev|              null|
|    min|***UNKNOWN_SONG***|
|    max|SOZYUKG12A6D4FB64F|
+-------+------------------+

+------------------+--------------------+---------+------------------+--------------------+
|           song_id|               title| duration|         artist_id|         artist_name|
+------------------+--------------------+---------+------------------+--------------------+
|SOMJTLW12A8C13C687|I'm Too Heavy For...|187.24526|AR6OQMY1187FB485F2|       Charlie Drake|
|SOLNMRI12A6D4F82C3|I Feel So Good (2...|203.12771|ARSTKIT1187B9A37A8|    Richard Thompson|
|SOLWPHG12A8C13C0FC|   Happiness Nuggets|133.11955|ARMWIHP1187B98C7D5|Co

In [43]:
# read what's in the song keys table
dfCheckSongKeys = spark.read.parquet(output_data + "song_keys.parquet")

dfCheckSongKeys.printSchema()
dfCheckSongKeys.describe("song_id").show() 
dfCheckSongKeys.limit(4).toPandas()

root
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- artist_name: string (nullable = true)

+-------+------------------+
|summary|           song_id|
+-------+------------------+
|  count|               586|
|   mean|              null|
| stddev|              null|
|    min|***UNKNOWN_SONG***|
|    max|SOZYUKG12A6D4FB64F|
+-------+------------------+



Unnamed: 0,song_id,title,duration,artist_id,artist_name
0,SONFXQX12AB01872A4,Up To Zion (High Key-Premiere Performance Plus...,171.25832,ARGIABO1187FB4B3B5,Ronnie Milsap
1,SOQXUNA12A6D4F9AFB,The One I've Been Waiting For (Diamond Days Al...,252.62975,AROPC1B1187FB567E3,Out Of The Grey
2,SOXUEGX12A8C13C161,Operation: M.O.V.E.,625.05751,AR8JO2B1187B98EBB6,Leftover Crack
3,SOINDRZ12A6701DAF6,Romantic Rights,195.29098,AR4R1FL1187FB55C3B,Death From Above 1979


#### Work the log/event source 

In [7]:
# get filepath to log data file
log_data = input_data + 'log_data/*/*/2018-11-2*.json'  # 'log_data/*/*/*.json'  # 'log_data/*/*/2018-11-01-events.json'

dfLogSource = spark.read.json(log_data)

dfLogSource.printSchema()
dfLogSource.describe("userId").show()
dfLogSource.limit(4).toPandas()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)

+-------+------------------+
|summary|            userId|
+-------+------------------+
|  count|              3082|
|   mean|57.536967632027256|
| stddev| 28.36084156649609|
|    min|                  |
|    max|                98|
+-------+------------------+



Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,,Logged Out,,,0,,,free,,PUT,Login,,741,,307,1542760054796,,
1,,Logged In,Theodore,M,1,Smith,,free,"Houston-The Woodlands-Sugar Land, TX",GET,Home,1540306000000.0,741,,200,1542760086796,Mozilla/5.0 (Windows NT 6.1; WOW64; rv:31.0) G...,52.0
2,,Logged Out,,,0,,,paid,,GET,Home,,774,,200,1542761399796,,
3,,Logged Out,,,1,,,paid,,GET,Home,,774,,200,1542761485796,,


In [8]:
# filter log data by actions for song plays
dfLogSource = dfLogSource.where(dfLogSource.page == 'NextSong')   # filter dataframe to only include rows with page == 'NextSong' 

# alternate syntax: 
#dfLogSource = dfLogSource[dfLogSource.page == 'NextSong']]

dfLogSource.describe("sessionId").show()

+-------+-----------------+
|summary|        sessionId|
+-------+-----------------+
|  count|             2589|
|   mean|824.8651989185014|
| stddev|182.4384717208562|
|    min|               21|
|    max|             1069|
+-------+-----------------+



##### Modify log dataframe to prepare users table

In [9]:
# convert userID from string to integer
#dfLogSource = dfLogSource.withColumn('userID', dfLogSource['userID'].cast(IntegerType()))
dfLogSource = dfLogSource.withColumn('userID', expr("cast(userID as int)"))

dfLogSource.printSchema()
dfLogSource.describe("userID").show()
dfLogSource.limit(4).toPandas()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userID: integer (nullable = true)

+-------+------------------+
|summary|            userID|
+-------+------------------+
|  count|              2589|
|   mean|57.779451525685595|
| stddev|28.314440166735164|
|    min|                 4|
|    max|               101|
+-------+------------------+



Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userID
0,Facto Delafe y las flores azules,Logged In,Tegan,F,4,Levine,315.81995,paid,"Portland-South Portland, ME",PUT,NextSong,1540794000000.0,774,Enero en la playa,200,1542761878796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",80
1,Kings Of Leon,Logged In,Kate,F,0,Harrell,204.2771,paid,"Lansing-East Lansing, MI",PUT,NextSong,1540473000000.0,671,Manhattan,200,1542761921796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",97
2,Franz Ferdinand,Logged In,Kate,F,1,Harrell,204.12036,paid,"Lansing-East Lansing, MI",PUT,NextSong,1540473000000.0,671,Michael,200,1542762125796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",97
3,Blue October,Logged In,Tegan,F,5,Levine,272.32608,paid,"Portland-South Portland, ME",PUT,NextSong,1540794000000.0,774,Drilled A Wire Through My Cheek,200,1542762193796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",80


##### Use Spark dataframes to construct users table

In [26]:
# extract columns for users table
users_table = dfLogSource.select(['userID', 'firstName', 'lastName', 'gender', 'level']) \
                         .dropDuplicates() \
                         .withColumnRenamed("userID", "user_id") \
                         .withColumnRenamed("firstName", "first_name") \
                         .withColumnRenamed("lastName", "last_name") 

users_table.describe("user_id").show()
users_table.limit(4).toPandas()

+-------+------------------+
|summary|           user_id|
+-------+------------------+
|  count|                78|
|   mean|50.666666666666664|
| stddev|29.231128518394126|
|    min|                 2|
|    max|               101|
+-------+------------------+



Unnamed: 0,user_id,first_name,last_name,gender,level
0,20,Aiden,Ramirez,M,paid
1,73,Jacob,Klein,M,paid
2,47,Kimber,Norris,F,free
3,7,Adelyn,Jordan,F,free


In [27]:
# extract columns for users table
# alternate syntax:
users_table = dfLogSource.select(col('userId').alias('user_id'), \
                                 col('firstName').alias('first_name'), \
                                 col('lastName').alias('last_name'), \
                                 'gender', 'level') \
                         .dropDuplicates()
   
users_table.describe("user_id").show()
users_table.limit(4).toPandas()

+-------+------------------+
|summary|           user_id|
+-------+------------------+
|  count|                78|
|   mean|50.666666666666664|
| stddev|29.231128518394126|
|    min|                 2|
|    max|               101|
+-------+------------------+



Unnamed: 0,user_id,first_name,last_name,gender,level
0,9,Wyatt,Scott,M,free
1,84,Shakira,Hunt,F,free
2,3,Isaac,Valdez,M,free
3,89,Kynnedi,Sanchez,F,free


##### Use Spark SQL to construct users table

In [10]:
# use Spark SQL query to create users dataframe (proto users table) 
dfLogSource.createOrReplaceTempView("staging_log") 

users_table = spark.sql(
"""
SELECT userID AS user_id, 
       MIN(firstName) AS first_name, 
       MIN(lastName) AS last_name, 
       MIN(gender) AS gender, 
       MIN(level) AS level
  FROM staging_log
 GROUP BY userID  
""")

users_table.describe("user_id").show() 
users_table.show(4)

+-------+-----------------+
|summary|          user_id|
+-------+-----------------+
|  count|               82|
|   mean|52.84146341463415|
| stddev|28.77690668445456|
|    min|                4|
|    max|              101|
+-------+-----------------+

+-------+----------+---------+------+-----+
|user_id|first_name|last_name|gender|level|
+-------+----------+---------+------+-----+
|     85|   Kinsley|    Young|     F| paid|
|     65|     Amiya| Davidson|     F| paid|
|     53|   Celeste| Williams|     F| free|
|     78|     Chloe|     Roth|     F| free|
+-------+----------+---------+------+-----+
only showing top 4 rows



In [31]:
# write users table to parquet files
users_table.write.format("parquet").mode("overwrite").save(output_data + "users.parquet")

In [32]:
# read what's in the users table
dfCheckUsers = spark.read.parquet(output_data + "users.parquet")

dfCheckUsers.printSchema()
dfCheckUsers.describe("user_id").show() 
#dfCheckUsers.show(4)
dfCheckUsers.limit(4).toPandas()

root
 |-- user_id: integer (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- level: string (nullable = true)

+-------+------------------+
|summary|           user_id|
+-------+------------------+
|  count|                82|
|   mean| 52.84146341463415|
| stddev|28.776906684454563|
|    min|                 4|
|    max|               101|
+-------+------------------+



Unnamed: 0,user_id,first_name,last_name,gender,level
0,88,Mohammad,Rodriguez,M,paid
1,53,Celeste,Williams,F,free
2,75,Joseph,Gutierrez,M,free
3,29,Jacqueline,Lynch,F,paid


##### Modify log dataframe to source time table

In [34]:
# create timestamp column start_time from original timestamp column ts using udf()
# alternate syntax:
get_timestamp = udf(lambda x: datetime.fromtimestamp(x / 1000.0), TimestampType())

dfLogSource = dfLogSource.withColumn("start_time", get_timestamp(dfLogSource.ts))

dfLogSource.printSchema()
dfLogSource.describe("start_time").show()
dfLogSource.limit(4).toPandas()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userID: integer (nullable = true)
 |-- start_time: timestamp (nullable = true)

+-------+
|summary|
+-------+
|  count|
|   mean|
| stddev|
|    min|
|    max|
+-------+



Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userID,start_time
0,Facto Delafe y las flores azules,Logged In,Tegan,F,4,Levine,315.81995,paid,"Portland-South Portland, ME",PUT,NextSong,1540794000000.0,774,Enero en la playa,200,1542761878796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",80,2018-11-21 00:57:58.796
1,Kings Of Leon,Logged In,Kate,F,0,Harrell,204.2771,paid,"Lansing-East Lansing, MI",PUT,NextSong,1540473000000.0,671,Manhattan,200,1542761921796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",97,2018-11-21 00:58:41.796
2,Franz Ferdinand,Logged In,Kate,F,1,Harrell,204.12036,paid,"Lansing-East Lansing, MI",PUT,NextSong,1540473000000.0,671,Michael,200,1542762125796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",97,2018-11-21 01:02:05.796
3,Blue October,Logged In,Tegan,F,5,Levine,272.32608,paid,"Portland-South Portland, ME",PUT,NextSong,1540794000000.0,774,Drilled A Wire Through My Cheek,200,1542762193796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",80,2018-11-21 01:03:13.796


In [11]:
# create timestamp column start_time from original timestamp column ts NOT using udf()
# note that this method has a side effect of stripping microseconds from start_time timestamp
dfLogSource = dfLogSource.withColumn("start_time", to_timestamp(from_unixtime(dfLogSource.ts/1000)))

dfLogSource.printSchema()
dfLogSource.describe("start_time").show()
dfLogSource.limit(4).toPandas()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userID: integer (nullable = true)
 |-- start_time: timestamp (nullable = true)

+-------+
|summary|
+-------+
|  count|
|   mean|
| stddev|
|    min|
|    max|
+-------+



Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userID,start_time
0,Facto Delafe y las flores azules,Logged In,Tegan,F,4,Levine,315.81995,paid,"Portland-South Portland, ME",PUT,NextSong,1540794000000.0,774,Enero en la playa,200,1542761878796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",80,2018-11-21 00:57:58
1,Kings Of Leon,Logged In,Kate,F,0,Harrell,204.2771,paid,"Lansing-East Lansing, MI",PUT,NextSong,1540473000000.0,671,Manhattan,200,1542761921796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",97,2018-11-21 00:58:41
2,Franz Ferdinand,Logged In,Kate,F,1,Harrell,204.12036,paid,"Lansing-East Lansing, MI",PUT,NextSong,1540473000000.0,671,Michael,200,1542762125796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",97,2018-11-21 01:02:05
3,Blue October,Logged In,Tegan,F,5,Levine,272.32608,paid,"Portland-South Portland, ME",PUT,NextSong,1540794000000.0,774,Drilled A Wire Through My Cheek,200,1542762193796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",80,2018-11-21 01:03:13


##### Use Spark dataframe to construct time table

In [35]:
# extract columns to create time table  
# use Spark dataframe to create time dataframe (pre time table) 
time_table = dfLogSource.select("start_time") \
                        .withColumn("hour",    hour(dfLogSource.start_time)) \
                        .withColumn("day",     dayofmonth(dfLogSource.start_time)) \
                        .withColumn("week",    weekofyear(dfLogSource.start_time)) \
                        .withColumn("month",   month(dfLogSource.start_time)) \
                        .withColumn("year",    year(dfLogSource.start_time)) \
                        .withColumn("weekday", dayofweek(dfLogSource.start_time)) \
                        .dropDuplicates()

time_table.printSchema()
time_table.describe("hour").show()
time_table.limit(10).toPandas()

root
 |-- start_time: timestamp (nullable = true)
 |-- hour: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- weekday: integer (nullable = true)

+-------+------------------+
|summary|              hour|
+-------+------------------+
|  count|              1507|
|   mean|13.594558725945587|
| stddev| 5.563784156889746|
|    min|                 0|
|    max|                23|
+-------+------------------+



Unnamed: 0,start_time,hour,day,week,month,year,weekday
0,2018-11-05 17:00:15,17,5,45,11,2018,2
1,2018-11-09 01:06:45,1,9,45,11,2018,6
2,2018-11-09 19:30:35,19,9,45,11,2018,6
3,2018-11-06 21:40:22,21,6,45,11,2018,3
4,2018-11-02 11:04:31,11,2,44,11,2018,6
5,2018-11-05 10:14:45,10,5,45,11,2018,2
6,2018-11-05 18:07:49,18,5,45,11,2018,2
7,2018-11-09 18:08:26,18,9,45,11,2018,6
8,2018-11-09 18:44:28,18,9,45,11,2018,6
9,2018-11-08 12:00:00,12,8,45,11,2018,5


##### Use Spark SQL to construct time table

In [36]:
# use Spark SQL query to create time dataframe (pre time table) 
dfLogSource.createOrReplaceTempView("staging_log") 

time_table = spark.sql(
"""
SELECT DISTINCT start_time,
                hour(start_time)       AS hour,
                day(start_time)        AS day,
                weekofyear(start_time) AS week,
                month(start_time)      AS month,
                year(start_time)       AS year,
                dayofweek(start_time)  AS weekday
  FROM staging_log
 WHERE 1=1
""")

time_table.printSchema()
time_table.describe("hour").show()
time_table.limit(10).toPandas()

root
 |-- start_time: timestamp (nullable = true)
 |-- hour: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- weekday: integer (nullable = true)

+-------+------------------+
|summary|              hour|
+-------+------------------+
|  count|              2584|
|   mean|13.234907120743035|
| stddev| 6.223244283071215|
|    min|                 0|
|    max|                23|
+-------+------------------+



Unnamed: 0,start_time,hour,day,week,month,year,weekday
0,2018-11-21 05:10:47,5,21,47,11,2018,4
1,2018-11-21 12:10:49,12,21,47,11,2018,4
2,2018-11-28 09:32:21,9,28,48,11,2018,4
3,2018-11-28 10:59:47,10,28,48,11,2018,4
4,2018-11-28 23:48:49,23,28,48,11,2018,4
5,2018-11-20 11:20:31,11,20,47,11,2018,3
6,2018-11-24 19:02:01,19,24,47,11,2018,7
7,2018-11-29 16:31:19,16,29,48,11,2018,5
8,2018-11-27 12:26:36,12,27,48,11,2018,3
9,2018-11-23 12:33:54,12,23,47,11,2018,6


In [37]:
# write time table to parquet files partitioned by year and month
time_table.write.partitionBy("year", "month").format("parquet").mode("overwrite").save(output_data + "time.parquet")

In [38]:
# read what's in the time table
dfCheckTime = spark.read.parquet(output_data + "time.parquet")

dfCheckTime.printSchema()
dfCheckTime.describe("hour").show() 
dfCheckTime.limit(4).toPandas()

root
 |-- start_time: timestamp (nullable = true)
 |-- hour: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- weekday: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)

+-------+------------------+
|summary|              hour|
+-------+------------------+
|  count|              2584|
|   mean|13.234907120743035|
| stddev| 6.223244283071213|
|    min|                 0|
|    max|                23|
+-------+------------------+



Unnamed: 0,start_time,hour,day,week,weekday,year,month
0,2018-11-21 03:31:42,3,21,47,4,2018,11
1,2018-11-21 08:25:43,8,21,47,4,2018,11
2,2018-11-21 10:07:55,10,21,47,4,2018,11
3,2018-11-21 21:11:41,21,21,47,4,2018,11


##### Hack songplays

In [56]:
# enhance log source to include synthetic primary key songplay_id
import hashlib

#get_songplay_id = udf(lambda x, y, z: hashlib.md5((str(x) + str(y) + str(z)).encode('utf-8')).hexdigest(), StringType())
#get_songplay_id = udf(lambda x, y, z: (str(x) + str(y) + str(z)), StringType())
get_songplay_id = udf(lambda x, y, z: (f"{x:06}.{y:06}.{z}"), StringType())

dfLogSource = dfLogSource.withColumn("songplay_id", get_songplay_id(dfLogSource.userID, dfLogSource.sessionId, dfLogSource.ts) ) 

dfLogSource.describe("songplay_id").show() 
dfLogSource.limit(4).toPandas()

+-------+--------------------+
|summary|         songplay_id|
+-------+--------------------+
|  count|                2589|
|   mean|                null|
| stddev|                null|
|    min|004.1054.15435111...|
|    max|101.955.154340334...|
+-------+--------------------+



Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userID,start_time,songplay_id
0,Facto Delafe y las flores azules,Logged In,Tegan,F,4,Levine,315.81995,paid,"Portland-South Portland, ME",PUT,NextSong,1540794000000.0,774,Enero en la playa,200,1542761878796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",80,2018-11-21 00:57:58,080.774.1542761878796
1,Kings Of Leon,Logged In,Kate,F,0,Harrell,204.2771,paid,"Lansing-East Lansing, MI",PUT,NextSong,1540473000000.0,671,Manhattan,200,1542761921796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",97,2018-11-21 00:58:41,097.671.1542761921796
2,Franz Ferdinand,Logged In,Kate,F,1,Harrell,204.12036,paid,"Lansing-East Lansing, MI",PUT,NextSong,1540473000000.0,671,Michael,200,1542762125796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",97,2018-11-21 01:02:05,097.671.1542762125796
3,Blue October,Logged In,Tegan,F,5,Levine,272.32608,paid,"Portland-South Portland, ME",PUT,NextSong,1540794000000.0,774,Drilled A Wire Through My Cheek,200,1542762193796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",80,2018-11-21 01:03:13,080.774.1542762193796


In [13]:
dfLogSource.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userID: integer (nullable = true)
 |-- start_time: timestamp (nullable = true)



In [39]:
# enhance log source to include hour and year to facilitate partitioning below
dfLogSource = dfLogSource.withColumn("year",    year(dfLogSource.start_time)) \
                         .withColumn("month",   month(dfLogSource.start_time))


In [27]:
# 1st attempt pt 1

# read in song data to use for songplays table   
songs_table = spark.read.parquet(output_data + "songs.parquet")

# register temp view for songs dimension
songs_table.createOrReplaceTempView("songs") 

# read in artist data to use for songplays table   
artists_table = spark.read.parquet(output_data + "artists.parquet")

# register temp view for artists dimension
artists_table.createOrReplaceTempView("artists") 

In [29]:
# 1st attempt pt 2

# extract columns from joined song, artists and log datasets to create songplays table
# use Spark SQL query to create songplays dataframe (proto songplays table) 
dfLogSource.createOrReplaceTempView("staging_log") 

songplays_table = spark.sql(
"""
SELECT e.start_time,
       e.year,
       e.month,
       e.userID as user_id,
       e.level,
       COALESCE(s.song_id, '***UNKNOWN_SONG***') as song_id,
       COALESCE(a.artist_id, '***UNKNOWN_ARTIST***') as artist_id,
       e.sessionid as session_id,
       e.location,
       e.useragent as user_agent
  FROM staging_log e
  LEFT OUTER JOIN songs s ON e.song = s.title
  LEFT OUTER JOIN artists a ON e.artist = a.name
 WHERE page = 'NextSong'
""")

songplays_table.printSchema()
songplays_table.describe("session_id").show()
songplays_table.limit(10).toPandas()

root
 |-- start_time: timestamp (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- level: string (nullable = true)
 |-- song_id: string (nullable = false)
 |-- artist_id: string (nullable = false)
 |-- session_id: long (nullable = true)
 |-- location: string (nullable = true)
 |-- user_agent: string (nullable = true)

+-------+-----------------+
|summary|       session_id|
+-------+-----------------+
|  count|               11|
|   mean|129.0909090909091|
| stddev|43.75032467411995|
|    min|                9|
|    max|              169|
+-------+-----------------+



Unnamed: 0,start_time,year,month,user_id,level,song_id,artist_id,session_id,location,user_agent
0,2018-11-01 21:01:46,2018,11,8,free,***UNKNOWN_SONG***,***UNKNOWN_ARTIST***,139,"Phoenix-Mesa-Scottsdale, AZ","""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK..."
1,2018-11-01 21:05:52,2018,11,8,free,***UNKNOWN_SONG***,***UNKNOWN_ARTIST***,139,"Phoenix-Mesa-Scottsdale, AZ","""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK..."
2,2018-11-01 21:08:16,2018,11,8,free,***UNKNOWN_SONG***,***UNKNOWN_ARTIST***,139,"Phoenix-Mesa-Scottsdale, AZ","""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK..."
3,2018-11-01 21:11:13,2018,11,8,free,***UNKNOWN_SONG***,***UNKNOWN_ARTIST***,139,"Phoenix-Mesa-Scottsdale, AZ","""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK..."
4,2018-11-01 21:17:33,2018,11,8,free,***UNKNOWN_SONG***,***UNKNOWN_ARTIST***,139,"Phoenix-Mesa-Scottsdale, AZ","""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK..."
5,2018-11-01 21:24:53,2018,11,8,free,***UNKNOWN_SONG***,***UNKNOWN_ARTIST***,139,"Phoenix-Mesa-Scottsdale, AZ","""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK..."
6,2018-11-01 21:28:54,2018,11,8,free,***UNKNOWN_SONG***,***UNKNOWN_ARTIST***,139,"Phoenix-Mesa-Scottsdale, AZ","""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK..."
7,2018-11-01 21:42:00,2018,11,10,free,***UNKNOWN_SONG***,***UNKNOWN_ARTIST***,9,"Washington-Arlington-Alexandria, DC-VA-MD-WV","""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4..."
8,2018-11-01 21:52:05,2018,11,26,free,***UNKNOWN_SONG***,***UNKNOWN_ARTIST***,169,"San Jose-Sunnyvale-Santa Clara, CA","""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5..."
9,2018-11-01 21:55:25,2018,11,26,free,***UNKNOWN_SONG***,***UNKNOWN_ARTIST***,169,"San Jose-Sunnyvale-Santa Clara, CA","""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5..."


In [44]:
# 2nd attempt pt 1

# read in song keys data to use for songplays table   
song_keys_table = spark.read.parquet(output_data + "song_keys.parquet")

song_keys_table.printSchema()
song_keys_table.describe("song_id").show()

# register temp view for song_keys utility table
song_keys_table.createOrReplaceTempView("song_keys") 

root
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- artist_name: string (nullable = true)

+-------+------------------+
|summary|           song_id|
+-------+------------------+
|  count|               586|
|   mean|              null|
| stddev|              null|
|    min|***UNKNOWN_SONG***|
|    max|SOZYUKG12A6D4FB64F|
+-------+------------------+



In [46]:
# 2nd attempt pt 2

# extract columns from joined song, artists and log datasets to create songplays table
# use Spark SQL query to create songplays dataframe (proto songplays table) 
dfLogSource.createOrReplaceTempView("staging_log") 

songplays_table = spark.sql(
"""
SELECT e.start_time,
       e.year,
       e.month,
       e.userID as user_id,
       e.level,
       COALESCE(s.song_id, '***UNKNOWN_SONG***') as song_id,
       COALESCE(s.artist_id, '***UNKNOWN_ARTIST***') as artist_id,
       e.sessionid as session_id,
       e.location,
       e.useragent as user_agent
  FROM staging_log e
  LEFT OUTER JOIN song_keys s ON e.song = s.title AND e.artist = s.artist_name and e.length = s.duration
 WHERE e.page = 'NextSong'
""")

songplays_table.printSchema()
songplays_table.describe("session_id").show()
songplays_table.limit(10).toPandas()

root
 |-- start_time: timestamp (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- level: string (nullable = true)
 |-- song_id: string (nullable = false)
 |-- artist_id: string (nullable = false)
 |-- session_id: long (nullable = true)
 |-- location: string (nullable = true)
 |-- user_agent: string (nullable = true)

+-------+-----------------+
|summary|       session_id|
+-------+-----------------+
|  count|             2589|
|   mean|824.8651989185014|
| stddev|182.4384717208562|
|    min|               21|
|    max|             1069|
+-------+-----------------+



Unnamed: 0,start_time,year,month,user_id,level,song_id,artist_id,session_id,location,user_agent
0,2018-11-21 00:57:58,2018,11,80,paid,***UNKNOWN_SONG***,***UNKNOWN_ARTIST***,774,"Portland-South Portland, ME","""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4..."
1,2018-11-21 00:58:41,2018,11,97,paid,***UNKNOWN_SONG***,***UNKNOWN_ARTIST***,671,"Lansing-East Lansing, MI","""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5..."
2,2018-11-21 01:02:05,2018,11,97,paid,***UNKNOWN_SONG***,***UNKNOWN_ARTIST***,671,"Lansing-East Lansing, MI","""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5..."
3,2018-11-21 01:03:13,2018,11,80,paid,***UNKNOWN_SONG***,***UNKNOWN_ARTIST***,774,"Portland-South Portland, ME","""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4..."
4,2018-11-21 01:05:29,2018,11,97,paid,***UNKNOWN_SONG***,***UNKNOWN_ARTIST***,671,"Lansing-East Lansing, MI","""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5..."
5,2018-11-21 01:09:37,2018,11,97,paid,***UNKNOWN_SONG***,***UNKNOWN_ARTIST***,671,"Lansing-East Lansing, MI","""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5..."
6,2018-11-21 01:13:22,2018,11,97,paid,***UNKNOWN_SONG***,***UNKNOWN_ARTIST***,671,"Lansing-East Lansing, MI","""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5..."
7,2018-11-21 01:16:52,2018,11,97,paid,***UNKNOWN_SONG***,***UNKNOWN_ARTIST***,671,"Lansing-East Lansing, MI","""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5..."
8,2018-11-21 01:20:24,2018,11,97,paid,***UNKNOWN_SONG***,***UNKNOWN_ARTIST***,671,"Lansing-East Lansing, MI","""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5..."
9,2018-11-21 01:24:03,2018,11,97,paid,***UNKNOWN_SONG***,***UNKNOWN_ARTIST***,671,"Lansing-East Lansing, MI","""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5..."


In [47]:
# write songplays table to parquet files partitioned by year and month
# note that in product code this should have an append option
songplays_table.write.partitionBy("year", "month").format("parquet").mode("overwrite").save(output_data + "songplays.parquet")