**Prototype, Development, Testing of Sparkify ETL**

In [53]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, row_number
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
from pyspark.sql.window import Window

**Handle Credentials**

In [54]:
config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']= config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']= config['AWS']['AWS_SECRET_ACCESS_KEY']

**Spark Session**

In [55]:
spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .config("spark.hadoop.fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem") \
        .config("spark.hadoop.fs.s3a.awsAccessKeyId", os.environ['AWS_ACCESS_KEY_ID']) \
        .config("spark.hadoop.fs.s3a.awsSecretAccessKey", os.environ['AWS_SECRET_ACCESS_KEY']) \
        .getOrCreate()

sc = spark.sparkContext
sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", config['AWS']['AWS_ACCESS_KEY_ID'])
sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", config['AWS']['AWS_SECRET_ACCESS_KEY'])

print(spark)

<pyspark.sql.session.SparkSession object at 0x7f7b78661ef0>


**Read Song data file**


In [56]:
#song_data = 'data/song-data/*/*/*/*.json'
song_data = 's3a://udacity-dend/song_data/A/B/C/TRABCEI128F424C983.json'
df_song = spark.read.json(song_data)
df_song.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: string (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: string (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



**Create song table**

In [57]:
df_song.createOrReplaceTempView("staging_songs_table")
songs_table = spark.sql("""SELECT song_id, title, artist_id, year, duration FROM staging_songs_table ORDER BY song_id""").dropDuplicates()
songs_table.printSchema()
songs_table.show(5)

root
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- year: long (nullable = true)
 |-- duration: double (nullable = true)

+------------------+-------------------+------------------+----+---------+
|           song_id|              title|         artist_id|year| duration|
+------------------+-------------------+------------------+----+---------+
|SOUPIRU12A6D4FA1E1|Der Kleine Dompfaff|ARJIE2Y1187B994AB7|   0|152.92036|
+------------------+-------------------+------------------+----+---------+



**Test write song table to partitioned parquet files**

In [6]:
# Reference: https://knowledge.udacity.com/questions/103172
songsParquetPath = "{}{}".format('data/output-data/','songs.parquet')
songs_table.write.mode('overwrite').partitionBy("year","artist_id").parquet(songsParquetPath)

**Create Artists Table Create and write artists table to parquet file**

In [58]:
artists_table = spark.sql(""" SELECT artist_id, artist_name AS name, artist_location AS location, 
                                     artist_latitude AS lattitude, artist_longitude AS longitude 
                              FROM staging_songs_table 
                              ORDER BY artist_id """).dropDuplicates()
artists_table.printSchema()
artists_table.show(5)

artistsParquetPath = "{}{}".format('data/output-data/','artists.parquet')
artists_table.write.mode('overwrite').parquet(artistsParquetPath)

root
 |-- artist_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- location: string (nullable = true)
 |-- lattitude: string (nullable = true)
 |-- longitude: string (nullable = true)

+------------------+-----------+--------+---------+---------+
|         artist_id|       name|location|lattitude|longitude|
+------------------+-----------+--------+---------+---------+
|ARJIE2Y1187B994AB7|Line Renaud|        |     null|     null|
+------------------+-----------+--------+---------+---------+



**Read Log data (picking one sample file from log data)**

In [59]:
log_data = 's3a://udacity-dend/log_data/*/*/*.json'
#log_data =  'data/log-data/*.json'
df_log = spark.read.json(log_data)
df_log.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



**Log data staging**

In [60]:
df_log.createOrReplaceTempView("staging_log_data")
log_table = spark.sql("""SELECT * FROM staging_log_data WHERE page='NextSong'""").dropDuplicates()
log_table.show(5)

+------------------+---------+---------+------+-------------+---------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+
|            artist|     auth|firstName|gender|itemInSession| lastName|   length|level|            location|method|    page|     registration|sessionId|                song|status|           ts|           userAgent|userId|
+------------------+---------+---------+------+-------------+---------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+
|           Fat Joe|Logged In|     Kate|     F|           21|  Harrell|241.34485| paid|Lansing-East Lans...|   PUT|NextSong|1.540472624796E12|      605|Safe 2 Say [The I...|   200|1542296032796|"Mozilla/5.0 (X11...|    97|
|       Linkin Park|Logged In|     Kate|     F|           33|  Harrell|259.86567| paid|Lansing-East Lans...|

**Create Users table and write parquet file**

In [61]:
users_table = spark.sql(""" SELECT userId AS user_id, firstName AS first_name, lastName AS last_name, gender, level 
                            FROM staging_log_data 
                            ORDER BY user_id """).dropDuplicates()
users_table.printSchema()
users_table.show(5)

usersParquetPath = "{}{}".format('data/output-data/','users.parquet')
users_table.write.mode('overwrite').parquet(usersParquetPath)

root
 |-- user_id: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- level: string (nullable = true)

+-------+----------+---------+------+-----+
|user_id|first_name|last_name|gender|level|
+-------+----------+---------+------+-----+
|       |      null|     null|  null| paid|
|       |      null|     null|  null| free|
|     10|    Sylvie|     Cruz|     F| free|
|    100|     Adler|  Barrera|     M| free|
|    101|    Jayden|      Fox|     M| free|
+-------+----------+---------+------+-----+
only showing top 5 rows



**create timestamp column from original timestamp column**

In [62]:
# Refernce: https://knowledge.udacity.com/questions/67777

# create timestamp column from original timestamp column
get_timestamp = udf(lambda x: datetime.fromtimestamp(x/1000.0).strftime('%Y-%m-%d %H:%M:%S'))
log_table = log_table.withColumn('timestamp', get_timestamp('ts'))
 
# create datetime column from original timestamp column
get_datetime = udf(lambda x: datetime.fromtimestamp(x/1000.0).strftime('%Y-%m-%d %H:%M:%S'))
log_table = log_table.withColumn('datetime', get_datetime('ts'))

# see if timestamp and datetime have values as expected.
log_table.show(1)

# extract columns (start_time, hour, day, week, month, year, weekday) to create time table
log_table.createOrReplaceTempView("staging_time")
time_table = spark.sql("""
    SELECT datetime AS start_time, hour(timestamp) AS hour, day(timestamp) AS day, 
           weekofyear(timestamp) AS week, month(timestamp) AS month, year(timestamp) AS year,
           dayofweek(timestamp) AS weekday
    FROM staging_time
    ORDER BY start_time
""").dropDuplicates()

time_table.printSchema()
time_table.show(5)
 
# write time table to parquet files partitioned by year and month
timesParquetPath = "{}{}".format('data/output-data/','times.parquet')
time_table.write.mode('overwrite').partitionBy("year", "month").parquet(timesParquetPath)

+-------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+-------------------+-------------------+
| artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|                song|status|           ts|           userAgent|userId|          timestamp|           datetime|
+-------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+-------------------+-------------------+
|Fat Joe|Logged In|     Kate|     F|           21| Harrell|241.34485| paid|Lansing-East Lans...|   PUT|NextSong|1.540472624796E12|      605|Safe 2 Say [The I...|   200|1542296032796|"Mozilla/5.0 (X11...|    97|2018-11-15 15:33:52|2018-11-15 15:33:

**Create songplays table and write parquet file**

In [63]:
# Reference: https://knowledge.udacity.com/questions/150979
# Refernce: https://stackoverflow.com/questions/40508489/spark-merge-2-dataframes-by-adding-row-index-number-on-both-dataframes

# read in song data to use for songplays table
songs_table = spark.read.parquet('data/output-data/songs.parquet')
artist_table = spark.read.parquet('data/output-data/artists.parquet')

# extract columns from joined song and log datasets to create songplays table 
#songplays: songplay_id, start_time, user_id, level, song_id, artist_id, session_id, location, user_agent
w = Window().orderBy('song_id')
df_joined = log_table.join(songs_table.alias("st"),   log_table.song   == col('st.title')) \
                     .join(artist_table.alias("at"),  log_table.artist == col('at.name')) \
                     .select(
                         col('timestamp').alias('start_time'),
                         col('userId').alias('user_id'),
                         'level',
                         'st.song_id',
                         'at.artist_id',
                         col('sessionId').alias('session_id'),
                         'at.location',
                         col('userAgent').alias('user_agent'),
                         year(col('timestamp')).alias('year'), 
                         month(col('timestamp')).alias('month')) \
                     .withColumn('songplay_id', row_number().over(w))

df_joined.createOrReplaceTempView("songplays_staging")
songplays_table = spark.sql("""SELECT * FROM songplays_staging""").dropDuplicates()
songplays_table.show(5)

# write songplays table to parquet files partitioned by year and month
# Reference: https://stackoverflow.com/questions/50962934/partition-column-is-moved-to-end-of-row-when-saving-a-file-to-parquet
# According to the above reference parition field year and month are not written into the parquet file and only as folder names..year and month

#songplays_table
songplaysParquetPath = "{}{}".format('data/output-data/','songplays.parquet')
songplays_table.write.mode('overwrite').partitionBy("year","month").parquet(songplaysParquetPath)

+----------+-------+-----+-------+---------+----------+--------+----------+----+-----+-----------+
|start_time|user_id|level|song_id|artist_id|session_id|location|user_agent|year|month|songplay_id|
+----------+-------+-----+-------+---------+----------+--------+----------+----+-----+-----------+
+----------+-------+-----+-------+---------+----------+--------+----------+----+-----+-----------+



In [43]:
# Alternative Way - if not using parquet files.
#df_joined = df_log.join(df_song, (df_log.song   == df_song.title) & 
#                                 (df_log.artist == df_song.artist_name) &  
#                                 (df_log.length == df_song.duration), 'left_outer').select(
#                                    df_log.ts.alias('start_time'),
#                                    col('userId').alias('user_id'),
#                                    df_log.level,
#                                    col('song_id'),
#                                    col('artist_id'),
#                                    df_log.sessionId.alias('session_id'),
#                                    col('artist_location').alias('location'),
#                                    col('userAgent').alias('user_agent')
#                                 ).withColumn('songplay_id', row_number().over(w))
