In [None]:
%%spark

In [None]:
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col,monotonically_increasing_id
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format

In [None]:
def create_spark_session():
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()
    return spark

In [None]:
def process_song_data(spark, input_data, output_data):
    """
    Description: Reads song data from an S3 bucket, creates a songs and artist table then writes the tables in parquet format back to an S3 bucket.
    
    Input:
        spark       :   the spark session
        input_data  :   the location of the song data file
        output_data :   the location where the output files will be stored
    """
    # get filepath to song data file
    song_data = input_data + 'song_data/*/*/*/*.json'
    
    # read song data file
    df = spark.read.json(song_data)

    # extract columns to create songs table
    
    songs_table = df['song_id', 'title', 'artist_id', 'year', 'duration'].where(col("song_id").isNotNull()).dropDuplicates(['song_id'])

    # write songs table to parquet files partitioned by year and artist
    songs_table.write.mode('overwrite').partitionBy("year", "artist_id").parquet(output_data+'songs_table/')
    
    # extract columns to create artists table
    artists_table = df['artist_id', 'artist_name', 'artist_location', 'artist_latitude', 'artist_longitude'].dropDuplicates(['artist_id']).where(col("artist_id").isNotNull())
    
    # write artists table to parquet files
    artists_table.write.mode("overwrite").parquet(output_data+'artists_table/')


In [None]:
def process_log_data(spark, input_data, output_data):
    """
    Description: This module reads log data file from S3, creates users, time and songplays tables and writes them back to an S3 bucket
    
    Input:
        spark       :  the spark session
        input_data  :  the path to the log data file from where it will be loaded into spark for analysis
        output-data :  the path to where the output files will be stored after processing
    """
    # get filepath to log data file
    log_data = input_data + 'log_data/*/*/*.json'
    
    # read log data file
    df = spark.read.json(log_data)
    
    # filter by actions for song plays
    df = df.filter(df.page == "NextSong")

    # extract columns for users table    
    users_table = df['userId', 'firstName', 'lastName', 'gender', 'level'].dropDuplicates(['userId'])
    
    # write users table to parquet files
    users_table.write.mode('overwrite').parquet(output_data+'users_table/')
    
    # create timestamp column from original timestamp column
    get_timestamp = udf(lambda x: str(int(int(x)/1000)))
    df = df.withColumn('timestamp', get_timestamp(df.ts))
    
    # create datetime column from original timestamp column
    get_datetime = udf(lambda x: str(datetime.fromtimestamp(int(x) / 1000.0)))
    df = df.withColumn("datetime", get_datetime(df.ts))
    
    # extract columns to create time table
    time_table = df.select(
        col('datetime').alias('start_time'),
        hour('datetime').alias('hour'),
        dayofmonth('datetime').alias('day'),
        weekofyear('datetime').alias('week'),
        month('datetime').alias('month'),
        year('datetime').alias('year') 
   )
    time_table = time_table.dropDuplicates(['start_time'])
    
    # write time table to parquet files partitioned by year and month
    time_table.write.mode('overwrite').partitionBy("year", "month").parquet(output_data+'time_table/')

    # read in song data to use for songplays table
    song_df = spark.read.json(song_data) 

    # extract columns from joined song and log datasets to create songplays table 
    df = df.join(song_df, song_df.title == df.song)
    df = df.withColumn("songplay_id", monotonically_increasing_id())
    
    songplays_table = df.select(
        col('songplay_id').alias('songplay_id'),
        col('ts').alias('ts'),
        col('userId').alias('user_id'),
        col('level').alias('level'),
        col('song_id').alias('song_id'),
        col('artist_id').alias('artist_id'),
        col('sessionId').alias('session_id'),
        col('location').alias('location'),
        col('userAgent').alias('user_agent'),
        col('year').alias('year'),
        month('datetime').alias('month')
    )
    
    # write songplays table to parquet files partitioned by year and month
    songplays_table.write.mode('overwrite').partitionBy("year", "month").parquet(output_data+'songplays_table/')

In [None]:
def main():
    spark = create_spark_session()
    input_data = "s3a://udacity-dend/"
    output_data = "s3a://udacity-datalake-ouput/"
    
    process_song_data(spark, input_data, output_data)    
    process_log_data(spark, input_data, output_data)
    
if __name__ == "__main__":
    main()

In [None]:
spark = create_spark_session()

In [None]:
input_data = "s3a://udacity-dend/"
output_data = "s3a://udacity-datalake-ouput/"

In [None]:
process_song_data(spark, input_data, output_data)

In [None]:
process_log_data(spark, input_data, output_data)