In [1]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, dayofweek, date_format

In [2]:
config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']

In [3]:
input_data = "s3a://udacity-dend/"
# output_data = "s3a://sparkify-bucket2/"

In [4]:
spark = SparkSession \
    .builder \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
    .getOrCreate()

# songs dataset

In [5]:
# using only a subset from the S3 bucket
song_data = input_data + "song_data/A/A/A/*.json"    # './data/song_data/*/*/*/*.json' for using local files
song_df = spark.read.json(song_data)
song_df.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



In [6]:
songs_table = song_df.select("song_id", "title", "artist_id", "year", "duration").dropDuplicates()

# using spark sql instead of dataframes
#     song_df.createOrReplace("song_df")
#     songs_table = spark.sql("""
#                                SELECT song_id, title, artist_id, year, duration
#                                FROM song_df
#                                WHERE song_id IS NOT NULL
#                             """)
    
songs_table.show(1, vertical=True)

-RECORD 0-------------------------
 song_id   | SODZYPO12A8C13A91E   
 title     | Burn My Body (Alb... 
 artist_id | AR1C2IX1187B99BF74   
 year      | 0                    
 duration  | 177.99791            
only showing top 1 row



In [8]:
artists_table = song_df.select("artist_id", "artist_name", "artist_location", "artist_latitude", "artist_longitude").dropDuplicates()
artists_table.show(1, vertical=True)

-RECORD 0------------------------------
 artist_id        | ARC1IHZ1187FB4E920 
 artist_name      | Jamie Cullum       
 artist_location  |                    
 artist_latitude  | null               
 artist_longitude | null               
only showing top 1 row



# log dataset

In [9]:
log_data = input_data + "log_data/*/*/*.json"          # './data/log-data/*.json' for using local data
log_df = spark.read.json(log_data)
log_df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [10]:
# user-defined functions to extract date & time fields from the original timestamp field

get_timestamp = udf(lambda x: datetime.fromtimestamp(x/1000).strftime('%Y-%m-%d %H:%M:%S'))
log_df = log_df.withColumn('start_time', get_timestamp(log_df.ts))

# create datetime column from original timestamp column
get_date = udf(lambda x: datetime.fromtimestamp(x/1000).strftime('%Y-%m-%d'))
log_df = log_df.withColumn('date', get_date(log_df.ts))

log_df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- start_time: string (nullable = true)
 |-- date: string (nullable = true)



In [11]:
users_table = log_df.select('userId', 'firstName', 'lastName','gender', 'level').dropDuplicates()
users_table.show(1, vertical=True)

-RECORD 0--------------
 userId    | 57        
 firstName | Katherine 
 lastName  | Gay       
 gender    | F         
 level     | free      
only showing top 1 row



In [12]:
time_table = log_df.select('start_time').withColumn('year', year(col('start_time'))) \
                                        .withColumn('month', month(col('start_time'))) \
                                        .withColumn('week', weekofyear(col('start_time'))) \
                                        .withColumn('weekday', date_format(col('start_time'),'E')) \
                                        .withColumn('day', dayofmonth(col('start_time'))) \
                                        .withColumn('hour', hour(col('start_time')))\
                                    .dropDuplicates() 
time_table.show(1, vertical=True)

-RECORD 0-------------------------
 start_time | 2018-11-15 16:37:27 
 year       | 2018                
 month      | 11                  
 week       | 46                  
 weekday    | Thu                 
 day        | 15                  
 hour       | 16                  
only showing top 1 row



# songplays table

In [13]:
song_df.createOrReplaceTempView('song_df')
log_df.createOrReplaceTempView('log_df')
time_table.createOrReplaceTempView('time_table')

In [22]:
songplays_table =spark.sql("""SELECT DISTINCT  
                                     t.start_time,
                                     t.year as year,
                                     t.month as month,
                                     l.userId, 
                                     l.level, 
                                     s.song_id,
                                     s.artist_id, 
                                     l.sessionid, 
                                     s.artist_location,
                                     l.useragent
                            FROM song_df s
                            JOIN log_df l
                                 ON s.artist_name = l.artist
                                 AND s.title = l.song
                                 AND s.duration = l.length
                            JOIN time_table t
                                 ON t.start_time = l.start_time
                     """).dropDuplicates()

In [23]:
songplays_table.show(1, vertical=True)

(0 rows)



In [24]:
songplays_table.printSchema()

root
 |-- start_time: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- userId: string (nullable = true)
 |-- level: string (nullable = true)
 |-- song_id: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- sessionid: long (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- useragent: string (nullable = true)

