In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .getOrCreate()
spark

In [2]:
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
from pyspark.sql import functions as F


In [3]:
song_data_path = os.path.join('data','song_data','*','*','*','*.json')
song_data_path

'data/song_data/*/*/*/*.json'

In [4]:
df_song = spark.read.json(song_data_path)

In [5]:
songs_table = df_song.select(['song_id','title','artist_id','year','duration'])

In [7]:
songs_table.take(10)

[Row(song_id='SOOLYAZ12A6701F4A6', title='Laws Patrolling (Album Version)', artist_id='AREBBGV1187FB523D2', year=0, duration=173.66159),
 Row(song_id='SOGXHEG12AB018653E', title='It Makes No Difference Now', artist_id='AR0RCMP1187FB3F427', year=1992, duration=133.32853),
 Row(song_id='SOGOSOV12AF72A285E', title='¿Dónde va Chichi?', artist_id='ARGUVEV1187B98BA17', year=1997, duration=313.12934)]

In [10]:
song_table_path = os.path.join('output','songs.parquet')


songs_table.write.mode('overwrite').partitionBy("year","artist_id").parquet(song_table_path)

## Artist

In [11]:
artists_table = df_song.select(['artist_id','artist_name','artist_location','artist_latitude','artist_longitude'])

In [12]:
artist_table_path = os.path.join('output','artist.parquet')

In [14]:
artists_table.write.mode('overwrite').parquet(artist_table_path)

# Log data

In [15]:
log_data_path = os.path.join('data','log_data','*.json')
log_data_path

'data/log_data/*.json'

In [16]:
df_log = spark.read.json(log_data_path)

In [17]:
df_log.take(3)

