In [47]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, to_timestamp, to_date
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType, LongType
from pyspark.sql import types as t
from pyspark.sql import functions as F




In [None]:
config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS_SECRET_ACCESS_KEY']



In [48]:
spark = SparkSession \
        .builder \
        .appName("Udacity P4 Aleaume") \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()

In [None]:
spark.sparkContext.getConf().getAll()

In [49]:
spark

# Handling Song Data 

In [50]:
path = "data/song_data/*/*/*/*.json"

song_data_schema = StructType([
    StructField("artist_id",StringType(),False),\
    StructField("artist_latitude",StringType(),True),\
    StructField("artist_longitude",StringType(), True),\
    StructField("artist_location", StringType(), True),\
    StructField("artist_name",StringType(), False),\
    StructField("song_id",StringType(),False),\
    StructField("title",StringType(), False),\
    StructField("duration",DoubleType(), False),\
    StructField("year",IntegerType(), False),\
    
])

df = spark.read.json(path, song_data_schema)

In [51]:
df.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: string (nullable = true)
 |-- artist_longitude: string (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- year: integer (nullable = true)



In [52]:
# extract columns to create songs table

songs_table= df.select(["song_id","title","artist_id","year","duration"])

In [53]:
songs_table.count()

71

In [54]:
songs_table_distinct = songs_table.distinct()
songs_table_distinct.count()

71

In [55]:
songs_table.printSchema()

root
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- duration: double (nullable = true)



In [56]:
# write songs table to parquet files partitioned by year and artist

songs_table.write.option("header",True) \
           .partitionBy("year","artist_id") \
           .mode("overwrite") \
           .parquet("data/output/songs/")

In [57]:
# extract columns to create artists table
artists_table = df.select(df.artist_id, \
                              col("artist_name").alias("name"),\
                              col("artist_location").alias("location"), \
                              col("artist_latitude").alias("latitude"), \
                              col("artist_longitude").alias("longitude"))

In [60]:
artists_table.count()

71

In [61]:
artists_table_distinct = artists_table.distinct()
artists_table_distinct.count()

69

In [62]:
artists_table.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- location: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)



In [63]:
artists_table.write.save("data/output/artists/", format="parquet", header=True)

# Handling Log Data

In [87]:
# get filepath to log data file

#Test
log_data = "data/log_data/*.json"

#PROD
#log_data = "data/log_data/*/*/*.json"

In [88]:
# read log data file
log_data_schema = StructType([
    StructField("artist",StringType(),False),\
    StructField("auth",StringType(),True),\
    StructField("firstName",StringType(), True),\
    StructField("gender", StringType(), True),\
    StructField("itemInSession",LongType(), False),\
    StructField("lastName",StringType(),True),\
    StructField("length",DoubleType(), False),\
    StructField("level",StringType(), True),\
    StructField("location",StringType(), True),\
    StructField("method",StringType(), True),\
    StructField("page",StringType(), False), \
    StructField("registration",DoubleType(), True), \
    StructField("sessionId",LongType(), True), \
    StructField("song",StringType(), False), \
    StructField("status",LongType(), True), \
    StructField("ts",LongType(), False), \
    StructField("userAgent",StringType(), True), \
    StructField("userId",StringType(), True), \

])

df = spark.read.json(log_data, log_data_schema)


In [89]:
df.printSchema()
df.count()


root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



8056

In [90]:
# filter by actions for song plays (page = NextSong)

df = df.select("*").where(df.page == "NextSong").distinct()
df.count()

6820

In [92]:
 # extract columns for users table 

users_table = df.select(["userId","firstName","lastName","gender","level"]).distinct()

users_table = users_table.withColumnRenamed("userId", "user_id") \
                         .withColumnRenamed("firstName","first_name") \
                         .withColumnRenamed("lastName","last_name")
users_table.count()

104

In [69]:
users_table.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- level: string (nullable = true)



In [75]:
users_table_distinct = users_table.distinct()
users_table_distinct.count()

104

In [76]:
# write users table to parquet files
users_table.write.save("data/output/users/", format="parquet", header=True)

In [77]:
df_pd = df.toPandas()
df_pd.head()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,Harmonia,Logged In,Ryan,M,0,Smith,655.77751,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Sehr kosmisch,200,1542241826796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
1,The Prodigy,Logged In,Ryan,M,1,Smith,260.07465,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,The Big Gundown,200,1542242481796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
2,Train,Logged In,Ryan,M,2,Smith,205.45261,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Marry Me,200,1542242741796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
3,Sony Wonder,Logged In,Samuel,M,0,Gonzalez,218.06975,free,"Houston-The Woodlands-Sugar Land, TX",PUT,NextSong,1540493000000.0,597,Blackbird,200,1542253449796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",61
4,Van Halen,Logged In,Tegan,F,2,Levine,289.38404,paid,"Portland-South Portland, ME",PUT,NextSong,1540794000000.0,602,Best Of Both Worlds (Remastered Album Version),200,1542260935796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",80


