In [1]:
import configparser
import datetime
import os
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
from pyspark.sql.types import TimestampType
from pyspark.sql.functions import monotonically_increasing_id
from datetime import datetime
#import pyarrow.parquet as pq

In [2]:
#CREATE SPARK SEASSION
#Create a spark session with hadoop-aws package
def create_spark_session():
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()
    return spark

print('Creating spark session on AWS')
spark = create_spark_session()

#DATA
input_data = "s3a://udacity-dend/"
song_input_data = "data/song-data/song-data/A/A/A/*.json"
log_input_data = "data/log-data/*.json"
output_data = "s3a://dend-project4-udacity"
OUTPUT_DATA_LOCAL     = "data/output_data/"

Creating spark session on AWS


In [3]:
song_data_path = song_input_data
df_sd = spark.read.json(song_data_path)

In [4]:
df_sd.printSchema()
df_sd.show(5, truncate=False)

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)

+------------------+---------------+---------------------------+----------------+---------------------+---------+---------+------------------+-------------------------------------------+----+
|artist_id         |artist_latitude|artist_location            |artist_longitude|artist_name          |duration |num_songs|song_id           |title                                      |year|
+------------------+---------------+---------------------------+----------------+---------------------+---------+---------+------------------+-------------------------------------------+----+
|AR

In [5]:
df_sd.createOrReplaceTempView("songs_table_DF")
songs_table = spark.sql("""
    SELECT song_id, title, artist_id, year, duration
    FROM songs_table_DF
    ORDER BY song_id
""")
songs_table.printSchema()
songs_table.show()

root
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- year: long (nullable = true)
 |-- duration: double (nullable = true)

+------------------+--------------------+------------------+----+---------+
|           song_id|               title|         artist_id|year| duration|
+------------------+--------------------+------------------+----+---------+
|SOCIWDW12A8C13D406|           Soul Deep|ARMJAGH1187FB546F3|1969|148.03546|
|SOFSOCN12A8C143F5D|      Face the Ashes|ARXR32B1187FB57099|2007|209.60608|
|SOHKNRJ12A6701D1F8|        Drop of Rain|AR10USD1187B99F3F1|   0|189.57016|
|SOIAZJW12AB01853F1|          Pink World|AR8ZCNI1187B9A069B|1984|269.81832|
|SOMJBYD12A6D4F8557|Keepin It Real (S...|ARD0S291187B9B7BF5|   0|114.78159|
|SOMZWCG12A8C13C480|    I Didn't Mean To|ARD7TVE1187B99BFB1|   0|218.93179|
|SONHOTT12A8C13493C|     Something Girls|AR7G5I41187FB4CE6C|1982|233.40363|
|SOQHXMF12AB0182363|     Young Boy Blues|AR

In [6]:
now = datetime.now().strftime('%Y-%m-%d-%H-%M-%S-%f')
songs_table_path = output_data + "songs_table" + "_" + now
print(songs_table_path)

s3a://dend-project4-udacitysongs_table_2019-11-10-08-43-03-023571


In [7]:
# Write DF data to JSON file(s)
# Ref: https://stackoverflow.com/questions/29908892/save-a-large-spark-dataframe-as-a-single-json-file-in-s3
#df_sd.write.mode('append').json(songs_table_path)
# -------
# Write DF to Spark parquet file
# Ref: https://spark.apache.org/docs/latest/sql-data-sources-parquet.html
# Partitioning: https://stackoverflow.com/questions/43731679/how-to-save-a-partitioned-parquet-file-in-spark-2-1
now = datetime.now().strftime('%Y-%m-%d-%H-%M-%S-%f')
songs_table_path = OUTPUT_DATA_LOCAL + "songs_table.parquet" + "_" + now

# Use this instead if you want to store output to S3.
#songs_table_path = OUTPUT_DATA + "songs_table.parquet" + "_" + now
#print(songs_table_path)

# NOTE: this command doesn't have partitioning!!
#songs_table.write.parquet(songs_table_path)

# Write DF to Spark parquet file (partitioned by year and artist_id)
songs_table.write.partitionBy("year", "artist_id").parquet(songs_table_path)

In [8]:
df_sd.createOrReplaceTempView("artists_table_DF")
artists_table = spark.sql("""
    SELECT  artist_id        AS artist_id, 
            artist_name      AS name, 
            artist_location  AS location, 
            artist_latitude  AS latitude, 
            artist_longitude AS longitude 
    FROM artists_table_DF
    ORDER BY artist_id desc
""")
artists_table.printSchema()
artists_table.show(5, truncate=False)

root
 |-- artist_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- location: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)

