In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import desc
from pyspark.sql.functions import asc
from pyspark.sql.functions import sum as Fsum

import datetime

import numpy as np
import pandas as pd
%matplotlib inline
import matplotlib.pyplot as plt

In [2]:
spark = SparkSession \
    .builder \
    .appName("Data wrangling with Spark SQL") \
    .getOrCreate()

In [3]:
song_data = "data/song-data.json"
print(song_data)

data/song-data.json


In [4]:
songs_df = spark.read.json(song_data)

In [5]:
songs_df.createOrReplaceTempView("staging_songs")

In [6]:
spark.sql('''
SELECT 
  song_id
, title
, artist_id
, year
, duration 
FROM staging_songs 
LIMIT 2
''').show()

+------------------+----------------+------------------+----+---------+
|           song_id|           title|         artist_id|year| duration|
+------------------+----------------+------------------+----+---------+
|SOMZWCG12A8C13C480|I Didn't Mean To|ARD7TVE1187B99BFB1|   0|218.93179|
+------------------+----------------+------------------+----+---------+



In [7]:
song_table = spark.sql('''
SELECT 
  song_id
, title
, artist_id
, year
, duration 
FROM staging_songs 
''')

In [8]:
song_table.show()

+------------------+----------------+------------------+----+---------+
|           song_id|           title|         artist_id|year| duration|
+------------------+----------------+------------------+----+---------+
|SOMZWCG12A8C13C480|I Didn't Mean To|ARD7TVE1187B99BFB1|   0|218.93179|
+------------------+----------------+------------------+----+---------+



In [9]:
dest_dir = "/data/song_table.parquet"

In [10]:
# write songs table to parquet files partitioned by year and artist
song_table.write.partitionBy("year", "artist_id").parquet(dest_dir)

In [11]:
artist_table = spark.sql('''
SELECT 
  artist_id
, artist_name
, artist_location
, artist_latitude
, artist_longitude 
FROM staging_songs 
''')

In [12]:
artist_table.show()

+------------------+-----------+---------------+---------------+----------------+
|         artist_id|artist_name|artist_location|artist_latitude|artist_longitude|
+------------------+-----------+---------------+---------------+----------------+
|ARD7TVE1187B99BFB1|     Casual|California - LA|           null|            null|
+------------------+-----------+---------------+---------------+----------------+



In [13]:
dest_dir = "/data/artist_table.parquet"

In [14]:
artist_table.write.parquet(dest_dir)

In [15]:
log_data = "data/log-data.json"
print(log_data)

data/log-data.json


In [16]:
logs_df = spark.read.json(log_data)

In [17]:
logs_df.take(1)

[Row(artist='Harmonia', auth='Logged In', firstName='Ryan', gender='M', itemInSession=0, lastName='Smith', length=655.77751, level='free', location='San Jose-Sunnyvale-Santa Clara, CA', method='PUT', page='NextSong', registration=1541016707796.0, sessionId=583, song='Sehr kosmisch', status=200, ts=1542241826796, userAgent='"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/36.0.1985.125 Chrome/36.0.1985.125 Safari/537.36"', userId='26')]

In [18]:
logs_df.createOrReplaceTempView("staging_logs")

In [19]:
user_table = spark.sql('''
SELECT 
  userId
, firstName
, lastName
, gender
, level 
FROM staging_logs 
''')

In [20]:
user_table.take(1)

[Row(userId='26', firstName='Ryan', lastName='Smith', gender='M', level='free')]

In [21]:
dest_dir = "/data/user_table.parquet"

In [22]:
user_table.write.parquet(dest_dir)

In [23]:
ts = spark.sql('''
SELECT ts from staging_logs limit 1''')

In [24]:
spark.udf.register("get_timestamp", lambda x: datetime.datetime.fromtimestamp(x/1000))

<function __main__.<lambda>(x)>

In [25]:
timestamp_df = spark.sql('''
SELECT 
  get_timestamp(ts) as ts
FROM staging_logs 
''')

In [26]:
timestamp_df.take(1)

[Row(ts='java.util.GregorianCalendar[time=?,areFieldsSet=false,areAllFieldsSet=false,lenient=true,zone=sun.util.calendar.ZoneInfo[id="Etc/UTC",offset=0,dstSavings=0,useDaylight=false,transitions=0,lastRule=null],firstDayOfWeek=1,minimalDaysInFirstWeek=1,ERA=?,YEAR=2018,MONTH=10,WEEK_OF_YEAR=?,WEEK_OF_MONTH=?,DAY_OF_MONTH=15,DAY_OF_YEAR=?,DAY_OF_WEEK=?,DAY_OF_WEEK_IN_MONTH=?,AM_PM=0,HOUR=0,HOUR_OF_DAY=0,MINUTE=30,SECOND=26,MILLISECOND=796,ZONE_OFFSET=?,DST_OFFSET=?]')]

In [27]:
timestamp_df.createOrReplaceTempView("staging_ts")

In [28]:
spark.udf.register("get_time", lambda x: datetime.datetime.fromtimestamp(x/1000).isoformat())
spark.udf.register("get_hour", lambda x: int(datetime.datetime.fromtimestamp(x/1000).hour))
spark.udf.register("get_day", lambda x: int(datetime.datetime.fromtimestamp(x/1000).day))
spark.udf.register("get_week", lambda x: str(datetime.datetime.fromtimestamp(x/1000).isocalendar()[1]))
spark.udf.register("get_month", lambda x: int(datetime.datetime.fromtimestamp(x/1000).month))
spark.udf.register("get_year", lambda x: int(datetime.datetime.fromtimestamp(x/1000).year))
spark.udf.register("get_weekday", lambda x: str(datetime.datetime.fromtimestamp(x/1000).weekday()))

<function __main__.<lambda>(x)>

In [29]:
time_table = spark.sql('''
SELECT 
  get_time(ts) as start_time
, get_hour(ts) as hour
, get_day(ts) as day
, get_week(ts) as week
, get_month(ts) as month
, get_year(ts) as year
, get_weekday(ts) as weekday
FROM staging_logs
'''
)

In [30]:
dest_dir = "/data/time_table.parquet"

In [31]:
time_table.write.parquet(dest_dir)

In [32]:
songplay_table = spark.sql('''
SELECT DISTINCT
  S.ARTIST_ID
, E.LEVEL
, S.ARTIST_LOCATION
, E.SESSIONID, S.SONG_ID
, get_time(e.ts) as start_time
, E.USERAGENT
, E.USERID
, S.duration
FROM STAGING_logS E
JOIN STAGING_SONGS S
ON E.song = S.title
AND E.artist = S.artist_name
WHERE E.PAGE = 'NextSong'
''')

In [33]:
songplay_table.take(1)

[]

In [34]:
dest_dir = "/data/song_play"
songplay_table.write.partitionBy("start_time").parquet(dest_dir)