**Schema for Song Play Analysis**

Using the song and log datasets, you'll need to create a star schema optimized for queries on song play analysis. This includes the following tables.

**Fact Table**

+ songplays - records with page NextSong
songplay_id, start_time, user_id, level, song_id, artist_id, session_id, location, user_agent

**Dimension Tables**
+ users - user_id, first_name, last_name, gender, level
+ songs - song_id, title, artist_id, year, duration
+ artists - artist_id, name, location, lattitude, longitude
+ time - start_time, hour, day, week, month, year, weekday

In [1]:
import pandas as pd
pd.set_option('max_colwidth', 200)

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .master("local") \
    .appName("sparkify_etl") \
    .getOrCreate()

In [3]:
# create spark context
sc = spark.sparkContext

In [4]:
spark

In [5]:
# need to replace with S3 eventually
path1 = 's3/input/log-data.zip'
path2 = 's3/input/song-data.zip'

In [6]:
from zipfile import ZipFile
# Create a ZipFile Object and load sample.zip in it
with ZipFile(path1, 'r') as zipObj:
   # Extract all the contents of zip file in current directory
   zipObj.extractall('s3/input/log-data')

In [7]:
with ZipFile(path2, 'r') as zipObj:
   # Extract all the contents of zip file in current directory
   zipObj.extractall('s3/input/song-data')

In [8]:
#log_path = 'data/log-data/*'
#log_df =spark.read.json(log_path)
#log_df.printSchema()

## Song Data

In [9]:
# get filepath to song data file
song_data = 's3/input/song-data/*/*/*/*'


In [10]:
from pyspark.sql.types import StructType as R, StructField as Fld, DoubleType as Dbl, StringType as Str, IntegerType as Int, DateType as Date
song_schema = R([Fld("artist", Str()),
                     Fld("artist_id", Str()),
                     Fld("artist_latitude", Dbl()),
                     Fld("artist_location", Str()),
                     Fld("artist_longitude", Dbl()),
                     Fld("artist_name", Str()),
                     Fld("duration", Dbl()),
                     Fld("num_songs", Int()),
                     Fld("song_id", Str()),
                     Fld("title", Str()),
                     Fld("year", Int()),  
                     ])

