In [None]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, monotonically_increasing_id
import pyspark.sql.functions as F
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format, dayofweek
from pyspark.sql.types import StructType, StructField, DoubleType, StringType, IntegerType, DateType, DoubleType, TimestampType

config = configparser.ConfigParser()
config.read_file(open('dl.cfg'))
os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']

song_data = "s3a://udacity-dend/" + "song_data/*/*/*/*.json"
output_data = "s3a://udacitylake1/"
log_data = "s3a://udacity-dend/" + "log_data/*/*/*.json"

In [None]:
def create_spark_session():
    spark = SparkSession \
        .builder \
        .appName("Data Wrangling with Spark SQL") \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()
    return spark
spark = create_spark_session()

In [None]:
# Define type of data 
songdata_schema = StructType([
    StructField("artist_id", StringType(), True),
    StructField("artist_latitude", DoubleType(), True),
    StructField("artist_location", StringType(), True),
    StructField("artist_longitude", DoubleType(), True),
    StructField("artist_name", StringType(), True), 
    StructField("duration", DoubleType(), True),
    StructField("num_songs", IntegerType(), True),
    StructField("song_id", StringType(), False),
    StructField("title", StringType(), False),
    StructField("year", IntegerType(), True)
])
df = spark.read.json(song_data, schema = songdata_schema)
df.printSchema()
#df.show(5)

### Processing Song Data

In [None]:
# Create Song Table 
# songs - songs in music database
# song_id, title, artist_id, year, duration
songs_table = df.select("song_id", "title", "artist_id", "year", "duration")
songs_table.show(5)

In [None]:
# Write Songs Table in parquet files partitioned by year and artist
songs_table.write.parquet(
        output_data + "songs_table.parquet",
        mode="overwrite",
        partitionBy=["year", "artist_id"]
)

In [None]:
# Create Artists Table
#artists - artists in music database
# artist_id, name, location, lattitude, longitude
artists_table = df.select(
    "artist_id",
    col("artist_name").alias("name"),
    col("artist_location").alias("location"),
    col("artist_latitude").alias("latitude"),
    col("artist_longitude").alias("longitude")).distinct()
artists_table.show(5)

In [None]:
# write artists table to parquet files
artists_table.write.parquet(
        output_data + "artists_table.parquet",
        mode="overwrite"
)

In [None]:
def process_song_data(spark, input_data, output_data):
    # get filepath to song data file
    song_data = input_data + "song_data/*/*/*/*.json"
    
    # Define type of data
    songdata_schema = StructType([
    StructField("artist_id", StringType(), True),
    StructField("artist_latitude", DoubleType(), True),
    StructField("artist_location", StringType(), True),
    StructField("artist_longitude", DoubleType(), True),
    StructField("artist_name", StringType(), True), 
    StructField("duration", DoubleType(), True),
    StructField("num_songs", IntegerType(), True),
    StructField("song_id", StringType(), False),
    StructField("title", StringType(), False),
    StructField("year", IntegerType(), True)
    ])
    
    # read song data file
    df = spark.read.json(song_data, schema = songdata_schema)

    # extract columns to create songs table
    songs_table = df.select("song_id", "title", "artist_id", "year", "duration")
    
    # write songs table to parquet files partitioned by year and artist
    songs_table.write.parquet(
        output_data + "songs_table.parquet",
        mode="overwrite",
        partitionBy=["year", "artist_id"]
    )

    # extract columns to create artists table
    artists_table = df.selectExpr(
        "artist_id", 
        "artist_name as name", 
        "artist_location as location", 
        "artist_latitude as latitude", 
        "artist_longitude as longitude").distinct()
    
    # write artists table to parquet files
    artists_table.write.parquet(
        output_data + "artists_table.parquet",
        mode="overwrite"
)


### Process Log Data

