In [1]:
import os
from typing import List
import configparser
from pprint import pprint
from datetime import datetime

from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, year, month, dayofmonth, hour, \
    weekofyear, date_format, dayofweek, max, monotonically_increasing_id, desc
from pyspark.sql.types import StructType, StructField, StringType, \
    DoubleType, IntegerType, TimestampType

In [2]:
spark = SparkSession.builder \
                    .appName('SparkETL') \
                    .getOrCreate()

21/08/01 18:02:55 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
21/08/01 18:02:58 WARN MacAddressUtil: Failed to find a usable hardware address from the network interfaces; using random bytes: 07:eb:e7:7a:9f:75:36:93


In [3]:
log_data_path = '../../data/log_data'
song_data_path = '../../data/song_data'

In [4]:
def get_directories_that_contain_jsons(path: str) -> List[str]:
    folders = []
    for dirpath, dirnames, filenames in os.walk(song_data_path):
        if filenames and '.DS_Store' not in filenames:
            folders.append(dirpath)
    return folders
            
song_data_specific_paths = get_directories_that_contain_jsons(song_data_path)

In [5]:
df_log_data = spark.read.json(log_data_path)

                                                                                

In [6]:
df_song_data = spark.read.json(song_data_specific_paths)

                                                                                

In [None]:
from pyspark.sql.types import StructType

schema = [i for i in df_log_data.schema] 

In [None]:
schema

In [None]:
StructType(schema)

In [None]:
df_log_data.limit(10).toPandas()

In [None]:
df_song_data.limit(10).toPandas()

## songs table

In [12]:
df_songs = (df_song_data
                .select("song_id", 
                        "title", 
                        "artist_id", 
                        "year", 
                        "duration")
                .dropDuplicates(["song_id"])
           )

df_songs.toPandas()

                                                                                

Unnamed: 0,song_id,title,artist_id,year,duration
0,SOGOSOV12AF72A285E,¿Dónde va Chichi?,ARGUVEV1187B98BA17,1997,313.12934
1,SOMZWCG12A8C13C480,I Didn't Mean To,ARD7TVE1187B99BFB1,0,218.93179
2,SOUPIRU12A6D4FA1E1,Der Kleine Dompfaff,ARJIE2Y1187B994AB7,0,152.92036
3,SOXVLOJ12AB0189215,Amor De Cabaret,ARKRRTF1187B9984DA,0,177.47546
4,SOWTBJW12AC468AC6E,Broken-Down Merry-Go-Round,ARQGYP71187FB44566,0,151.84934
...,...,...,...,...,...
66,SOBBUGU12A8C13E95D,Setting Fire to Sleeping Giants,ARMAC4T1187FB3FA4C,2004,207.77751
67,SOHUOAP12A8AE488E9,Floating,ARD842G1187B997376,1987,491.12771
68,SORAMLE12AB017C8B0,Auguri Cha Cha,ARHHO3O1187B989413,0,191.84281
69,SOVYKGO12AB0187199,Crazy Mixed Up World,ARH4Z031187B9A71F2,1961,156.39465


## Artist Table

In [14]:
df_artists = (df_song_data
            .select(
                "artist_id",
                col("artist_name").alias("name"),
                col("artist_location").alias("location"),
                col("artist_latitude").alias("latitude"),
                col("artist_longitude").alias("longitude"))
            .distinct()
    )

df_artists.toPandas()

                                                                                

Unnamed: 0,artist_id,name,location,latitude,longitude
0,ARPBNLO1187FB3D52F,Tiny Tim,"New York, NY",40.71455,-74.00712
1,ARXR32B1187FB57099,Gob,,,
2,AROGWRA122988FEE45,Christos Dantis,,,
3,ARBGXIG122988F409D,Steel Rain,California - SF,37.77916,-122.42005
4,AREVWGE1187B9B890A,Bitter End,Noci (BA),-13.44200,-41.99520
...,...,...,...,...,...
64,ARGIWFO1187B9B55B7,Five Bolt Main,,,
65,AR62SOJ1187FB47BB5,Chase & Status,,,
66,AR1Y2PT1187FB5B9CE,John Wesley,Brandon,27.94017,-82.32547
67,AR10USD1187B99F3F1,Tweeterfriendly Music,"Burlington, Ontario, Canada",,


## Users Table

In [None]:
df_log_data.printSchema()

In [58]:
(
    df_log_data
    .select(col('userId').cast('int'),
            'ts')
    .groupBy('userId')
    .max('ts')
    .orderBy('userId')
).toPandas()

                                                                                

Unnamed: 0,userId,max(ts)
0,,1543596347796
1,2.0,1542450490796
2,3.0,1541191397796
3,4.0,1543541644796
4,5.0,1543607664796
...,...,...
93,97.0,1543416646796
94,98.0,1543479789796
95,99.0,1542532333796
96,100.0,1543402126796


In [71]:
df_users = (
    df_log_data
        .select(
            col("userId").alias("user_id"),
            col("firstName").alias("first_name"),
            col("lastName").alias("last_name"),
            col("gender"),
            col("level"),
            col("ts")
        )
        .orderBy(col("user_id"),
                col("ts").desc())
)

