#### Importing the Python Libraries

In [1]:
import configparser
import os
import glob
import zipfile
import pyspark.sql.functions as F
from pyspark.sql.functions import udf, col, to_timestamp
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear,dayofweek
from pyspark.sql.types import *

from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql.window import Window

#### Defining the function for creating Spark Session

In [2]:
def create_spark_session():
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()
    return spark

spark = create_spark_session()

#### Printing the current working directory so that data files can be extracted there

In [3]:
cwd = os.getcwd()
print("Current working directory:", cwd)

Current working directory: /workspace/home


#### Defining the variables for the log data and song data zip file

In [4]:
# Defining the variables files1 and files2 for log data and song data respectively
files1 = glob.glob('/workspace/home/data/log-data.zip')
files2 = glob.glob('/workspace/home/data/song-data.zip')

#Printing full directory path for log data
print("Log Data------>",files1)

#Printing full directory path for song data
print("Song Data----->",files2)

Log Data------> ['/workspace/home/data/log-data.zip']
Song Data-----> ['/workspace/home/data/song-data.zip']


#### Extracting the Log Data Zip file

In [5]:
# Extracting the Log Data Zip file
for file in files1:
    print('Unzipping Log Data Zip file:',file)

    with zipfile.ZipFile(file, 'r') as zip_ref:
        zip_ref.extractall('/workspace/home/data/extracted_data_folder/log-data/')

Unzipping Log Data Zip file: /workspace/home/data/log-data.zip


#### Extracting the Song Data Zip file

In [6]:
for file in files2:
    print('Unzipping Song Data Zip file:',file)

    with zipfile.ZipFile(file, 'r') as zip_ref:
        zip_ref.extractall('/workspace/home/data/extracted_data_folder/song-data/')

Unzipping Song Data Zip file: /workspace/home/data/song-data.zip


#### Defining the variables for the input & output data directory

In [7]:
input_data = "data/extracted_data_folder/"
output_data = "output/"

#### Setting the song data variable

In [8]:
# With only partial dataset as suggested in the Knowledge support ticket
song_data = input_data + "/song-data/song_data/A/A/*/"

#### Reading the JSON file into the Spark Data Frame

In [9]:
# Reading the JSON file into the Data Frame
song_data_df = spark.read.json(song_data)

# Total count of records in Song Data Frame
print("Total count of records in Song Data Frame----->",song_data_df.count())

Total count of records in Song Data Frame-----> 36


#### Creating Songs Data Table with smallert set of the columns from above data frame

In [10]:

songs_data_table = song_data_df.select(["song_id", "title", "artist_id", "year", "duration"]).distinct()


#### Displaying the sample records from the Songs Data Table

In [11]:
songs_data_table.show(15, True)

