In [None]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
from pyspark.sql import functions as F


config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ["AWS_ACCESS_KEY_ID"]= config['AWS']['AWS_ACCESS_KEY_ID']
os.environ["AWS_SECRET_ACCESS_KEY"]= config['AWS']['AWS_SECRET_ACCESS_KEY']

In [None]:
spark = SparkSession.builder\
                     .config("spark.jars.packages","org.apache.hadoop:hadoop-aws:2.7.0")\
                     .getOrCreate()

In [None]:
input_data = "s3a://udacity-dend/"
output_data = "s3a://dend-4th-project/"

In [None]:
# get filepath to song data file
song_data = input_data + 'song_data/A/B/C/*.json'

In [None]:
# read song data file
df = spark.read.json(song_data)

In [None]:
df.printSchema()
df.show(5)

In [None]:
# extract columns to create songs table
songs_table = df.select("song_id", "title", "artist_id", "year", "duration").distinct()
    
# write songs table to parquet files partitioned by year and artist
songs_table.write.partitionBy("year", "artist_id").parquet(output_data+'songs_table/')

songs_table.limit(5).toPandas()

In [None]:
# extract columns to create artists table
artists_table = df.select("artist_id", col("artist_name").alias("name"), col("artist_location").alias("location"), col("artist_latitude").alias("latitude"), col("artist_longitude").alias("longitude")).distinct()
    
# write artists table to parquet files
artists_table.write.parquet(output_data+'artists_table/')

artists_table.limit(5).toPandas()

In [None]:
# get filepath to log data file
log_data = input_data + 'log_data/*/*/*.json'

# read log data file
df = spark.read.json(log_data)

In [None]:
# filter by actions for song plays
df = df.filter(df.page == 'NextSong')

# extract columns for users table    
users_table = df.select(col("userId").alias("user_id"), col("firstName").alias("first_name"), col("lastName").alias("last_name"), "gender", "level").distinct()
    
# write users table to parquet files
users_table.write.parquet(output_data+'users_table/')

users_table.limit(5).toPandas()

In [None]:
# create timestamp column from original timestamp column
get_timestamp = udf()
df = df.withColumn(
    "ts_timestamp",
    F.to_timestamp(F.from_unixtime((col("ts") / 1000) , 'yyyy-MM-dd HH:mm:ss.SSS')).cast("Timestamp"))
    
# extract columns to create time table
time_table = (df.withColumn("hour", hour(col("ts_timestamp")))
    .withColumn("day", dayofmonth(col("ts_timestamp")))
    .withColumn("week", weekofyear(col("ts_timestamp")))
    .withColumn("month", month(col("ts_timestamp")))
    .withColumn("year", year(col("ts_timestamp")))
    ).select(col("ts_timestamp").alias("start_time"), col("hour"), col("day"), col("week"), col("month"), col("year"), date_format("ts_timestamp", "EEEE").alias("weekday"))
    
# write time table to parquet files partitioned by year and month
time_table.write.partitionBy("year", "month").parquet(output_data+'time_table/')

time_table.limit(5).toPandas()

In [None]:
# read in song data to use for songplays table
song_df = spark.read.parquet(output_data+'songs_table/')

# extract columns from joined song and log datasets to create songplays table 
songplays_table = df.withColumn("monthPlay", month(col("ts_timestamp"))).withColumn("yearPlay", year(col("ts_timestamp"))).withColumn("songplay_id",
                                                                                                                              F.monotonically_increasing_id()).join(song_df, song_df.title == df.song).select("songplay_id", col("ts_timestamp").alias("start_time"), col("monthPlay"), col("yearPlay"), col("userId").alias("user_id"), "level", "song_id", "artist_id", col("sessionId").alias("session_id"), "location", col("userAgent").alias("user_agent"))

# write songplays table to parquet files partitioned by year and month
songplays_table.write.partitionBy("yearPlay", "monthPlay").parquet(output_data+'songplays_table/')


songplays_table.limit(5).toPandas()