## Purpose of this Python file is to create and test code for etl
### We will first use the data (already unzipped from existing zip files) in sub-folders in data folder
#### song-data   - input data store for songs
#### log-data    - input data store for logs
#### output-data - output data folder from program

In [1]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format


config = configparser.ConfigParser()
config.read_file(open('dl.cfg'))

os.environ["AWS_ACCESS_KEY_ID"]=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ["AWS_SECRET_ACCESS_KEY"]=config['AWS']['AWS_SECRET_ACCESS_KEY']

In [2]:
#SONG_DATA_LOCAL=config['LOCAL']['INPUT_DATA_SONG_WORKSPACE']
#LOG_DATA_LOCAL=config['LOCAL']['INPUT_DATA_LOG_WORKSPACE']
OUTPUT_DATA_LOCAL=config['LOCAL']['OUTPUT_DATA_WORKSPACE']
SONG_DATA_LOCAL=config['LOCAL']['INPUT_DATA']
LOG_DATA_LOCAL=config['LOCAL']['INPUT_DATA']
input_data=config['LOCAL']['INPUT_DATA']

In [12]:
spark = SparkSession.builder\
                     .config("spark.jars.packages","org.apache.hadoop:hadoop-aws:2.7.0")\
                     .getOrCreate()

In [13]:
# Read local song_data
song_data_path = SONG_DATA_LOCAL

# Use this instead if you want to read song_data from S3.
#song_data_path = INPUT_DATA_SD
song_data = f'{input_data}/song_data/*/*/*/*.json'
log_data = '{input_data}/log_data/'

#for output: df.write.parquet("s3a://hs-output-data",mode="overwrite")
song_data = os.path.join(input_data, "song_data/A/A/A/*.json")
print(song_data)
df = spark.read.json(song_data)


s3a://udacity-dend/song_data/A/A/A/*.json


In [14]:
df.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



In [18]:
df.show(5, truncate=False)

+------------------+---------------+---------------------------------+----------------+-------------------------+---------+---------+------------------+------------------------------------------------------+----+
|artist_id         |artist_latitude|artist_location                  |artist_longitude|artist_name              |duration |num_songs|song_id           |title                                                 |year|
+------------------+---------------+---------------------------------+----------------+-------------------------+---------+---------+------------------+------------------------------------------------------+----+
|ARTC1LV1187B9A4858|51.4536        |Goldsmith's College, Lewisham, Lo|-0.01802        |The Bonzo Dog Band       |301.40036|1        |SOAFBCP12A8C13CC7D|King Of Scurf (2007 Digital Remaster)                 |1972|
|ARA23XO1187B9AF18F|40.57885       |Carteret, New Jersey             |-74.21956       |The Smithereens          |192.522  |1        |SOKTJDS12AF72A2

In [19]:
# extract columns to create songs table
df.createOrReplaceTempView("songs_table_df")
songs_table = spark.sql("""
    SELECT song_id, title, artist_id, year, duration
    FROM songs_table_df
    ORDER BY song_id
""")
print("Songs table schema: ")
songs_table.printSchema()

Songs table schema: 
root
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- year: long (nullable = true)
 |-- duration: double (nullable = true)



In [None]:
df.count()

In [None]:
songs_table.show(5, truncate=False)

In [None]:
# write songs table to parquet files partitioned by year and artist
output_data = OUTPUT_DATA_LOCAL
now = datetime.now().strftime('%Y-%m-%d-%H-%M-%S-%f')
songs_table_path = output_data + "songs_table.parquet" + "_" + now
songs_table.write.mode("overwrite").partitionBy("year","artist_id").parquet(songs_table_path)


In [23]:
df.createOrReplaceTempView("artists_table_df")
artists_table = spark.sql("""
    SELECT  artist_id        AS artist_id,
            artist_name      AS name,
            artist_location  AS location,
            artist_latitude  AS latitude,
            artist_longitude AS longitude
    FROM artists_table_df
    ORDER by artist_id desc
""")

In [24]:
artists_table.printSchema()
artists_table.show(5, truncate=False)

root
 |-- artist_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- location: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)

+------------------+-------------------------+------------------+--------+----------+
|artist_id         |name                     |location          |latitude|longitude |
+------------------+-------------------------+------------------+--------+----------+
|ARZKCQM1257509D107|Dataphiles               |                  |null    |null      |
|ARZ5H0P1187B98A1DD|Snoop Dogg               |Long Beach, CA    |33.76672|-118.1924 |
|ARY589G1187B9A9F4E|Talkdemonic              |Portland, OR      |45.51179|-122.67563|
|ARXR32B1187FB57099|Gob                      |                  |null    |null      |
|ARXQBR11187B98A2CC|Frankie Goes To Hollywood|Liverpool, England|null    |null      |
+------------------+-------------------------+------------------+--------+----------+
only showing top 5 row