+------------------+--------------------+------------------+----+---------+
|           song_id|               title|         artist_id|year| duration|
+------------------+--------------------+------------------+----+---------+
|SOGOSOV12AF72A285E|   ¿Dónde va Chichi?|ARGUVEV1187B98BA17|1997|313.12934|
|SOBBUGU12A8C13E95D|Setting Fire to S...|ARMAC4T1187FB3FA4C|2004|207.77751|
|SOIAZJW12AB01853F1|          Pink World|AR8ZCNI1187B9A069B|1984|269.81832|
|SONYPOM12A8C13B2D7|I Think My Wife I...|ARDNS031187B9924F0|2005|186.48771|
|SOYMRWW12A6D4FAB14|The Moon And I (O...|ARKFYS91187B98E58F|   0| 267.7024|
|SOWTBJW12AC468AC6E|Broken-Down Merry...|ARQGYP71187FB44566|   0|151.84934|
|SOQHXMF12AB0182363|     Young Boy Blues|ARGSJW91187B9B1D6B|   0|218.77506|
|SOZVMJI12AB01808AF|     Synthetic Dream|ARNPAGP1241B9C7FD4|   0|165.69424|
|SOHKNRJ12A6701D1F8|        Drop of Rain|AR10USD1187B99F3F1|   0|189.57016|
|SOMJBYD12A6D4F8557|Keepin It Real (S...|ARD0S291187B9B7BF5|   0|114.78159|
|SOLLHMX12AB

#### Creating the Spark Data Frame for the Log Data

In [12]:
log_data = input_data + "/log-data/"
log_data_df = spark.read.json(log_data)
log_data_df.show(5, True)

+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+---------------+------+-------------+--------------------+------+
|     artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|           song|status|           ts|           userAgent|userId|
+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+---------------+------+-------------+--------------------+------+
|   Harmonia|Logged In|     Ryan|     M|            0|   Smith|655.77751| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|  Sehr kosmisch|   200|1542241826796|"Mozilla/5.0 (X11...|    26|
|The Prodigy|Logged In|     Ryan|     M|            1|   Smith|260.07465| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|The Big Gundown|

#### Processing Song Data

In [13]:
# Getting filepath to song data file
song_data = input_data + "/song-data/song_data/A/A/*/"

# Reading song data file
df = spark.read.json(song_data)

# Extracting columns to create songs table with distinct
songs_table = df.select(["song_id", "title", "artist_id", "year", "duration"]).distinct()

# Writing songs table to parquet files partitioned by year and artist
songs_table.write.parquet(os.path.join(output_data, 'songs/songs.parquet'), partitionBy=['year', 'artist_id'], mode='overwrite')

# Extracting columns to create artists table
artists_table =  artists_table = df['artist_id', 'artist_name', 'artist_location', 'artist_latitude', 'artist_longitude']

# Dropping duplicates from the artists table
artists_table = artists_table.drop_duplicates(subset=['artist_id'])

# Writing artists table to parquet files
artists_table.write.parquet(os.path.join(output_data, 'artists/artists.parquet'), mode='overwrite')

#Displating 5 Rows from Artists Table
artists_table.show(5, True)

+------------------+--------------------+-----------------+---------------+----------------+
|         artist_id|         artist_name|  artist_location|artist_latitude|artist_longitude|
+------------------+--------------------+-----------------+---------------+----------------+
|AR0RCMP1187FB3F427|    Billie Jo Spears|     Beaumont, TX|       30.08615|       -94.10158|
|ARI3BMM1187FB4255E|        Alice Stuart|       Washington|        38.8991|         -77.029|
|ARMAC4T1187FB3FA4C|The Dillinger Esc...|Morris Plains, NJ|       40.82624|       -74.47995|
|ARNTLGG11E2835DDB9|                 Clp|                 |           null|            null|
|ARKRRTF1187B9984DA|    Sonora Santanera|                 |           null|            null|
+------------------+--------------------+-----------------+---------------+----------------+
only showing top 5 rows



#### Processing Log Data

In [14]:
# Getting filepath to log data file
log_data = input_data + "/log-data/"

# Reading log data file
log_data_df = spark.read.json(log_data)

# Filtering by actions for song plays
log_data_df = log_data_df.where('page="NextSong"')

# Extracting columns for users table
users_table = log_data_df['userId', 'firstName', 'lastName', 'gender', 'level']

# Removing duplicates from the users tables
users_table = users_table.drop_duplicates(subset=['userId'])

# Writing the users table to parquet files
users_table.write.parquet(os.path.join(output_data, 'users/users.parquet'), mode='overwrite')

#printing rows for the users table
print("printing the users table")

#Displating 5 Rows from Artists Table
users_table.show(5, True)

# Creating timestamp column from original timestamp column with UDF lambda function
get_timestamp = udf(lambda x: str(int(int(x)/1000)))

print("get_timestamp", get_timestamp)

#print log data schema
log_data_df.printSchema()

printing the users table
+------+---------+--------+------+-----+
|userId|firstName|lastName|gender|level|
+------+---------+--------+------+-----+
|    51|     Maia|   Burke|     F| free|
|     7|   Adelyn|  Jordan|     F| free|
|    15|     Lily|    Koch|     F| paid|
|    54|    Kaleb|    Cook|     M| free|
|   101|   Jayden|     Fox|     M| free|
+------+---------+--------+------+-----+
only showing top 5 rows

get_timestamp <function <lambda> at 0x7ff3388db8c8>
root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: strin

#### Processing the Time Table and creatomg UDF lambda function

In [15]:
# create datetime column from original timestamp column
get_datetime = udf(lambda x: str(datetime.fromtimestamp(int(x) / 1000.0)))
log_data_df = log_data_df.withColumn("date_time", get_datetime(log_data_df.ts))

log_data_df = log_data_df.withColumn("date_time", get_timestamp(log_data_df['ts']))

log_data_df = log_data_df.withColumn("hour", hour(log_data_df['date_time']))
log_data_df = log_data_df.withColumn("day", dayofmonth(log_data_df['date_time']))
log_data_df = log_data_df.withColumn("week", weekofyear(log_data_df['date_time']))
log_data_df = log_data_df.withColumn("month", month(log_data_df['date_time']))
log_data_df = log_data_df.withColumn("year", year(log_data_df['date_time']))
log_data_df = log_data_df.withColumn("weekday", dayofweek(log_data_df['date_time']))

#Extracting the selected columns from the log data
time_table = log_data_df['ts','date_time','hour','day','week','month','year','weekday']


#printing

time_table.show(2, False)

+-------------+----------+----+----+----+-----+----+-------+
|ts           |date_time |hour|day |week|month|year|weekday|
+-------------+----------+----+----+----+-----+----+-------+
|1542241826796|1542241826|null|null|null|null |null|null   |
|1542242481796|1542242481|null|null|null|null |null|null   |
+-------------+----------+----+----+----+-----+----+-------+
only showing top 2 rows



In [16]:

get_timestamp = udf(lambda ts: datetime.fromtimestamp(ts / 1000), TimestampType()) 
#get_timestamp = udf(lambda x: datetime.utcfromtimestamp(int(x) / 1000), TimestampType()) --->working
log_data_df = log_data_df.withColumn("start_time", get_timestamp(log_data_df['ts']))

# Adding the columns to the time table from the start_time column basically 
# parsing the hour, day, week, month, year etc.

log_data_df = log_data_df.withColumn("hour", hour(log_data_df['start_time']))
log_data_df = log_data_df.withColumn("day", dayofmonth(log_data_df['start_time']))
log_data_df = log_data_df.withColumn("week", weekofyear(log_data_df['start_time']))
log_data_df = log_data_df.withColumn("month", month(log_data_df['start_time']))
log_data_df = log_data_df.withColumn("year", year(log_data_df['start_time']))
log_data_df = log_data_df.withColumn("weekday", dayofweek(log_data_df['start_time']))
log_data_df.printSchema()

#Extracting the selected columns from the log data
time_table = log_data_df['start_time','hour','day','week','month','year','weekday'].distinct()


#printing

time_table.show(2, False)

# Writing time table to parquet files partitioned by year and month
time_table.write.parquet(os.path.join(output_data, 'time/time.parquet'), partitionBy=['year', 'month'], mode='overwrite')


root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- date_time: string (nullable = true)
 |-- hour: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- weekday: integer (nullable = true)
 |-- start_time: timestamp (nullable =

#### SongsPlays Table Processing

In [17]:

# Reading in song data to use for songplays table
input_song_df = spark.read.json(input_data+'/song-data/song_data/A/*/*/*.json')

In [18]:
input_song_df.printSchema()
#log_data_df.printSchema()
time_table.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)

root
 |-- start_time: timestamp (nullable = true)
 |-- hour: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- weekday: integer (nullable = true)



In [19]:


# Extracting columns from joined song and log datasets to create songplays table 
songplays_table_joined_log_table = log_data_df.join(input_song_df, 
                                   (log_data_df.song == input_song_df.title) , how='inner')

#songplays_table_joined_log_table.printSchema()

songplays_table_output = songplays_table_joined_log_table.alias("s").join(time_table.alias("t"), \
    (time_table.alias("t").start_time == songplays_table_joined_log_table.alias("s").start_time), \
                                                                   how='inner') 




#songplays_table.printSchema()
songplays_table = songplays_table_output.select("s.userId", "t.start_time","s.song_id","s.artist_id", "s.level","s.sessionId","s.location","s.userAgent","t.year","t.month")

                             

# Selecting the distinct values Songplays table data frame
#songplays_table = songplays_table_joined_time_table["userId", "start_time", "song_id", "artist_id",
#                                                    "level","songplay_id", "sessionId", "location",
#                                                    "userAgent","year","month"]







# Extracting the columns from the Song
#input_song_df = input_song_df['datetime', 'userId', 'level', 'song', 'artist', 'sessionId', 'location', 'userAgent'].distinct()

# Writing songplays table to parquet files partitioned by year and month
#songplays_table = song_log_joined_table.distinct() \
#.select("userId", "timestamp", "song_id", "artist_id", "level", "sessionId", "location", "userAgent" ) \
#.withColumn("songplay_id", F.row_number().over( Window.partitionBy('timestamp').orderBy("timestamp"))) \
#.withColumnRenamed("userId","user_id")\
#.withColumnRenamed("timestamp","start_time")  \
#.withColumnRenamed("sessionId","session_id")  \
#.withColumnRenamed("userAgent", "user_agent") \

# Writing songplays table to parquet files partitioned by year and month
#songplays_table.write.parquet(output_data + 'songplays/songplays.parquet',partitionBy=["t.year","t.month"])


songplays_table.show(5)

songplays_table.write.parquet(os.path.join(output_data, 'songplays/songplays.parquet'), partitionBy=["year", "month"], mode='overwrite')

+------+--------------------+------------------+------------------+-----+---------+--------------------+--------------------+----+-----+
|userId|          start_time|           song_id|         artist_id|level|sessionId|            location|           userAgent|year|month|
+------+--------------------+------------------+------------------+-----+---------+--------------------+--------------------+----+-----+
|    15|2018-11-21 21:56:...|SOZCTXZ12AB0182364|AR5KOSW1187FB35FF4| paid|      818|Chicago-Napervill...|"Mozilla/5.0 (X11...|2018|   11|
|    10|2018-11-14 05:06:...|SOGDBUF12A8C140FAA|AR558FS1187FB45658| free|      484|Washington-Arling...|"Mozilla/5.0 (Mac...|2018|   11|
|    24|2018-11-19 09:14:...|SOGDBUF12A8C140FAA|AR558FS1187FB45658| paid|      672|Lake Havasu City-...|"Mozilla/5.0 (Win...|2018|   11|
|    80|2018-11-27 22:35:...|SOGDBUF12A8C140FAA|AR558FS1187FB45658| paid|      992|Portland-South Po...|"Mozilla/5.0 (Mac...|2018|   11|
+------+--------------------+------------