In [59]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format

config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID'] = config.get('AWS', 'AWS_ACCESS_KEY_ID')
os.environ['AWS_SECRET_ACCESS_KEY'] = config.get('AWS', 'AWS_SECRET_ACCESS_KEY')

In [60]:
def create_spark_session():
    """Creates the spark session"""
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()
    return spark

In [61]:
spark = create_spark_session()

In [62]:
df = spark.read.json("/home/workspace/data/song_data/*/*/*/*.json")

In [63]:
songs_table = df.select(["song_id", "title", "artist_id", "year", "duration"]).dropDuplicates()
songs_table.write.mode('overwrite').partitionBy("year", "artist_id").parquet(os.path.join("data", "songs"))

In [64]:
artists_table = (df.selectExpr("artist_id        as artist_id", 
                               "artist_name      as name", 
                               "artist_location  as location", 
                               "artist_latitude  as latitude", 
                               "artist_longitude as longitude")
                 .dropDuplicates())
artists_table.write.mode('overwrite').parquet(os.path.join("data", "artists"))

In [65]:
df = spark.read.json("/home/workspace/data/log_data/*.json")
df = df.filter("page = 'NextSong'")

In [66]:
users_table = (df.selectExpr("userId    as user_id",
                             "firstName as first_name",
                             "lastName  as last_name",
                             "gender",
                             "level")
               .dropDuplicates())
users_table.write.mode('overwrite').parquet(os.path.join("data", 'users'))

In [67]:
get_timestamp = udf(lambda x: datetime.fromtimestamp(x / 1000.0).strftime('%Y-%m-%d %H:%M:%S'))
df = df.withColumn("timestamp", get_timestamp(df.ts))
time_table = (df.selectExpr("timestamp             as start_time",
                            "hour(timestamp)       as hour",
                            "day(timestamp)        as day",
                            "weekofyear(timestamp) as week",
                            "month(timestamp)      as month",
                            "year(timestamp)       as year",
                            "weekday(timestamp)    as weekday")
                .dropDuplicates())
time_table.write.mode('overwrite').partitionBy("year", "month").parquet(os.path.join("data", "time"))

In [69]:
song_df = spark.read.parquet(os.path.join("data", 'songs'))
songplays_table = df.join(song_df, 
                          (df.song     == song_df.title)
                          & (df.artist == song_df.artist_id),
                          "left").selectExpr("monotonically_increasing_id() as songplay_id",
                                             "timestamp                     as start_time",
                                             "userId                        as user_id",
                                             "level",
                                             "song_id",
                                             "artist_id",
                                             "sessionId                     as session_id",
                                             "location",
                                             "userAgent                     as user_agent",
                                             "month(timestamp)              as month",
                                             "year(timestamp)               as year")
songplays_table.write.mode('overwrite').partitionBy("year", "month").parquet(os.path.join("data", "songplays"))