# Project 4 - Data Lake on S3

In [4]:
import findspark
findspark.init()
import pyspark
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StructType as R, StructField as Fld, DoubleType as Dbl, StringType as Str, IntegerType as Int, DateType as Date, FloatType as Flt, TimestampType
import pyspark.sql.functions as F
import os
import configparser
import boto3
import pandas as pd

# Make sure that your AWS credentials are loaded as env vars

In [2]:
config = configparser.ConfigParser()

#Normally this file should be in ~/.aws/credentials
config.read_file(open('dl.cfg'))

os.environ["AWS_ACCESS_KEY_ID"] = config['AWS']['AWS_ACCESS_KEY_ID']
os.environ["AWS_SECRET_ACCESS_KEY"] = config['AWS']['AWS_SECRET_ACCESS_KEY']
ACCESS_KEY = config.get('AWS','AWS_ACCESS_KEY_ID')
SECRET_KEY = config.get('AWS','AWS_SECRET_ACCESS_KEY')

# Create spark session with hadoop-aws package

In [3]:
spark = SparkSession.builder\
                     .config("spark.jars.packages","org.apache.hadoop:hadoop-aws:2.7.0")\
                     .getOrCreate()

In [11]:
hadoopConf = spark._jsc.hadoopConfiguration()
hadoopConf.set('fs.s3a.access.key', ACCESS_KEY)
hadoopConf.set('fs.s3a.secret.key', SECRET_KEY)
hadoopConf.set('fs.s3a.endpoint', 's3-us-west-2.amazonaws.com')
hadoopConf.set('fs.s3a.impl', 'org.apache.hadoop.fs.s3a.S3AFileSystem')

# Load data from S3

In [12]:
s3 = boto3.resource('s3',
                       region_name="us-west-2",
                       aws_access_key_id=ACCESS_KEY,
                       aws_secret_access_key=SECRET_KEY
                   )

In [13]:
print('Display files from log-data')
Bucket = "project4-data-lake-s3"
LogBucket =  s3.Bucket(Bucket)
log_data_files = [filename.key for filename in LogBucket.objects.filter(Prefix='log-data')]
log_data_files[:10]

Display files from log-data


['log-data/2018-11-01-events.json',
 'log-data/2018-11-02-events.json',
 'log-data/2018-11-03-events.json',
 'log-data/2018-11-04-events.json',
 'log-data/2018-11-05-events.json',
 'log-data/2018-11-06-events.json',
 'log-data/2018-11-07-events.json',
 'log-data/2018-11-08-events.json',
 'log-data/2018-11-09-events.json',
 'log-data/2018-11-10-events.json']

In [14]:
# get filepath to song data file
BucketPath = "s3a://" + Bucket
SongBucketPath = BucketPath + "/song-data"
song_data = SongBucketPath + "/*/*/*/*.json"
Parquet = BucketPath + "/parquet/"

In [15]:
SongDataSchema = R([
    Fld("num_songs", Int()),
    Fld("artist_id", Str()),
    Fld("artist_latitude", Flt()),
    Fld("artist_longitude", Flt()),
    Fld("artist_location", Str()),
    Fld("artist_name", Str()),
    Fld("song_id", Str()),
    Fld("title", Str()),
    Fld("duration", Flt()),
    Fld("year", Int())
])

In [16]:
# read Song data file
dfSongData = spark.read.json(song_data, schema = SongDataSchema)

In [17]:
dfSongData.printSchema()
dfSongData.show(5)

