In [1]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, monotonically_increasing_id
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, dayofweek, date_format
import pandas as pd
from zipfile import ZipFile
from pyspark.sql.types import * 

In [2]:
config = configparser.ConfigParser()
config.read('dl.cfg')

['dl.cfg']

In [4]:
os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']

In [5]:
def create_spark_session():
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()
    return spark

In [6]:
spark = create_spark_session()

## Processing song data

#### Songs table

In [7]:
input_data = 's3a://udacity-dend/'
output_data = 's3a://udacity-data-lake-proj/'

In [8]:
song_data = input_data + 'song_data/*/*/*/*.json'

In [None]:
df = spark.read.json(song_data)
df.printSchema

In [13]:
# extract columns to create songs table
songs_table = df.select('song_id', 'title', 'artist_id', 'year', 'duration').drop_duplicates()
songs_table.show(5)

+------------------+--------------------+------------------+----+---------+
|           song_id|               title|         artist_id|year| duration|
+------------------+--------------------+------------------+----+---------+
|SOBAYLL12A8C138AF9|Sono andati? Fing...|ARDR4AC1187FB371A1|   0|511.16363|
|SOOLYAZ12A6701F4A6|Laws Patrolling (...|AREBBGV1187FB523D2|   0|173.66159|
|SOBBUGU12A8C13E95D|Setting Fire to S...|ARMAC4T1187FB3FA4C|2004|207.77751|
|SOAOIBZ12AB01815BE|I Hold Your Hand ...|ARPBNLO1187FB3D52F|2000| 43.36281|
|SONYPOM12A8C13B2D7|I Think My Wife I...|ARDNS031187B9924F0|2005|186.48771|
+------------------+--------------------+------------------+----+---------+
only showing top 5 rows



In [14]:
# write songs table to parquet files partitioned by year and artist
songs_table.write.parquet(output_data + 'songs', mode='overwrite', partitionBy=['year', 'artist_id'])

#### Artists table

In [21]:
artists_table = df.select('artist_id', 'artist_name', 'artist_location', 'artist_latitude', 'artist_longitude').drop_duplicates()
artists_table.show(5)

+------------------+--------------------+-----------------+---------------+----------------+
|         artist_id|         artist_name|  artist_location|artist_latitude|artist_longitude|
+------------------+--------------------+-----------------+---------------+----------------+
|ARDR4AC1187FB371A1|Montserrat Caball...|                 |           null|            null|
|AREBBGV1187FB523D2|Mike Jones (Featu...|      Houston, TX|           null|            null|
|ARMAC4T1187FB3FA4C|The Dillinger Esc...|Morris Plains, NJ|       40.82624|       -74.47995|
|ARPBNLO1187FB3D52F|            Tiny Tim|     New York, NY|       40.71455|       -74.00712|
|ARDNS031187B9924F0|          Tim Wilson|          Georgia|       32.67828|       -83.22295|
+------------------+--------------------+-----------------+---------------+----------------+
only showing top 5 rows



In [22]:
# write artists table to parquet files
artists_table.write.parquet(output_data + 'artists', mode='overwrite')

## Processing log data

In [5]:
# get filepath to log data file
log_data = input_data + 'log-data/'

In [43]:
# read log data file
df = spark.read.json(log_data)

In [45]:
df.printSchema()
df.show(5)

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)

+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+---------------+------+-------------+--------------------+------+
|     artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            lo

In [46]:
# filter by actions for song plays
df = df.filter(df['page'] == 'NextSong').drop_duplicates()

#### Users table

In [29]:
# extract columns for users table    
users_table = df.select('userId', 'firstName', 'lastName', 'gender', 'level').drop_duplicates()
users_table.show(3)

+------+---------+--------+------+-----+
|userId|firstName|lastName|gender|level|
+------+---------+--------+------+-----+
|    26|     Ryan|   Smith|     M| free|
|    26|     Ryan|   Smith|     M| free|
|    26|     Ryan|   Smith|     M| free|
+------+---------+--------+------+-----+
only showing top 3 rows



In [30]:
# write users table to parquet files
users_table.write.parquet(output_data + 'users', mode='overwrite')

#### Time table

In [13]:
df.select('ts').show(3)

+-------------+
|           ts|
+-------------+
|1542241826796|
|1542242481796|
|1542242741796|
+-------------+
only showing top 3 rows



In [47]:
# create timestamp column from original timestamp column
get_timestamp = udf(lambda x: datetime.utcfromtimestamp(int(x) / 1000), TimestampType())
df = df.withColumn('start_time', get_timestamp('ts'))

In [17]:
df.select('start_time').show(3)

+--------------------+
|          start_time|
+--------------------+
|2018-11-15 00:30:...|
|2018-11-15 00:41:...|
|2018-11-15 00:45:...|
+--------------------+
only showing top 3 rows



In [48]:
# extract columns to create time table
time_table = df.withColumn('hour', hour('start_time'))\
    .withColumn('day', dayofmonth('start_time'))\
    .withColumn('week', weekofyear('start_time'))\
    .withColumn('month', month('start_time'))\
    .withColumn('year', year('start_time'))\
    .withColumn('weekday', dayofweek('start_time'))\
    .select('ts', 'start_time', 'hour', 'day', 'week', 'month', 'year', 'weekday').dropDuplicates()
    
time_table.show(3)                           

+-------------+--------------------+----+---+----+-----+----+-------+
|           ts|          start_time|hour|day|week|month|year|weekday|
+-------------+--------------------+----+---+----+-----+----+-------+
|1542279962796|2018-11-15 11:06:...|  11| 15|  46|   11|2018|      5|
|1542299805796|2018-11-15 16:36:...|  16| 15|  46|   11|2018|      5|
|1542765178796|2018-11-21 01:52:...|   1| 21|  47|   11|2018|      4|
+-------------+--------------------+----+---+----+-----+----+-------+
only showing top 3 rows



In [49]:
# write time table to parquet files partitioned by year and month
time_table.write.parquet(output_data + 'time', mode='overwrite', partitionBy=['year', 'month'])

#### Songplay table

In [52]:
df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- start_time: timestamp (nullable = true)



In [63]:
# read in song data to use for songplays table
song_df = spark.read.format('json').load(input_data + 'song_data/*/*/*')

In [64]:
song_df.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



In [65]:
song_df.show(3)

+------------------+---------------+-----------------+----------------+--------------------+---------+---------+------------------+--------------------+----+
|         artist_id|artist_latitude|  artist_location|artist_longitude|         artist_name| duration|num_songs|           song_id|               title|year|
+------------------+---------------+-----------------+----------------+--------------------+---------+---------+------------------+--------------------+----+
|ARDR4AC1187FB371A1|           null|                 |            null|Montserrat Caball...|511.16363|        1|SOBAYLL12A8C138AF9|Sono andati? Fing...|   0|
|AREBBGV1187FB523D2|           null|      Houston, TX|            null|Mike Jones (Featu...|173.66159|        1|SOOLYAZ12A6701F4A6|Laws Patrolling (...|   0|
|ARMAC4T1187FB3FA4C|       40.82624|Morris Plains, NJ|       -74.47995|The Dillinger Esc...|207.77751|        1|SOBBUGU12A8C13E95D|Setting Fire to S...|2004|
+------------------+---------------+----------------

In [67]:
# extract columns from joined song and log datasets to create songplays table 
songplays_table = df.join(song_df, df.song == song_df.title, how='inner') \
    .select(monotonically_increasing_id().alias('songplay_id'), col('start_time'),col('userId').alias('user_id') \
    , 'level', 'song_id', 'artist_id', col('sessionId').alias('session_id'), 'location', col('userAgent').alias('user_agent')).drop_duplicates()

In [70]:
# add year and month columns to use to partition by in the next step
songplays_table = songplays_table.join(time_table, songplays_table.start_time == time_table.start_time, how='inner')\
                        .select('songplay_id', songplays_table.start_time, 'user_id', 'level', 'song_id', 'artist_id' \
                                , 'session_id', 'location', 'user_agent', 'year', 'month').drop_duplicates()

In [71]:
songplays_table.show(3)

+-----------+--------------------+-------+-----+------------------+------------------+----------+--------------------+--------------------+----+-----+
|songplay_id|          start_time|user_id|level|           song_id|         artist_id|session_id|            location|          user_agent|year|month|
+-----------+--------------------+-------+-----+------------------+------------------+----------+--------------------+--------------------+----+-----+
|          0|2018-11-21 21:56:...|     15| paid|SOZCTXZ12AB0182364|AR5KOSW1187FB35FF4|       818|Chicago-Napervill...|"Mozilla/5.0 (X11...|2018|   11|
|          1|2018-11-14 05:06:...|     10| free|SOGDBUF12A8C140FAA|AR558FS1187FB45658|       484|Washington-Arling...|"Mozilla/5.0 (Mac...|2018|   11|
|          2|2018-11-19 09:14:...|     24| paid|SOGDBUF12A8C140FAA|AR558FS1187FB45658|       672|Lake Havasu City-...|"Mozilla/5.0 (Win...|2018|   11|
+-----------+--------------------+-------+-----+------------------+------------------+--------

In [72]:
songplays_table.printSchema()

root
 |-- songplay_id: long (nullable = false)
 |-- start_time: timestamp (nullable = true)
 |-- user_id: string (nullable = true)
 |-- level: string (nullable = true)
 |-- song_id: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- session_id: long (nullable = true)
 |-- location: string (nullable = true)
 |-- user_agent: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)



In [73]:
# write songplays table to parquet files partitioned by year and month
songplays_table.write.parquet(output_data + 'songplays', mode='overwrite', partitionBy=['year', 'month'])