In [61]:
import os
import configparser
import pandas as pd
from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, date_format, monotonically_increasing_id
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, dayofweek
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, LongType, TimestampType

config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['KEY']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['KEY']['AWS_SECRET_ACCESS_KEY']

pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', None)

In [None]:
spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()

In [118]:
song_schema = StructType([
    StructField("artist_id", StringType(), False),
    StructField("artist_latitude", DoubleType(), True),
    StructField("artist_location", StringType(), True),
    StructField("artist_longitude", DoubleType(), True),
    StructField("artist_name", StringType(), False),
    StructField("duration", DoubleType(), False),
    StructField("num_songs", IntegerType(), True),
    StructField("song_id", StringType(), False),
    StructField("title", StringType(), False),
    StructField("year", IntegerType(), True),
])

In [153]:
song_data = 'data/song_data/A/A/A/TRAAAAW128F429D538.json'
song_df = spark.read.json(song_data, schema=song_schema)
song_df.limit(3).toPandas()

Unnamed: 0,artist_id,artist_latitude,artist_location,artist_longitude,artist_name,duration,song_id,title,year
0,ARD7TVE1187B99BFB1,,California - LA,,Casual,218.93179,SOMZWCG12A8C13C480,I Didn't Mean To,0


In [117]:
song_df.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: integer (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: integer (nullable = true)



In [109]:
song_data = 'data/song_data/A/A/A/TRAAAAW128F429D538.json'
song_df = spark.read.json(song_data)
song_df.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: string (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: string (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



In [95]:
song_df.count()

1

In [154]:
songs_table = song_df.select('song_id', 'title', 'artist_id', 'year', 'duration', 
                            col('artist_id').alias('artist')).drop_duplicates(subset=['song_id'])
songs_table.count()

1

In [155]:
artists_table = song_df.select('artist_id', 
                              col('artist_name').alias('name'), 
                              col('artist_location').alias('location'), 
                              col('artist_latitude').alias('latitude'), 
                              col('artist_longitude').alias('longitude')) \
                    .drop_duplicates(subset=['artist_id'])

In [None]:
songs_table.drop_duplicates(subset=['song_id'])

In [163]:
log_schema = StructType([
    StructField("artist", StringType()),
    StructField("auth", StringType(), True),
    StructField("firstName", StringType(), True),
    StructField("gender", StringType(), True),
    StructField("itemInSession", IntegerType(), True),
    StructField("lastName", StringType(), True),
    StructField("length", DoubleType(), True),
    StructField("level", StringType(), True),
    StructField("location", StringType(), True),
    StructField("method", StringType(), True),
    StructField("page", StringType(), True),
    StructField("registration", DoubleType(), True),
    StructField("sessionId", IntegerType(), True),
    StructField("song", StringType(), True),
    StructField("status", IntegerType(), True),
    StructField("ts", LongType(), True),
    StructField("userAgent", StringType(), True),
    StructField("userId", IntegerType(), True),
])

In [164]:
log_data = 'data/log_data/2018-11-01-events.json'
log_df = spark.read.json(log_data, schema=log_schema)
# log_df = log_df.where(log_df.page=='NextSong')
log_df.limit(3).toPandas()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,,,,,,,,,,,,,,,,,,
1,,,,,,,,,,,,,,,,,,
2,,,,,,,,,,,,,,,,,,


In [89]:
log_df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [82]:
# Left outer join
df_joined = (log_df.alias('a').join(song_df.alias('b'),
                             (col('a.song') == col('b.title'))
                             & (col('a.length') == col('b.duration')),
                             how='left')
      .withColumn('songplay_id', monotonically_increasing_id())
      .select('songplay_id',
              col('ts').alias('start_time'),
              col('userId').alias('user_id'),
              'level',
              'song_id',
              'artist_id',
              col('sessionId').alias('session_id'),
              'location',
              col('userAgent').alias('user_agent'))
     )
df_joined.limit(3).toPandas()

Unnamed: 0,songplay_id,start_time,user_id,level,song_id,artist_id,session_id,location,user_agent
0,0,1541106106796,8,free,,,139,"Phoenix-Mesa-Scottsdale, AZ","""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK..."
1,1,1541106352796,8,free,,,139,"Phoenix-Mesa-Scottsdale, AZ","""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK..."
2,2,1541106496796,8,free,,,139,"Phoenix-Mesa-Scottsdale, AZ","""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK..."


In [85]:
time_table.select(col('start_time').alias('ts'),'year','month').limit(1).toPandas()

Unnamed: 0,ts,year,month
0,1541106106796,2018,11


In [None]:
time_table = log_df.select(col('ts').alias('start_time'), 
                           hour('timestamp').alias('hour'), 
                           dayofmonth('timestamp').alias('day'), 
                           weekofyear('timestamp').alias('week'), 
                           month('timestamp').alias('month'), 
                           year('timestamp').alias('year'), 
                           dayofweek('timestamp').alias('weekday')) 
time_table.limit(1).toPandas()

In [86]:
songplays_table = df_joined.alias('a').join(time_table.select(col('start_time').alias('ts'),'year','month').alias('b'), 
                                     col('a.start_time')==col('b.ts')).drop('ts')
songplays_table.limit(3).toPandas()

Unnamed: 0,songplay_id,start_time,user_id,level,song_id,artist_id,session_id,location,user_agent,year,month
0,0,1541106106796,8,free,,,139,"Phoenix-Mesa-Scottsdale, AZ","""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",2018,11
1,1,1541106352796,8,free,,,139,"Phoenix-Mesa-Scottsdale, AZ","""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",2018,11
2,2,1541106496796,8,free,,,139,"Phoenix-Mesa-Scottsdale, AZ","""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",2018,11


In [87]:
songplays_table.printSchema()

root
 |-- songplay_id: long (nullable = false)
 |-- start_time: long (nullable = true)
 |-- user_id: string (nullable = true)
 |-- level: string (nullable = true)
 |-- song_id: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- session_id: long (nullable = true)
 |-- location: string (nullable = true)
 |-- user_agent: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)



In [80]:
final = df.join(time_table.select(col('start_time').alias('ts'),'year','month').alias('a'), df.start_time==col('a.ts')).drop('ts')
final.limit(5).toPandas()

Unnamed: 0,songplay_id,start_time,user_id,level,song_id,artist_id,session_id,location,user_agent,year,month
0,0,1541106106796,8,free,,,139,"Phoenix-Mesa-Scottsdale, AZ","""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",2018,11
1,1,1541106352796,8,free,,,139,"Phoenix-Mesa-Scottsdale, AZ","""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",2018,11
2,2,1541106496796,8,free,,,139,"Phoenix-Mesa-Scottsdale, AZ","""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",2018,11
3,3,1541106673796,8,free,,,139,"Phoenix-Mesa-Scottsdale, AZ","""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",2018,11
4,4,1541107053796,8,free,,,139,"Phoenix-Mesa-Scottsdale, AZ","""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",2018,11


In [161]:
get_timestamp = udf(lambda x: datetime.fromtimestamp(x/1000.0), T.TimestampType())
log_df = log_df.withColumn('timestamp', get_timestamp(log_df.ts))
log_df.limit(1).toPandas()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId,timestamp
0,,Logged In,Walter,M,0,Frye,,free,"San Francisco-Oakland-Hayward, CA",GET,Home,1540919000000.0,38,,200,1541105830796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",39,2018-11-01 20:57:10.796


In [162]:
time_table = log_df.select(col('ts').alias('start_time'), 
                       hour('timestamp').alias('hour'), 
                       dayofmonth('timestamp').alias('day'), 
                       weekofyear('timestamp').alias('week'), 
                       month('timestamp').alias('month'), 
                       year('timestamp').alias('year'), 
                       dayofweek('timestamp').alias('weekday')) \
                 .drop_duplicates(subset=['start_time'])
time_table.limit(1).toPandas()

Unnamed: 0,start_time,hour,day,week,month,year,weekday
0,1541106673796,21,1,44,11,2018,5


In [81]:
time_table.limit(1).toPandas()

Unnamed: 0,start_time,hour,day,week,month,year,weekday
0,1541106106796,21,1,44,11,2018,5


In [59]:
output_data = "data/"
test = spark.read.parquet(f'{output_data}songs/*/*/*.parquet')
test.limit(1).toPandas()

Unnamed: 0,song_id,title,artist_id,duration
0,SOMZWCG12A8C13C480,I Didn't Mean To,ARD7TVE1187B99BFB1,218.93179


Unnamed: 0,song_id,title,duration
0,SOMZWCG12A8C13C480,I Didn't Mean To,218.93179