+------------------+---------------------+-----------+--------+---------+
|artist_id         |name                 |location   |latitude|longitude|
+------------------+---------------------+-----------+--------+---------+
|ARXR32B1187FB57099|Gob                  |           |null    |null     |
|ARNTLGG11E2835DDB9|Clp                  |           |null    |null     |
|ARMJAGH1187FB546F3|The Box Tops         |Memphis, TN|35.14968|-90.04892|
|ARKRRTF1187B9984DA|Sonora Santanera     |           |null    |null     |
|ARKFYS91187B98E58F|Jeff And Sheri Easter|           |null    |null     |
+------------------+---------------------+-----------+--------+---------+
only showing top 5 rows



In [9]:
# Write DF to Spark parquet file
# Ref: https://spark.apache.org/docs/latest/sql-data-sources-parquet.html
now = datetime.now().strftime('%Y-%m-%d-%H-%M-%S-%f')
artists_table_path = OUTPUT_DATA_LOCAL + "artists_table.parquet" + "_" + now

# Use this instead if you want to store output to S3.
#artists_table_path = OUTPUT_DATA + "artists_table.parquet" + "_" + now

#print(artists_table_path)
songs_table.write.parquet(artists_table_path)

In [10]:
# Read local song_data
df_ld = spark.read.json(log_input_data)

# Use this instead if you want to read log_data from S3.
#df_ld = spark.read.json(INPUT_DATA_LD)

In [11]:
df_ld.printSchema()
df_ld.show(5)

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)

+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+---------------+------+-------------+--------------------+------+
|     artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            lo

In [12]:
df_ld_filtered = df_ld.filter(df_ld.page == 'NextSong')
df_ld_filtered.show(20)

+--------------------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+
|              artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|                song|status|           ts|           userAgent|userId|
+--------------------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+
|            Harmonia|Logged In|     Ryan|     M|            0|   Smith|655.77751| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|       Sehr kosmisch|   200|1542241826796|"Mozilla/5.0 (X11...|    26|
|         The Prodigy|Logged In|     Ryan|     M|            1|   Smith|260.07465| free|San Jose-Sunnyva

In [13]:
df_ld_filtered.createOrReplaceTempView("users_table_DF")
users_table = spark.sql("""
    SELECT  DISTINCT userId    AS user_id, 
                     firstName AS first_name, 
                     lastName  AS last_name, 
                     gender, 
                     level
    FROM users_table_DF
    ORDER BY last_name
""")
users_table.printSchema()
users_table.show(20)

root
 |-- user_id: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- level: string (nullable = true)

+-------+----------+---------+------+-----+
|user_id|first_name|last_name|gender|level|
+-------+----------+---------+------+-----+
|     66|     Kevin| Arellano|     M| free|
|     34|    Evelin|    Ayala|     F| free|
|     99|       Ann|    Banks|     F| free|
|    100|     Adler|  Barrera|     M| free|
|     42|    Harper|  Barrett|     M| paid|
|     91|    Jayden|     Bell|     M| free|
|      2|   Jizelle| Benjamin|     F| free|
|     58|     Emily|   Benson|     F| paid|
|     72|    Hayden|    Brock|     F| paid|
|     51|      Maia|    Burke|     F| free|
|     32|      Lily|    Burns|     F| free|
|     90|    Andrea|   Butler|     F| free|
|     64|    Hannah|  Calhoun|     F| free|
|     27|    Carlos|   Carter|     M| free|
|     94|      Noah|   Chavez|     M| free|
|    

In [14]:
# Write DF to Spark parquet file
# Ref: https://spark.apache.org/docs/latest/sql-data-sources-parquet.html
now = datetime.now().strftime('%Y-%m-%d-%H-%M-%S-%f')
users_table_path = OUTPUT_DATA_LOCAL + "users_table.parquet" + "_" + now
print(users_table_path)
users_table.write.parquet(users_table_path)

data/output_data/users_table.parquet_2019-11-10-08-48-28-131475


In [15]:
# 
import pyspark.sql.functions as f
from pyspark.sql.functions import udf
from pyspark.sql import types as t
#from datetime import datetime

@udf(t.TimestampType())
def get_timestamp (ts):
    return datetime.fromtimestamp(ts / 1000.0)
    
df_ld_filtered = df_ld_filtered.withColumn("timestamp", get_timestamp("ts"))
                    

df_ld_filtered.printSchema()
df_ld_filtered.show(5, truncate=False)

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)

+-----------+---------+---------+------+-------------+--------+---------+-----+------------------------------------+------+--------+-----------------+---------+----------------------------------------------+------+-------------+-------------------------------

In [16]:
# Create a new column with datetime
# Ref: https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=dateformat#pyspark.sql.functions.from_unixtime
@udf(t.StringType())
def get_datetime(ts):
    return datetime.fromtimestamp(ts / 1000.0).strftime('%Y-%m-%d %H:%M:%S')

