## Importing Libraries

In [1]:
from pyspark.sql import SparkSession
import pandas as pd
import configparser
import zipfile
import os
import glob
from pyspark.sql.functions import *

## Loading AWS Credentials as Environment Variable

In [2]:
config = configparser.ConfigParser()

config.read('dl-Copy1.cfg')

os.environ["AWS_ACCESS_KEY_ID"]= config['AWS']['AWS_ACCESS_KEY_ID']

os.environ["AWS_SECRET_ACCESS_KEY"]= config['AWS']['AWS_SECRET_ACCESS_KEY']

## Initializing Spark Session

In [3]:
spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()

In [None]:
spark.sparkContext.getConf().getAll()

In [4]:
input_path = 's3a://udacity-dend/'
#s3a://udacity-dend/song_data
output_path = 's3a://barakbucket/'

In [5]:
log_path = 'log-data/2018/11/2018-11-03-events.json'
log_data_path = os.path.join(input_path, log_path)
log_data_path

's3a://udacity-dend/log-data/2018/11/2018-11-03-events.json'

In [6]:
song_data = 'song_data/A/A/A/*.json'
song_data_path = os.path.join(input_path, song_data)
song_data_path

's3a://udacity-dend/song_data/A/A/A/*.json'

## Reading Sample Song and Log Data

In [22]:
## Using Sample Zip file locally
path = 'data/log-data.zip'

with zipfile.ZipFile(path,"r") as zip_ref:
    zip_ref.extractall("data/2018-11-23-events")

df_songlog = spark.read.json('data/2018-11-23-events/*.json')

In [6]:
path2 = 'data/song-data.zip'

with zipfile.ZipFile(path2,'r') as zip_ref2:
    zip_ref2.extractall('data/song_data')
df_songdata = spark.read.json('data/song_data/A/A/A/*.json')

In [7]:
## Using S3 data
df_songlog = spark.read.json(log_data_path)
df_songdata = spark.read.json(song_data_path)

## Infering and Viewing Schema

In [8]:
df_songlog.printSchema()
df_songlog.count()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



111

In [9]:
df_songdata.printSchema()
df_songdata.count()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



24

## Viewing sample data with Pandas dataframe

