# Sparkify - Data Lake with Spark
The purpose of this notebook is to test the logic before applying it to AWS EMR.

In [1]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format, dayofweek
from pyspark.sql.functions import monotonically_increasing_id

In [1]:
config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['KEYS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['KEYS']['AWS_SECRET_ACCESS_KEY']

In [1]:
spark = SparkSession.builder.config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0").getOrCreate()
hadoop_conf = spark._jsc.hadoopConfiguration()
hadoop_conf.set("fs.s3n.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
hadoop_conf.set("fs.s3n.awsAccessKeyId", config['KEYS']['AWS_ACCESS_KEY_ID'])
hadoop_conf.set("fs.s3n.awsSecretAccessKey", config['KEYS']['AWS_SECRET_ACCESS_KEY'])

In [1]:
input_data = 'data/'
output_data = 'output/'

song_data = input_data + 'song_data/*/*/*/*.json'
log_data = input_data + 'log_data/*.json'

## Song Data

In [1]:
# get filepath to song data file
song_data = input_data + "song_data/*/*/*/*.json"
df = spark.read.json(song_data)

In [1]:
# looking at the schema
df.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



In [1]:
# viewing first five rows
df.take(5)

[Row(artist_id='ARDR4AC1187FB371A1', artist_latitude=None, artist_location='', artist_longitude=None, artist_name='Montserrat Caballé;Placido Domingo;Vicente Sardinero;Judith Blegen;Sherrill Milnes;Georg Solti', duration=511.16363, num_songs=1, song_id='SOBAYLL12A8C138AF9', title='Sono andati? Fingevo di dormire', year=0),
 Row(artist_id='AREBBGV1187FB523D2', artist_latitude=None, artist_location='Houston, TX', artist_longitude=None, artist_name="Mike Jones (Featuring CJ_ Mello & Lil' Bran)", duration=173.66159, num_songs=1, song_id='SOOLYAZ12A6701F4A6', title='Laws Patrolling (Album Version)', year=0),
 Row(artist_id='ARMAC4T1187FB3FA4C', artist_latitude=40.82624, artist_location='Morris Plains, NJ', artist_longitude=-74.47995, artist_name='The Dillinger Escape Plan', duration=207.77751, num_songs=1, song_id='SOBBUGU12A8C13E95D', title='Setting Fire to Sleeping Giants', year=2004),
 Row(artist_id='ARPBNLO1187FB3D52F', artist_latitude=40.71455, artist_location='New York, NY', artist_lo

### Song Table

In [1]:
# extract columns to create songs table
songs_table = df['song_id', 'title', 'artist_id', 'year', 'duration']

In [1]:
# droping songs in-case of duplicates
songs_table = songs_table.drop_duplicates(subset=['song_id'])

In [1]:
# write songs table to parquet files partitioned by year and artist
songs_table.write.partitionBy('year', 'artist_id').parquet(output_data + 'songs/')

### Artist Table

In [1]:
# extract columns to create artists table
artists_table = df['artist_id', 'artist_name', 'artist_location', 'artist_latitude', 'artist_longitude']

In [1]:
# droping artists in-case of duplicates
artists_table = artists_table.drop_duplicates(subset=['artist_id'])

In [1]:
# write artists table to parquet files
artists_table.write.parquet(output_data + 'artists/')

## Log Data

In [1]:
# get filepath to log data file
log_data = log_data = input_data + "log_data/*.json"
df = spark.read.json(log_data)

In [1]:
# looking at the schema
df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [1]:
# viewing first five rows
df.take(5)

[Row(artist='Harmonia', auth='Logged In', firstName='Ryan', gender='M', itemInSession=0, lastName='Smith', length=655.77751, level='free', location='San Jose-Sunnyvale-Santa Clara, CA', method='PUT', page='NextSong', registration=1541016707796.0, sessionId=583, song='Sehr kosmisch', status=200, ts=1542241826796, userAgent='"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/36.0.1985.125 Chrome/36.0.1985.125 Safari/537.36"', userId='26'),
 Row(artist='The Prodigy', auth='Logged In', firstName='Ryan', gender='M', itemInSession=1, lastName='Smith', length=260.07465, level='free', location='San Jose-Sunnyvale-Santa Clara, CA', method='PUT', page='NextSong', registration=1541016707796.0, sessionId=583, song='The Big Gundown', status=200, ts=1542242481796, userAgent='"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/36.0.1985.125 Chrome/36.0.1985.125 Safari/537.36"', userId='26'),
 Row(artist='Train', auth='Logged In'

In [1]:
# filter by actions for song plays
df = df.filter(df['page'] == 'NextSong')

### User Table

In [1]:
# extract columns for users table    
users_table = df['userId', 'firstName', 'lastName', 'gender', 'level']

In [1]:
# dropping users in-case of duplicates
users_table = users_table.drop_duplicates(subset=['userId'])

In [1]:
# write users table to parquet files
users_table.write.parquet(output_data + 'users/')

### Time Table

In [1]:
# create timestamp column from original timestamp column
get_timestamp = udf(lambda ts: datetime.fromtimestamp(ts / 1000.0).strftime('%Y-%m-%d %H:%M:%S'))

In [1]:
df = df.withColumn('start_date', get_timestamp(df.ts))

In [1]:
# Adding month to df for later use
df = df.withColumn('month', month(df.start_date))

In [1]:
# extract columns to create time table
time_table = df.select('start_date',
                        hour('start_date').alias('hour'),
                        dayofmonth('start_date').alias('day'),
                        weekofyear('start_date').alias('week'),
                        month('start_date').alias('month'),
                        year('start_date').alias('year'),
                        dayofweek('start_date').alias('weekday')
                        )

In [1]:
# write time table to parquet files partitioned by year and month
time_table.write.partitionBy('year', 'month').parquet(output_data + 'time/')

### Songplays Table

In [1]:
# read in song data to use for songplays table
songs_df = spark.read.parquet(output_data + '/songs')

In [1]:
# looking at the schema
songs_df.printSchema()

root
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- year: integer (nullable = true)
 |-- artist_id: string (nullable = true)



In [1]:
# Joining tables together
df = df.join(songs_df, (df.song == songs_df.title))

In [1]:
# extract columns from joined song and log datasets to create songplays table 
songplays_table = df['start_date', 'userId', 'level', 'song_id', 'artist_id', 'location', 'userAgent', 'year', 'month']

In [1]:
# Adding songplay_id column
songplays_table = songplays_table.withColumn('songplay_id', monotonically_increasing_id())

In [1]:
# write songplays table to parquet files partitioned by year and month
songplays_table.write.partitionBy('year', 'month').parquet(output_data + 'songplay/')