# Data Lake with Spark Project

## Setup

In [47]:
import configparser
import datetime
import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
from pyspark.sql.types import StructType as ST, StructField as Fld, DoubleType as Dbl, StringType as Str, IntegerType as Int

# get config settings
config = configparser.ConfigParser()
config.read('dl.cfg')

# make environment variables out of ID and secret key
os.environ['AWS_ACCESS_KEY_ID']=config.get('AWS','AWS_ACCESS_KEY_ID')
os.environ['AWS_SECRET_ACCESS_KEY']=config.get('AWS','AWS_SECRET_ACCESS_KEY')

In [2]:
# initialize Spark session
spark = SparkSession \
    .builder \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
    .getOrCreate()

In [3]:
# Extract .zip files for debugging
# Instructions derived from here: https://thispointer.com/python-how-to-unzip-a-file-extract-single-multiple-or-all-files-from-a-zip-archive/

from zipfile import ZipFile

with ZipFile('./data/song-data.zip', 'r') as zipObj:
    zipObj.extractall('./data')

with ZipFile('./data/log-data.zip', 'r') as zipObj:
    zipObj.extractall('./data')

In [4]:
# define I/O variables
input_data = "s3a://udacity-dend/"
output_data = 's3a://udacity-dend-cseal/data-lake/'

## Process Raw Song Files

### Make Song Dimension Table
- Scratchwork for process_song_file function in etl.py

In [5]:
# Infer schema first
song_input_data = "data/song_data/A/A/A/*.json"
song_data = spark.read.json(song_input_data)

In [11]:
song_data.printSchema()
print("Number of files: {}".format(song_data.count()))
song_data.show(5, truncate=False)

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)

Number of files: 11
+------------------+---------------+---------------------------+----------------+---------------------+---------+---------+------------------+-------------------------------------------+----+
|artist_id         |artist_latitude|artist_location            |artist_longitude|artist_name          |duration |num_songs|song_id           |title                                      |year|
+------------------+---------------+---------------------------+----------------+---------------------+---------+---------+------------------+---------------------------------

In [13]:
# this looks mostly correct, except we probably can use the 32-bit Integer type for year and num_songs
# let's load the data with a defined schema
song_schema = ST([
    Fld('artist_id', Str(), nullable=True),
    Fld('artist_latitude', Dbl(), nullable=True),
    Fld('artist_location', Str(), nullable=True),
    Fld('artist_longitude', Dbl(), nullable=True),
    Fld('artist_name', Str(), nullable=True),
    Fld('duration', Dbl(), nullable=True),
    Fld('num_songs', Int(), nullable=True),
    Fld('song_id', Str(), nullable=False),
    Fld('title', Str(), nullable=True),
    Fld('year', Int(), nullable=True)
])
song_data = spark.read.json(song_input_data, schema=song_schema)
song_data.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: integer (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: integer (nullable = true)



In [15]:
# define the song table
songs_table = song_data.select('song_id', 'title', 'artist_id', 'year', 'duration').dropDuplicates()
print(songs_table.toPandas().head())

              song_id                              title           artist_id  \
0  SONHOTT12A8C13493C                    Something Girls  AR7G5I41187FB4CE6C   
1  SOHKNRJ12A6701D1F8                       Drop of Rain  AR10USD1187B99F3F1   
2  SOCIWDW12A8C13D406                          Soul Deep  ARMJAGH1187FB546F3   
3  SOUDSGM12AC9618304  Insatiable (Instrumental Version)  ARNTLGG11E2835DDB9   
4  SOQHXMF12AB0182363                    Young Boy Blues  ARGSJW91187B9B1D6B   

   year   duration  
0  1982  233.40363  
1     0  189.57016  
2  1969  148.03546  
3     0  266.39628  
4     0  218.77506  


In [16]:
# partition song table by year and artist_id
songs_table.write.mode('overwrite').partitionBy("year", "artist_id").parquet(os.path.join(output_data, 'songs'))

### Make artists dimension table

In [17]:
artists_table = song_data.selectExpr( 
    ['artist_id', 'artist_name AS name', 'artist_location AS location', 
     'artist_latitude AS latititude', 'artist_longitude AS longitude']
).dropDuplicates()
artists_table.printSchema()
print(artists_table.toPandas().head())

root
 |-- artist_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- location: string (nullable = true)
 |-- latititude: double (nullable = true)
 |-- longitude: double (nullable = true)

            artist_id              name         location  latititude  \
0  ARXR32B1187FB57099               Gob                          NaN   
1  ARGSJW91187B9B1D6B      JennyAnyKind   North Carolina    35.21962   
2  ARKRRTF1187B9984DA  Sonora Santanera                          NaN   
3  ARD7TVE1187B99BFB1            Casual  California - LA         NaN   
4  AR8ZCNI1187B9A069B  Planet P Project                          NaN   

   longitude  
0        NaN  
1  -80.01955  
2        NaN  
3        NaN  
4        NaN  


In [None]:
# print artists table to s3 parquet files
artists_table.write.mode('overwrite').parquet(os.path.join(output_data, 'artists'))

## Process Raw Log Files

In [64]:
# get filepath to log data file
# log_data = os.path.join(input_data, 'log_data/*/*/*.json')
log_data = "./data/log_data/*.json" # for debugging

# read log data file
df = spark.read.json(log_data)

# filter by actions for song plays
df = df.where(df.page == 'NextSong')

In [49]:
df.printSchema()
print(df.toPandas().head())

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)

        artist       auth firstName gender  itemInSession  lastName  \
