In [11]:
import os
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql import types as types

In [3]:
# Create a Spark session
spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()


#Song's table

In [4]:
# Songs Data
song_schema = types.StructType([
                                    types.StructField("num_songs", types.IntegerType()),
                                    types.StructField("artist_id", types.StringType()),
                                    types.StructField("artist_latitude", types.FloatType()),
                                    types.StructField("artist_longitude", types.FloatType()),
                                    types.StructField("artist_location", types.StringType()),
                                    types.StructField("artist_name", types.StringType()),
                                    types.StructField("song_id", types.StringType()),
                                    types.StructField("title", types.StringType()),
                                    types.StructField("duration", types.FloatType()),
                                    types.StructField("year", types.IntegerType())
                                 ])

song_data = spark.read.json('work/data/song-data/song_data/*/*/*/*.json', schema=song_schema)
song_data.printSchema()

root
 |-- num_songs: integer (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: float (nullable = true)
 |-- artist_longitude: float (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- duration: float (nullable = true)
 |-- year: integer (nullable = true)



In [5]:
# Songs Table
df_songs = song_data.select('song_id', 'title', 'artist_id', 'year', 'duration')
df_songs.write.parquet(path=os.path.join(os.getcwd(), 'work', 'data', 'songs.parquet'),
                       partitionBy=['year', 'artist_id'])

In [6]:
# Artists Table
df_artists = song_data.select('artist_id', 'artist_name', 'artist_location', 'artist_latitude', 'artist_longitude')\
    .withColumnRenamed('artist_name', 'name').withColumnRenamed('artist_location', 'location')\
    .withColumnRenamed('artist_latitude', 'latitude').withColumnRenamed('artist_longitude', 'longitude')
df_artists.write.parquet(path=os.path.join(os.getcwd(), 'work', 'data', 'artists.parquet'))

#Log's table

In [8]:
# Logs Data
log_schema = types.StructType([
                                    types.StructField("artist", types.StringType()),
                                    types.StructField("auth", types.StringType()),
                                    types.StructField("first_name", types.StringType()),
                                    types.StructField("gender", types.StringType()),
                                    types.StructField("item_in_session", types.IntegerType()),
                                    types.StructField("last_name", types.StringType()),
                                    types.StructField("length", types.FloatType()),
                                    types.StructField("level", types.StringType()),
                                    types.StructField("location", types.StringType()),
                                    types.StructField("method", types.StringType()),
                                    types.StructField("page", types.StringType()),
                                    types.StructField("registration", types.FloatType()),
                                    types.StructField("session_id", types.IntegerType()),
                                    types.StructField("song", types.StringType()),
                                    types.StructField("status", types.IntegerType()),
                                    types.StructField("ts", types.TimestampType()),
                                    types.StructField("user_agent", types.StringType()),
                                    types.StructField("user_id", types.IntegerType())
                                 ])

log_data = spark.read.json('work/data/log-data/*.json', schema=log_schema)
log_data = log_data.filter(log_data.page=='NextSong')
log_data.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- item_in_session: integer (nullable = true)
 |-- last_name: string (nullable = true)
 |-- length: float (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: float (nullable = true)
 |-- session_id: integer (nullable = true)
 |-- song: string (nullable = true)
 |-- status: integer (nullable = true)
 |-- ts: timestamp (nullable = true)
 |-- user_agent: string (nullable = true)
 |-- user_id: integer (nullable = true)



In [9]:
# Users Table
df_users = log_data.select('user_id', 'first_name', 'last_name', 'gender', 'level')
df_users.write.parquet(path=os.path.join(os.getcwd(), 'work', 'data', 'users.parquet'))

In [12]:
# Time Table
df_time = log_data.select('ts').withColumnRenamed('ts', 'start_time')
df_time = df_time.withColumn('hour', F.hour(df_time.start_time))\
                 .withColumn('day', F.dayofmonth(df_time.start_time))\
                 .withColumn('week', F.weekofyear(df_time.start_time))\
                 .withColumn('month', F.month(df_time.start_time))\
                 .withColumn('year', F.year(df_time.start_time))\
                 .withColumn('weekday', F.dayofweek(df_time.start_time))
df_time.write.parquet(path=os.path.join(os.getcwd(), 'work', 'data', 'time.parquet'),
                       partitionBy=['year', 'month'])

In [None]:
# Songplays Table


# songplay_id, start_time, user_id, level, song_id, artist_id, session_id, location, user_agent