# extract data

In [None]:
!unzip ./data/log-data.zip -d ./data/log_data/
!unzip ./data/song-data.zip -d ./data/

# import libraries

In [8]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, monotonically_increasing_id
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
from pyspark.sql.functions import to_date, to_timestamp

# change Spark to local

In [6]:
''' # commented out to test locally
config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config.get('AWS','AWS_ACCESS_KEY_ID')
os.environ['AWS_SECRET_ACCESS_KEY']=config.get('AWS','AWS_SECRET_ACCESS_KEY')
'''

In [None]:
''' # commented out to test locally
def create_spark_session():
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()
    return spark
'''

In [2]:
# local spark session to test locally
def create_spark_session():
    spark = SparkSession \
        .builder \
        .appName("Sparkify ETL") \
        .getOrCreate()
    return spark

# check default load schema for `song_data`

In [35]:
song_data = 'data/song_data/*/*/*/*.json'
df = spark.read.json(song_data)
df.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



# function for processing `song_data`

In [5]:
def process_song_data(spark, input_data, output_data):
    # get filepath to song data file
    song_data = os.path.join(input_data, 'song_data/*/*/*/*.json')
    
    # read song data file
    df = spark.read.json(song_data)

    # extract columns to create songs table
    songs_table = df.select("song_id", \
                            "title", \
                            "artist_id", \
                            "year", \
                            "duration") \
                    .dropDuplicates()
    
    # write songs table to parquet files partitioned by year and artist
    songs_table.write.parquet(os.path.join(output_data, "songs_table"), \
                              mode='overwrite', \
                              partitionBy=["year","artist_id"])

    # extract columns to create artists table
    artists_table = df.select(['artist_id', \
                               col('artist_name').alias('name'), \
                               col('artist_location').alias('location'), \
                               col('artist_latitude').alias('latitude'), \
                               col('artist_longitude').alias('longitude')]) \
                      .dropDuplicates()
    
    # write artists table to parquet files
    artists_table.write.parquet(os.path.join(output_data, "artists_table"),
                                mode='overwrite')

# check default load schema for `log_data`

In [42]:
log_data = 'data/log_data/*.json'
df = spark.read.json(log_data)
df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



# function for processing `log_data`

In [1]:
def process_log_data(spark, input_data, output_data):
    # get filepath to log data file
    log_data = os.path.join(input_data, 'log_data/*.json')

    # read log data file
    df = spark.read.json(log_data)
    
    # filter by actions for song plays
    df = df.filter(df.page == 'NextSong')

    # extract columns for users table
    users_table = df.select([col('userId').alias('user_id'), \
                             col('firstName').alias('first_name'), \
                             col('lastName').alias('last_name'), \
                             'gender', \
                             'level']) \
                    .dropDuplicates()
    
    # write users table to parquet files
    users_table.write.parquet(os.path.join(output_data, "users_table"), \
                              mode='overwrite')

    # create timestamp column from original timestamp column
    # get_timestamp = udf()
    df = df.withColumn('timestamp', to_timestamp(df.ts/1000))
    
    # create datetime column from original timestamp column
    # get_datetime = udf()
    df = df.withColumn('datetime', to_date(df.timestamp))
    
    # extract columns to create time table
    # start_time, hour, day, week, month, year, weekday
    time_table = df.select([col('timestamp').alias('start_time'), \
                            hour(col('datetime')).alias('hour'), \
                            dayofmonth(col('datetime')).alias('day'), \
                            weekofyear(col('datetime')).alias('week'), \
                            month(col('datetime')).alias('month'), \
                            year(col('datetime')).alias('year'), \
                            date_format(col('datetime'), 'E').alias('weekday')]) \
                    .dropDuplicates()
    
    # write time table to parquet files partitioned by year and month
    time_table.write.parquet(os.path.join(output_data, "time_table"), \
                             mode='overwrite', \
                             partitionBy=["year","month"])
    
    # read in song data to use for songplays table
    song_df = spark.read.parquet(os.path.join(output_data, 'songs_table'))

    # extract columns from joined song and log datasets to create songplays table
    # songplay_id, start_time, user_id, level, song_id, artist_id, session_id, location, user_agent
    songplays_table = df.join(song_df, df.song == song_df.title) \
                        .select(col('timestamp').alias('start_time'), \
                                col('userId').alias('user_id'), \
                                'level', \
                                'song_id', \
                                'artist_id', \
                                col('sessionId').alias('session_id'), \
                                'location', \
                                col('userAgent').alias('user_agent'), \
                                year('timestamp').alias('year'), \
                                month('timestamp').alias('month')) \
                        .dropDuplicates() \
                        .withColumn('songplay_id', monotonically_increasing_id())

    # write songplays table to parquet files partitioned by year and month
    songplays_table.write.parquet(os.path.join(output_data, "songplays_table"), \
                             mode='overwrite', \
                             partitionBy=["year","month"])

# run ETL pipeline to process data

In [9]:
spark = create_spark_session()
input_data = "data" # "s3a://udacity-dend/"
output_data = "test"

process_song_data(spark, input_data, output_data)    
process_log_data(spark, input_data, output_data)

# check loading of parquet files and verify table contents

In [12]:
for table in ['artists_table', 'time_table', 'users_table', 'songs_table', 'songplays_table']:
    print(table)
    table_df = spark.read.parquet(os.path.join(output_data, table))
    table_df.printSchema()
    print(table_df.limit(5).toPandas())

artists_table
root
 |-- artist_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- location: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)

            artist_id                                               name  \
0  ARDR4AC1187FB371A1  Montserrat Caballé;Placido Domingo;Vicente Sar...   
1  ARMAC4T1187FB3FA4C                          The Dillinger Escape Plan   
2  ARNF6401187FB57032                                  Sophie B. Hawkins   
3  AROUOZZ1187B9ABE51                                        Willie Bobo   
4  ARI2JSK1187FB496EF                           Nick Ingman;Gavyn Wright   

                        location  latitude  longitude  
0                                      NaN        NaN  
1              Morris Plains, NJ  40.82624  -74.47995  
2       New York, NY [Manhattan]  40.79086  -73.96644  
3  New York, NY [Spanish Harlem]  40.79195  -73.94512  
4                London, England  51.50632   

# clean up workspace

In [2]:
!rm -r ./data/log_data/
!rm -r ./data/song_data/
!rm -r ./test/

rm: cannot remove './test/': No such file or directory