df_ld_filtered = df_ld_filtered.withColumn("datetime", get_datetime("ts"))
df_ld_filtered.printSchema()
df_ld_filtered.show(5, truncate=False)

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- datetime: string (nullable = true)

+-----------+---------+---------+------+-------------+--------+---------+-----+------------------------------------+------+--------+-----------------+---------+----------------------------------------------+------+-----

In [17]:
# Ref: https://docs-snaplogic.atlassian.net/wiki/spaces/SD/pages/2458071/Date+Functions+and+Properties+Spark+SQL

df_ld_filtered.createOrReplaceTempView("time_table_DF")
time_table = spark.sql("""
    SELECT  DISTINCT datetime AS start_time, 
                     hour(timestamp) AS hour, 
                     day(timestamp)  AS day, 
                     weekofyear(timestamp) AS week,
                     month(timestamp) AS month,
                     year(timestamp) AS year,
                     dayofweek(timestamp) AS weekday
    FROM time_table_DF
    ORDER BY start_time
""")
time_table.printSchema()
time_table.show(5, truncate=False)

root
 |-- start_time: string (nullable = true)
 |-- hour: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- weekday: integer (nullable = true)

+-------------------+----+---+----+-----+----+-------+
|start_time         |hour|day|week|month|year|weekday|
+-------------------+----+---+----+-----+----+-------+
|2018-11-01 21:01:46|21  |1  |44  |11   |2018|5      |
|2018-11-01 21:05:52|21  |1  |44  |11   |2018|5      |
|2018-11-01 21:08:16|21  |1  |44  |11   |2018|5      |
|2018-11-01 21:11:13|21  |1  |44  |11   |2018|5      |
|2018-11-01 21:17:33|21  |1  |44  |11   |2018|5      |
+-------------------+----+---+----+-----+----+-------+
only showing top 5 rows



In [18]:
# Write DF to Spark parquet file
# Ref: https://spark.apache.org/docs/latest/sql-data-sources-parquet.html
now = datetime.now().strftime('%Y-%m-%d-%H-%M-%S-%f')
time_table_path = OUTPUT_DATA_LOCAL + "time_table.parquet" + "_" + now
print(time_table_path)
time_table.write.parquet(time_table_path)

data/output_data/time_table.parquet_2019-11-10-08-49-34-546528


In [19]:
# Ref: https://stackoverflow.com/questions/33745964/how-to-join-on-multiple-columns-in-pyspark
df_ld_sd_joined = df_ld_filtered.join(df_sd, (df_ld_filtered.artist == df_sd.artist_name) & (df_ld_filtered.song == df_sd.title))
df_ld_sd_joined.printSchema()
df_ld_sd_joined.show(5, truncate=False)

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- datetime: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = 

In [20]:
df_ld_sd_joined = df_ld_sd_joined.withColumn("songplay_id", f.monotonically_increasing_id())


df_ld_sd_joined.createOrReplaceTempView("songplays_table_DF")
songplays_table = spark.sql("""
    SELECT  songplay_id AS songplay_id, 
            timestamp   AS start_time, 
            userId      AS user_id, 
            level       AS level,
            song_id     AS song_id,
            artist_id   AS artist_id,
            sessionId   AS session_id,
            location    AS location,
            userAgent   AS user_agent
    FROM songplays_table_DF
    ORDER BY (user_id, session_id) 
""")

songplays_table.printSchema()
songplays_table.show(5, truncate=False)

root
 |-- songplay_id: long (nullable = false)
 |-- start_time: timestamp (nullable = true)
 |-- user_id: string (nullable = true)
 |-- level: string (nullable = true)
 |-- song_id: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- session_id: long (nullable = true)
 |-- location: string (nullable = true)
 |-- user_agent: string (nullable = true)

+-----------+----------+-------+-----+-------+---------+----------+--------+----------+
|songplay_id|start_time|user_id|level|song_id|artist_id|session_id|location|user_agent|
+-----------+----------+-------+-----+-------+---------+----------+--------+----------+
+-----------+----------+-------+-----+-------+---------+----------+--------+----------+



In [21]:
# Write DF to Spark parquet file
# Ref: https://spark.apache.org/docs/latest/sql-data-sources-parquet.html
now = datetime.now().strftime('%Y-%m-%d-%H-%M-%S-%f')
songplays_table_path = OUTPUT_DATA_LOCAL + "songplays_table.parquet" + "_" + now
print(songplays_table_path)
time_table.write.parquet(songplays_table_path)

data/output_data/songplays_table.parquet_2019-11-10-08-50-46-824185
