In [1]:
import pandas as pd
import datetime
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import boto3
import configparser

## Setup Spark

In [2]:
config = configparser.ConfigParser()
config.read('dl.cfg')
udacity_bucket_name = "udacity-dend"
aws_region_name = config.get("AWS", "REGION")
aws_access_key_id = config.get("AWS", "ACCESS_KEY_ID")
aws_access_key_secret = config.get("AWS", "SECRET_ACCESS_KEY")
s3 = boto3.resource(
    "s3", region_name=aws_region_name,
    aws_access_key_id=aws_access_key_id,
    aws_secret_access_key=aws_access_key_secret
)
bucket = s3.Bucket(udacity_bucket_name)
spark = SparkSession.builder \
    .master("local") \
    .appName("udacity_dend_p4") \
    .config("spark.sql.repl.eagerEval.enabled", True) \
    .config("mapreduce.input.fileinputformat.input.dir.recursive", True) \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.hadoop.com.amazonaws.services.s3.enableV4", "true") \
    .config("spark.hadoop.fs.s3a.access.key", aws_access_key_id) \
    .config("spark.hadoop.fs.s3a.secret.key", aws_access_key_secret) \
    .config("spark.hadoop.fs.s3a.connection.maximum", "100000") \
    .getOrCreate()

spark

### Read Log Data

Reading all log data for analysis the data is filter by page = "NextSong" since that is the only data that is used in the Data Warehouse.

In [3]:
df_events = spark.read.json("data/input/log-data/*.json")
df_events.limit(5)

artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
Harmonia,Logged In,Ryan,M,0,Smith,655.77751,free,San Jose-Sunnyval...,PUT,NextSong,1541016707796.0,583,Sehr kosmisch,200,1542241826796,"""Mozilla/5.0 (X11...",26
The Prodigy,Logged In,Ryan,M,1,Smith,260.07465,free,San Jose-Sunnyval...,PUT,NextSong,1541016707796.0,583,The Big Gundown,200,1542242481796,"""Mozilla/5.0 (X11...",26
Train,Logged In,Ryan,M,2,Smith,205.45261,free,San Jose-Sunnyval...,PUT,NextSong,1541016707796.0,583,Marry Me,200,1542242741796,"""Mozilla/5.0 (X11...",26
,Logged In,Wyatt,M,0,Scott,,free,Eureka-Arcata-For...,GET,Home,1540872073796.0,563,,200,1542247071796,Mozilla/5.0 (Wind...,9
,Logged In,Austin,M,0,Rosales,,free,New York-Newark-J...,GET,Home,1541059521796.0,521,,200,1542252577796,Mozilla/5.0 (Wind...,12


#### Data Schema

In [4]:
df_events.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



#### Check for null data

In [5]:
print("Null checking:")
display(df_events.select([F.count(F.when(F.isnull(c), c)).alias(c) for c in df_events.columns]).toPandas())
print("Nan checking:")
display(df_events.select([F.count(F.when(F.isnan(c), c)).alias(c) for c in df_events.columns]).toPandas())

Null checking:


Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,1236,0,286,286,0,286,1236,0,286,0,0,286,0,1236,0,0,286,0


Nan checking:


Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0


#### Check for nan data

### Find page that are related to playing songs

In [6]:
df_events.groupBy(["page"]).count().orderBy(F.col("count").desc())

page,count
NextSong,6820
Home,806
Login,92
Logout,90
Downgrade,60
Settings,56
Help,47
About,36
Upgrade,21
Save Settings,10


#### Remove all non song playing records

In [7]:
df_staging_events = df_events.where(df_events.page=="NextSong")
df_staging_events.limit(5)

artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
Harmonia,Logged In,Ryan,M,0,Smith,655.77751,free,San Jose-Sunnyval...,PUT,NextSong,1541016707796.0,583,Sehr kosmisch,200,1542241826796,"""Mozilla/5.0 (X11...",26
The Prodigy,Logged In,Ryan,M,1,Smith,260.07465,free,San Jose-Sunnyval...,PUT,NextSong,1541016707796.0,583,The Big Gundown,200,1542242481796,"""Mozilla/5.0 (X11...",26
Train,Logged In,Ryan,M,2,Smith,205.45261,free,San Jose-Sunnyval...,PUT,NextSong,1541016707796.0,583,Marry Me,200,1542242741796,"""Mozilla/5.0 (X11...",26
Sony Wonder,Logged In,Samuel,M,0,Gonzalez,218.06975,free,Houston-The Woodl...,PUT,NextSong,1540492941796.0,597,Blackbird,200,1542253449796,"""Mozilla/5.0 (Mac...",61
Van Halen,Logged In,Tegan,F,2,Levine,289.38404,paid,Portland-South Po...,PUT,NextSong,1540794356796.0,602,Best Of Both Worl...,200,1542260935796,"""Mozilla/5.0 (Mac...",80


In [8]:
df_staging_events.where(df_staging_events.userId == "29")
df_staging_events.limit(50)

artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
Harmonia,Logged In,Ryan,M,0,Smith,655.77751,free,San Jose-Sunnyval...,PUT,NextSong,1541016707796.0,583,Sehr kosmisch,200,1542241826796,"""Mozilla/5.0 (X11...",26
The Prodigy,Logged In,Ryan,M,1,Smith,260.07465,free,San Jose-Sunnyval...,PUT,NextSong,1541016707796.0,583,The Big Gundown,200,1542242481796,"""Mozilla/5.0 (X11...",26
Train,Logged In,Ryan,M,2,Smith,205.45261,free,San Jose-Sunnyval...,PUT,NextSong,1541016707796.0,583,Marry Me,200,1542242741796,"""Mozilla/5.0 (X11...",26
Sony Wonder,Logged In,Samuel,M,0,Gonzalez,218.06975,free,Houston-The Woodl...,PUT,NextSong,1540492941796.0,597,Blackbird,200,1542253449796,"""Mozilla/5.0 (Mac...",61
Van Halen,Logged In,Tegan,F,2,Levine,289.38404,paid,Portland-South Po...,PUT,NextSong,1540794356796.0,602,Best Of Both Worl...,200,1542260935796,"""Mozilla/5.0 (Mac...",80
Magic Sam,Logged In,Tegan,F,3,Levine,132.04853,paid,Portland-South Po...,PUT,NextSong,1540794356796.0,602,Call Me If You Ne...,200,1542261224796,"""Mozilla/5.0 (Mac...",80
Edward Sharpe & T...,Logged In,Tegan,F,4,Levine,306.31138,paid,Portland-South Po...,PUT,NextSong,1540794356796.0,602,Home,200,1542261356796,"""Mozilla/5.0 (Mac...",80
Usher featuring w...,Logged In,Tegan,F,5,Levine,395.72853,paid,Portland-South Po...,PUT,NextSong,1540794356796.0,602,OMG,200,1542261662796,"""Mozilla/5.0 (Mac...",80
Helen Reddy,Logged In,Tegan,F,7,Levine,176.50893,paid,Portland-South Po...,PUT,NextSong,1540794356796.0,602,Candle On The Water,200,1542262057796,"""Mozilla/5.0 (Mac...",80
Taylor Swift,Logged In,Tegan,F,8,Levine,201.06404,paid,Portland-South Po...,PUT,NextSong,1540794356796.0,602,Our Song,200,1542262233796,"""Mozilla/5.0 (Mac...",80


### users dimension table

In [9]:
df_user = df_staging_events.select(
    "userId", "firstName", "lastName", "gender", "level"
).orderBy(F.col("ts").desc())
df_user.limit(30)

userId,firstName,lastName,gender,level
5,Elijah,Davis,M,free
16,Rylan,George,M,paid
16,Rylan,George,M,paid
16,Rylan,George,M,paid
16,Rylan,George,M,paid
49,Chloe,Cuevas,F,paid
49,Chloe,Cuevas,F,paid
16,Rylan,George,M,paid
49,Chloe,Cuevas,F,paid
16,Rylan,George,M,paid


In [10]:
df_user = df_user.dropDuplicates(("userId", "firstName", "lastName", "gender"))
df_user.limit(20)

userId,firstName,lastName,gender,level
52,Theodore,Smith,M,free
48,Marina,Sutton,F,free
47,Kimber,Norris,F,free
25,Jayden,Graves,M,paid
87,Dustin,Lee,M,free
65,Amiya,Davidson,F,paid
10,Sylvie,Cruz,F,free
84,Shakira,Hunt,F,free
71,Ayleen,Wise,F,free
43,Jahiem,Miles,M,free


#### Check for null data

In [11]:
print("Null checking:")
display(df_user.select([F.count(F.when(F.isnull(c), c)).alias(c) for c in df_user.columns]).toPandas())
print("Nan checking:")
display(df_user.select([F.count(F.when(F.isnan(c), c)).alias(c) for c in df_user.columns]).toPandas())

Null checking:


Unnamed: 0,userId,firstName,lastName,gender,level
0,0,0,0,0,0


Nan checking:


Unnamed: 0,userId,firstName,lastName,gender,level
0,0,0,0,0,0


#### Check Data Types

In [12]:
df_user.printSchema()

root
 |-- userId: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- lastName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- level: string (nullable = true)



#### Check for duplicates

In [13]:
df_user.groupBy("userId").count().orderBy(F.col("count").desc()).limit(10)

userId,count
51,1
7,1
15,1
54,1
101,1
11,1
69,1
29,1
42,1
87,1


In [14]:
df_user = df_user.dropDuplicates()
df_user.groupBy("userId").count().orderBy(F.col("count").desc()).limit(10)

userId,count
51,1
7,1
15,1
54,1
101,1
11,1
69,1
29,1
42,1
87,1


In [15]:
df_user.where(df_user.userId == 15).limit(5)

userId,firstName,lastName,gender,level
15,Lily,Koch,F,paid


### time dimension table

In [16]:
df_time = df_staging_events.select("ts")
df_time.limit(5)

ts
1542241826796
1542242481796
1542242741796
1542253449796
1542260935796


#### Check for null data

In [17]:
print("Null checking:")
display(df_time.select([F.count(F.when(F.isnull(c), c)).alias(c) for c in df_time.columns]).toPandas())
print("Nan checking:")
display(df_time.select([F.count(F.when(F.isnan(c), c)).alias(c) for c in df_time.columns]).toPandas())

Null checking:


Unnamed: 0,ts
0,0


Nan checking:


Unnamed: 0,ts
0,0


#### Check for duplicates

In [18]:
df_time.groupBy("ts").count().orderBy(F.col("count").desc()).limit(10)

ts,count
1542308104796,2
1543339730796,2
1542171216796,2
1543435163796,2
1543069787796,2
1542984111796,2
1543422975796,2
1542795222796,1
1542798065796,1
1542825241796,1


### songs staging table

In [19]:
df_song_staging = spark.read.json("data/input/song_data/*/*/*")
df_song_staging.limit(5)

artist_id,artist_latitude,artist_location,artist_longitude,artist_name,duration,num_songs,song_id,title,year
ARDR4AC1187FB371A1,,,,Montserrat Caball...,511.16363,1,SOBAYLL12A8C138AF9,Sono andati? Fing...,0
AREBBGV1187FB523D2,,"Houston, TX",,Mike Jones (Featu...,173.66159,1,SOOLYAZ12A6701F4A6,Laws Patrolling (...,0
ARMAC4T1187FB3FA4C,40.82624,"Morris Plains, NJ",-74.47995,The Dillinger Esc...,207.77751,1,SOBBUGU12A8C13E95D,Setting Fire to S...,2004
ARPBNLO1187FB3D52F,40.71455,"New York, NY",-74.00712,Tiny Tim,43.36281,1,SOAOIBZ12AB01815BE,I Hold Your Hand ...,2000
ARDNS031187B9924F0,32.67828,Georgia,-83.22295,Tim Wilson,186.48771,1,SONYPOM12A8C13B2D7,I Think My Wife I...,2005


#### Check for null data

In [20]:
print("Null checking:")
display(df_song_staging.select([F.count(F.when(F.isnull(c), c)).alias(c) for c in df_song_staging.columns]).toPandas())
print("Nan checking:")
display(df_song_staging.select([F.count(F.when(F.isnan(c), c)).alias(c) for c in df_song_staging.columns]).toPandas())

Null checking:


Unnamed: 0,artist_id,artist_latitude,artist_location,artist_longitude,artist_name,duration,num_songs,song_id,title,year
0,0,40,0,40,0,0,0,0,0,0


Nan checking:


Unnamed: 0,artist_id,artist_latitude,artist_location,artist_longitude,artist_name,duration,num_songs,song_id,title,year
0,0,0,0,0,0,0,0,0,0,0


#### Check Data Types

In [21]:
df_song_staging.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



### songs dimension table

In [63]:
df_song = df_song_staging.select("song_id", "title", "artist_id", "year", "duration").dropDuplicates()
df_song.limit(5)

song_id,title,artist_id,year,duration
SOGOSOV12AF72A285E,¿Dónde va Chichi?,ARGUVEV1187B98BA17,1997,313.12934
SOTTDKS12AB018D69B,It Wont Be Christmas,ARMBR4Y1187B9990EB,0,241.47546
SOBBUGU12A8C13E95D,Setting Fire to S...,ARMAC4T1187FB3FA4C,2004,207.77751
SOIAZJW12AB01853F1,Pink World,AR8ZCNI1187B9A069B,1984,269.81832
SONYPOM12A8C13B2D7,I Think My Wife I...,ARDNS031187B9924F0,2005,186.48771


#### Check for null data

In [64]:
print("Null checking:")
display(df_song.select([F.count(F.when(F.isnull(c), c)).alias(c) for c in df_song.columns]).toPandas())
print("Nan checking:")
display(df_song.select([F.count(F.when(F.isnan(c), c)).alias(c) for c in df_song.columns]).toPandas())

Null checking:


Unnamed: 0,song_id,title,artist_id,year,duration
0,0,0,0,0,0


Nan checking:


Unnamed: 0,song_id,title,artist_id,year,duration
0,0,0,0,0,0


#### Check Data Types

In [65]:
df_song.printSchema()

root
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- year: long (nullable = true)
 |-- duration: double (nullable = true)



#### Check for duplicates

In [66]:
df_song.groupBy("song_id").count().orderBy(F.col("count").desc())

song_id,count
SOGOSOV12AF72A285E,1
SOMZWCG12A8C13C480,1
SOUPIRU12A6D4FA1E1,1
SOXVLOJ12AB0189215,1
SOWTBJW12AC468AC6E,1
SOBONFF12A6D4F84D8,1
SOPVXLX12A8C1402D5,1
SOAOIBZ12AB01815BE,1
SOBKWDJ12A8C13B2F3,1
SONSKXP12A8C13A2C9,1


### artists dimension table

In [58]:
df_artist = df_song_staging.select(
    df_song_staging.artist_id,
    df_song_staging.artist_name.alias('name'),
    df_song_staging.artist_location.alias('location'),
    df_song_staging.artist_latitude.alias('latitude'),
    df_song_staging.artist_longitude.alias('longitude')    
).dropDuplicates()
df_artist.limit(5)


artist_id,name,location,latitude,longitude
ARPBNLO1187FB3D52F,Tiny Tim,"New York, NY",40.71455,-74.00712
ARXR32B1187FB57099,Gob,,,
AROGWRA122988FEE45,Christos Dantis,,,
ARBGXIG122988F409D,Steel Rain,California - SF,37.77916,-122.42005
AREVWGE1187B9B890A,Bitter End,Noci (BA),-13.442,-41.9952


#### Check for null data

In [59]:
print("Null checking:")
display(df_artist.select([F.count(F.when(F.isnull(c), c)).alias(c) for c in df_artist.columns]).toPandas())
print("Nan checking:")
display(df_artist.select([F.count(F.when(F.isnan(c), c)).alias(c) for c in df_artist.columns]).toPandas())

Null checking:


Unnamed: 0,artist_id,name,location,latitude,longitude
0,0,0,0,38,38


Nan checking:


Unnamed: 0,artist_id,name,location,latitude,longitude
0,0,0,0,0,0


#### Check Data Types

In [60]:
df_artist.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- location: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)



#### Check for duplicates

In [61]:
df_artist.groupBy("artist_id").count().orderBy(F.col("count").desc())

artist_id,count
AR9AWNF1187B9AB0B4,1
AR0IAWL1187B9A96D0,1
AREDL271187FB40F44,1
AR0RCMP1187FB3F427,1
ARI3BMM1187FB4255E,1
AR7SMBG1187B9B9066,1
ARMAC4T1187FB3FA4C,1
ARNTLGG11E2835DDB9,1
ARKRRTF1187B9984DA,1
AR051KA1187B98B2FF,1
