# Explorating and Prototyping

This notebook should be used for Exploration and Prototyping the solution before moving to the cloud

In [1]:
# importing pyspark in order to find spark on my local computer
import findspark
findspark.init()

In [63]:
# importing all necessary packages
import pandas as pd
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
import pyspark.sql.functions as F
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType, FloatType, DoubleType
from pyspark.sql import DataFrame

## 1. Reading the data

In [3]:
# creating a SparkSession
spark = SparkSession.builder.appName('DEND_Project_4_DL_Spark').getOrCreate()

In [4]:
# Setting the paths where the files are located (both log and sond are in JSON)
path_log = './data/log-data'
path_song = './data/song_data'

In [5]:
# reading the log-data into a spark dataframe
df_log = spark.read.json(path_log)

In [6]:
# checking the first 5 rows in a understandble way (using pandas for it)
df_log.limit(5).toPandas()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,Harmonia,Logged In,Ryan,M,0,Smith,655.77751,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Sehr kosmisch,200,1542241826796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
1,The Prodigy,Logged In,Ryan,M,1,Smith,260.07465,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,The Big Gundown,200,1542242481796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
2,Train,Logged In,Ryan,M,2,Smith,205.45261,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Marry Me,200,1542242741796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
3,,Logged In,Wyatt,M,0,Scott,,free,"Eureka-Arcata-Fortuna, CA",GET,Home,1540872000000.0,563,,200,1542247071796,Mozilla/5.0 (Windows NT 6.1; WOW64; Trident/7....,9
4,,Logged In,Austin,M,0,Rosales,,free,"New York-Newark-Jersey City, NY-NJ-PA",GET,Home,1541060000000.0,521,,200,1542252577796,Mozilla/5.0 (Windows NT 6.1; rv:31.0) Gecko/20...,12


In [7]:
# checking the number of observations present in log-data
df_log.count()

8056

In [8]:
df_log.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [22]:
# same process for song data
df_song = spark.read.json(path_song)

AnalysisException: Unable to infer schema for JSON. It must be specified manually.;

As the error message reports, the schema must be manually declared using spark types and structure.

In [40]:
schema = StructType([StructField('num_songs', IntegerType()),
                     StructField('artist_id', StringType()),
                     StructField('artist_latitude', FloatType()),
                     StructField('artist_longitude', FloatType()),
                     StructField('artist_location', StringType()),
                     StructField('artist_name', StringType()),
                     StructField('song_id', StringType()),
                     StructField('title', StringType()),
                     StructField('duration', DoubleType()),
                     StructField('year', IntegerType())
                     ])

In [41]:
# reading the song-data properly
df_song = spark.read.option("recursiveFileLookup","true").json(path_song, schema = schema)

In [42]:
# checking the dataset
df_song.limit(5).toPandas()

Unnamed: 0,num_songs,artist_id,artist_latitude,artist_longitude,artist_location,artist_name,song_id,title,duration,year
0,1,ARDR4AC1187FB371A1,,,,Montserrat Caballé;Placido Domingo;Vicente Sar...,SOBAYLL12A8C138AF9,Sono andati? Fingevo di dormire,511.16363,0
1,1,AREBBGV1187FB523D2,,,"Houston, TX",Mike Jones (Featuring CJ_ Mello & Lil' Bran),SOOLYAZ12A6701F4A6,Laws Patrolling (Album Version),173.66159,0
2,1,ARMAC4T1187FB3FA4C,40.826241,-74.47995,"Morris Plains, NJ",The Dillinger Escape Plan,SOBBUGU12A8C13E95D,Setting Fire to Sleeping Giants,207.77751,2004
3,1,ARPBNLO1187FB3D52F,40.71455,-74.007118,"New York, NY",Tiny Tim,SOAOIBZ12AB01815BE,I Hold Your Hand In Mine [Live At Royal Albert...,43.36281,2000
4,1,ARDNS031187B9924F0,32.67828,-83.222954,Georgia,Tim Wilson,SONYPOM12A8C13B2D7,I Think My Wife Is Running Around On Me (Taco ...,186.48771,2005


In [43]:
# checking the dataset
df_song.printSchema()

root
 |-- num_songs: integer (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: float (nullable = true)
 |-- artist_longitude: float (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- year: integer (nullable = true)



In [45]:
# checking the number of observations present in song-data
df_song.count()

71

**Everything looks great in reading phase now!**

## 2. Creating the Fact Table

In [28]:
# casting ts to a datetime format
df_log = df_log.withColumn('ts', F.to_timestamp(df_log.ts / 1000))

In [83]:
columns= ['ts as start_time', 
          'userId as user_id', 
          'level', 
          'song_id', 
          'artist_id',
          'sessionId as session_id', 
          'location',
          'userAgent as user_agent']

condition = [df_log.song == df_song.title, df_log.length == df_song.duration, df_log.artist == df_song.artist_name]

songplay = df_log.join(df_song, on = condition, how = 'left_outer').selectExpr(*columns)

In [84]:
songplay.show()

+--------------------+-------+-----+-------+---------+----------+--------------------+--------------------+
|          start_time|user_id|level|song_id|artist_id|session_id|            location|          user_agent|
+--------------------+-------+-----+-------+---------+----------+--------------------+--------------------+
|2018-11-14 22:30:...|     26| free|   null|     null|       583|San Jose-Sunnyval...|"Mozilla/5.0 (X11...|
|2018-11-14 22:41:...|     26| free|   null|     null|       583|San Jose-Sunnyval...|"Mozilla/5.0 (X11...|
|2018-11-14 22:45:...|     26| free|   null|     null|       583|San Jose-Sunnyval...|"Mozilla/5.0 (X11...|
|2018-11-14 23:57:...|      9| free|   null|     null|       563|Eureka-Arcata-For...|Mozilla/5.0 (Wind...|
|2018-11-15 01:29:...|     12| free|   null|     null|       521|New York-Newark-J...|Mozilla/5.0 (Wind...|
|2018-11-15 01:44:...|     61| free|   null|     null|       597|Houston-The Woodl...|"Mozilla/5.0 (Mac...|
|2018-11-15 01:44:...|     6