In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from pyspark.sql.types import IntegerType
from pyspark.sql.types import NullType
from pyspark.sql.types import TimestampType
from pyspark.sql.types import DoubleType
from pyspark.sql.types import LongType
from pyspark.sql.types import DateType
from pyspark.sql.functions import desc
from pyspark.sql.functions import asc
from pyspark.sql.functions import sum as Fsum

import pyspark.sql.functions as F

import os
import configparser

import datetime

import numpy as np
import pandas as pd
%matplotlib inline
import matplotlib.pyplot as plt

In [None]:
config = configparser.ConfigParser()
config.read_file(open('../../aws/dl.cfg'))

os.environ['AWS_ACCESS_KEY_ID']= f"{config['AWS']['AWS_ACCESS_KEY_ID']}"
os.environ['AWS_SECRET_ACCESS_KEY']= f"{config['AWS']['AWS_SECRET_ACCESS_KEY']}"

In [None]:
spark = SparkSession.builder\
                     .config("spark.jars.packages","org.apache.hadoop:hadoop-aws:2.7.0")\
                     .appName("sparkify_etl_pipeline") \
                     .getOrCreate()

In [None]:
# Use this if using localhost to be able to access S3 bucket
sc=spark.sparkContext
hadoop_conf=sc._jsc.hadoopConfiguration()
hadoop_conf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
hadoop_conf.set("fs.s3a.awsAccessKeyId", os.getenv('AWS_ACCESS_KEY_ID'))
hadoop_conf.set("fs.s3a.awsSecretAccessKey", os.getenv('AWS_SECRET_ACCESS_KEY'))

In [None]:
# Using smaller subset for experimenting
songData = spark.read.json("../../data/song_data/A/*/*/*.json")

In [None]:
# Using smaller subset for experimenting
logData = spark.read.json("../../data/log_data.tar.gz")

In [None]:
songData.printSchema()

In [None]:
logData.printSchema()

In [None]:
log_df = logData.drop('_corrupt_record')
log_df.printSchema()

In [None]:
log_df = log_df.filter(log_df['page'] == 'NextSong')

# Create time table
#### start_time, hour, day, week, month, year, weekday

In [None]:
time_df = log_df.select("ts")

In [None]:
time_df = time_df.withColumn("start_time", F.to_timestamp(time_df["ts"] / 1000))\
                 .withColumn("hour", F.hour("start_time"))\
                 .withColumn("day", F.dayofmonth("start_time"))\
                 .withColumn("week", F.weekofyear("start_time"))\
                 .withColumn("month", F.month("start_time"))\
                 .withColumn("year", F.year("start_time"))\
                 .withColumn("weekday", F.dayofweek("start_time"))

In [None]:
time_df = time_df.drop("ts")
time_df = time_df.dropDuplicates()

In [None]:
time_df.show(5)

In [None]:
time_df.printSchema()

In [None]:
# Won't work for notebook on local machine
time_df.write.mode("overwrite")\
       .partitionBy("year", "week", "hour")\
       .parquet("time.parquet")

# Create User table
#### user_id, first_name, last_name, gender, level

In [None]:
log_df.columns

In [None]:
user_df = log_df.select(["userId", "firstName", "lastName", "gender", "level", "ts"])

In [None]:
user_df.show(1)

In [None]:
from pyspark.sql import Window
from pyspark.sql.functions import column

In [None]:
# Create window to get last entry for user
w1 = Window.partitionBy("userId").orderBy(F.asc("ts"))

In [None]:
user_df = user_df.withColumn("row", F.row_number().over(w1))

In [None]:
user_df = user_df.filter(user_df["row"] == 1).drop("row", "ts")

In [None]:
user_df = user_df.withColumnRenamed("userId", "user_id")\
                 .withColumnRenamed("firstName", "first_name")\
                 .withColumnRenamed("lastName", "last_name")

In [None]:
user_df.show(5)

# Create Songs Table
##### song_id, title, artist_id, year, duration

In [None]:
songData.columns

In [None]:
song_df = songData.select(["song_id", "title", "artist_id", "year", "duration"])

In [None]:
song_df = song_df.dropDuplicates()

In [None]:
song_df.show(5)

# Create Artist Table
##### artist_id, name, location, lattitude, longitude

In [None]:
artist_df = songData.select(["artist_id", "artist_name", "artist_location", "artist_latitude", "artist_longitude"])

In [None]:
artist_df.show(5)

In [None]:
@udf
def parseArtistLocUDF(line):
    import re
    PATTERN = "^<a\shref"
    match = re.search(PATTERN, line)
    if match is None:
        return line
    else:
        return NullType()

In [None]:
artist_df = artist_df.dropDuplicates().withColumn("artist_location", parseArtistLocUDF("artist_location"))

In [None]:
artist_df = artist_df.withColumnRenamed("artist_name", "name")

In [None]:
artist_df.show(5)

# Create SongPlays Table
##### songplay_id, start_time, user_id, level, song_id, artist_id, session_id, location, user_agent

In [None]:
log_df.printSchema()

In [None]:
songData.printSchema()

In [None]:
songplay_df = log_df.select("ts", "userId", "level", "sessionId", "location", "userAgent", "song", "artist")

In [None]:
songplay_df = songplay_df.withColumn("ts", F.to_timestamp(songplay_df["ts"] / 1000))

In [None]:
songplay_df = songplay_df.withColumnRenamed("ts", "start_time")\
                         .withColumnRenamed("userId", "user_id")\
                         .withColumnRenamed("sessionId", "session_id")\
                         .withColumnRenamed("userAgent", "user_agent")

In [None]:
songplay_df = songplay_df.withColumn("songplay_id", F.monotonically_increasing_id())

In [None]:
songplay_df.printSchema()

In [None]:
# need to add song_id, artist_id from song and artist tables
songplay_df.createOrReplaceTempView("songplay_table")
song_df.createOrReplaceTempView("song_table")
artist_df.createOrReplaceTempView("artist_table")

In [None]:
artist_song_df = spark.sql("""
    SELECT a.artist_id, song_id, name, title
    FROM artist_table AS a
    JOIN song_table AS s ON a.artist_id = s.artist_id
""")

In [None]:
artist_song_df.createOrReplaceTempView("artist_song_table")

In [None]:
songplay_df = spark.sql("""
    SELECT songplay_id, start_time, user_id, level, song_id, artist_id, session_id, location, user_agent
    FROM songplay_table AS sp
    LEFT JOIN artist_song_table AS ast ON sp.song = ast.title AND sp.artist = ast.name
""")

In [None]:
songplay_df.printSchema()

In [None]:
songplay_df.show(5)