In [1]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format, monotonically_increasing_id
from pyspark.sql.types import TimestampType

In [26]:
import zipfile
with zipfile.ZipFile('data/song-data.zip', 'r') as zip_ref:
    zip_ref.extractall('data/')

In [3]:
config = configparser.ConfigParser()
config.read('dl.cfg')

#os.environ['AWS_ACCESS_KEY_ID']=config['AWS_ACCESS_KEY_ID']
#os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS_SECRET_ACCESS_KEY']


['dl.cfg']

In [5]:
input_data = "data/"
output_data = "data/output_data/"

song_data = input_data + 'song_data/*/*/*/*.json'
log_data = input_data + 'log_data/*.json'

In [2]:
spark = SparkSession \
    .builder \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
    .getOrCreate()

In [6]:
df_song = spark.read.json(song_data)
df_song.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



In [17]:
song_fields = ["song_id", "title", "artist_id", "year", "duration"]
songs_table = df_song.select(song_fields).dropDuplicates()

In [27]:
songs_table.write.parquet(output_data + "/songs/", mode='overwrite', partitionBy=["year", "artist_id"])

In [23]:
artists_fields = ["artist_id", "artist_name as name", "artist_location as location", "artist_latitude as latitude", "artist_longitude as longitude"]
artists_table = df_song.selectExpr(artists_fields).dropDuplicates()

In [28]:
artists_table.write.parquet(output_data + "/artists/", mode='overwrite')

In [7]:
df_log = spark.read.json(log_data)
df_log.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [8]:
df_log = df_log.filter(df_log.page == 'NextSong')

In [40]:
users_fields = ["userId as user_id", "firstName as first_name", "lastName as last_name", "gender", "level"]
users_table = df_log.selectExpr(users_fields).dropDuplicates()

In [41]:
users_table.write.parquet(output_data + "/users/", mode='overwrite')

In [9]:
get_timestamp = udf(lambda x: datetime.fromtimestamp(x / 1000), TimestampType())
df_log = df_log.withColumn("timestamp", get_timestamp(col("ts")))

In [10]:
get_datetime = udf(lambda x: to_date(x), TimestampType())
df_log = df_log.withColumn("start_time", get_timestamp(col("ts")))

In [11]:
time_fields = ["timestamp as start_time", "hour(start_time) as hour", "dayofmonth(start_time) as day", "weekofyear(start_time) as week", "month(start_time) as month", "year(start_time) as year", "dayofweek(start_time) as weekday"]
time_table = df_log.selectExpr(time_fields).dropDuplicates()
time_table.printSchema()

root
 |-- start_time: timestamp (nullable = true)
 |-- hour: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- weekday: integer (nullable = true)



In [24]:
time_table.write.parquet(output_data + "/time/", mode='overwrite', partitionBy=["year", "month"])

In [12]:
song_df = spark.read.parquet(output_data + "/songs/")

In [12]:
df_log

DataFrame[artist: string, auth: string, firstName: string, gender: string, itemInSession: bigint, lastName: string, length: double, level: string, location: string, method: string, page: string, registration: double, sessionId: bigint, song: string, status: bigint, ts: bigint, userAgent: string, userId: string, timestamp: timestamp, datetime: timestamp]

In [15]:
songplays_fields = ["start_time", "userId as user_id", "level", "song_id", "artist_id", "sessionId as session_id", "location", "userAgent as user_agent", "month(start_time) as month", "year(start_time) as year"]
songplays_table = df_log.join(df_song, df_song.title == df_log.song, "inner") \
        .distinct() \
        .selectExpr(songplays_fields) \
        .withColumn("songplay_id", monotonically_increasing_id())

songplays_table.printSchema()

root
 |-- start_time: timestamp (nullable = true)
 |-- user_id: string (nullable = true)
 |-- level: string (nullable = true)
 |-- song_id: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- session_id: long (nullable = true)
 |-- location: string (nullable = true)
 |-- user_agent: string (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- songplay_id: long (nullable = false)



In [20]:
songplays_table.write.parquet(output_data + "/songplays/", mode='overwrite', partitionBy=["year", "month"])