In [25]:
now = datetime.now().strftime('%Y-%m-%d-%H-%M-%S-%f')
artists_table_path = OUTPUT_DATA_LOCAL + "artists_table.parquet" + "_" + now
artists_table.write.parquet(artists_table_path)

In [None]:
# get filepath to log data file
log_data =LOG_DATA_LOCAL

# read log data file
#df_log_data = spark.read.json(log_data)
df_log_data = spark.read.format("json").load("s3a://udacity-dend/song_data/*/*/*")
# filter by actions for song plays
df_log_data_filtered = df_log_data.filter(df_log_data.page == 'NextSong') 

# extract columns for users table    
df_log_data_filtered.createOrReplaceTempView("users_table_df")
users_table = spark.sql("""
    SELECT DISTINCT userId    AS user_id,
                    firstName AS first_name,
                    lastName  AS last_name,
                    gender,
                    level
    FROM users_table_df
    ORDER BY last_name
""")

In [None]:
users_table.printSchema()
users_table.show(5, truncate=False)

In [None]:
#write users table to parquet files
now = datetime.now().strftime('%Y-%m-%d-%H-%M-%S-%f')
users_table_path = OUTPUT_DATA_LOCAL + "users_table.parquet" + "_" + now
users_table.write.parquet(users_table_path)

In [None]:
#create column with timestamp
import pyspark.sql.functions as f
from pyspark.sql.functions import udf
from pyspark.sql import types as t

@udf(t.TimestampType())
def get_timestamp (ts):
    return datetime.fromtimestamp(ts / 1000.0)

#add column timestamp, and use results of user-defined-function above to populate it
df_log_data_filtered = df_log_data_filtered.withColumn("timestamp", get_timestamp("ts"))
df_log_data_filtered.printSchema()
df_log_data_filtered.show(5)

In [None]:
#create column with datetime
@udf(t.StringType())
def get_datetime(ts):
    return datetime.fromtimestamp(ts /1000.0).strftime('%Y-%m-%d %H:%M:%S')

df_log_data_filtered = df_log_data_filtered.withColumn("datetime", get_datetime("ts"))
df_log_data_filtered.printSchema()
df_log_data_filtered.show(5, truncate=False)

In [None]:
df.count()

In [None]:
#create time table
df_log_data_filtered.createOrReplaceTempView("time_table_df")
time_table = spark.sql("""
    SELECT  DISTINCT datetime AS start_time, 
                     hour(timestamp) AS hour, 
                     day(timestamp)  AS day, 
                     weekofyear(timestamp) AS week,
                     month(timestamp) AS month,
                     year(timestamp) AS year,
                     dayofweek(timestamp) AS weekday
    FROM time_table_df
    ORDER BY start_time
""")
time_table.printSchema()
time_table.show(5, truncate=False)

In [None]:
#write time table to parquet file
now = datetime.now().strftime('%Y-%m-%d-%H-%M-%S-%f')
time_table_path = OUTPUT_DATA_LOCAL + "time_table.parquet" + "_" + now
print(time_table_path)
time_table.write.parquet(time_table_path)

In [None]:
#create songplays table
#join song data and artists table for log_data.artist = song data's artist & song = song table's title
df_joined = df_log_data_filtered.join(df, (df_log_data_filtered.artist == df.artist_name) & \
                                          (df_log_data_filtered.song == df.title))
df_joined.printSchema()
df_joined.show(5)

In [None]:
df_joined.show(15)

In [None]:
#add column to df and create table
df_joined = df_joined.withColumn("songplay_id", f.monotonically_increasing_id())

df_joined.createOrReplaceTempView("songplays_table_df")
songplays_table = spark.sql("""
    SELECT  songplay_id AS songplay_id, 
            timestamp   AS start_time, 
            userId      AS user_id, 
            level       AS level,
            song_id     AS song_id,
            artist_id   AS artist_id,
            sessionId   AS session_id,
            location    AS location,
            userAgent   AS user_agent
    FROM songplays_table_df
    ORDER BY (user_id, session_id)
""")

songplays_table.printSchema()

In [None]:
songplays_table.show(5, truncate=False)

In [None]:
df_joined.count()

In [None]:
#write songplays table to parquet files
now = datetime.now().strftime('%Y-%m-%d-%H-%M-%S-%f')
songplays_table_path = OUTPUT_DATA_LOCAL + "songplays_table.parquet" + "_" + now
print("Writing output to: ", songplays_table_path)
songplays_table.write.parquet(songplays_table_path)