In [17]:
from pyspark.sql import SparkSession
import pandas as pd
from pyspark.sql.functions import col, udf
from datetime import datetime
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, dayofweek, monotonically_increasing_id
from pyspark.sql.types import TimestampType, DateType

In [2]:
spark = SparkSession.builder.getOrCreate()

In [5]:
input_data = 'unzipped_data/'
output_data = 'output/'

def process_song_data(spark=spark, input_data=input_data, output_data=output_data):
    # get filepath to song data file
    song_data = input_data + 'song_data/*/*/*/*.json'
    
    # read song data file
    df = spark.read.json(song_data)

    # extract columns to create songs table (song_id, title, artist_id, year, duration)
    songs_table = df.select('song_id', 'title', 'artist_id', 'year', 'duration').distinct()
    
    # write songs table to parquet files partitioned by year and artist
    songs_table.write.partitionBy('year', 'artist_id').mode('overwrite').parquet(output_data + 'songs')

    # extract columns to create artists table(artist_id, name, location, lattitude, longitude)
    artists_table = df.select('artist_id', col('artist_name').alias('name'), col('artist_location').alias('location'), col('artist_latitude').alias('latitude'), col('artist_longitude').alias('longitude')).distinct()
    
    # write artists table to parquet files
    artists_table.write.mode('overwrite').parquet(output_data + 'artists')

In [6]:
process_song_data()

In [15]:
input_data = 'unzipped_data/'
output_data = 'output/'

def process_log_data(spark = spark, input_data = input_data, output_data = output_data):

    # read log data file
    df_log = spark.read.json(input_data + 'log_data/*.json')
    
    # filter by actions for song plays
    df_log = df_log.filter(df_log.page == 'NextSong')

    # extract columns for users table    
    users_table = df_log.select(col('userId').alias('user_id'), col('firstName').alias('first_name'), col('lastName').alias('last_name'), 'gender', 'level').distinct()
    
    # write users table to parquet files
    users_table.write.mode('overwrite').parquet(output_data + 'users')

    # create timestamp column from original timestamp column in datetime format
    # neccessary to define the return type of udf as timestamp type
    get_timestamp = udf(lambda ts: datetime.fromtimestamp(ts/1000),TimestampType())
    df_log = df_log.withColumn('start_time', get_timestamp(df_log.ts))
    
    # extract columns to create time table
    time_table = df_log.select('start_time') \
                    .withColumn('hour', hour('start_time')) \
                    .withColumn('day', dayofmonth('start_time')) \
                    .withColumn('week', weekofyear('start_time')) \
                    .withColumn('month', month('start_time'))\
                    .withColumn('year', year('start_time')) \
                    .withColumn('weekday', dayofweek('start_time'))
    
    # write time table to parquet files partitioned by year and month
    time_table.write.partitionBy('year', 'month').mode('overwrite').parquet(output_data + 'time')

    # read in song data to use for songplays table
    df_song = spark.read.json(input_data + 'song_data/*/*/*/*.json')

    # extract columns from joined song and log datasets to create songplays table 
    
    songplays_table = df_log.join(df_song, (df_log.song == df_song.title)\
                                        & (df_log.artist == df_song.artist_name)\
                                        & (df_log.length == df_song.duration), "inner") \
                            .distinct() \
                            .select('start_time', col('userId').alias('user_id'), 'level', 'song_id', \
                                    'artist_id', col('sessionId').alias('session_id'),'location', col('userAgent').alias('user_agent')) \
                            .withColumn("songplay_id", monotonically_increasing_id()).withColumn('year', year('start_time')).withColumn('month', month('start_time'))

    # write songplays table to parquet files partitioned by year and month
    songplays_table.write.partitionBy('year', 'month').mode('overwrite').parquet(output_data + 'songplays')

In [16]:
process_log_data()

In [6]:
# get filepath to song data file
song_data = "song_data/*/*/*/*.json"
    
# read song data file
df = spark.read.json(song_data)

In [8]:
df.show(5)
print(df.count())

