In [7]:
from pyspark.sql import SparkSession, functions as F
import os
import configparser

In [2]:
config = configparser.ConfigParser()

config.read_file(open('credentials'))
os.environ["AWS_ACCESS_KEY_ID"] = config.get('sparkifyuser',
                                             'AWS_ACCESS_KEY_ID')
os.environ["AWS_SECRET_ACCESS_KEY"] = config.get('sparkifyuser',
                                             'AWS_SECRET_ACCESS_KEY')

In [None]:
input_data = "s3a://udacity-dend"
output_data = "s3a://sparkify-bucket"

In [3]:
spark = SparkSession.builder\
                     .config("spark.jars.packages",
                             "org.apache.hadoop:hadoop-aws:2.7.0")\
                     .getOrCreate()

In [4]:
song_path = './data/song_data/*/*/*/*.json'
song_data = spark.read.json(song_path)
song_data.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



In [5]:
log_path = './data/log-data/*.json'
log_data = spark.read.json(log_path)
log_data.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [8]:
time_data = log_data.withColumn('start_time', 
                                F.from_unixtime(F.col('ts')/1000))
time_data = time_data.select('ts', 'start_time') \
               .withColumn('year', F.year('start_time')) \
               .withColumn('month', F.month('start_time')) \
               .withColumn('week', F.weekofyear('start_time')) \
               .withColumn('weekday', F.dayofweek('start_time')) \
               .withColumn('day', F.dayofyear('start_time')) \
               .withColumn('hour', F.hour('start_time')).dropDuplicates()
time_data.printSchema()

root
 |-- ts: long (nullable = true)
 |-- start_time: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- weekday: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- hour: integer (nullable = true)



In [9]:
song_data.createOrReplaceTempView('song_data')
log_data.createOrReplaceTempView('log_data')
time_data.createOrReplaceTempView('time_data')

In [10]:
songplays_table = spark.sql("""SELECT DISTINCT
                                       l.ts as ts,
                                       t.year as year,
                                       t.month as month,
                                       l.userId as user_id,
                                       l.level as level,
                                       s.song_id as song_id,
                                       s.artist_id as artist_id,
                                       l.sessionId as session_id,
                                       s.artist_location as artist_location,
                                       l.userAgent as user_agent
                                   FROM song_data s
                                   JOIN log_data l
                                       ON s.artist_name = l.artist
                                       AND s.title = l.song
                                       AND s.duration = l.length
                                   JOIN time_data t
                                       ON t.ts = l.ts
                                   """)

In [11]:
songplays_table.count()

1

In [12]:
songplays_table.show(1, vertical=True)

-RECORD 0-------------------------------
 ts              | 1542837407796        
 year            | 2018                 
 month           | 11                   
 user_id         | 15                   
 level           | paid                 
 song_id         | SOZCTXZ12AB0182364   
 artist_id       | AR5KOSW1187FB35FF4   
 session_id      | 818                  
 artist_location | Dubai UAE            
 user_agent      | "Mozilla/5.0 (X11... 