In [9]:
df_songlog.limit(5).toPandas()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,Harmonia,Logged In,Ryan,M,0,Smith,655.77751,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Sehr kosmisch,200,1542241826796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
1,The Prodigy,Logged In,Ryan,M,1,Smith,260.07465,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,The Big Gundown,200,1542242481796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
2,Train,Logged In,Ryan,M,2,Smith,205.45261,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Marry Me,200,1542242741796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
3,,Logged In,Wyatt,M,0,Scott,,free,"Eureka-Arcata-Fortuna, CA",GET,Home,1540872000000.0,563,,200,1542247071796,Mozilla/5.0 (Windows NT 6.1; WOW64; Trident/7....,9
4,,Logged In,Austin,M,0,Rosales,,free,"New York-Newark-Jersey City, NY-NJ-PA",GET,Home,1541060000000.0,521,,200,1542252577796,Mozilla/5.0 (Windows NT 6.1; rv:31.0) Gecko/20...,12


In [27]:
df_songdata.limit(5).toPandas()

Unnamed: 0,artist_id,artist_latitude,artist_location,artist_longitude,artist_name,duration,num_songs,song_id,title,year
0,ARKFYS91187B98E58F,,,,Jeff And Sheri Easter,267.7024,1,SOYMRWW12A6D4FAB14,The Moon And I (Ordinary Day Album Version),0
1,AR10USD1187B99F3F1,,"Burlington, Ontario, Canada",,Tweeterfriendly Music,189.57016,1,SOHKNRJ12A6701D1F8,Drop of Rain,0
2,ARGSJW91187B9B1D6B,35.21962,North Carolina,-80.01955,JennyAnyKind,218.77506,1,SOQHXMF12AB0182363,Young Boy Blues,0
3,ARMJAGH1187FB546F3,35.14968,"Memphis, TN",-90.04892,The Box Tops,148.03546,1,SOCIWDW12A8C13D406,Soul Deep,1969
4,AR7G5I41187FB4CE6C,,"London, England",,Adam Ant,233.40363,1,SONHOTT12A8C13493C,Something Girls,1982


## Dimention Tables
### Creating a View

In [10]:
df_songlog.createOrReplaceTempView('song_log')
df_songdata.createOrReplaceTempView('song_data')

## Users table

In [32]:
users_table = spark.sql("""
                    SELECT DISTINCT
                        userId,
                        firstName,
                        lastName,
                        gender,
                        level
                        FROM song_log           
""").limit(5).toPandas()

users_table

Unnamed: 0,userId,firstName,lastName,gender,level
0,98,Jordyn,Powell,F,free
1,34,Evelin,Ayala,F,free
2,85,Kinsley,Young,F,paid
3,38,Gianna,Jones,F,free
4,85,Kinsley,Young,F,free


## Songs table

In [34]:
songs_table = spark.sql("""
                SELECT DISTINCT
                    song_id,
                    title,
                    artist_id,
                    year,
                    duration
                    FROM song_data
""").limit(5).toPandas()

songs_table

Unnamed: 0,song_id,title,artist_id,year,duration
0,SOYMRWW12A6D4FAB14,The Moon And I (Ordinary Day Album Version),ARKFYS91187B98E58F,0,267.7024
1,SOXVLOJ12AB0189215,Amor De Cabaret,ARKRRTF1187B9984DA,0,177.47546
2,SOHKNRJ12A6701D1F8,Drop of Rain,AR10USD1187B99F3F1,0,189.57016
3,SOQHXMF12AB0182363,Young Boy Blues,ARGSJW91187B9B1D6B,0,218.77506
4,SOFSOCN12A8C143F5D,Face the Ashes,ARXR32B1187FB57099,2007,209.60608


## Artists table

In [39]:
artists_table = spark.sql("""SELECT DISTINCT
                         artist_id,
                         artist_name,
                         artist_location,
                         artist_latitude,
                         artist_longitude
                         FROM song_data
""").limit(5).toPandas()

artists_table

Unnamed: 0,artist_id,artist_name,artist_location,artist_latitude,artist_longitude
0,ARD0S291187B9B7BF5,Rated R,Ohio,,
1,ARKRRTF1187B9984DA,Sonora Santanera,,,
2,ARXR32B1187FB57099,Gob,,,
3,ARGSJW91187B9B1D6B,JennyAnyKind,North Carolina,35.21962,-80.01955
4,ARMJAGH1187FB546F3,The Box Tops,"Memphis, TN",35.14968,-90.04892


## Time table

### Cast timestamp to date type

In [None]:
dfa = df_songlog.withColumn('date_type', to_timestamp(df_songlog.ts/1000).cast('date'))

dfa.limit(5).toPandas()

### Create updated temp view and required time table

In [35]:
dfa.createOrReplaceTempView('song_log')

time_table = spark.sql("""
                SELECT DISTINCT
                    date_type AS start_time,
                    hour(date_type) AS hour,
                    day(date_type) AS day,
                    weekofyear(date_type) AS week,
                    month(date_type) AS month,
                    year(date_type) AS year,
                    date_format(date_type, "EEEE") AS weekday
                FROM song_log
                """)
time_table.limit(5).toPandas()

Unnamed: 0,start_time,hour,day,week,month,year,weekday
0,2018-11-08,0,8,45,11,2018,Thursday
1,2018-11-05,0,5,45,11,2018,Monday
2,2018-11-02,0,2,44,11,2018,Friday
3,2018-11-06,0,6,45,11,2018,Tuesday
4,2018-11-09,0,9,45,11,2018,Friday


## Facts Table
### Songplay Table

In [39]:
songplay_table = spark.sql("""
                    SELECT DISTINCT
                        monotonically_increasing_id() AS songplay_id,
                        L.date_type AS start_time,
                        L.userId,
                        L.level,
                        S.song_id,
                        S.artist_id,
                        sessionId,
                        L.location,
                        L.userAgent
                        FROM song_log AS L
                            JOIN song_data AS S
                                ON (S.artist_name = L.artist) AND (S.title = L.song) AND (S.duration = L.length)
                        WHERE page='NextSong'     
""").limit(5).toPandas()
songplay_table

Unnamed: 0,songplay_id,start_time,userId,level,song_id,artist_id,sessionId,location,userAgent