In [94]:
# create timestamp column from original timestamp column
get_timestamp = udf(lambda x: datetime.fromtimestamp(x / 1000), t.TimestampType())
df = df.withColumn("start_time", get_timestamp(df.ts))

df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- start_time: timestamp (nullable = true)



In [95]:
# create datetime column from original timestamp column
get_datetime = udf(lambda x: from_unixtime(x), t.DateType())

df = df.withColumn("date_time", to_date(df.start_time))
df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- start_time: timestamp (nullable = true)
 |-- date_time: date (nullable = true)



In [96]:
df_pd = df.toPandas()
df_pd.head()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId,start_time,date_time
0,Fat Joe,Logged In,Kate,F,21,Harrell,241.34485,paid,"Lansing-East Lansing, MI",PUT,NextSong,1540473000000.0,605,Safe 2 Say [The Incredible] (Album Version - A...,200,1542296032796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",97,2018-11-15 15:33:52.796,2018-11-15
1,Linkin Park,Logged In,Kate,F,33,Harrell,259.86567,paid,"Lansing-East Lansing, MI",PUT,NextSong,1540473000000.0,605,My December,200,1542299023796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",97,2018-11-15 16:23:43.796,2018-11-15
2,The Saturdays,Logged In,Chloe,F,20,Cuevas,176.95302,paid,"San Francisco-Oakland-Hayward, CA",PUT,NextSong,1540941000000.0,630,If This Is Love,200,1542318319796,Mozilla/5.0 (Windows NT 5.1; rv:31.0) Gecko/20...,49,2018-11-15 21:45:19.796,2018-11-15
3,Wim Mertens,Logged In,Aleena,F,71,Kirby,240.79628,paid,"Waterloo-Cedar Falls, IA",PUT,NextSong,1541023000000.0,619,Naviamente,200,1542321121796,Mozilla/5.0 (Macintosh; Intel Mac OS X 10.9; r...,44,2018-11-15 22:32:01.796,2018-11-15
4,The Avett Brothers,Logged In,Mohammad,M,1,Rodriguez,271.0722,paid,"Sacramento--Roseville--Arden-Arcade, CA",PUT,NextSong,1540512000000.0,744,The Perfect Space,200,1542786093796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",88,2018-11-21 07:41:33.796,2018-11-21


In [97]:
# extract columns to create time table
df = df.withColumn("hour", hour(col("start_time"))) \
    .withColumn("day", dayofmonth(col("start_time"))) \
    .withColumn("week", weekofyear(col("start_time"))) \
    .withColumn("month", month(col("start_time"))) \
    .withColumn("year", year(col("start_time"))) \
    .withColumn("weekday", date_format(col("start_time"),"EEEE"))
    
time_table = df.select("start_time", "hour", "day", "week", "month", "year", "weekday")
    

In [98]:
time_table.printSchema()

root
 |-- start_time: timestamp (nullable = true)
 |-- hour: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- weekday: string (nullable = true)



In [99]:
# write time table to parquet files partitioned by year and month
time_table.write.option("header",True) \
           .partitionBy("year","month") \
           .mode("overwrite") \
           .parquet("data/output/time/")

In [100]:
# read in song data to use for songplays table
song_df = spark.read.parquet("data/output/songs")


In [101]:
song_df.printSchema()

root
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- year: integer (nullable = true)
 |-- artist_id: string (nullable = true)



In [102]:
# extract columns from joined song and log datasets to create songplays table 
songplays_table = df.join(song_df, (df.song == song_df.title) & (df.length == song_df.duration), "inner") \
                    .join(time_table,(df.start_time == time_table.start_time), "inner") \
                    .select(df.start_time, col("userId").alias("user_id"), df.level, \
                            song_df.song_id, song_df.artist_id, col("sessionId").alias("session_id"), \
                            df.location, col("userAgent").alias("user_agent"), \
                            time_table.year, time_table.month) \
                    .withColumn("songplay_id",F.monotonically_increasing_id())

songplays_table = songplays_table.select("songplay_id","user_id","level","song_id","artist_id","session_id","location","user_agent", "year", "month")

#HELP/SOURCE: https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.DataFrame.join.html+

In [103]:
songplays_table.printSchema()

root
 |-- songplay_id: long (nullable = false)
 |-- user_id: string (nullable = true)
 |-- level: string (nullable = true)
 |-- song_id: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- session_id: long (nullable = true)
 |-- location: string (nullable = true)
 |-- user_agent: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)



In [104]:

# write songplays table to parquet files partitioned by year and month
songplays_table.write.option("header",True) \
           .partitionBy("year","month") \
           .mode("overwrite") \
           .parquet("data/output/songplays/")

# Path adaptation

In [138]:
input_data = "s3a://udacity-dend/" 
song_data = input_data+"song_data/*/*/*/*.json"

print(song_data)

s3a://udacity-dend/song_data/*/*/*/*.json
