# ETL Processes
Use this notebook to develop the ETL process for each of your tables before completing the `etl.py` file to load the whole datasets.

In [84]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
from pyspark.sql.types import StructType, StructField, DateType, DoubleType, IntegerType, LongType, ShortType, StringType

In [85]:
config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']

In [86]:
def create_spark_session():
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()
    return spark

spark = create_spark_session()

In [87]:
input_data = "s3a://udacity-dend/"
output_data = "s3a://udacity-dend-bucket-ny/"

# Process `song_data`
In this first part, you'll perform ETL on the first dataset, `song_data`, to create the `songs` and `artists` dimensional tables.

Let's perform ETL on a single song file and load a single record into each table to start.
- Use the `get_files` function provided above to get a list of all song JSON files in `data/song_data`
- Select the first song in this list
- Read the song file and view the data

In [88]:
# get filepath to song data file
song_data = input_data + 'song_data/*/*/*/*.json'

#Testing
song_data = './data/song_data/*/*/*/*.json'

In [89]:
songSchema = StructType([
    StructField("artist_id", StringType()),
    StructField("artist_latitude", DoubleType()),
    StructField("artist_location", StringType()),
    StructField("artist_longitude", DoubleType()),
    StructField("artist_name", StringType()),
    StructField("duration", DoubleType()),
    StructField("num_songs", ShortType()),
    StructField("song_id", StringType()),
    StructField("title", StringType()),
    StructField("year", ShortType())
])

In [90]:
# read song data file
df = spark.read.json(song_data, schema=songSchema, multiLine=True)

In [8]:
# read log data file Testing
#df1 = spark.read.json(song_data, multiLine=True)

In [9]:
#df1.count()

In [10]:
#df2.count()

In [11]:
#df1.printSchema()

In [12]:
#df.show(1)

In [91]:
df.createOrReplaceTempView("staging_songs")

In [14]:
#spark.sql('''
#          SELECT DISTINCT song_id
#                       , title
#                       , artist_id
#                       , year
#                       , duration
#            FROM staging_songs
#          '''
#          ).show()

## #1: `songs` Table
#### Extract Data for Songs Table
- Select columns for song ID, title, artist ID, year, and duration

In [92]:
# extract columns to create songs table
songs_table = spark.sql('''
                        SELECT DISTINCT song_id
                                      , title
                                      , artist_id
                                      , year
                                      , duration
                          FROM staging_songs
                         LIMIT 1
                        '''
                        )

In [16]:
# write songs table to parquet files partitioned by year and artist
songs_table.write.mode('overwrite').partitionBy('year', 'artist_id').parquet(output_data + 'songs')

In [100]:
print("    songs_table count:", songs_table.count())

    songs_table count: 1


# Verify Song Table

In [17]:
parquetFile = spark.read.parquet(output_data + 'songs')

In [18]:
parquetFile.printSchema()

root
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- year: integer (nullable = true)
 |-- artist_id: string (nullable = true)



In [19]:
parquetFile.show()