root
 |-- num_songs: integer (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: float (nullable = true)
 |-- artist_longitude: float (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- duration: float (nullable = true)
 |-- year: integer (nullable = true)

+---------+------------------+---------------+----------------+-----------------+--------------------+------------------+--------------------+---------+----+
|num_songs|         artist_id|artist_latitude|artist_longitude|  artist_location|         artist_name|           song_id|               title| duration|year|
+---------+------------------+---------------+----------------+-----------------+--------------------+------------------+--------------------+---------+----+
|        1|ARDR4AC1187FB371A1|           null|            null|                 |Montserrat Caball...|

In [10]:
# extract columns to create songs table
songs_table = dfSongData.select("song_id", "title", "artist_id", "year", "duration")

In [11]:
# write songs table to parquet files partitioned by year and artist
songs_table.show(5)
songs_table.printSchema()
songs_table.write.mode("Overwrite").parquet(Parquet + "/songs")

+------------------+--------------------+------------------+----+---------+
|           song_id|               title|         artist_id|year| duration|
+------------------+--------------------+------------------+----+---------+
|SOBAYLL12A8C138AF9|Sono andati? Fing...|ARDR4AC1187FB371A1|   0|511.16364|
|SOOLYAZ12A6701F4A6|Laws Patrolling (...|AREBBGV1187FB523D2|   0|173.66159|
|SOBBUGU12A8C13E95D|Setting Fire to S...|ARMAC4T1187FB3FA4C|2004|207.77751|
|SOAOIBZ12AB01815BE|I Hold Your Hand ...|ARPBNLO1187FB3D52F|2000| 43.36281|
|SONYPOM12A8C13B2D7|I Think My Wife I...|ARDNS031187B9924F0|2005|186.48772|
+------------------+--------------------+------------------+----+---------+
only showing top 5 rows

root
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- duration: float (nullable = true)



In [12]:
# extract columns to create artists table
artists_table = dfSongData.select("artist_id", col("artist_name").alias("name"), col("artist_location").alias("location"), col("artist_latitude").alias("latitude"), col("artist_longitude").alias("longitude"))

In [13]:
# write artists table to parquet files
artists_table.show(5)
artists_table.printSchema()
artists_table.write.mode("Overwrite").parquet(Parquet + "/artists")

+------------------+--------------------+-----------------+--------+---------+
|         artist_id|                name|         location|latitude|longitude|
+------------------+--------------------+-----------------+--------+---------+
|ARDR4AC1187FB371A1|Montserrat Caball...|                 |    null|     null|
|AREBBGV1187FB523D2|Mike Jones (Featu...|      Houston, TX|    null|     null|
|ARMAC4T1187FB3FA4C|The Dillinger Esc...|Morris Plains, NJ|40.82624|-74.47995|
|ARPBNLO1187FB3D52F|            Tiny Tim|     New York, NY|40.71455|-74.00712|
|ARDNS031187B9924F0|          Tim Wilson|          Georgia|32.67828|-83.22295|
+------------------+--------------------+-----------------+--------+---------+
only showing top 5 rows

root
 |-- artist_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- location: string (nullable = true)
 |-- latitude: float (nullable = true)
 |-- longitude: float (nullable = true)



In [14]:
print('Display files from song-data')
SongBucket =  s3.Bucket("project4-data-lake-s3")
song_data_files = [filename.key for filename in SongBucket.objects.filter(Prefix='song-data')]
song_data_files[:10]

Display files from song-data


['song-data/.DS_Store',
 'song-data/A/.DS_Store',
 'song-data/A/A/.DS_Store',
 'song-data/A/A/A/TRAAAAW128F429D538.json',
 'song-data/A/A/A/TRAAABD128F429CF47.json',
 'song-data/A/A/A/TRAAADZ128F9348C2E.json',
 'song-data/A/A/A/TRAAAEF128F4273421.json',
 'song-data/A/A/A/TRAAAFD128F92F423A.json',
 'song-data/A/A/A/TRAAAMO128F1481E7F.json',
 'song-data/A/A/A/TRAAAMQ128F1460CD3.json']

In [15]:
BucketPath = "s3a://" + Bucket
LogBucketPath = BucketPath + "/log-data"
log_data = LogBucketPath + "/*.json"

In [16]:
LogDataSchema = R([
    Fld("artist",Str()),
    Fld("auth",Str()),
    Fld("firstName",Str()),
    Fld("gender",Str()),
    Fld("itemInSession",Int()),
    Fld("lastName",Str()),
    Fld("length", Flt()),
    Fld("level", Str()),
    Fld("location", Str()),
    Fld("method", Str()),
    Fld("page", Str()),
    Fld("registration", Flt()),
    Fld("sessionId", Int()),
    Fld("song", Str()),
    Fld("status", Int()),
    Fld("ts", Str()),
    Fld("userAgent", Str()),
    Fld("userId", Str())
])

In [17]:
# read log data file
dfLogData = spark.read.json(log_data, schema = LogDataSchema)

In [18]:
dfLogData.printSchema()
dfLogData.show(5)

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: integer (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: float (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: float (nullable = true)
 |-- sessionId: integer (nullable = true)
 |-- song: string (nullable = true)
 |-- status: integer (nullable = true)
 |-- ts: string (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)

+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-------------+---------+---------------+------+-------------+--------------------+------+
|     artist|     auth|firstName|gender|itemInSession|lastName|   length|level|         

In [19]:
# filter by actions for song plays
dfLogData = dfLogData.select('*').where(dfLogData.page == 'NextSong')
dfLogData.printSchema()
dfLogData.show(5)

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: integer (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: float (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: float (nullable = true)
 |-- sessionId: integer (nullable = true)
 |-- song: string (nullable = true)
 |-- status: integer (nullable = true)
 |-- ts: string (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)

+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-------------+---------+--------------------+------+-------------+--------------------+------+
|     artist|     auth|firstName|gender|itemInSession|lastName|   length|level|    

In [20]:
# extract columns for users table    
users_table = dfLogData.select(col("userId").alias("user_id"), col("firstName").alias("first_name"), col("lastName").alias("last_name"), "gender", "level")

In [21]:
# write users table to parquet files
users_table.show(5)
users_table.write.mode("Overwrite").parquet(Parquet + "/users")

+-------+----------+---------+------+-----+
|user_id|first_name|last_name|gender|level|
+-------+----------+---------+------+-----+
|     26|      Ryan|    Smith|     M| free|
|     26|      Ryan|    Smith|     M| free|
|     26|      Ryan|    Smith|     M| free|
|     61|    Samuel| Gonzalez|     M| free|
|     80|     Tegan|   Levine|     F| paid|
+-------+----------+---------+------+-----+
only showing top 5 rows



In [22]:
# create datetime column from original timestamp column
#dfLogData = dfLogData.withColumn("ts", F.to_timestamp(F.from_unixtime((col("ts") / 1000) , 'yyyy-MM-dd HH:mm:ss.SSS')).cast("Timestamp"))

In [23]:
dfLogData.printSchema()
dfLogData.show(5)

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: integer (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: float (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: float (nullable = true)
 |-- sessionId: integer (nullable = true)
 |-- song: string (nullable = true)
 |-- status: integer (nullable = true)
 |-- ts: string (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)

+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-------------+---------+--------------------+------+-------------+--------------------+------+
|     artist|     auth|firstName|gender|itemInSession|lastName|   length|level|    

In [24]:
# convert ts to double
dfLogData = dfLogData.withColumn("ts",dfLogData.ts.cast(Dbl()))
dfLogData.printSchema()
dfLogData.show(1)

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: integer (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: float (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: float (nullable = true)
 |-- sessionId: integer (nullable = true)
 |-- song: string (nullable = true)
 |-- status: integer (nullable = true)
 |-- ts: double (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)

+--------+---------+---------+------+-------------+--------+--------+-----+--------------------+------+--------+-------------+---------+-------------+------+-----------------+--------------------+------+
|  artist|     auth|firstName|gender|itemInSession|lastName|  length|level|            loc

In [25]:
# create timestamp column from original timestamp column
get_timestamp = udf(lambda x: pd.Timestamp(x*1000000), TimestampType())
dfLogData = dfLogData.withColumn("start_time", get_timestamp("ts"))
dfLogData.printSchema()
dfLogData.show(1)

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: integer (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: float (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: float (nullable = true)
 |-- sessionId: integer (nullable = true)
 |-- song: string (nullable = true)
 |-- status: integer (nullable = true)
 |-- ts: double (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- start_time: timestamp (nullable = true)

+--------+---------+---------+------+-------------+--------+--------+-----+--------------------+------+--------+-------------+---------+-------------+------+-----------------+--------------------+------+--------------------+
|  artist|     auth|firs

In [26]:
# create datetime column from original timestamp column
get_datetime = udf(lambda x: pd.Timestamp(x*1000000), Date())
dfLogData = dfLogData.withColumn("start_date", get_datetime("ts"))
dfLogData.printSchema()
dfLogData.show(1)

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: integer (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: float (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: float (nullable = true)
 |-- sessionId: integer (nullable = true)
 |-- song: string (nullable = true)
 |-- status: integer (nullable = true)
 |-- ts: double (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- start_time: timestamp (nullable = true)
 |-- start_date: date (nullable = true)

+--------+---------+---------+------+-------------+--------+--------+-----+--------------------+------+--------+-------------+---------+-------------+------+-----------------+--------------------+------+------

In [27]:
# extract columns to create time table
time_table = dfLogData.select("start_time")\
                        .withColumn("hour", F.hour(col("start_time")))\
                        .withColumn("day", F.dayofmonth(col("start_time")))\
                        .withColumn("week", F.weekofyear(col("start_time")))\
                        .withColumn("month", F.month(col("start_time")))\
                        .withColumn("year", F.year(col("start_time")))\
                        .withColumn("weekday", F.dayofweek(col("start_time")))

In [28]:
# write time table to parquet files partitioned by year and month
time_table.show(5)
time_table.printSchema()
time_table.write.mode("Overwrite").parquet(Parquet + "/time")

+--------------------+----+---+----+-----+----+-------+
|          start_time|hour|day|week|month|year|weekday|
+--------------------+----+---+----+-----+----+-------+
|2018-11-15 00:30:...|   0| 15|  46|   11|2018|      5|
|2018-11-15 00:41:...|   0| 15|  46|   11|2018|      5|
|2018-11-15 00:45:...|   0| 15|  46|   11|2018|      5|
|2018-11-15 03:44:...|   3| 15|  46|   11|2018|      5|
|2018-11-15 05:48:...|   5| 15|  46|   11|2018|      5|
+--------------------+----+---+----+-----+----+-------+
only showing top 5 rows

root
 |-- start_time: timestamp (nullable = true)
 |-- hour: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- weekday: integer (nullable = true)



In [29]:
# read in song data to use for songplays table // reset df
dfSongData = spark.read.json(song_data, schema = SongDataSchema)

In [30]:
dfSongData.printSchema()
dfLogData.printSchema()

root
 |-- num_songs: integer (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: float (nullable = true)
 |-- artist_longitude: float (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- duration: float (nullable = true)
 |-- year: integer (nullable = true)

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: integer (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: float (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: float (nullable = true)
 |-- sessionId: integer (nullable = true)
 |-- song: string (nullable = true)
 |-- status: integer (n

In [31]:
# format for join
dfSongData = dfSongData.selectExpr('title as song_title', '*')
dfLogData = dfLogData.selectExpr('artist as artist_name', 'song as song_title','*')

In [32]:
dfSongData.printSchema()
dfSongData.show(1)
dfLogData.printSchema()
dfLogData.show(1)

root
 |-- song_title: string (nullable = true)
 |-- num_songs: integer (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: float (nullable = true)
 |-- artist_longitude: float (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- duration: float (nullable = true)
 |-- year: integer (nullable = true)

+--------------------+---------+------------------+---------------+----------------+---------------+--------------------+------------------+--------------------+---------+----+
|          song_title|num_songs|         artist_id|artist_latitude|artist_longitude|artist_location|         artist_name|           song_id|               title| duration|year|
+--------------------+---------+------------------+---------------+----------------+---------------+--------------------+------------------+--------------------+---------+----+
|So

In [33]:
All = dfLogData.join(dfSongData, on=['song_title', 'artist_name'], how = 'outer')

In [None]:
All.show(5)
All.printSchema()

In [None]:
# extract columns from joined song and log datasets to create songplays table 
songplays_table = All.select("start_time", col("userId").alias("user_id"), "level", "song_id", "artist_id", col("SessionId").alias("session_id"), "artist_location", col("userAgent").alias("user_agent"))

In [None]:
# write songplays table to parquet files partitioned by year and month
songplays_table.show(5)
songplays_table.printSchema()
songplays_table.write.mode("Overwrite").parquet(Parquet + "/songplays")

In [None]:
All.filter(col("year").isNotNull()).show()

In [None]:
songplays_table.filter(col("artist_id").isNotNull()).show()