In [1]:
# Import librairies
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
from pyspark.sql import functions as F

In [2]:
config = configparser.ConfigParser()
#config.read('dl.cfg')
config.read_file(open('dl.cfg'))

os.environ["AWS_ACCESS_KEY_ID"]= config['AWS']['AWS_ACCESS_KEY_ID']
os.environ["AWS_SECRET_ACCESS_KEY"]= config['AWS']['AWS_SECRET_ACCESS_KEY']

In [3]:
def create_spark_session():
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()
    spark.sparkContext._jsc.hadoopConfiguration().set("mapreduce.fileoutputcommitter.algorithm.version", "2")
    return spark

In [4]:
spark = create_spark_session()

In [5]:
input_data = "s3a://udacity-dend/"
output_data = "s3a://data-lake-simple-storage/"

# Import song data and process data for songs and artists tables

In [6]:
# get filepath to song data file
song_data = input_data + 'song_data/A/A/A/*.json'

In [7]:
import time
# read song data file
start = time.time()
df = spark.read.json(song_data)
end = time.time()
print(end - start)

17.680700302124023


In [8]:
# extract columns to create songs table
songs_table = df.select("song_id", "title", "artist_id", "year", "duration")

In [9]:
# write songs table to parquet files partitioned by year and artist
start = time.time()

songs_table.write.mode("overwrite").partitionBy("year", "artist_id").parquet("s3a://data-lake-simple-storage/" + "songs")

end = time.time()
print(end - start)

194.0408115386963


In [10]:
# extract columns to create artists table
artists_table = df.select(col('artist_id'), \
                          col('artist_name').alias('name'), \
                          col('artist_location').alias('location'), \
                          col('artist_latitude').alias('latitude'), \
                          col('artist_longitude').alias('longitude')).dropDuplicates() 

In [11]:
# write artists table to parquet files
start = time.time()
artists_table.write.mode("overwrite").parquet(output_data + 'artists')
end = time.time()
print(end - start)

183.8410041332245


# Import log data and process users, time and sonplay tables

In [12]:
# get filepath to log data file
log_data = input_data + 'log_data/2018/11/*.json'

# read log data file
start = time.time()
df = spark.read.json(log_data)
end = time.time()
print(end - start)

13.325629949569702


In [13]:
# filter by actions for song plays
df = df.filter(df.page == 'NextSong')

In [14]:
# extract columns for users table    
users_table = df.select(col('userId').cast('int').alias('user_id'), \
                        col('firstName').alias('first_name'),\
                        col('lastName').alias('last_name'), \
                        col('gender'), \
                        col('level')).dropDuplicates()

In [None]:
# write users table to parquet files
start = time.time()
users_table.write.mode("overwrite").parquet(output_data + 'users')
end = time.time()
print(end - start)

In [None]:
# create timestamp column from original timestamp column
df = df.withColumn('ts', (F.round(col('ts')/1000)).cast("timestamp"))

In [None]:
# extract columns to create time table
time_table = df.selectExpr('ts AS start_time').dropDuplicates().orderBy('start_time', ascending=False) \
                .withColumn('hour', F.hour('start_time')) \
                .withColumn('day', F.dayofmonth('start_time'))\
                .withColumn('week', F.weekofyear('start_time')) \
                .withColumn('month', F.month('start_time')) \
                .withColumn('year', F.year('start_time')) \
                .withColumn('weekday', F.dayofweek('start_time'))

In [None]:
# write time table to parquet files partitioned by year and month
start = time.time()
time_table.write.partitionBy("year", "month").mode("overwrite").parquet(output_data + 'time')
end = time.time()
print(end - start)

In [None]:
# read in song data to use for songplays table
song_df = spark.read.parquet(output_data + 'songs')

In [None]:
df.printSchema()

In [None]:
song_df.printSchema()

In [None]:
# extract columns from joined song and log datasets to create songplays table 
songplays_table = df.withColumn('songplay_id', F.monotonically_increasing_id()).join(song_df, (song_df.title == df.song))\
                            .select('songplay_id',\
                           col('ts').alias('start_time'),\
                           col('userId').alias('user_id'),\
                           'level',\
                           'song_id',\
                           'artist_id',\
                           col('sessionId').alias('session_id'),\
                           'location',\
                           col('userAgent').alias('user_agent'))

In [None]:
# write songplays table to parquet files partitioned by year and month
songplays_table = songplays_table \
                .withColumn("year", year(col("start_time"))) \
                .withColumn("month", month(col("start_time")))

In [None]:
songplays_table.show()

In [None]:
# write songplays table to parquet files partitioned by year and month
start = time.time()
songplays_table.write.partitionBy('year', 'month').mode("overwrite").parquet(output_data + 'songplays')
end = time.time()
print(end - start)

In [None]:
songplays_df = spark.read.parquet(output_data + 'songplays')
songplays_df.printSchema()

In [None]:
songplays_df.show()