+------------------+--------------------+---------+----+------------------+
|           song_id|               title| duration|year|         artist_id|
+------------------+--------------------+---------+----+------------------+
|SOBBXLX12A58A79DDA|Erica (2005 Digit...|138.63138|   0|AREDBBQ1187B98AFF5|
+------------------+--------------------+---------+----+------------------+



## #2: `artists` Table
#### Extract Data for Artists Table
- Select columns for artist ID, name, location, latitude, and longitude

In [20]:
# extract columns to create artists table
artists_table = spark.sql('''
                          SELECT DISTINCT artist_id
                                        , artist_name AS name
                                        , artist_location AS location
                                        , artist_latitude AS lattitude
                                        , artist_longitude AS longitude
                            FROM staging_songs
                           LIMIT 1
                          '''
                          )

In [21]:
artists_table.show()

+------------------+--------+------------+---------+---------+
|         artist_id|    name|    location|lattitude|longitude|
+------------------+--------+------------+---------+---------+
|ARPBNLO1187FB3D52F|Tiny Tim|New York, NY| 40.71455|-74.00712|
+------------------+--------+------------+---------+---------+



In [22]:
# write artists table to parquet files
artists_table.write.mode('overwrite').parquet(output_data + 'artists')

# Verify Artists Table

In [23]:
parquetFile = spark.read.parquet(output_data + 'artists')
parquetFile.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- location: string (nullable = true)
 |-- lattitude: double (nullable = true)
 |-- longitude: double (nullable = true)



In [24]:
parquetFile.show()

+------------------+--------+------------+---------+---------+
|         artist_id|    name|    location|lattitude|longitude|
+------------------+--------+------------+---------+---------+
|ARPBNLO1187FB3D52F|Tiny Tim|New York, NY| 40.71455|-74.00712|
+------------------+--------+------------+---------+---------+



# Process `log_data`
In this part, you'll perform ETL on the second dataset, `log_data`, to create the `time` and `users` dimensional tables, as well as the `songplays` fact table.
- Filter records by `NextSong` action

In [25]:
# get filepath to log data file
log_data = input_data + 'log_data/*/*/*.json'
#Testing
log_data = './data/log_data/*.json'

In [26]:
logSchema = StructType([
    StructField("artist", StringType()),
    StructField("auth", StringType()),
    StructField("firstName", StringType()),
    StructField("gender", StringType()),
    StructField("itemInSession", ShortType()),
    StructField("lastName", StringType()),
    StructField("length", DoubleType()),
    StructField("level", StringType()),
    StructField("location", StringType()),
    StructField("method", StringType()),
    StructField("page", StringType()),
    StructField("registration", DoubleType()), #Convert to Long before writing to parquet files
    StructField("sessionId", ShortType()),
    StructField("song", StringType()),
    StructField("status", ShortType()),
    StructField("ts", LongType()),
    StructField("userAgent", StringType()),
    StructField("userId", StringType()) #Convert to Short before writing to parquet files
])

# read log data file
df = spark.read.json(log_data, schema=logSchema)

In [27]:
df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: short (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: short (nullable = true)
 |-- song: string (nullable = true)
 |-- status: short (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [28]:
#df2 = spark.read.json(log_data)
#df2.printSchema()

In [29]:
df.count()

8056

In [30]:
# filter by actions for song plays
df = df.filter(df.page == 'NextSong')

In [31]:
df.count()

6820

In [32]:
df.createOrReplaceTempView("staging_events")

In [33]:
#spark.sql('''
#          SELECT DISTINCT CAST(userId AS short)
#            FROM staging_events
#           LIMIT 10
#          '''
#          ).show()

In [34]:
#spark.sql('''
#          SELECT DISTINCT CAST(registration AS long)
#            FROM staging_events
#           LIMIT 10
#          '''
#          ).show()

In [35]:
#spark.sql('''
#          SELECT to_timestamp(ts/1000) as ts
#               , EXTRACT(hour FROM to_timestamp(ts/1000)) AS hour
#            FROM staging_events
#          '''
#          ).show()

In [36]:
#spark.sql('''
#          SELECT DISTINCT ts
#                        , EXTRACT(hour FROM ts) AS hour
#                        , EXTRACT(day FROM ts) AS day
#                        , EXTRACT(week FROM ts) AS week
#                        , EXTRACT(month FROM ts) AS month
#                        , EXTRACT(year FROM ts) AS year
#                        , EXTRACT(dayofweek FROM ts) AS weekday
#            FROM (SELECT to_timestamp(ts/1000) AS ts
#                    FROM staging_events)
#          '''
#          ).show()

In [37]:
#spark.sql('''
#          SELECT count(*)
#            FROM staging_events
#          '''
#          ).show()

In [38]:
#spark.sql('''
#          SELECT DISTINCT page
#            FROM staging_events
#          '''
#          ).show()

In [39]:
#spark.udf.register("get_ts", lambda x: datetime.fromtimestamp(x / 1000.0))

In [40]:
#spark.sql('''
#          SELECT ts, CAST(get_ts(ts) AS int)
#          FROM staging_events 
#          LIMIT 1
#          '''
#          ).show()

In [41]:
#df2.count()

## #3: `users` Table
#### Extract Data for Users Table
- Select columns for user ID, first name, last name, gender and level

In [42]:
spark.sql('''
        SELECT DISTINCT user_id
                      , first_name
                      , last_name
                      , gender
                      , level
          FROM (SELECT ROW_NUMBER() OVER(PARTITION BY userId 
                                             ORDER BY ts DESC)
                    AS row_num
                     , CAST(userId AS short) AS user_id
                     , firstName AS first_name
                     , lastName AS last_name
                     , gender
                     , level
                     , ts
                  FROM staging_events)
         WHERE row_num = 1
         LIMIT 10
          '''
          ).show()

+-------+----------+---------+------+-----+
|user_id|first_name|last_name|gender|level|
+-------+----------+---------+------+-----+
|     26|      Ryan|    Smith|     M| free|
|     15|      Lily|     Koch|     F| paid|
|      8|    Kaylee|  Summers|     F| free|
|     17|  Makinley|    Jones|     F| free|
|     82|     Avery| Martinez|     F| paid|
|     36|   Matthew|    Jones|     M| paid|
|     11| Christian|   Porter|     F| free|
|      5|    Elijah|    Davis|     M| free|
|      9|     Wyatt|    Scott|     M| free|
|     14|  Theodore|   Harris|     M| free|
+-------+----------+---------+------+-----+



In [43]:
# extract columns for users table    
users_table = spark.sql('''
    SELECT DISTINCT user_id
                  , first_name
                  , last_name
                  , gender
                  , level
      FROM (SELECT ROW_NUMBER() OVER(PARTITION BY userId 
                                         ORDER BY ts DESC)
                AS row_num
                 , CAST(userId AS short) AS user_id
                 , firstName AS first_name
                 , lastName AS last_name
                 , gender
                 , level
                 , ts
              FROM staging_events)
     WHERE row_num = 1
     LIMIT 1
'''
)

# write users table to parquet files
users_table.write.mode('overwrite').parquet(output_data + 'users')

# Verify Users Table

In [44]:
parquetFile = spark.read.parquet(output_data + 'users')
parquetFile.printSchema()

root
 |-- user_id: short (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- level: string (nullable = true)



In [45]:
parquetFile.show()

+-------+----------+---------+------+-----+
|user_id|first_name|last_name|gender|level|
+-------+----------+---------+------+-----+
|     26|      Ryan|    Smith|     M| free|
+-------+----------+---------+------+-----+



## #4: `time` Table
#### Extract Data for Time Table
- Convert the `ts` timestamp column to datetime
  - Hint: the current timestamp is in milliseconds
- Extract the timestamp, hour, day, week of year, month, year, and weekday from the `ts` column and set `time_data` to a list containing these values in order

In [46]:
# create timestamp column from original timestamp column
#get_timestamp = udf()
#df = 

# create datetime column from original timestamp column
#get_datetime = udf()
#df = 

# extract columns to create time table
time_table = spark.sql('''
    SELECT DISTINCT start_time
                  , EXTRACT(hour FROM start_time) AS hour
                  , EXTRACT(day FROM start_time) AS day
                  , EXTRACT(week FROM start_time) AS week
                  , EXTRACT(month FROM start_time) AS month
                  , EXTRACT(year FROM start_time) AS year
                  , EXTRACT(dayofweek FROM start_time) AS weekday
      FROM (SELECT to_timestamp(ts/1000.0) AS start_time
              FROM staging_events)
     LIMIT 1
'''
)
# write time table to parquet files partitioned by year and month
time_table.write.mode('overwrite').partitionBy('year', 'month').parquet(output_data + 'time')

# Verify Time Table

In [47]:
parquetFile = spark.read.parquet(output_data + 'time')
parquetFile.printSchema()

root
 |-- start_time: timestamp (nullable = true)
 |-- hour: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- weekday: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)



In [48]:
parquetFile.show()

+--------------------+----+---+----+-------+----+-----+
|          start_time|hour|day|week|weekday|year|month|
+--------------------+----+---+----+-------+----+-----+
|2018-11-15 21:04:...|  21| 15|  46|      5|2018|   11|
+--------------------+----+---+----+-------+----+-----+



## #5: `songplays` Table
#### Extract Data and Songplays Table
This one is a little more complicated since information from the songs table, artists table, and original log file are all needed for the `songplays` table. Since the log file does not specify an ID for either the song or the artist, you'll need to get the song ID and artist ID by querying the songs and artists tables to find matches based on song title, artist name, and song duration time.

In [72]:
#spark.sql('''
#SELECT ROW_NUMBER() OVER(ORDER BY e.ts) AS songplay_id
#     , t.start_time
#     , EXTRACT(year FROM t.start_time) AS year
#     , EXTRACT(month FROM t.start_time) AS month
#     , CAST(e.userId AS short) AS user_id
#     , e.level
#     , s.song_id
#     , s.artist_id
#     , e.sessionId AS session_id
#     , e.location
#     , e.userAgent AS user_agent
#  FROM staging_events e
#       LEFT JOIN staging_songs s
#              ON e.song = s.title
#                 AND e.artist = s.artist_name
#       LEFT JOIN (SELECT ts
#                       , to_timestamp(ts/1000.0) AS start_time
#                    FROM staging_events
#                 ) t
#              ON e.ts = t.ts
#'''
#).show()

+-----------+--------------------+----+-----+-------+-----+-------+---------+----------+--------------------+--------------------+
|songplay_id|          start_time|year|month|user_id|level|song_id|artist_id|session_id|            location|          user_agent|
+-----------+--------------------+----+-----+-------+-----+-------+---------+----------+--------------------+--------------------+
|          1|2018-11-01 21:01:...|2018|   11|      8| free|   null|     null|       139|Phoenix-Mesa-Scot...|"Mozilla/5.0 (Win...|
|          2|2018-11-01 21:05:...|2018|   11|      8| free|   null|     null|       139|Phoenix-Mesa-Scot...|"Mozilla/5.0 (Win...|
|          3|2018-11-01 21:08:...|2018|   11|      8| free|   null|     null|       139|Phoenix-Mesa-Scot...|"Mozilla/5.0 (Win...|
|          4|2018-11-01 21:11:...|2018|   11|      8| free|   null|     null|       139|Phoenix-Mesa-Scot...|"Mozilla/5.0 (Win...|
|          5|2018-11-01 21:17:...|2018|   11|      8| free|   null|     null|      

In [76]:
# read in song data to use for songplays table
#song_df = 

# extract columns from joined song and log datasets to create songplays table 
songplays_table = spark.sql('''
SELECT ROW_NUMBER() OVER(ORDER BY e.ts) AS songplay_id
     , t.start_time
     , CAST(e.userId AS short) AS user_id
     , e.level
     , s.song_id
     , s.artist_id
     , e.sessionId AS session_id
     , e.location
     , e.userAgent AS user_agent
     , EXTRACT(year FROM t.start_time) AS year
     , EXTRACT(month FROM t.start_time) AS month
  FROM staging_events e
       LEFT JOIN staging_songs s
              ON e.song = s.title
                 AND e.artist = s.artist_name
       LEFT JOIN (SELECT ts
                       , to_timestamp(ts/1000.0) AS start_time
                    FROM staging_events
                 ) t
              ON e.ts = t.ts
 LIMIT 1
'''
)

# write songplays table to parquet files partitioned by year and month
songplays_table.write.mode('overwrite').partitionBy('year', 'month').parquet(output_data + 'songplays')

# Verify Songplays Table

In [77]:
parquetFile = spark.read.parquet(output_data + 'songplays')
parquetFile.printSchema()

root
 |-- songplay_id: integer (nullable = true)
 |-- start_time: timestamp (nullable = true)
 |-- user_id: short (nullable = true)
 |-- level: string (nullable = true)
 |-- song_id: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- session_id: short (nullable = true)
 |-- location: string (nullable = true)
 |-- user_agent: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)



In [78]:
parquetFile.show()

+-----------+--------------------+-------+-----+-------+---------+----------+--------------------+--------------------+----+-----+
|songplay_id|          start_time|user_id|level|song_id|artist_id|session_id|            location|          user_agent|year|month|
+-----------+--------------------+-------+-----+-------+---------+----------+--------------------+--------------------+----+-----+
|          1|2018-11-01 21:01:...|      8| free|   null|     null|       139|Phoenix-Mesa-Scot...|"Mozilla/5.0 (Win...|2018|   11|
+-----------+--------------------+-------+-----+-------+---------+----------+--------------------+--------------------+----+-----+

