In [1]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, hour, weekofyear, date_format, dayofyear, dayofweek
from pyspark.sql.types import DecimalType, ShortType, StringType, IntegerType, LongType, TimestampType
from pyspark.sql.types import StructField, StructType
import pandas as pd

In [2]:
# Get credentials
config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['DEFAULT']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['DEFAULT']['AWS_SECRET_ACCESS_KEY']

In [3]:
# Create spark session, the entry point for accessing spark utilities
# This might take a few seconds to import packages
spark = SparkSession.builder.config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0").getOrCreate()

In [4]:
# input data paths
log_data = "./data/log-data"
song_data = "./data/song_data"

# output data paths
out_artist_path = "./data/out/artists"
out_song_path = "./data/out/songs"
out_user_path = "./data/out/users"
out_time_path = "./data/out/time"
out_songplay_path = "./data/out/songplays"

# Explore song data
Basic Steps:
1. Load JSON with explicit schema
2. Extract data for songs and artists tables
3. Clean the data

## Load Song Data

In [5]:
schema_parse_song = StructType([
    StructField('__corrupted', StringType(), True),
    StructField('song_id', StringType(), True),
    StructField('title', StringType(), True),
    StructField('artist_id', StringType(), True),
    StructField('year', ShortType(), True),
    StructField('duration', DecimalType(10,5), True),
    StructField('artist_name', StringType(), True),
    StructField('artist_location', StringType(), True),
    StructField('artist_latitude', DecimalType(9,6), True),
    StructField('artist_longitude', DecimalType(9,6), True)
])

df_parsed_song = spark.read.json(song_data, 
                                 schema = schema_parse_song, 
                                 recursiveFileLookup=True, 
                                 mode = 'PERMISSIVE',
                                 columnNameOfCorruptRecord='__corrupted')

In [6]:
df_song_corrupted = df_parsed_song.filter("__corrupted is NOT NULL").persist()
print("Number of corrupted records: {}".format(df_song_corrupted.count()))
df_song_corrupted.limit(5).toPandas()

Number of corrupted records: 0


Unnamed: 0,__corrupted,song_id,title,artist_id,year,duration,artist_name,artist_location,artist_latitude,artist_longitude


In [7]:
df_song_typed = df_parsed_song.filter("__corrupted is NULL").drop("__corrupted").persist()
print("Number of correct records: {}".format(df_song_typed.count()))
df_song_typed.printSchema()
df_song_typed.limit(5).toPandas()

