In [1]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, monotonically_increasing_id, from_unixtime
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, dayofweek, to_date
from pyspark.sql.types import IntegerType, TimestampType, DateType

In [2]:
spark = SparkSession \
    .builder \
    .appName("Data Frames practice") \
    .getOrCreate()

df_log  = spark.read.json("data/log-data/")
df_song = spark.read.json("data/song_data/*/*/*")

In [9]:
#df_log.printSchema()
df_song.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



In [5]:
song = df_song.select('song_id', 'title', 'artist_id', 'year', 'duration') \
    .dropDuplicates()
song.printSchema()

song.orderBy().take(5)

root
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- year: long (nullable = true)
 |-- duration: double (nullable = true)



[Row(song_id='SOGOSOV12AF72A285E', title='¿Dónde va Chichi?', artist_id='ARGUVEV1187B98BA17', year=1997, duration=313.12934),
 Row(song_id='SOTTDKS12AB018D69B', title='It Wont Be Christmas', artist_id='ARMBR4Y1187B9990EB', year=0, duration=241.47546),
 Row(song_id='SOBBUGU12A8C13E95D', title='Setting Fire to Sleeping Giants', artist_id='ARMAC4T1187FB3FA4C', year=2004, duration=207.77751),
 Row(song_id='SOIAZJW12AB01853F1', title='Pink World', artist_id='AR8ZCNI1187B9A069B', year=1984, duration=269.81832),
 Row(song_id='SONYPOM12A8C13B2D7', title='I Think My Wife Is Running Around On Me (Taco Hell)', artist_id='ARDNS031187B9924F0', year=2005, duration=186.48771)]

In [4]:
artist = df_song.select('artist_id', col('artist_name').alias('name'), col('artist_location')\
                        .alias('location'),col('artist_latitude').alias('latitude'), \
                        col('artist_longitude').alias('longitude')).dropDuplicates()
artist.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- location: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)



In [5]:
user_temp = df_log.filter(df_log.userId != ''). \
    groupBy('userId'). \
    max('ts'). \
    select('userId', col('max(ts)').alias('max_ts'))

users = df_log.join(user_temp, [df_log.userId == user_temp.userId, df_log.ts == user_temp.max_ts]). \
    select(df_log.userId.alias('user_id'), df_log.firstName.alias('first_name'), \
           df_log.lastName.alias('last_name'), df_log.gender, df_log.level)

users = users.withColumn('user_id', users['user_id'].cast(IntegerType()))
users.printSchema()

root
 |-- user_id: integer (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- level: string (nullable = true)



In [6]:
df_log = df_log.filter(df_log.page == 'NextSong')
df_log = df_log.withColumn('timestamp', from_unixtime(df_log.ts/1000)\
                                 .cast(TimestampType()))
df_log = df_log.withColumn('datetime', from_unixtime(df_log.ts/1000).cast(DateType()))

time = df_log.select(df_log.timestamp.alias('start_time'), hour('timestamp').alias('hour'), \
                       dayofmonth('timestamp').alias('day'), weekofyear('timestamp').alias('week'), \
                       month('timestamp').alias('month'), year('timestamp').alias('year'), \
                       dayofweek('timestamp').alias('weekday'))

#time = time.withColumn('start_time_datetime', to_date(df_log.start_time, 'yyyy-mm-dd HH:mm:ss')\
#                                 .cast(DateType()))

#time.printSchema()
time.printSchema()
#time.write.partitionBy('year', 'month').parquet('data/analytics/time/')

root
 |-- start_time: timestamp (nullable = true)
 |-- hour: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- weekday: integer (nullable = true)



In [8]:
songplays = df_log.filter(df_log.page == 'NextSong'). \
    join(df_song, (df_song.title == df_log.song) & \
                  (df_song.artist_name == df_log.artist) & \
                  (df_song.duration == df_log.length), 'inner'). \
    select(monotonically_increasing_id().alias('songplay_id'), df_log.timestamp, \
           df_log.userId.alias('user_id'), df_log.level, df_song.song_id, df_song.artist_id, \
           df_log.sessionId.alias('session_id'), df_log.location, df_log.userAgent.alias('user_agent'), \
           year(df_log.timestamp).alias('year'), month(df_log.timestamp).alias('month'))

songplays.printSchema()

root
 |-- songplay_id: long (nullable = false)
 |-- timestamp: timestamp (nullable = true)
 |-- user_id: string (nullable = true)
 |-- level: string (nullable = true)
 |-- song_id: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- session_id: long (nullable = true)
 |-- location: string (nullable = true)
 |-- user_agent: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)



In [None]:
song = spark.read.parquet('')
songplays1 = df_log.filter(df_log.page == 'NextSong'). \
    join(song, [song.title == df_log.song,df_song.artist_name == df_log.artist, \
                   df_song.duration == df_log.length]). \
    select(monotonically_increasing_id().alias('songplay_id'), df_log.ts.alias('start_time'), \
           df_log.userId.alias('user_id'), df_log.level, df_song.song_id, df_song.artist_id, \
           df_log.sessionId.alias('session_id'), df_log.location, df_log.userAgent.alias('user_agent'), \
           df_log.year, df_log.month
          )

songplay1.printSchema()
songplay1.show()