In [None]:
from pyspark.sql import SparkSession



In [None]:
spark = (SparkSession
        .Builder()
        .appName("Python Spark SQL basic example")
        .master("local[2]")
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()
)
spark.sparkContext.setLogLevel('WARN')

# Song Data

In [24]:
df_songs = spark.read.json("./data/song_data/A/*/*/*.json")
df_songs.printSchema()



root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



                                                                                

The infered schema looks good. We can create one to make it more explicit

In [None]:
from pyspark.sql.types import (
    StructType,
    StructField, 
    StringType, 
    DoubleType,
    LongType,
    IntegerType
)
# create a schema simialr to this
# |-- artist_id: string (nullable = true)
# |-- artist_latitude: double (nullable = true)
# |-- artist_location: string (nullable = true)
# |-- artist_longitude: double (nullable = true)
# |-- artist_name: string (nullable = true)
# |-- duration: double (nullable = true)
# |-- num_songs: long (nullable = true)
# |-- song_id: string (nullable = true)
# |-- title: string (nullable = true)
# |-- year: long (nullable = true)
songs_schema = StructType([
    StructField('artist_id', StringType()),
    StructField('artist_latitude', DoubleType()),
    StructField('artist_location', StringType()),
    StructField('artist_longitude', DoubleType()),
    StructField('artist_name', StringType()),
    StructField('duration', DoubleType()),
    StructField('num_songs', IntegerType()),
])

In [None]:
df_songs = spark.read.json("./data/song_data/A/*/*/*.json", 
                           schema = songs_schema)

### 1. Songs Table

In [23]:
df_songs.limit(5).toPandas()

Unnamed: 0,artist_id,artist_latitude,artist_location,artist_longitude,artist_name,duration,num_songs,song_id,title,year
0,AR4T2IF1187B9ADBB7,63.96027,"<a href=""http://billyidol.net"" onmousedown='Un...",10.22442,Billy Idol,233.22077,1,SOVIYJY12AF72A4B00,The Dead Next Door (Digitally Remastered 99),1983
1,AR4T2IF1187B9ADBB7,63.96027,"<a href=""http://billyidol.net"" onmousedown='Un...",10.22442,Billy Idol,287.92118,1,SOVYXYL12AF72A3373,Rebel Yell (1999 Digital Remaster),1983
2,ARQ846I1187B9A7083,,,,Yvonne S. Moriarty / Walt Fowler / Ladd McInto...,196.04853,1,SOEPTVC12A67ADD0DA,"To Zucchabar [""Gladiator"" - Music from the Mot...",0
3,AR4T2IF1187B9ADBB7,63.96027,"<a href=""http://billyidol.net"" onmousedown='Un...",10.22442,Billy Idol,247.53587,1,SOLQYSZ12AB0181F97,Mony Mony (Live),1987
4,AR3TZ691187FB3DBB1,,,,Russell Watson / Pino Palladino / Robbie McInt...,273.44934,1,SOVPFJK12A6701CB16,Barcelona - (Friends until the end),2000


In [None]:
songs_tablo = (
    df_songs.
    select('song_id', 'title', 'artist_id', 'year', 'duration')
    .drop_duplicates(["song_id"])
)
songs_tablo.write.parquet(path = "songs_table",
                          partitionBy= ["year", "artist_id"])


## 2. Artist Table


In [None]:
artists_table = (
    df_songs
    .select("artist_id", "artist_name", "artist_location", "artist_latitude", "artist_longitude")
    .withColumnRenamed("artist_name", "name")
    .drop_duplicates(["artist_id"])
)

In [None]:
artists_table.write.parquet(path = "path")

In [None]:
df_logs = spark.read.json("./data/log_data/2018/11/*.json")
df_logs.printSchema()

# 2. Log Data

In [66]:
df_logs = spark.read.json("./data/log_data/2018/11/*.json")
df_logs.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



Some datatypes are not accurate, so, we can
* Fix them individually
* Or enforce a schema to fix them when reading the data

In [67]:
from pyspark.sql.types import (
    StructType,
    StructField, 
    StringType, 
    DoubleType,
    LongType,
    TimestampType,
    IntegerType
)

In [68]:

# # We want a schema like this one
#  |-- artist: string (nullable = true)
#  |-- auth: string (nullable = true)
#  |-- firstName: string (nullable = true)
#  |-- gender: string (nullable = true)
#  |-- itemInSession: long (nullable = true)
#  |-- lastName: string (nullable = true)
#  |-- length: double (nullable = true)
#  |-- level: string (nullable = true)
#  |-- location: string (nullable = true)
#  |-- method: string (nullable = true)
#  |-- page: string (nullable = true)
#  |-- registration: double (nullable = true)
#  |-- sessionId: long (nullable = true)
#  |-- song: string (nullable = true)
#  |-- status: long (nullable = true)
#  |-- ts: long (nullable = true)
#  |-- userAgent: string (nullable = true)
#  |-- userId: string (nullable = true)
logs_schema = StructType([
    StructField("artist", StringType()),
    StructField("auth", StringType()),
    StructField("firstName", StringType()),
    StructField("gender", StringType()),
    StructField("itemInSession", LongType()),
    StructField("lastName", StringType()),
    StructField("length", DoubleType()),
    StructField("level", StringType()),
    StructField("location", StringType()),
    StructField("method", StringType()),
    StructField("page", StringType()),
    StructField("registration", TimestampType()),
    StructField("sessionId", LongType()),
    StructField("song", StringType()),
    StructField("status", LongType()),
    StructField("ts", TimestampType()),
    StructField("userAgent", StringType()),
    StructField("userId", StringType())
])