In [None]:
# Define type of data 
logdata_schema = StructType([
        StructField("artist", StringType(), True),
        StructField("auth", StringType(), False),
        StructField("firstName", StringType(), True),
        StructField("gender", StringType(), True),
        StructField("itemInSession", IntegerType(), False),
        StructField("lastName", StringType(), True),
        StructField("length", DoubleType(), True),
        StructField("level", StringType(), False),
        StructField("location", StringType(), True),
        StructField("method", StringType(), False),
        StructField("page", StringType(), False),
        StructField("registration", DoubleType(), True),
        StructField("sessionId", IntegerType(), False),
        StructField("song", StringType(), True),
        StructField("status", IntegerType(), False),
        StructField("ts", DoubleType(), False),
        StructField("userAgent", StringType(), True),
        StructField("userId", StringType(), True)
    ])
log_df = spark.read.json(log_data, schema = logdata_schema)
log_df.printSchema()
#log_df.show(5)

In [None]:
# filter by actions for song plays
log_df = log_df.filter(col("page") == "NextSong")
#log_df.show(5)

In [None]:
# extract columns for users table    
# users - users in the app
# user_id, first_name, last_name, gender, level
users_table = log_df.select(
    col("userId").alias("user_id"), 
    col("firstName").alias("first_name"), 
    col("lastName").alias("last_name"), 
    "gender", 
    "level"
).distinct()
#users_table.show(5)

In [None]:
# write users table to parquet files
users_table.write.parquet(
        output_data + "users_table.parquet",
        mode="overwrite"
)

In [None]:
# create timestamp and datetime column
get_timestamp = udf(lambda x: datetime.fromtimestamp(x / 1000.0), TimestampType())
log_df = log_df.withColumn("timestamp", get_timestamp("ts"))
get_datetime = udf(
        lambda x: datetime.fromtimestamp(x / 1000).replace(microsecond=0),
        TimestampType()
    )
log_df = log_df.withColumn("start_time", get_datetime("ts"))
#log_df.show(5)

In [None]:
# Create Time table 
# time - timestamps of records in songplays broken down into specific units
# start_time, hour, day, week, month, year, weekday
time_table = (
    log_df
    .withColumn("hour", hour("start_time"))
    .withColumn("day", dayofmonth("start_time"))
    .withColumn("week", weekofyear("start_time"))
    .withColumn("month", month("start_time"))
    .withColumn("year", year("start_time"))
    .withColumn("weekday", dayofweek("start_time"))
    .select("start_time", "hour", "day", "week", "month", "year", "weekday")
    .distinct()
    )
#time_table.show(5)

In [None]:
# write time table to parquet files partitioned by year and month
time_table.write.parquet(
    output_data + "time_table.parquet",
    mode="overwrite", 
    partitionBy=["year", "month"])

In [None]:
# read in song data to use for songplays table
song_df = spark.read.parquet(output_data + "songs_table.parquet")
#song_df.head(5)

In [None]:
# read artists data for songsplay table 
artists_table = spark.read.parquet(output_data + "artists_table.parquet")
#artists_table.show(5)

In [None]:
song_artist = song_df.join(artists_table, "artist_id", "full").select("artist_id", "name", "title", "duration", "song_id")
song_artist.show(5)

In [None]:
# extract columns from joined song and log datasets to create songplays table 
# songplays - records in log data associated with song plays i.e. records with page NextSong
# songplay_id, start_time, user_id, level, song_id, artist_id, session_id, location, user_agent
songplays_table = log_df.join(
        song_artist,
        [
            log_df.song == song_artist.song,
            log_df.artist == song_artist.artist,
            log_df.length == song_artist.length
        ],
        "left"
    ).join(time_table, "start_time", "left").selectExpr(
            "start_time",
            "userId as user_id",
            "level",
            "song_id",
            "artist_id", 
            "sessionId as session_id",
            "location", 
            "userAgent as user_agent",
            "year",
            "month"
        ).withColumn("songplay_id", monotonically_increasing_id())
songplays_table.show(5)

In [None]:
# write songplays table to parquet files partitioned by year and month
songplays_table.write.parquet(
    output_data + "songplays_table.parquet",
    mode="overwrite", 
    partitionBy=["year", "month"])