+------------------+---------------+-----------------+----------------+--------------------+---------+---------+------------------+--------------------+----+
|         artist_id|artist_latitude|  artist_location|artist_longitude|         artist_name| duration|num_songs|           song_id|               title|year|
+------------------+---------------+-----------------+----------------+--------------------+---------+---------+------------------+--------------------+----+
|ARDR4AC1187FB371A1|           null|                 |            null|Montserrat Caball...|511.16363|        1|SOBAYLL12A8C138AF9|Sono andati? Fing...|   0|
|AREBBGV1187FB523D2|           null|      Houston, TX|            null|Mike Jones (Featu...|173.66159|        1|SOOLYAZ12A6701F4A6|Laws Patrolling (...|   0|
|ARMAC4T1187FB3FA4C|       40.82624|Morris Plains, NJ|       -74.47995|The Dillinger Esc...|207.77751|        1|SOBBUGU12A8C13E95D|Setting Fire to S...|2004|
|ARPBNLO1187FB3D52F|       40.71455|     New York, N

In [9]:
# extract columns to create songs table (song_id, title, artist_id, year, duration)
songs_table = df.select('song_id', 'title', 'artist_id', 'year', 'duration').distinct()

In [12]:
# write songs table to parquet files partitioned by year and artist
songs_table.write.partitionBy('year', 'artist_id').mode('overwrite').parquet('output/songs')

In [14]:
# extract columns to create artists table(artist_id, name, location, lattitude, longitude)
artists_table = df.select('artist_id', col('artist_name').alias('name'), col('artist_location').alias('location'), col('artist_latitude').alias('latitude'), col('artist_longitude').alias('longitude')).distinct()

In [16]:
artists_table.show(5)

+------------------+---------------+---------------+--------+----------+
|         artist_id|           name|       location|latitude| longitude|
+------------------+---------------+---------------+--------+----------+
|ARPBNLO1187FB3D52F|       Tiny Tim|   New York, NY|40.71455| -74.00712|
|ARXR32B1187FB57099|            Gob|               |    null|      null|
|AROGWRA122988FEE45|Christos Dantis|               |    null|      null|
|ARBGXIG122988F409D|     Steel Rain|California - SF|37.77916|-122.42005|
|AREVWGE1187B9B890A|     Bitter End|      Noci (BA)| -13.442|  -41.9952|
+------------------+---------------+---------------+--------+----------+
only showing top 5 rows



In [17]:
# write artists table to parquet files
artists_table.write.mode('overwrite').parquet('output/artists')

In [53]:
input_data = 'unzipped_data/'
output_data = 'output/'

def process_log_data(spark = spark, input_data = input_data, output_data = output_data):

    # read log data file
    df_log = spark.read.json(input_data + 'log_data/*.json')
    
    # filter by actions for song plays
    df_log = df_log.filter(df_log.page == 'NextSong')

    # extract columns for users table    
    users_table = df_log.select(col('userId').alias('user_id'), col('firstName').alias('first_name'), col('lastName').alias('last_name'), 'gender', 'level').distinct()
    
    # write users table to parquet files
    users_table.write.mode('overwrite').parquet(output_data + 'users')

    # create timestamp column from original timestamp column in datetime format
    # neccessary to define the return type of udf as timestamp type
    get_datetime = udf(lambda ts: datetime.fromtimestamp(ts/1000),TimestampType())
    df_log = df_log.withColumn('start_time', get_datetime(df_log.ts))
    
    # extract columns to create time table
    time_table = df_log.select('start_time') \
                    .withColumn('hour', hour('start_time')) \
                    .withColumn('day', dayofmonth('start_time')) \
                    .withColumn('week', weekofyear('start_time')) \
                    .withColumn('month', month('start_time'))\
                    .withColumn('year', year('start_time')) \
                    .withColumn('weekday', dayofweek('start_time'))
    
    # write time table to parquet files partitioned by year and month
    time_table.write.partitionBy('year', 'month').mode('overwrite').parquet(output_data + 'time')

    # read in song data to use for songplays table
    df_song = spark.read.json(input_data + 'song_data/*/*/*/*.json')

    # extract columns from joined song and log datasets to create songplays table 
    
    songplays_table = df_log.join(df_song, (df_log.song == df_song.title)\
                                        & (df_log.artist == df_song.artist_name)\
                                        & (df_log.length == df_song.duration), "inner") \
                            .distinct() \
                            .select('start_time', col('userId').alias('user_id'), 'level', 'song_id', \
                                    'artist_id', col('sessionId').alias('session_id'),'location', col('userAgent').alias('user_agent')) \
                            .withColumn("songplay_id", monotonically_increasing_id()).withColumn('year', year('start_time')).withColumn('month', month('start_time'))

    # write songplays table to parquet files partitioned by year and month
    songplays_table.write.partitionBy('year', 'month').mode('overwrite').parquet(output_data + 'songplays')

In [56]:
process_log_data()