## Startup Spark locally

In [1]:
# import findspark
# findspark.init()

# import pyspark
# sc = pyspark.SparkContext(appName="myAppName")


In [2]:
# from pyspark.sql import SQLContext
# sqlContext = SQLContext(sc)

In [109]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import isnan, count, when, col, desc, udf, col, sort_array, asc, avg
from pyspark.sql.functions import sum as Fsum
from pyspark.sql.window import Window
from pyspark.sql.types import IntegerType

In [110]:
spark = SparkSession \
    .builder \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
    .getOrCreate()

## Load song data

For some reason, we can not load all the data at once. the function bellow
  read the 30 files at once in each step

In [5]:
def load_data(l_files):
    # NOTE: for some reason, locally I could not load more than 30 files by step
    i_step = 30
    df = None
    for ii in range(0, len(l_files), i_step):
        if ii == 0:
            continue
        df_new = spark.read.json(l_files[ii-i_step:ii])
        if isinstance(df, type(None)):
            df = df_new
        else:
            df = df.union(df_new)
    return df

In [115]:
data = './data/songs/A/*/*.json'
output_data = './data/output'
l_files = !ls {data}
l_files2 = list(l_files)
print(f'total fies: {len(l_files)}')

total fies: 604


In [9]:
# df = spark.read.json(l_files2)
# df.printSchema()

In [163]:
%%time
df = load_data(l_files2)
df.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)

CPU times: user 82.8 ms, sys: 19 ms, total: 102 ms
Wall time: 3.82 s


## extract columns to create songs table

In [12]:
# songs - songs in music database
# song_id, title, artist_id, year, duration

df.createOrReplaceTempView("staging_songs")
songs_table = spark.sql("""
    SELECT song_id, title, artist_id, year, duration
    FROM staging_songs
    ORDER BY song_id
""")

songs_table.printSchema()

root
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- year: long (nullable = true)
 |-- duration: double (nullable = true)



In [164]:
songs_table = df['song_id', 'title', 'artist_id', 'year', 'duration']
songs_table.printSchema()

root
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- year: long (nullable = true)
 |-- duration: double (nullable = true)



In [165]:
songs_table.limit(1).show()