In [11]:
# read song data file
df = spark.read.json(song_data, song_schema)
df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: integer (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: integer (nullable = true)



### Song Table

In [12]:
# extract columns to create songs table
cols = ['song_id','title', 'artist_id', 'year','duration']
songs_table = df.select(cols)
#print(songs_table.count())
songs_table = songs_table.drop_duplicates()
#print(songs_table.count())
songs_table.limit(5).toPandas()

Unnamed: 0,song_id,title,artist_id,year,duration
0,SOHUOAP12A8AE488E9,Floating,ARD842G1187B997376,1987,491.12771
1,SOKEJEJ12A8C13E0D0,The Urgency (LP Version),ARC43071187B990240,0,245.21098
2,SONHOTT12A8C13493C,Something Girls,AR7G5I41187FB4CE6C,1982,233.40363
3,SOHKNRJ12A6701D1F8,Drop of Rain,AR10USD1187B99F3F1,0,189.57016
4,SOOLYAZ12A6701F4A6,Laws Patrolling (Album Version),AREBBGV1187FB523D2,0,173.66159


In [13]:
songs_table.printSchema()

root
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- duration: double (nullable = true)



In [14]:
# write songs table to parquet files partitioned by year and artist
songs_table = songs_table.repartition('year','artist_id')
#songs_table.write.parquet("s3/output/songs.parquet")

### Artist Table

In [15]:
cols = ['artist_id','artist_name as name', 'artist_location as location', 'artist_latitude as latitude','artist_longitude as longitude']
artist_table = df.selectExpr(cols)
print(artist_table.count())
artist_table = artist_table.drop_duplicates()
print(artist_table.count())
artist_table.limit(5).toPandas()

71
69


Unnamed: 0,artist_id,name,location,latitude,longitude
0,ARPBNLO1187FB3D52F,Tiny Tim,"New York, NY",40.71455,-74.00712
1,ARXR32B1187FB57099,Gob,,,
2,AROGWRA122988FEE45,Christos Dantis,,,
3,ARBGXIG122988F409D,Steel Rain,California - SF,37.77916,-122.42005
4,AREVWGE1187B9B890A,Bitter End,Noci (BA),-13.442,-41.9952


In [16]:
# write songs table to parquet files partitioned by year and artist
#artist_table.write.parquet("s3/output/artist.parquet")

## Log Data

In [17]:
from pyspark.sql.types import StructType as R, StructField as Fld, DoubleType as Dbl, StringType as Str, IntegerType as Int, DateType as Date
new_schema = R([Fld("artist", Str()),
                     Fld("auth", Str()),
                     Fld("firstName", Str()),
                     Fld("gender", Str()),
                     Fld("itemInSession", Int()),
                     Fld("lastName", Str()),
                     Fld("length", Dbl()),
                     Fld("level", Str()),
                     Fld("location", Str()),
                     Fld("method", Str()),
                     Fld("page", Str()),
                     Fld("registration", Str()),
                     Fld("sessionId", Int()),
                     Fld("song", Str()),
                     Fld("status", Int()),
                     Fld("ts", Str()),
                     Fld("userAgent", Str()),
                     Fld("userId", Str(), False)    
                     ])

In [18]:
# get filepath to log data file
log_data = 's3/input/log-data/*'

# read log data file
df = spark.read.json(log_data, new_schema)
df.limit(5).toPandas()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,Harmonia,Logged In,Ryan,M,0,Smith,655.77751,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541016707796.0,583,Sehr kosmisch,200,1542241826796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/36.0.1985.125 Chrome/36.0.1985.125 Safari/537.36""",26
1,The Prodigy,Logged In,Ryan,M,1,Smith,260.07465,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541016707796.0,583,The Big Gundown,200,1542242481796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/36.0.1985.125 Chrome/36.0.1985.125 Safari/537.36""",26
2,Train,Logged In,Ryan,M,2,Smith,205.45261,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541016707796.0,583,Marry Me,200,1542242741796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/36.0.1985.125 Chrome/36.0.1985.125 Safari/537.36""",26
3,,Logged In,Wyatt,M,0,Scott,,free,"Eureka-Arcata-Fortuna, CA",GET,Home,1540872073796.0,563,,200,1542247071796,Mozilla/5.0 (Windows NT 6.1; WOW64; Trident/7.0; rv:11.0) like Gecko,9
4,,Logged In,Austin,M,0,Rosales,,free,"New York-Newark-Jersey City, NY-NJ-PA",GET,Home,1541059521796.0,521,,200,1542252577796,Mozilla/5.0 (Windows NT 6.1; rv:31.0) Gecko/20100101 Firefox/31.0,12


In [19]:
# filter by actions for song plays
df = df.filter(df.page == 'NextSong')
#df.limit(3).toPandas()

### Users Table

In [20]:
# extract columns for users table    
cols = ['userId as user_id','firstName as first_name', 'lastName as last_name', 'gender', 'level']
users_table = df.selectExpr(cols)
users_table = users_table.drop_duplicates()
users_table.limit(5).toPandas()

Unnamed: 0,user_id,first_name,last_name,gender,level
0,26,Ryan,Smith,M,free
1,7,Adelyn,Jordan,F,free
2,71,Ayleen,Wise,F,free
3,81,Sienna,Colon,F,free
4,87,Dustin,Lee,M,free


In [21]:
# write users table to parquet files
# users_table.write.parquet("s3/output/users.parquet")

### Time Table

In [22]:
# create timestamp column from original timestamp column
from pyspark.sql.functions import udf, col, to_timestamp, from_unixtime
#from pyspark.sql.types import StringType

def parse_time(line : str) -> str:
    return(line[0:-3])


parse_time_udf = udf(lambda epoch: parse_time(epoch), Str())
df = df.withColumn("start_time", to_timestamp(from_unixtime(parse_time_udf(col("ts")))))
df.limit(1).toPandas()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId,start_time
0,Harmonia,Logged In,Ryan,M,0,Smith,655.77751,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541016707796.0,583,Sehr kosmisch,200,1542241826796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/36.0.1985.125 Chrome/36.0.1985.125 Safari/537.36""",26,2018-11-15 00:30:26


In [23]:
df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: integer (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: string (nullable = true)
 |-- sessionId: integer (nullable = true)
 |-- song: string (nullable = true)
 |-- status: integer (nullable = true)
 |-- ts: string (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- start_time: timestamp (nullable = true)



In [24]:
from pyspark.sql.functions import to_date
parse_time_udf = udf(lambda epoch: parse_time(epoch), Str())
df = df.withColumn("date", to_date(from_unixtime(parse_time_udf(col("ts")))))
df.limit(1).toPandas()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId,start_time,date
0,Harmonia,Logged In,Ryan,M,0,Smith,655.77751,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541016707796.0,583,Sehr kosmisch,200,1542241826796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/36.0.1985.125 Chrome/36.0.1985.125 Safari/537.36""",26,2018-11-15 00:30:26,2018-11-15


In [25]:
df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: integer (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: string (nullable = true)
 |-- sessionId: integer (nullable = true)
 |-- song: string (nullable = true)
 |-- status: integer (nullable = true)
 |-- ts: string (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- start_time: timestamp (nullable = true)
 |-- date: date (nullable = true)



In [26]:
# extract columns to create time table
from pyspark.sql.functions import hour, year, month, dayofmonth, weekofyear, date_format

time_table = df.select('start_time')
time_table = time_table.withColumn('hour', hour(time_table.start_time))
time_table = time_table.withColumn('day', dayofmonth(time_table.start_time))
time_table = time_table.withColumn('week', weekofyear(time_table.start_time))
time_table = time_table.withColumn('month', month(time_table.start_time))
time_table = time_table.withColumn('year', year(time_table.start_time))
time_table = time_table.withColumn("weekday", date_format(df.start_time, "EEEE"))
time_table = time_table.drop_duplicates()
time_table.limit(1).toPandas()

Unnamed: 0,start_time,hour,day,week,month,year,weekday
0,2018-11-15 21:21:37,21,15,46,11,2018,Thursday


In [27]:
time_table = time_table.repartition('year','month')

### Songplays Table

In [28]:
# read in song data to use for songplays table
song_df = spark.read.json(song_data)

In [29]:
# extract columns from joined song and log datasets to create songplays table 
df.createOrReplaceTempView("e")
song_df.createOrReplaceTempView("song_stage")

songplays_table = spark.sql("""
SELECT
    CAST(e.userId || e.sessionId || e.itemInSession as bigint) as songplay_id,
    e.ts start_time,
    CAST(e.userId as int) as user_id,
    e.level as level,
    st.song_id,
    st.artist_id,
    CAST(e.itemInSession as int) as session_id,
    e.location as location,
    e.userAgent as user_agent
FROM e
LEFT JOIN (select distinct title, artist_name, artist_id, duration, song_id from song_stage) as st
    ON ( e.song = st.title and e.artist = st.artist_name and e.length = st.duration)
ORDER BY start_time ASC
""")


In [30]:
songplays_table.printSchema()

root
 |-- songplay_id: long (nullable = true)
 |-- start_time: string (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- level: string (nullable = true)
 |-- song_id: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- session_id: integer (nullable = true)
 |-- location: string (nullable = true)
 |-- user_agent: string (nullable = true)



In [31]:
songplays_table = songplays_table.withColumn("start_time", to_timestamp(from_unixtime(parse_time_udf(col("start_time")))))
songplays_table.limit(3).toPandas()

Unnamed: 0,songplay_id,start_time,user_id,level,song_id,artist_id,session_id,location,user_agent
0,81391,2018-11-01 21:01:46,8,free,,,1,"Phoenix-Mesa-Scottsdale, AZ","""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/35.0.1916.153 Safari/537.36"""
1,81393,2018-11-01 21:05:52,8,free,,,3,"Phoenix-Mesa-Scottsdale, AZ","""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/35.0.1916.153 Safari/537.36"""
2,81394,2018-11-01 21:08:16,8,free,,,4,"Phoenix-Mesa-Scottsdale, AZ","""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/35.0.1916.153 Safari/537.36"""


In [34]:
songplays_table = songplays_table.repartition( year(songplays_table.start_time),month(songplays_table.start_time))