In [1]:
import configparser
from datetime import datetime
import os
import pandas as pd
from pyspark.sql.types import *
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, monotonically_increasing_id
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format, dayofweek

### Getting credentials

In [2]:
config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']

### Creating Spark session

In [3]:
spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()

### Reading from S3 Bucket

##### Song Data file sample

```
{"num_songs": 1, "artist_id": "ARJIE2Y1187B994AB7", "artist_latitude": null, "artist_longitude": null, "artist_location": "", "artist_name": "Line Renaud", "song_id": "SOUPIRU12A6D4FA1E1", "title": "Der Kleine Dompfaff", "duration": 152.92036, "year": 0}
```

In [4]:
song_data = 's3a://udacity-data-lake-project-2187/ExtractedData/song_data/*/*/*/*.json'

In [5]:
song_schema = StructType([
    StructField("num_songs", IntegerType(), True),
    StructField("artist_id", StringType(), False),
    StructField("artist_latitude", DoubleType(), True),
    StructField("artist_longitude", DoubleType(), True),
    StructField("artist_location", StringType(), True),
    StructField("artist_name", StringType(), True),
    StructField("song_id", StringType(), False),
    StructField("title", StringType(), False),
    StructField("duration", FloatType(), True),
    StructField("year", IntegerType(), False)
])

In [6]:
df_song = spark.read.json(song_data, schema = song_schema)
df_song.printSchema()