0     Harmonia  Logged In      Ryan      M              0     Smith   
1  The Prodigy  Logged In      Ryan      M              1     Smith   
2        Train  Logged In      Ryan      M              2     Smith   
3  Sony Wonder  Log

### Make Users Dimension Table

In [25]:
users_table = df.selectExpr( 
    ['INT(userId) AS user_id', 'firstName AS first_name', 'lastName AS last_name', 'gender', 'level']
).dropDuplicates()
print("Row count: {}".format(users_table.count()))
print(users_table.printSchema())
print(users_table.toPandas().head())

Row count: 104
root
 |-- user_id: integer (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- level: string (nullable = true)

None
   user_id first_name last_name gender level
0       15       Lily      Koch      F  free
1        9      Wyatt     Scott      M  free
2       40     Tucker  Garrison      M  free
3       84    Shakira      Hunt      F  free
4        3      Isaac    Valdez      M  free


In [26]:
users_table.write.mode('overwrite').parquet(os.path.join(output_data, 'users'))

### Make Time Dimension Table

In [50]:
print("ts column is of type {}, so we need to change it to a timestamp".format(df.toPandas()['ts'].dtype))

ts column is of type int64, so we need to change it to a timestamp


In [77]:
# create timestamp column from original timestamp column
from pyspark.sql.types import TimestampType as Tmstp
get_timestamp = udf(lambda x: datetime.datetime.fromtimestamp(x / 1000.0), returnType=Tmstp())
df = df.withColumn("start_time", get_timestamp(df.ts))
df.printSchema()
df.toPandas().head(1)

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- start_time: timestamp (nullable = true)



Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId,start_time
0,Harmonia,Logged In,Ryan,M,0,Smith,655.77751,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Sehr kosmisch,200,1542241826796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26,2018-11-15 00:30:26.796


In [71]:
# extract columns to create time table
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
time_table = df.select('start_time').dropDuplicates() \
    .withColumn('hour', hour(col('start_time'))) \
    .withColumn('day', dayofmonth(col('start_time'))) \
    .withColumn('week', weekofyear(col('start_time'))) \
    .withColumn('month', month(col('start_time'))) \
    .withColumn('year', year(col('start_time'))) \
    .withColumn('weekday', date_format(col("start_time"), "u")) # syntax derived from here: https://sparkbyexamples.com/spark/spark-get-day-of-week-number/
time_table.printSchema()
time_table.toPandas().head()

root
 |-- start_time: timestamp (nullable = true)
 |-- hour: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- weekday: string (nullable = true)



Unnamed: 0,start_time,hour,day,week,month,year,weekday
0,2018-11-21 06:18:12.796,6,21,47,11,2018,3
1,2018-11-21 18:49:23.796,18,21,47,11,2018,3
2,2018-11-14 15:20:15.796,15,14,46,11,2018,3
3,2018-11-05 16:31:59.796,16,5,45,11,2018,1
4,2018-11-13 18:00:26.796,18,13,46,11,2018,2


In [72]:
time_table.write.mode('overwrite').partitionBy('year', 'month').parquet(os.path.join(output_data, 'time'))

### Make Songplay Fact Table

In [83]:
def load_song_jsons(fpath, spark):
    song_schema = ST([
        Fld('artist_id', Str(), nullable=True),
        Fld('artist_latitude', Dbl(), nullable=True),
        Fld('artist_location', Str(), nullable=True),
        Fld('artist_longitude', Dbl(), nullable=True),
        Fld('artist_name', Str(), nullable=True),
        Fld('duration', Dbl(), nullable=True),
        Fld('num_songs', Int(), nullable=True),
        Fld('song_id', Str(), nullable=False),
        Fld('title', Str(), nullable=True),
        Fld('year', Int(), nullable=True)
    ])
    return spark.read.json(fpath, schema=song_schema) 

# read in song data to use for songplays table
song_df = load_song_jsons(song_input_data)
song_df.printSchema()
song_df.limit(1).show()
df.printSchema()
df.toPandas().head(1)

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: integer (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: integer (nullable = true)

+------------------+---------------+---------------+----------------+--------------------+--------+---------+------------------+--------------------+----+
|         artist_id|artist_latitude|artist_location|artist_longitude|         artist_name|duration|num_songs|           song_id|               title|year|
+------------------+---------------+---------------+----------------+--------------------+--------+---------+------------------+--------------------+----+
|ARKFYS91187B98E58F|           null|               |            null|Jeff And Sheri Ea...|267.7024|        1

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId,start_time
0,Harmonia,Logged In,Ryan,M,0,Smith,655.77751,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Sehr kosmisch,200,1542241826796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26,2018-11-15 00:30:26.796


In [97]:
joined = df.join(song_df, [df.song == song_df.title, df.artist == song_df.artist_name], 'inner').dropDuplicates()
joined.printSchema()
print('There are {} matching rows in songplays in demo dataset'.format(joined.count()))
joined.limit(1).show()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- start_time: timestamp (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable =

In [98]:
joined.toPandas().head(1)

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,...,artist_id,artist_latitude,artist_location,artist_longitude,artist_name,duration,num_songs,song_id,title,year
0,Bullet For My Valentine,Logged In,Kate,F,92.0,Harrell,235.65016,paid,"Lansing-East Lansing, MI",PUT,...,,,,,,,,,,


In [100]:
joined.createOrReplaceTempView("joined")

In [112]:
# extract columns from joined song and log datasets to create songplays table 
songplays_table = spark.sql("""
    SELECT monotonically_increasing_id() AS songplay_id,
        j.start_time,
        month(j.start_time) AS month,
        year(j.start_time) AS year,
        j.userId AS user_id,
        j.level,
        j.song_id,
        j.artist_id,
        j.sessionid AS session_id,
        j.location,
        j.useragent AS user_agent
    FROM joined j
    ORDER BY j.start_time
""")
songplays_table.printSchema()
print('There are {} rows in songplays fact table'.format(songplays_table.count()))
songplays_table.limit(1).show()

root
 |-- songplay_id: long (nullable = false)
 |-- start_time: timestamp (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- user_id: string (nullable = true)
 |-- level: string (nullable = true)
 |-- song_id: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- session_id: long (nullable = true)
 |-- location: string (nullable = true)
 |-- user_agent: string (nullable = true)

There are 6831 rows in songplays fact table
+------------+----------+-----+----+-------+-----+------------------+------------------+----------+--------+----------+
| songplay_id|start_time|month|year|user_id|level|           song_id|         artist_id|session_id|location|user_agent|
+------------+----------+-----+----+-------+-----+------------------+------------------+----------+--------+----------+
|283467841550|      null| null|null|   null| null|SOHKNRJ12A6701D1F8|AR10USD1187B99F3F1|      null|    null|      null|
+------------+----------+-----

In [111]:
# write songplays table to parquet files partitioned by year and month
songplays_table.write.mode('overwrite').partitionBy('year', 'month').parquet(os.path.join(output_data, 'songplays'))

KeyboardInterrupt: 