In [69]:
df_logs = spark.read.json("./data/log_data/2018/11/*.json",
                         schema=logs_schema)


In [70]:
df_logs.limit(5).toPandas()

ValueError: year 50841 is out of range

It looks like we need to get our hands a little bit dirty. A quick fix using the schema didn't do it

In [81]:
logs_schema = StructType([
    StructField("artist", StringType()),
    StructField("auth", StringType()),
    StructField("firstName", StringType()),
    StructField("gender", StringType()),
    StructField("itemInSession", IntegerType()),
    StructField("lastName", StringType()),
    StructField("length", DoubleType()),
    StructField("level", StringType()),
    StructField("location", StringType()),
    StructField("method", StringType()),
    StructField("page", StringType()),
    StructField("registration", LongType()),
    StructField("sessionId", IntegerType()),
    StructField("song", StringType()),
    StructField("status", IntegerType()),
    StructField("ts", LongType()),
    StructField("userAgent", StringType()),
    StructField("userId", IntegerType())
])
df_logs = spark.read.json("./data/log_data/2018/11/*.json",
                          schema=logs_schema)
df_logs.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: integer (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: long (nullable = true)
 |-- sessionId: integer (nullable = true)
 |-- song: string (nullable = true)
 |-- status: integer (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: integer (nullable = true)



In [82]:
from pyspark.sql.functions import from_unixtime

In [83]:
# fixing the timestamp columns
df_logs = df_logs.withColumn('ts', from_unixtime(df_logs["ts"]/1000))
df_logs = df_logs.withColumn('registration', from_unixtime(df_logs["registration"]/1000))
df_logs.limit(3).toPandas()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,Harmonia,Logged In,Ryan,M,0,Smith,655.77751,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,,583,Sehr kosmisch,200,2018-11-15 00:30:26,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",
1,The Prodigy,Logged In,Ryan,M,1,Smith,260.07465,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,,583,The Big Gundown,200,2018-11-15 00:41:21,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",
2,Train,Logged In,Ryan,M,2,Smith,205.45261,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,,583,Marry Me,200,2018-11-15 00:45:41,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",


In [84]:
df_logs.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: integer (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: string (nullable = true)
 |-- sessionId: integer (nullable = true)
 |-- song: string (nullable = true)
 |-- status: integer (nullable = true)
 |-- ts: string (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: integer (nullable = true)



In [88]:
# renaming from camel case to  snake case
df_logs = (
    df_logs
    .withColumnRenamed('firstName', 'first_name')
    .withColumnRenamed('lastName', 'last_name')
    .withColumnRenamed('userId', 'user_id')
    .withColumnRenamed('itemInSession','item_in_session')
    .withColumnRenamed('sessionId','session_id')
    .withColumnRenamed('userAgent', 'user_agent')
)

+--------------------+----------+----------+------+-------------+--------+---------+-----+--------------------+------+--------+------------+---------+--------------------+------+-------------------+--------------------+------+
|              artist|      auth|first_name|gender|itemInSession|lastName|   length|level|            location|method|    page|registration|sessionId|                song|status|                 ts|           userAgent|userId|
+--------------------+----------+----------+------+-------------+--------+---------+-----+--------------------+------+--------+------------+---------+--------------------+------+-------------------+--------------------+------+
|            Harmonia| Logged In|      Ryan|     M|            0|   Smith|655.77751| free|San Jose-Sunnyval...|   PUT|NextSong|        null|      583|       Sehr kosmisch|   200|2018-11-15 00:30:26|"Mozilla/5.0 (X11...|  null|
|         The Prodigy| Logged In|      Ryan|     M|            1|   Smith|260.07465| free|Sa

## 2.1 Songplay table

* songplays - records in log data associated with song plays i.e. records with page `NextSong`
    * songplay_id, 
    * start_time, 
    * user_id, 
    * level, 
    * song_id, 
    * artist_id, 
    * session_id, 
    * location, 
    * user_agent

In [64]:
import pyspark.sql.functions as f

In [None]:

# the full table will requires a join. We can then select a subset of the columns and filter on the page.
# to later join it with the songs table

songplay_table = (
    df_logs
    .filter(f.lower(df_logs["page"]) == "nextsong")
    .select('ts', 'userId',  'level', 'song', 'artist', 'sessionId', 'location', 'userAgent')
    .join(df_songs, on = df_logs["song"] == df_songs["title"], how = "inner")
    .withColumnsRenamed(
        {
            "ts": "start_time",
            "userId": "user_id",
            "sessionId": "session_id",
            "userAgent": "user_agent" 
        }
    ).withColumn('songplay_id', f.monotonically_increasing_id())
)

## 2.2 Users Table


In [None]:
users_table = (
    df_logs.select('userId', 'first')
)