[Row(artist='Mitch Ryder & The Detroit Wheels', auth='Logged In', firstName='Tegan', gender='F', itemInSession=65, lastName='Levine', length=205.03465, level='paid', location='Portland-South Portland, ME', method='PUT', page='NextSong', registration=1540794356796.0, sessionId=992, song='Jenny Take A Ride (LP Version)', status=200, ts=1543363215796, userAgent='"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.143 Safari/537.36"', userId='80'),
 Row(artist='The Spill Canvas', auth='Logged In', firstName='Tegan', gender='F', itemInSession=66, lastName='Levine', length=358.03383, level='paid', location='Portland-South Portland, ME', method='PUT', page='NextSong', registration=1540794356796.0, sessionId=992, song='The TIde (LP Version)', status=200, ts=1543363420796, userAgent='"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.143 Safari/537.36"', userId='80'),
 Row(artist='Mogwai', a

## User

In [18]:
df_log_next_song = df_log.filter(df_log.page == 'NextSong')

In [19]:
df_log_next_song.take(3)

[Row(artist='Mitch Ryder & The Detroit Wheels', auth='Logged In', firstName='Tegan', gender='F', itemInSession=65, lastName='Levine', length=205.03465, level='paid', location='Portland-South Portland, ME', method='PUT', page='NextSong', registration=1540794356796.0, sessionId=992, song='Jenny Take A Ride (LP Version)', status=200, ts=1543363215796, userAgent='"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.143 Safari/537.36"', userId='80'),
 Row(artist='The Spill Canvas', auth='Logged In', firstName='Tegan', gender='F', itemInSession=66, lastName='Levine', length=358.03383, level='paid', location='Portland-South Portland, ME', method='PUT', page='NextSong', registration=1540794356796.0, sessionId=992, song='The TIde (LP Version)', status=200, ts=1543363420796, userAgent='"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.143 Safari/537.36"', userId='80'),
 Row(artist='Mogwai', a

In [20]:
df_users = df_log_next_song.select(['userId','firstName','lastName','gender','level'])

In [21]:
user_table_path = os.path.join('output','users.parquet')
user_table_path

'output/users.parquet'

In [22]:
df_users.write.mode('overwrite').parquet(user_table_path)

In [23]:
df_log_next_song.take(3)

[Row(artist='Mitch Ryder & The Detroit Wheels', auth='Logged In', firstName='Tegan', gender='F', itemInSession=65, lastName='Levine', length=205.03465, level='paid', location='Portland-South Portland, ME', method='PUT', page='NextSong', registration=1540794356796.0, sessionId=992, song='Jenny Take A Ride (LP Version)', status=200, ts=1543363215796, userAgent='"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.143 Safari/537.36"', userId='80'),
 Row(artist='The Spill Canvas', auth='Logged In', firstName='Tegan', gender='F', itemInSession=66, lastName='Levine', length=358.03383, level='paid', location='Portland-South Portland, ME', method='PUT', page='NextSong', registration=1540794356796.0, sessionId=992, song='The TIde (LP Version)', status=200, ts=1543363420796, userAgent='"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.143 Safari/537.36"', userId='80'),
 Row(artist='Mogwai', a

In [24]:
df_log_next_song.where(col("ts").isNull()).show()

+------+----+---------+------+-------------+--------+------+-----+--------+------+----+------------+---------+----+------+---+---------+------+
|artist|auth|firstName|gender|itemInSession|lastName|length|level|location|method|page|registration|sessionId|song|status| ts|userAgent|userId|
+------+----+---------+------+-------------+--------+------+-----+--------+------+----+------------+---------+----+------+---+---------+------+
+------+----+---------+------+-------------+--------+------+-----+--------+------+----+------------+---------+----+------+---+---------+------+



In [25]:
get_timestamp = udf(lambda x: datetime.fromtimestamp(x / 1000.0))
df_log_next_song = df_log_next_song.withColumn("ts_timestamp",get_timestamp("ts"))

In [26]:
df_log_next_song.take(3)

[Row(artist='Mitch Ryder & The Detroit Wheels', auth='Logged In', firstName='Tegan', gender='F', itemInSession=65, lastName='Levine', length=205.03465, level='paid', location='Portland-South Portland, ME', method='PUT', page='NextSong', registration=1540794356796.0, sessionId=992, song='Jenny Take A Ride (LP Version)', status=200, ts=1543363215796, userAgent='"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.143 Safari/537.36"', userId='80', ts_timestamp='java.util.GregorianCalendar[time=?,areFieldsSet=false,areAllFieldsSet=false,lenient=true,zone=sun.util.calendar.ZoneInfo[id="Etc/UTC",offset=0,dstSavings=0,useDaylight=false,transitions=0,lastRule=null],firstDayOfWeek=1,minimalDaysInFirstWeek=1,ERA=?,YEAR=2018,MONTH=10,WEEK_OF_YEAR=?,WEEK_OF_MONTH=?,DAY_OF_MONTH=28,DAY_OF_YEAR=?,DAY_OF_WEEK=?,DAY_OF_WEEK_IN_MONTH=?,AM_PM=0,HOUR=0,HOUR_OF_DAY=0,MINUTE=0,SECOND=15,MILLISECOND=796,ZONE_OFFSET=?,DST_OFFSET=?]'),
 Row(artist='The Spill 

In [27]:
df_log_next_song = df_log_next_song.withColumn('ts_year',year(df_log_next_song.ts_timestamp))
df_log_next_song = df_log_next_song.withColumn('ts_month',month(df_log_next_song.ts_timestamp))
df_log_next_song = df_log_next_song.withColumn('ts_day',dayofmonth(df_log_next_song.ts_timestamp))
df_log_next_song = df_log_next_song.withColumn('ts_weekofyear',weekofyear(df_log_next_song.ts_timestamp))
df_log_next_song = df_log_next_song.withColumn('ts_hour',hour(df_log_next_song.ts_timestamp))
df_log_next_song = df_log_next_song.withColumn('ts_weekday',weekofyear(df_log_next_song.ts_timestamp))

time_table = df_log_next_song.select(['ts_timestamp','ts_hour','ts_day','ts_weekofyear','ts_month','ts_weekday'])

In [28]:
time_table.take(3)

[Row(ts_timestamp='java.util.GregorianCalendar[time=?,areFieldsSet=false,areAllFieldsSet=false,lenient=true,zone=sun.util.calendar.ZoneInfo[id="Etc/UTC",offset=0,dstSavings=0,useDaylight=false,transitions=0,lastRule=null],firstDayOfWeek=1,minimalDaysInFirstWeek=1,ERA=?,YEAR=2018,MONTH=10,WEEK_OF_YEAR=?,WEEK_OF_MONTH=?,DAY_OF_MONTH=28,DAY_OF_YEAR=?,DAY_OF_WEEK=?,DAY_OF_WEEK_IN_MONTH=?,AM_PM=0,HOUR=0,HOUR_OF_DAY=0,MINUTE=0,SECOND=15,MILLISECOND=796,ZONE_OFFSET=?,DST_OFFSET=?]', ts_hour=None, ts_day=None, ts_weekofyear=None, ts_month=None, ts_weekday=None),
 Row(ts_timestamp='java.util.GregorianCalendar[time=?,areFieldsSet=false,areAllFieldsSet=false,lenient=true,zone=sun.util.calendar.ZoneInfo[id="Etc/UTC",offset=0,dstSavings=0,useDaylight=false,transitions=0,lastRule=null],firstDayOfWeek=1,minimalDaysInFirstWeek=1,ERA=?,YEAR=2018,MONTH=10,WEEK_OF_YEAR=?,WEEK_OF_MONTH=?,DAY_OF_MONTH=28,DAY_OF_YEAR=?,DAY_OF_WEEK=?,DAY_OF_WEEK_IN_MONTH=?,AM_PM=0,HOUR=0,HOUR_OF_DAY=0,MINUTE=3,SECOND=40,MILL

In [29]:
df_song.createOrReplaceTempView('songs_staging')
df_log_next_song.createOrReplaceTempView('logs_staging')

In [30]:
df_song.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



In [31]:
df_log_next_song.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- ts_timestamp: string (nullable = true)
 |-- ts_year: integer (nullable = true)
 |-- ts_month: integer (nullable = true)
 |-- ts_day: integer (nullable = true)
 |-- ts_weekofyear: integer (nullable = true)
 |-- ts_hour: integer (nullable = true)
 |-- ts_weekday: integer (nullable = true)



In [32]:
spark.sql("""
            select 
                logs_staging.registration
            FROM logs_staging as logs_staging
            LEFT JOIN songs_staging as songs_staging
            on logs_staging.artist=songs_staging.artist_name
        """)

DataFrame[registration: double]

In [39]:
songplays_table = spark.sql("""
            select 
                logs_staging.registration,
                logs_staging.userId,
                logs_staging.level,
                songs_staging.song_id,
                songs_staging.artist_id,
                logs_staging.sessionId,
                logs_staging.location,
                logs_staging.userAgent,
                logs_staging.ts_year as year,
                logs_staging.ts_month as month
            from logs_staging
            left join songs_staging
            on logs_staging.artist=songs_staging.artist_name
        """)

In [40]:
songplays_table

DataFrame[registration: double, userId: string, level: string, song_id: string, artist_id: string, sessionId: bigint, location: string, userAgent: string, year: int, month: int]

In [41]:
songplays_table.write.mode('overwrite').partitionBy("year","month").parquet('output/songplay.parquet')