Number of correct records: 71
root
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- year: short (nullable = true)
 |-- duration: decimal(10,5) (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_latitude: decimal(9,6) (nullable = true)
 |-- artist_longitude: decimal(9,6) (nullable = true)



Unnamed: 0,song_id,title,artist_id,year,duration,artist_name,artist_location,artist_latitude,artist_longitude
0,SOBAYLL12A8C138AF9,Sono andati? Fingevo di dormire,ARDR4AC1187FB371A1,0,511.16363,Montserrat Caballé;Placido Domingo;Vicente Sar...,,,
1,SOOLYAZ12A6701F4A6,Laws Patrolling (Album Version),AREBBGV1187FB523D2,0,173.66159,Mike Jones (Featuring CJ_ Mello & Lil' Bran),"Houston, TX",,
2,SOBBUGU12A8C13E95D,Setting Fire to Sleeping Giants,ARMAC4T1187FB3FA4C,2004,207.77751,The Dillinger Escape Plan,"Morris Plains, NJ",40.82624,-74.47995
3,SOAOIBZ12AB01815BE,I Hold Your Hand In Mine [Live At Royal Albert...,ARPBNLO1187FB3D52F,2000,43.36281,Tiny Tim,"New York, NY",40.71455,-74.00712
4,SONYPOM12A8C13B2D7,I Think My Wife Is Running Around On Me (Taco ...,ARDNS031187B9924F0,2005,186.48771,Tim Wilson,Georgia,32.67828,-83.22295


### Processing songs
songs table: (song_id, title, artist_id, year, duration)

In [8]:
df_songs = df_song_typed.select(['song_id','title', 'artist_id', 'year','duration'])
df_songs.printSchema()

root
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- year: short (nullable = true)
 |-- duration: decimal(10,5) (nullable = true)



In [9]:
# drop nulls
df_songs = df_songs.dropna(how='any')

# drop empty strings
df_songs = df_songs.filter("song_id != '' AND title != '' AND artist_id != ''")

# truncate song_id, title, artist_id
trunc_ids = udf(lambda x: x[:50])
trunc_title = udf(lambda x: x[:256])
df_songs = df_songs.withColumn('song_id', trunc_ids(df_songs['song_id']))
df_songs = df_songs.withColumn('artist_id', trunc_ids(df_songs['artist_id']))
df_songs = df_songs.withColumn('title', trunc_title(df_songs['title']))

# drop duplicates
df_songs = df_songs.dropDuplicates()

In [10]:
# impose a strong schema for not-nullable fields
songs_schema = StructType([
    StructField('song_id', StringType(), nullable=False),
    StructField('title', StringType(), nullable=False),
    StructField('artist_id', StringType(), nullable=False),
    StructField('year', ShortType(), nullable=False),
    StructField('duration', DecimalType(10,5), nullable=False)
])
songs_final = spark.createDataFrame(df_songs.rdd, songs_schema, verifySchema=True)
print("Number of records: {}".format(songs_final.count()))
songs_final.printSchema()
songs_final.limit(5).toPandas()

Number of records: 71
root
 |-- song_id: string (nullable = false)
 |-- title: string (nullable = false)
 |-- artist_id: string (nullable = false)
 |-- year: short (nullable = false)
 |-- duration: decimal(10,5) (nullable = false)



Unnamed: 0,song_id,title,artist_id,year,duration
0,SOYMRWW12A6D4FAB14,The Moon And I (Ordinary Day Album Version),ARKFYS91187B98E58F,0,267.7024
1,SOUDSGM12AC9618304,Insatiable (Instrumental Version),ARNTLGG11E2835DDB9,0,266.39628
2,SOVYKGO12AB0187199,Crazy Mixed Up World,ARH4Z031187B9A71F2,1961,156.39465
3,SOPEGZN12AB0181B3D,Get Your Head Stuck On Your Neck,AREDL271187FB40F44,0,45.66159
4,SOBKWDJ12A8C13B2F3,Wild Rose (Back 2 Basics Mix),AR36F9J1187FB406F1,0,230.71302


### Processing artists:
artists table:(artist_id, name, location, lattitude, longitude)

In [11]:
df_artists = df_song_typed.select(col('artist_id'),
                                  col('artist_name').alias('name'),
                                  col('artist_location').alias('location'),
                                  col('artist_latitude').alias('lattitude'),
                                  col('artist_longitude').alias('longitude'))
df_artists.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- location: string (nullable = true)
 |-- lattitude: decimal(9,6) (nullable = true)
 |-- longitude: decimal(9,6) (nullable = true)



In [12]:
# drop nulls
df_artists = df_artists.dropna(how='any', subset=['artist_id', 'name'])

# drop empty strings
df_artists = df_artists.filter("artist_id != '' AND name != ''")

# # truncate location, name, artist_id
trunc_loc = udf(lambda x: x[:256])
trunc_name = udf(lambda x: x[:256])
trunc_ids = udf(lambda x: x[:50])
df_artists = df_artists.withColumn('location', trunc_loc(df_artists['location']))
df_artists = df_artists.withColumn('name', trunc_name(df_artists['name']))
df_artists = df_artists.withColumn('artist_id', trunc_ids(df_artists['artist_id']))

# drop duplicates
df_artists = df_artists.dropDuplicates()

In [13]:
# impose a strong schema for not-nullable fields
artists_schema = StructType([
    StructField('artist_id', StringType(), nullable=False),
    StructField('name', StringType(), nullable=False),
    StructField('location', StringType(), nullable=True),
    StructField('lattitude', DecimalType(9,6), nullable=True),
    StructField('longitude', DecimalType(9,6), nullable=True)
])
artists_final = spark.createDataFrame(df_artists.rdd, artists_schema, verifySchema=True)
print("Number of records: {}".format(artists_final.count()))
artists_final.printSchema()
artists_final.limit(5).toPandas()

Number of records: 69
root
 |-- artist_id: string (nullable = false)
 |-- name: string (nullable = false)
 |-- location: string (nullable = true)
 |-- lattitude: decimal(9,6) (nullable = true)
 |-- longitude: decimal(9,6) (nullable = true)



Unnamed: 0,artist_id,name,location,lattitude,longitude
0,ARNNKDK1187B98BBD5,Jinx,Zagreb Croatia,45.80726,15.9676
1,ARAJPHH1187FB5566A,The Shangri-Las,"Queens, NY",40.7038,-73.83168
2,ARXR32B1187FB57099,Gob,,,
3,AROGWRA122988FEE45,Christos Dantis,,,
4,ARI2JSK1187FB496EF,Nick Ingman;Gavyn Wright,"London, England",51.50632,-0.12714


### Save songs, artists to parquet files

In [14]:
songs_final.write.partitionBy('year', 'artist_id').parquet(out_song_path, mode='overwrite')
artists_final.write.parquet(out_artist_path, mode='overwrite')

# Explore log data
Basic Steps:
1. Load JSON with explicit schema
2. Extract data for users, time, songplay tables
3. Clean the data

### Load Data
The following fields are needed:
songplays: songplay_id, start_time, user_id, level, song_id, artist_id, session_id, location, user_agent

users: user_id, first_name, last_name, gender, level

time: start_time, hour, day, week, month, year, weekday

In [15]:
df_raw = spark.read.json(log_data, recursiveFileLookup=True)
df_raw.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [16]:
schema_parse_log = StructType([
    StructField('__corrupted', StringType(), True),
    StructField('artist', StringType(), True),
    StructField('firstName', StringType(), True),
    StructField('gender', StringType(), True),
    StructField('lastName', StringType(), True),
    StructField('level', StringType(), True),
    StructField('location', StringType(), True),
    StructField('page', StringType(), True),
    StructField('sessionId', IntegerType(), True),
    StructField('song', StringType(), True),
    StructField('ts', LongType(), True),
    StructField('userAgent', StringType(), True),
    StructField('userId', StringType(), True)
])

df_log_parsed = spark.read.json(log_data, 
                                schema = schema_parse_log, 
                                recursiveFileLookup=True, 
                                mode = 'PERMISSIVE',
                                columnNameOfCorruptRecord='__corrupted')
print("Number of records: {}".format(df_log_parsed.count()))

Number of records: 8056


In [17]:
df_log_corrupted = df_log_parsed.filter("__corrupted is NOT NULL").persist()
print("Number of corrupted records: {}".format(df_log_corrupted.count()))
df_log_corrupted.limit(5).toPandas()

Number of corrupted records: 0


Unnamed: 0,__corrupted,artist,firstName,gender,lastName,level,location,page,sessionId,song,ts,userAgent,userId


In [18]:
df_log_typed = df_log_parsed.filter("__corrupted is NULL").drop("__corrupted").persist()
print("Number of correct records: {}".format(df_log_typed.count()))
df_log_typed.printSchema()
df_log_typed.limit(5).toPandas()

Number of correct records: 8056
root
 |-- artist: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- lastName: string (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- page: string (nullable = true)
 |-- sessionId: integer (nullable = true)
 |-- song: string (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



Unnamed: 0,artist,firstName,gender,lastName,level,location,page,sessionId,song,ts,userAgent,userId
0,Harmonia,Ryan,M,Smith,free,"San Jose-Sunnyvale-Santa Clara, CA",NextSong,583,Sehr kosmisch,1542241826796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
1,The Prodigy,Ryan,M,Smith,free,"San Jose-Sunnyvale-Santa Clara, CA",NextSong,583,The Big Gundown,1542242481796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
2,Train,Ryan,M,Smith,free,"San Jose-Sunnyvale-Santa Clara, CA",NextSong,583,Marry Me,1542242741796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
3,,Wyatt,M,Scott,free,"Eureka-Arcata-Fortuna, CA",Home,563,,1542247071796,Mozilla/5.0 (Windows NT 6.1; WOW64; Trident/7....,9
4,,Austin,M,Rosales,free,"New York-Newark-Jersey City, NY-NJ-PA",Home,521,,1542252577796,Mozilla/5.0 (Windows NT 6.1; rv:31.0) Gecko/20...,12


### Processing users
users: user_id, first_name, last_name, gender, level

In [19]:
df_users = df_log_typed.select(col('userId').alias('user_id'),
                               col('firstName').alias('first_name'),
                               col('lastName').alias('last_name'),
                               col('gender'),
                               col('level'))
df_users.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- level: string (nullable = true)



In [20]:
# drop nulls
df_users = df_users.dropna(how='any', subset=['user_id', 'first_name', 'last_name'])

# drop empty strings
df_users = df_users.filter("user_id != '' AND first_name != '' AND last_name != ''")

# truncate song_id, title, artist_id
trunc_id_name = udf(lambda x: x[:50], StringType())
df_users = df_users.withColumn('user_id', trunc_id_name(df_users['user_id']))
df_users = df_users.withColumn('first_name', trunc_id_name(df_users['first_name']))
df_users = df_users.withColumn('last_name', trunc_id_name(df_users['last_name']))

# convert user_id to Integer and drop records if type casting failed
df_users = df_users.withColumn('user_id', df_users.user_id.cast(IntegerType()))
df_users = df_users.dropna(how='any', subset=['user_id'])

# drop duplicates only on user_id. This is because there might be a user_id with 2 levels.
# In distributed setting, the ordering of which level is loaded cannot be guaranteed
df_users = df_users.dropDuplicates(subset=['user_id'])

In [21]:
# impose a strong schema for not-nullable fields
users_schema = StructType([
    StructField('user_id', IntegerType(), nullable=False),
    StructField('first_name', StringType(), nullable=False),
    StructField('last_name', StringType(), nullable=False),
    StructField('gender', StringType(), nullable=True),
    StructField('level', StringType(), nullable=True)
])
users_final = spark.createDataFrame(df_users.rdd, users_schema, verifySchema=True)
print("Number of records: {}".format(users_final.count()))
users_final.printSchema()
users_final.limit(5).toPandas()

Number of records: 97
root
 |-- user_id: integer (nullable = false)
 |-- first_name: string (nullable = false)
 |-- last_name: string (nullable = false)
 |-- gender: string (nullable = true)
 |-- level: string (nullable = true)



Unnamed: 0,user_id,first_name,last_name,gender,level
0,85,Kinsley,Young,F,paid
1,65,Amiya,Davidson,F,paid
2,53,Celeste,Williams,F,free
3,78,Chloe,Roth,F,free
4,34,Evelin,Ayala,F,free


### Processing time
time: start_time, hour, day, week, month, year, weekday

In [22]:
# drop NULL and duplicates
df_time = df_log_typed.select('ts').dropna().dropDuplicates()

# convert timestamp
get_timestamp = udf(lambda x: datetime.fromtimestamp(x/1000.0), TimestampType())
df_time = df_time.withColumn('start_time', get_timestamp(df_time.ts))

# drop raw format
df_time = df_time.drop('ts')

# extract fields: hour, dayofyear, weekofyear, month, year, dayofweek
df_time = df_time.withColumn('hour', hour(df_time['start_time']))
df_time = df_time.withColumn('day', dayofyear(df_time['start_time']))
df_time = df_time.withColumn('week', weekofyear(df_time['start_time']))
df_time = df_time.withColumn('month', month(df_time['start_time']))
df_time = df_time.withColumn('year', year(df_time['start_time']))
df_time = df_time.withColumn('weekday', dayofweek(df_time['start_time']))
df_time.printSchema()
df_time.limit(5).toPandas()

root
 |-- start_time: timestamp (nullable = true)
 |-- hour: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- weekday: integer (nullable = true)



Unnamed: 0,start_time,hour,day,week,month,year,weekday
0,2018-11-21 11:13:42.796,11,325,47,11,2018,4
1,2018-11-21 12:01:05.796,12,325,47,11,2018,4
2,2018-11-21 19:34:01.796,19,325,47,11,2018,4
3,2018-11-21 21:44:26.796,21,325,47,11,2018,4
4,2018-11-14 04:37:52.796,4,318,46,11,2018,4


In [23]:
# impose a strong schema for not-nullable fields
time_schema = StructType([
    StructField('start_time', TimestampType(), nullable=False),
    StructField('hour', IntegerType(), nullable=False),
    StructField('day', IntegerType(), nullable=False),
    StructField('week', IntegerType(), nullable=False),
    StructField('month', IntegerType(), nullable=False),
    StructField('year', IntegerType(), nullable=False),
    StructField('weekday', IntegerType(), nullable=False)
])
time_final = spark.createDataFrame(df_time.rdd, time_schema, verifySchema=True)
print("Number of records: {}".format(time_final.count()))
time_final.printSchema()
time_final.limit(5).toPandas()

Number of records: 8023
root
 |-- start_time: timestamp (nullable = false)
 |-- hour: integer (nullable = false)
 |-- day: integer (nullable = false)
 |-- week: integer (nullable = false)
 |-- month: integer (nullable = false)
 |-- year: integer (nullable = false)
 |-- weekday: integer (nullable = false)



Unnamed: 0,start_time,hour,day,week,month,year,weekday
0,2018-11-21 11:13:42.796,11,325,47,11,2018,4
1,2018-11-21 12:01:05.796,12,325,47,11,2018,4
2,2018-11-21 19:34:01.796,19,325,47,11,2018,4
3,2018-11-21 21:44:26.796,21,325,47,11,2018,4
4,2018-11-14 04:37:52.796,4,318,46,11,2018,4


### Processing songplays
songplay_id, start_time, user_id, level, song_id, artist_id, session_id, location, user_agent

* songplay_id is auto-generated
* song_id, artist_id MUST be obtained by joining tables from songs

In [24]:
from pyspark.sql.functions import monotonically_increasing_id

# filter page=NextSong
df_log_sub = df_log_typed.filter("page == 'NextSong'")

# select sub fields
df_log_sub = df_log_sub.select(monotonically_increasing_id().alias('songplay_id'),
                               col('userId').alias('user_id'),
                               col('level'),
                               col('sessionId').alias('session_id'),
                               col('location'),
                               col('userAgent').alias('user_agent'),
                               col('artist'),
                               col('song'),
                               col('ts'))
df_song_sub = df_song_typed.select(col('song_id'),
                                  col('title').alias('song'),
                                  col('artist_id'),
                                  col('artist_name').alias('artist'))

# join tables
df_songplay = df_log_sub.join(df_song_sub, on = ['artist', 'song'], how='inner')
df_songplay = df_songplay.select(['songplay_id', 'ts', 'user_id', 'level', 'song_id', 'artist_id', 'session_id', 'location', 'user_agent'])

# drop nulls and empty strings
df_songplay = df_songplay.dropna(how='any', subset=['user_id', 'song_id', 'artist_id', 'ts'])
df_songplay = df_songplay.filter("user_id != '' AND song_id != '' AND artist_id != ''")

# convert datetime
df_songplay = df_songplay.withColumn('start_time', get_timestamp(df_songplay.ts))
df_songplay = df_songplay.drop('ts')

# convert user_id to Integer and drop records if type casting failed
df_songplay = df_songplay.withColumn('user_id', df_songplay.user_id.cast(IntegerType()))
df_songplay = df_songplay.dropna(how='any', subset=['user_id'])

# truncate long strings
trunc_long_str = udf(lambda x: x[:256])
df_songplay = df_songplay.withColumn('location', trunc_long_str(df_songplay['location']))
df_songplay = df_songplay.withColumn('user_agent', trunc_long_str(df_songplay['user_agent']))

# drop duplicates on all columns except for 'songplay_id' since it's auto-generated
df_songplay = df_songplay.dropDuplicates(subset=['start_time', 'user_id', 'level', 'song_id', 'artist_id', 'session_id', 'location', 'user_agent'])

# join with time table to get year and month
df_songplay = df_songplay.join(time_final, on = ['start_time'], how='inner')
df_songplay = df_songplay.select(['songplay_id', 'start_time', 'year', 'month', 'user_id',
                                 'level', 'song_id', 'artist_id', 'session_id', 'location',
                                 'user_agent'])

df_songplay.printSchema()
df_songplay.limit(5).toPandas()

root
 |-- songplay_id: long (nullable = false)
 |-- start_time: timestamp (nullable = true)
 |-- year: integer (nullable = false)
 |-- month: integer (nullable = false)
 |-- user_id: integer (nullable = true)
 |-- level: string (nullable = true)
 |-- song_id: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- session_id: integer (nullable = true)
 |-- location: string (nullable = true)
 |-- user_agent: string (nullable = true)



Unnamed: 0,songplay_id,start_time,year,month,user_id,level,song_id,artist_id,session_id,location,user_agent
0,882,2018-11-21 22:56:47.796,2018,11,15,paid,SOZCTXZ12AB0182364,AR5KOSW1187FB35FF4,818,"Chicago-Naperville-Elgin, IL-IN-WI","""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5..."


In [25]:
# impose a strong schema for not-nullable fields
songplay_schema = StructType([
    StructField('songplay_id', LongType(), nullable=False),
    StructField('start_time', TimestampType(), nullable=False),
    StructField('year', IntegerType(), nullable=False),
    StructField('month', IntegerType(), nullable=False),
    StructField('user_id', IntegerType(), nullable=False),
    StructField('level', StringType(), nullable=True),
    StructField('song_id', StringType(), nullable=False),
    StructField('artist_id', StringType(), nullable=False),
    StructField('session_id', IntegerType(), nullable=True),
    StructField('location', StringType(), nullable=True),
    StructField('user_agent', StringType(), nullable=True)
])
songplay_final = spark.createDataFrame(df_songplay.rdd, songplay_schema, verifySchema=True)
print("Number of records: {}".format(songplay_final.count()))
songplay_final.printSchema()
songplay_final.limit(5).toPandas()

Number of records: 1
root
 |-- songplay_id: long (nullable = false)
 |-- start_time: timestamp (nullable = false)
 |-- year: integer (nullable = false)
 |-- month: integer (nullable = false)
 |-- user_id: integer (nullable = false)
 |-- level: string (nullable = true)
 |-- song_id: string (nullable = false)
 |-- artist_id: string (nullable = false)
 |-- session_id: integer (nullable = true)
 |-- location: string (nullable = true)
 |-- user_agent: string (nullable = true)



Unnamed: 0,songplay_id,start_time,year,month,user_id,level,song_id,artist_id,session_id,location,user_agent
0,882,2018-11-21 22:56:47.796,2018,11,15,paid,SOZCTXZ12AB0182364,AR5KOSW1187FB35FF4,818,"Chicago-Naperville-Elgin, IL-IN-WI","""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5..."


### Save users, time, songplays to parquet files

In [26]:
users_final.write.parquet(out_user_path, mode='overwrite')
time_final.write.partitionBy('year', 'month').parquet(out_time_path, mode='overwrite')
songplay_final.write.partitionBy('year', 'month').parquet(out_songplay_path, mode='overwrite')