df_users.toPandas()

                                                                                

Unnamed: 0,user_id,first_name,last_name,gender,level,ts
0,,,,,free,1543596347796
1,,,,,paid,1543589944796
2,,,,,paid,1543588287796
3,,,,,paid,1543588286796
4,,,,,paid,1543585407796
...,...,...,...,...,...,...
8051,99,Ann,Banks,F,free,1542182809796
8052,99,Ann,Banks,F,free,1542182794796
8053,99,Ann,Banks,F,free,1541777585796
8054,99,Ann,Banks,F,free,1541777355796


## Time Table

Since Spark 1.5 you can use a number of date processing functions:
https://stackoverflow.com/questions/30949202/spark-dataframe-timestamptype-how-to-get-year-month-day-values-from-field

- pyspark.sql.functions.year
- pyspark.sql.functions.month
- pyspark.sql.functions.dayofmonth
- pyspark.sql.functions.dayofweek()
- pyspark.sql.functions.dayofyear
- pyspark.sql.functions.weekofyear()


In [7]:
@udf(returnType=TimestampType())
def parse_timestamp(ts):
    parsed_ts = datetime.fromtimestamp(ts / 1000).replace(microsecond=0)
    return parsed_ts

In [57]:
df_time = (df_log_data
            .withColumn('start_time', parse_timestamp('ts')) 
            .select('start_time')
            .withColumn('week', weekofyear('start_time'))
            .withColumn('hour', hour('start_time'))
            .withColumn('day', dayofmonth('start_time'))
            .withColumn('month', month('start_time'))
            .withColumn('year', year('start_time'))
       )

df_time.toPandas()

                                                                                

Unnamed: 0,start_time,week,hour,day,month,year
0,2018-11-15 00:30:26,46,0,15,11,2018
1,2018-11-15 00:41:21,46,0,15,11,2018
2,2018-11-15 00:45:41,46,0,15,11,2018
3,2018-11-15 01:57:51,46,1,15,11,2018
4,2018-11-15 03:29:37,46,3,15,11,2018
...,...,...,...,...,...,...
8051,2018-11-01 21:42:00,44,21,1,11,2018
8052,2018-11-01 21:50:15,44,21,1,11,2018
8053,2018-11-01 21:52:05,44,21,1,11,2018
8054,2018-11-01 21:55:25,44,21,1,11,2018


## Songplays

In [18]:
df_songplays = (df_song_data
                .join(df_log_data, 
                      df_song_data.artist_name == df_log_data.artist)
                .withColumn('start_time', parse_timestamp(df_log_data.ts))
                .select(col('userId').alias('user_id'),
                       col('level'),
                       col('song_id'),
                       col('artist_id'),
                       col('sessionId'),
                       col('location'),
                       col('userAgent'))
                .where(df_log_data.page == 'NextSong')
                
)

df_songplays.limit(10).toPandas()

                                                                                

Unnamed: 0,user_id,level,song_id,artist_id,sessionId,location,userAgent
0,44,paid,SOBONFF12A6D4F84D8,ARIK43K1187B9AE54C,619,"Waterloo-Cedar Falls, IA",Mozilla/5.0 (Macintosh; Intel Mac OS X 10.9; r...
1,15,paid,SOZCTXZ12AB0182364,AR5KOSW1187FB35FF4,818,"Chicago-Naperville-Elgin, IL-IN-WI","""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5..."
2,34,free,SOWQTQZ12A58A7B63E,ARPFHN61187FB575F6,495,"Milwaukee-Waukesha-West Allis, WI",Mozilla/5.0 (Macintosh; Intel Mac OS X 10.9; r...
3,101,free,SORRZGD12A6310DBC3,ARVBRGZ1187FB4675A,603,"New Orleans-Metairie, LA","""Mozilla/5.0 (Windows NT 6.3; WOW64) AppleWebK..."
4,24,paid,SOWQTQZ12A58A7B63E,ARPFHN61187FB575F6,984,"Lake Havasu City-Kingman, AZ","""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK..."
5,88,paid,SONWXQJ12A8C134D94,ARNF6401187FB57032,888,"Sacramento--Roseville--Arden-Arcade, CA","""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4..."
6,80,paid,SOBONFF12A6D4F84D8,ARIK43K1187B9AE54C,903,"Portland-South Portland, ME","""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4..."
7,49,paid,SOBONFF12A6D4F84D8,ARIK43K1187B9AE54C,1041,"San Francisco-Oakland-Hayward, CA",Mozilla/5.0 (Windows NT 5.1; rv:31.0) Gecko/20...
8,49,paid,SOFSOCN12A8C143F5D,ARXR32B1187FB57099,724,"San Francisco-Oakland-Hayward, CA",Mozilla/5.0 (Windows NT 5.1; rv:31.0) Gecko/20...
9,42,paid,SOFFKZS12AB017F194,ARBEBBY1187B9B43DB,632,"New York-Newark-Jersey City, NY-NJ-PA","""Mozilla/5.0 (Windows NT 6.3; WOW64) AppleWebK..."