root
 |-- num_songs: integer (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- duration: float (nullable = true)
 |-- year: integer (nullable = true)



##### Creating the *Songs* Table

In [7]:
songs_table = df_song.select('song_id',
                        col('title').alias('song_title'), 
                        'artist_id', 
                        'year', 
                        'duration')\
                            .dropDuplicates()

songs_table.limit(5).toPandas()

Unnamed: 0,song_id,song_title,artist_id,year,duration
0,SOQVMXR12A81C21483,Salt In NYC,ARKULSX1187FB45F84,0,424.123627
1,SOYMRWW12A6D4FAB14,The Moon And I (Ordinary Day Album Version),ARKFYS91187B98E58F,0,267.702393
2,SOMZWCG12A8C13C480,I Didn't Mean To,ARD7TVE1187B99BFB1,0,218.931793
3,SOZHPGD12A8C1394FE,Baby Come To Me,AR9AWNF1187B9AB0B4,0,236.930161
4,SOFSOCN12A8C143F5D,Face the Ashes,ARXR32B1187FB57099,2007,209.606079


##### Creating the *Artists* Table

In [8]:
artists_table = df_song.select('artist_id',
                        'artist_name', 
                        'artist_location', 
                        'artist_latitude', 
                        'artist_longitude')\
                            .dropDuplicates()

artists_table.limit(5).toPandas()

Unnamed: 0,artist_id,artist_name,artist_location,artist_latitude,artist_longitude
0,AR3JMC51187B9AE49D,Backstreet Boys,"Orlando, FL",28.53823,-81.37739
1,AR0IAWL1187B9A96D0,Danilo Perez,Panama,8.4177,-80.11278
2,ARWB3G61187FB49404,Steve Morse,"Hamilton, Ohio",,
3,AR47JEX1187B995D81,SUE THOMPSON,"Nevada, MO",37.83721,-94.35868
4,ARHHO3O1187B989413,Bob Azzam,,,


#### Reading Log Data from S3 

In [9]:
log_data = 's3a://udacity-data-lake-project-2187/ExtractedData/log_data/*.json'

In [10]:
log_schema = StructType([
    StructField("artist", StringType(), False),
    StructField("auth", StringType(), True),
    StructField("firstName", StringType(), True),
    StructField("gender", StringType(), True),
    StructField("itemInSession", IntegerType(), True),
    StructField("lastName", StringType(), True),
    StructField("length", FloatType(), True),
    StructField("level", StringType(), True),
    StructField("location", StringType(), True),
    StructField("method", StringType(), True),
    StructField("page", StringType(), True),
    StructField("registration", FloatType(), True),
    StructField("sessionId", IntegerType(), True),
    StructField("song", StringType(), True),
    StructField("status", IntegerType(), True),
    StructField("ts", StringType(), False),
    StructField("userAgent", StringType(), True),
    StructField("userId", StringType(), False)
])

In [11]:
df_log = spark.read.json(log_data, schema = log_schema)
df_log.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: integer (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: float (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: float (nullable = true)
 |-- sessionId: integer (nullable = true)
 |-- song: string (nullable = true)
 |-- status: integer (nullable = true)
 |-- ts: string (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [12]:
## Get unique page values to filter
df_log.select('page').toPandas()['page'].unique()

array(['NextSong', 'Home', 'About', 'Login', 'Upgrade', 'Downgrade',
       'Help', 'Logout', 'Settings', 'Submit Downgrade', 'Submit Upgrade',
       'Save Settings', 'Error'], dtype=object)

In [13]:
df_log = df_log.select('*').where(df_log.page == 'NextSong')

##### Creating *Users* Table

In [14]:
users_table = df_log.select('userId', 
                        'firstName', 
                        'lastName', 
                        'gender', 
                        'level')\
                            .dropDuplicates()

users_table.limit(5).toPandas()

Unnamed: 0,userId,firstName,lastName,gender,level
0,57,Katherine,Gay,F,free
1,84,Shakira,Hunt,F,free
2,22,Sean,Wilson,F,free
3,52,Theodore,Smith,M,free
4,80,Tegan,Levine,F,paid


#### Timestamp/datetime creation

In [15]:
pd.Timestamp(1542241826796)

Timestamp('1970-01-01 00:25:42.241826796')

In [16]:
def getTimeStamp(timestamp):
    return pd.Timestamp(int(timestamp))

In [17]:
# create timestamp column from original timestamp column
get_timestamp = udf(getTimeStamp, TimestampType())

df_log = df_log.withColumn('start_time', get_timestamp(df_log.ts))

In [18]:
def getDateTime(timestamp):
    return datetime.fromtimestamp(int(timestamp)/1e3)

In [19]:
# create datetime column from original timestamp column
get_date_time = udf(getDateTime, DateType())

df_log = df_log.withColumn('date_time', get_date_time(df_log.ts))

In [20]:
df_log.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: integer (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: float (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: float (nullable = true)
 |-- sessionId: integer (nullable = true)
 |-- song: string (nullable = true)
 |-- status: integer (nullable = true)
 |-- ts: string (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- start_time: timestamp (nullable = true)
 |-- date_time: date (nullable = true)



##### Creating *time* Table

In [21]:
time_table =  df_log.select('start_time').dropDuplicates()\
                .withColumn("hour", hour(col('start_time')))\
                .withColumn("day", dayofmonth(col('start_time')))\
                .withColumn("week", weekofyear(col('start_time')))\
                .withColumn("month", month(col('start_time')))\
                .withColumn("year", year(col('start_time')))\
                .withColumn("weekday", dayofweek(col('start_time')))

In [22]:
time_table.limit(5).toPandas()

Unnamed: 0,start_time,hour,day,week,month,year,weekday
0,1970-01-01 00:25:42.298096,0,1,1,1,1970,5
1,1970-01-01 00:25:42.300662,0,1,1,1,1970,5
2,1970-01-01 00:25:42.300805,0,1,1,1,1970,5
3,1970-01-01 00:25:42.836433,0,1,1,1,1970,5
4,1970-01-01 00:25:43.406580,0,1,1,1,1970,5


#### Join song_data and log_data

In [23]:
df_log = df_log.selectExpr('artist as artist_name', 'song as song_title','*')
df_log.printSchema()

root
 |-- artist_name: string (nullable = true)
 |-- song_title: string (nullable = true)
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: integer (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: float (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: float (nullable = true)
 |-- sessionId: integer (nullable = true)
 |-- song: string (nullable = true)
 |-- status: integer (nullable = true)
 |-- ts: string (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- start_time: timestamp (nullable = true)
 |-- date_time: date (nullable = true)



In [24]:
df_song = df_song.selectExpr('title as song_title', '*')
df_song.printSchema()

root
 |-- song_title: string (nullable = true)
 |-- num_songs: integer (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- duration: float (nullable = true)
 |-- year: integer (nullable = true)



In [25]:
df_song_log = df_log.join(df_song, on=['song_title', 'artist_name'], how = 'outer')
df_song_log.printSchema()

root
 |-- song_title: string (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: integer (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: float (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: float (nullable = true)
 |-- sessionId: integer (nullable = true)
 |-- song: string (nullable = true)
 |-- status: integer (nullable = true)
 |-- ts: string (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- start_time: timestamp (nullable = true)
 |-- date_time: date (nullable = true)
 |-- num_songs: integer (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = tr

In [26]:
df_song_log.select('*').limit(5).toPandas()

Unnamed: 0,song_title,artist_name,artist,auth,firstName,gender,itemInSession,lastName,length,level,...,date_time,num_songs,artist_id,artist_latitude,artist_longitude,artist_location,song_id,title,duration,year
0,Begging For Mercy,Bullet For My Valentine,Bullet For My Valentine,Logged In,Kate,F,92,Harrell,235.650162,paid,...,2018-11-07,,,,,,,,,
1,Death On Two Legs (Dedicated To....) (Live) (1...,Queen,Queen,Logged In,Lily,F,5,Koch,212.166077,paid,...,2018-11-26,,,,,,,,,
2,Do You Call My Name,Ra,Ra,Logged In,Tegan,F,17,Levine,319.947296,paid,...,2018-11-15,,,,,,,,,
3,Gone Going,Black Eyed Peas,Black Eyed Peas,Logged In,Kinsley,F,3,Young,193.880356,paid,...,2018-11-23,,,,,,,,,
4,Harbor,Vienna Teng,Vienna Teng,Logged In,Jacqueline,F,16,Lynch,263.784027,paid,...,2018-11-08,,,,,,,,,


##### Creating *songplays* Table

In [27]:
songplays = df_song_log.select('start_time', 
                               'userId', 
                               'level', 
                               'song_id', 
                               'artist_id', 
                               'sessionId', 
                               'userAgent', 
                               'location', 
                               'year')\
                                .withColumn("month", month(col('start_time')))\
                                    .dropDuplicates()

songplays = songplays.withColumn('songplay_id', monotonically_increasing_id())
songplays.limit(5).toPandas()

Unnamed: 0,start_time,userId,level,song_id,artist_id,sessionId,userAgent,location,year,month,songplay_id
0,1970-01-01 00:25:41.675489,72,paid,,,117,Mozilla/5.0 (Macintosh; Intel Mac OS X 10.9; r...,"Detroit-Warren-Dearborn, MI",,1,0
1,1970-01-01 00:25:43.449690,24,paid,,,984,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...","Lake Havasu City-Kingman, AZ",,1,1
2,1970-01-01 00:25:43.584688,49,paid,,,1096,Mozilla/5.0 (Windows NT 5.1; rv:31.0) Gecko/20...,"San Francisco-Oakland-Hayward, CA",,1,2
3,1970-01-01 00:25:43.037982,88,paid,,,888,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...","Sacramento--Roseville--Arden-Arcade, CA",,1,3
4,1970-01-01 00:25:43.597383,16,paid,,,1076,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...","Birmingham-Hoover, AL",,1,4