+------------------+--------------------+------------------+----+---------+
|           song_id|               title|         artist_id|year| duration|
+------------------+--------------------+------------------+----+---------+
|SOAFBCP12A8C13CC7D|King Of Scurf (20...|ARTC1LV1187B9A4858|1972|301.40036|
+------------------+--------------------+------------------+----+---------+



In [166]:
# write songs table to parquet files partitioned by year and artist
s_song_s3_path = f"s3://{output_data}/songs.parquet"
s_song_s3_path = f"{output_data}/songs.parquet"
songs_table.write.mode('overwrite').partitionBy('year', 'artist_id').parquet(s_song_s3_path)


## extract columns to create artists table

In [104]:
# artists - artists in music database
# artist_id, name, location, latitude, longitude
artists_table = spark.sql("""
    SELECT (
        artist_id,
        artist_name AS name,
        artist_location AS location,
        artist_latitude AS latitude,
        artist_longitude AS longitude)
    FROM staging_songs
    ORDER BY artist_id
""")

artists_table.printSchema()

root
 |-- named_struct(artist_id, artist_id, name, artist_name AS `name`, location, artist_location AS `location`, latitude, artist_latitude AS `latitude`, longitude, artist_longitude AS `longitude`): struct (nullable = false)
 |    |-- artist_id: string (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- location: string (nullable = true)
 |    |-- latitude: double (nullable = true)
 |    |-- longitude: double (nullable = true)



In [107]:
# artists_table = df['artist_id', 'artist_name', 'artist_location', 'artist_latitude', 'artist_longitude']
artists_table = df.select(
    col("artist_id").alias("artist_id"),
    col("artist_name").alias("name"),
    col("artist_location").alias("location"),
    col("artist_latitude").alias("latitude"),
    col("artist_longitude").alias("longitude"))
artists_table = artists_table.drop_duplicates(subset=['artist_id'])
artists_table.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- location: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)



In [108]:
artists_table.limit(1).show()

+------------------+-------------------+--------------------+--------+---------+
|         artist_id|               name|            location|latitude|longitude|
+------------------+-------------------+--------------------+--------+---------+
|AR9ODB41187FB459B2|Organized Konfusion|SPRINGFIELD, Virg...|    null|     null|
+------------------+-------------------+--------------------+--------+---------+



## Load log data

In [177]:
data = './data/logs/*/*/*.json'
df = spark.read.json(data)
df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [29]:
# filter by actions for song plays
df.createOrReplaceTempView("staging_events")
df = spark.sql("""
    SELECT * FROM staging_events
    WHERE page='NextSong'
    """)

In [178]:
df = df.filter(df.page == 'NextSong')

## extract columns for users table

In [30]:
# users - users in the app
# user_id, first_name, last_name, gender, level

users_table = spark.sql("""
    SELECT DISTINCT
        userId AS user_id,
        firstName AS first_name,
        lastName AS last_name,
        gender AS gender,
        level AS level
    FROM staging_events
    ORDER BY user_id
""")

users_table.printSchema()


root
 |-- user_id: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- level: string (nullable = true)



In [114]:
users_table = df.select(
    col("userId").alias("user_id"),
    col("firstName").alias("first_name"),
    col("lastName").alias("last_name"),
    col("gender").alias("gender"),
    col("level").alias("level"))
users_table = users_table.drop_duplicates(subset=['user_id'])
users_table.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- level: string (nullable = true)



In [116]:
# write users table to parquet files
s_s3_path = "{}/users.parquet".format(output_data)
users_table.write.mode('overwrite').parquet(s_s3_path)

## extract columns to create time table

In [31]:
gr = spark.sql("""
    SELECT ts
    FROM staging_events
    LIMIT 1
""")

# gr.collect()
gr.show()

+-------------+
|           ts|
+-------------+
|1542241826796|
+-------------+



In [117]:
df.limit(1).select('ts').show()

+-------------+
|           ts|
+-------------+
|1542241826796|
+-------------+



In [179]:
# testing convertion
import time

x = 1542241826796

s_format = '%m/%d/%Y %H:%M:%S'
print(time.strftime(s_format, time.localtime(x/1000.)))

11/14/2018 22:30:26


In [184]:
# create timestamp column from original timestamp column

import time
from datetime import datetime
from pyspark.sql.functions import udf, col

get_timestamp = udf(lambda x: time.localtime(x/1000.))
get_timestamp = udf(lambda ts: ts/1000)
df2 = df.withColumn('timestamp', get_timestamp('ts'))





In [185]:
# create datetime column from original timestamp column
from pyspark.sql.types import DateType

get_datetime = udf(lambda x: time.strftime(
    '%Y-%m-%d %H:%M:%S', time.localtime(x/1000.0)), DateType())
get_datetime = udf(lambda ts: datetime.fromtimestamp(ts/1000), DateType())
df2 = df2.withColumn('datetime', get_datetime('ts'))


df2.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- datetime: date (nullable = true)



In [187]:
# df2.limit(1).select('datetime').show()

In [42]:
df2.createOrReplaceTempView("staging_events")

gr = spark.sql("""
    SELECT ts
    FROM staging_events
    LIMIT 1
""")

# gr.collect()
# df.select('datetime').show(1)

In [123]:
# df.select('ts').show(1)

In [188]:
# time - timestamps of records in songplays broken down into specific units
# start_time, hour, day, week, month, year, weekday
from pyspark.sql.functions import (year, month, dayofmonth, hour, weekofyear,
                                   date_format, dayofweek)

time_table = spark.sql("""
    SELECT DISTINCT
        datetime AS start_time,
        hour(datetime) AS hour,
        dayofmonth(datetime) AS day,
        weekofyear(datetime) AS week,
        month(datetime) AS month,
        year(datetime) AS year,
        dayofweek(datetime) AS weekday
    FROM staging_events
    ORDER BY start_time
""")

time_table.printSchema()


root
 |-- start_time: date (nullable = true)
 |-- hour: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- weekday: integer (nullable = true)



In [189]:
time_table = df2.select(
    col('datetime').alias('start_time'),
    hour('datetime').alias('hour'),
    dayofmonth('datetime').alias('day'),
    weekofyear('datetime').alias('week'),
    month('datetime').alias('month'),
    year('datetime').alias('year'),
    dayofweek('datetime').alias('weekday')
)
time_table = time_table.drop_duplicates(subset=['start_time'])

time_table.printSchema()

root
 |-- start_time: date (nullable = true)
 |-- hour: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- weekday: integer (nullable = true)



In [191]:
# time_table.select('hour').limit(1).show(1)

In [161]:
# s_s3_path = "{}/time.parquet".format(output_data)
# time_table.write.mode('overwrite').partitionBy(
#     'year', 'month').parquet(s_s3_path)

## extract columns from joined song and log datasets to create songplays table

In [143]:
df.select('ts').limit(1).show(1)

+-------------+
|           ts|
+-------------+
|1542241826796|
+-------------+



In [152]:
df2.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- datetime: date (nullable = true)



In [168]:
# read in song data to use for songplays table
s_s3_path = f"{output_data}/songs.parquet"
# songs_table = spark.read.parquet(s_s3_path)

In [170]:
# NOTE: should've read from parque songs table, but I was uneble to save the data
# locally




# songplays - records in log data associated with song plays i.e. records with page NextSong
# songplay_id, start_time, user_id, level, song_id, artist_id, session_id, location, user_agent

# serial like number:
# source https://towardsdatascience.com/adding-sequential-ids-to-a-spark-dataframe-fa0df5566ff6
# https://stackoverflow.com/questions/53042432/creating-a-row-number-of-each-row-in-pyspark-dataframe-using-row-number-functi

from pyspark.sql.functions import row_number


window = Window.orderBy(col('song_id'))

songplays_table = (df2.join(
    songs_table.alias('songs'), df2.song == col('songs.title'))
     .select(
        col('timestamp').alias('start_time'),
        col('userId').alias('user_id'),
        col('level').alias('level'),
        col('songs.song_id').alias('song_id'),
        col('songs.artist_id').alias('artist_id'),
        col('sessionId').alias('session_id'),
        col('location').alias('location'),
        col('userAgent').alias('user_agent'),
     )
    .withColumn('songplay_id', row_number().over(window))
)

songplays_table.printSchema()

root
 |-- start_time: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- level: string (nullable = true)
 |-- song_id: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- session_id: long (nullable = true)
 |-- location: string (nullable = true)
 |-- user_agent: string (nullable = true)
 |-- songplay_id: integer (nullable = true)



In [None]:
df2.createOrReplaceTempView("staging_events")
songs_table.createOrReplaceTempView("songs_table")

songplays_table = spark.sql("""
    SELECT DISTINCT
        events.timestamp AS start_time,
        events.userId AS user_id,
        events.level AS level,
        songs.song_id AS song_id,
        songs.artist_id AS artist_id,
        events.sessionId AS session_id,
        events.location AS location,
        events.userAgent AS user_agent
    FROM staging_events AS events
    INNER JOIN songs_table AS songs
        ON events.song = songs.title
        AND events.length = songs.duration
    ORDER BY song_id
""")

window = Window.orderBy(col('song_id'))
songplays_table = songplays_table.withColumn(
    'songplay_id', row_number().over(window))

songplays_table.printSchema()

In [193]:
# songplays_table.limit(1).toPandas()