# 1. Imports

In [22]:
import os
import configparser
import pandas
from pyspark.sql import SparkSession
from datetime import datetime

In [23]:
import pyspark.sql.functions as f
from pyspark.sql.functions import udf
from pyspark.sql import types as t

# 2. Config

In [24]:
config = configparser.ConfigParser()

config.read_file(open('dl.cfg'))

os.environ['AWS_ACCESS_KEY_ID'] = config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY'] = config['AWS']['AWS_SECRET_ACCESS_KEY']

song_data_path = config['S3']['SONG_DATA']
log_data_path = config['S3']['LOG_DATA']
song_output_path = config['S3']['SONG_OUTPUT_PATH']

print(f'{song_data_path}, {log_data_path}')
print(f'{song_output_path}')

s3a://udacity-data/project-4/song_data/*/*/*/*.json, s3a://udacity-data/project-4/log_data/*.json
s3a://udacity-data/project-4/output/


**Use these paths for the final ELT code**

In [25]:
song_data_large_path = config['S3']['SONG_DATA_UDACITY']
log_data_large_path = config['S3']['LOG_DATA_UDACITY']

# 3. SparkSession 

In [26]:
os.environ['PYSPARK_PYTHON']='/usr/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON']='/usr/bin/python3'

In [27]:
spark = SparkSession.builder.config('spark.jars.packages', 
                                    'org.apache.hadoop:hadoop-aws:2.7.0').getOrCreate()

In [28]:
spark

In [7]:
spark._jsc.hadoopConfiguration().set("fs.s3a.access.key",
                                     os.environ['AWS_ACCESS_KEY_ID'])
spark._jsc.hadoopConfiguration().set("fs.s3a.secret.key", 
                                     os.environ['AWS_SECRET_ACCESS_KEY'])
spark._jsc.hadoopConfiguration().set("fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem")

# 4. Load data from S3

In [9]:
song_data = spark.read.json(song_data_path)

In [10]:
song_data.limit(2).toPandas()

Unnamed: 0,artist_id,artist_latitude,artist_location,artist_longitude,artist_name,duration,num_songs,song_id,title,year
0,ARDR4AC1187FB371A1,,,,Montserrat Caballé;Placido Domingo;Vicente Sar...,511.16363,1,SOBAYLL12A8C138AF9,Sono andati? Fingevo di dormire,0
1,AREBBGV1187FB523D2,,"Houston, TX",,Mike Jones (Featuring CJ_ Mello & Lil' Bran),173.66159,1,SOOLYAZ12A6701F4A6,Laws Patrolling (Album Version),0


In [11]:
log_data = spark.read.json(log_data_path)

In [12]:
log_data.limit(2).toPandas()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,Harmonia,Logged In,Ryan,M,0,Smith,655.77751,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Sehr kosmisch,200,1542241826796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
1,The Prodigy,Logged In,Ryan,M,1,Smith,260.07465,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,The Big Gundown,200,1542242481796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26


In [13]:
song_data.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



In [14]:
log_data.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



### Helper functions

In [14]:
def get_time():
    n = datetime.now()

    timestamp = f"""
    {n.year}-{n.month}-{n.day}_{n.hour}-{n.minute}-{n.second}-{n.microsecond}
    """.strip()
    
    return timestamp

# 5. Create `songs` table and save with Parquet

In [36]:
song_data.createOrReplaceTempView('song_data')

In [37]:
songs_table = spark.sql("""
    SELECT song_id, title, artist_id, year, duration
    FROM song_data
    ORDER BY song_id
""")

In [38]:
songs_table.printSchema()

root
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- year: long (nullable = true)
 |-- duration: double (nullable = true)



In [40]:
songs_table.show(5)

+------------------+--------------------+------------------+----+---------+
|           song_id|               title|         artist_id|year| duration|
+------------------+--------------------+------------------+----+---------+
|SOAOIBZ12AB01815BE|I Hold Your Hand ...|ARPBNLO1187FB3D52F|2000| 43.36281|
|SOBAYLL12A8C138AF9|Sono andati? Fing...|ARDR4AC1187FB371A1|   0|511.16363|
|SOBBUGU12A8C13E95D|Setting Fire to S...|ARMAC4T1187FB3FA4C|2004|207.77751|
|SOBBXLX12A58A79DDA|Erica (2005 Digit...|AREDBBQ1187B98AFF5|   0|138.63138|
|SOBCOSW12A8C13D398|  Rumba De Barcelona|AR7SMBG1187B9B9066|   0|218.38322|
+------------------+--------------------+------------------+----+---------+
only showing top 5 rows



In [77]:
songs_table_path = f'{song_output_path}songs_table.{get_time()}'
songs_table.write.partitionBy('year', 'artist_id').parquet(songs_table_path)

# 6. Create `artists` table and save with Parquet

In [None]:
song_data.createOrReplaceTempView('song_data')

In [78]:
artists_table = spark.sql("""
    SELECT
        artist_id AS artist_id,
        artist_name AS name,
        artist_location AS location,
        artist_latitude AS latitude,
        artist_longitude AS longitude
    FROM song_data
    ORDER BY artist_id DESC
""")

In [79]:
artists_table.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- location: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)



In [80]:
artists_table.show(5)

+------------------+------------------+--------------+--------+---------+
|         artist_id|              name|      location|latitude|longitude|
+------------------+------------------+--------------+--------+---------+
|ARYKCQI1187FB3B18F|             Tesla|              |    null|     null|
|ARXR32B1187FB57099|               Gob|              |    null|     null|
|ARWB3G61187FB49404|       Steve Morse|Hamilton, Ohio|    null|     null|
|ARVBRGZ1187FB4675A|      Gwen Stefani|              |    null|     null|
|ARULZCI1241B9C8611|Luna Orbit Project|              |    null|     null|
+------------------+------------------+--------------+--------+---------+
only showing top 5 rows



In [81]:
artists_table_path = f'{song_output_path}artists_table.{get_time()}'
artists_table.write.parquet(artists_table_path)

### Filter `log_data`

In [82]:
log_data = log_data.filter(log_data.page == 'NextSong')
log_data.show(5)

+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+
|     artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|                song|status|           ts|           userAgent|userId|
+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+
|   Harmonia|Logged In|     Ryan|     M|            0|   Smith|655.77751| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|       Sehr kosmisch|   200|1542241826796|"Mozilla/5.0 (X11...|    26|
|The Prodigy|Logged In|     Ryan|     M|            1|   Smith|260.07465| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      

# 7. Create `users` table and save with Parquet

In [83]:
log_data.createOrReplaceTempView('log_data')

In [84]:
users_table = spark.sql("""
    SELECT 
        DISTINCT userId AS user_id,
        firstName AS first_name,
        lastName AS last_name,
        gender,
        level
    FROM log_data
    ORDER BY last_name
""")

In [85]:
users_table.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- level: string (nullable = true)



In [112]:
users_table.show(5)

+-------+----------+---------+------+-----+
|user_id|first_name|last_name|gender|level|
+-------+----------+---------+------+-----+
|     66|     Kevin| Arellano|     M| free|
|     34|    Evelin|    Ayala|     F| free|
|     99|       Ann|    Banks|     F| free|
|    100|     Adler|  Barrera|     M| free|
|     42|    Harper|  Barrett|     M| paid|
+-------+----------+---------+------+-----+
only showing top 5 rows



In [88]:
users_table_path = f'{song_output_path}users_table.{get_time()}'
users_table.write.parquet(users_table_path)

# 8. Create `time` table and save with Parquet

In [38]:
@udf(t.TimestampType())
def make_timestamp(ts):
    return datetime.fromtimestamp(ts / 1000.0)

@udf(t.StringType())
def make_datetime(ts):
    return datetime.fromtimestamp(ts / 1000.0).strftime('%Y-%m-%d %H:%M:%S')

In [33]:
log_data_time = log_data.withColumn('time', make_timestamp('ts'))

In [37]:
log_data_time.show(2)

+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+---------------+------+-------------+--------------------+------+--------------------+
|     artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|           song|status|           ts|           userAgent|userId|                time|
+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+---------------+------+-------------+--------------------+------+--------------------+
|   Harmonia|Logged In|     Ryan|     M|            0|   Smith|655.77751| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|  Sehr kosmisch|   200|1542241826796|"Mozilla/5.0 (X11...|    26|2018-11-14 18:30:...|
|The Prodigy|Logged In|     Ryan|     M|            1|   Smith|260.07465| fr

In [41]:
log_data_time_final = log_data_time.withColumn('datetime', make_datetime('ts'))

In [42]:
log_data_time_final.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- time: timestamp (nullable = true)
 |-- datetime: string (nullable = true)



In [44]:
log_data_time_final.show(2, truncate=False)

+-----------+---------+---------+------+-------------+--------+---------+-----+----------------------------------+------+--------+-----------------+---------+---------------+------+-------------+-----------------------------------------------------------------------------------------------------------------------------------------+------+-----------------------+-------------------+
|artist     |auth     |firstName|gender|itemInSession|lastName|length   |level|location                          |method|page    |registration     |sessionId|song           |status|ts           |userAgent                                                                                                                                |userId|time                   |datetime           |
+-----------+---------+---------+------+-------------+--------+---------+-----+----------------------------------+------+--------+-----------------+---------+---------------+------+-------------+-----------------------------------

In [47]:
log_data_time_final.createOrReplaceTempView('log_data_time')

In [49]:
time_table = spark.sql("""
    SELECT DISTINCT datetime AS start_time,
        hour(time) AS hour,
        day(time) AS day,
        weekofyear(time) AS week,
        month(time) AS month,
        year(time) AS year,
        dayofweek(time) AS weekday
    FROM log_data_time
    ORDER BY start_time
""")

In [50]:
time_table.printSchema()

root
 |-- start_time: string (nullable = true)
 |-- hour: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- weekday: integer (nullable = true)



In [51]:
time_table.show(2)

+-------------------+----+---+----+-----+----+-------+
|         start_time|hour|day|week|month|year|weekday|
+-------------------+----+---+----+-----+----+-------+
|2018-11-01 15:57:10|  15|  1|  44|   11|2018|      5|
|2018-11-01 16:01:46|  16|  1|  44|   11|2018|      5|
+-------------------+----+---+----+-----+----+-------+
only showing top 2 rows



In [53]:
time_table_path = f'{song_output_path}time_table.{get_time()}'
time_table.write.parquet(time_table_path)

# 9. Create `songplays` table and write with Parquet

In [56]:
joined_data = log_data_time_final.join(song_data,
    (log_data_time_final.artist == song_data.artist_name) &
    (log_data_time_final.song == song_data.title)
)
joined_data = joined_data.withColumn('songplay_id', f.monotonically_increasing_id())

In [57]:
joined_data.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- time: timestamp (nullable = true)
 |-- datetime: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)

In [59]:
joined_data.show(2, truncate=False)

+------+---------+---------+------+-------------+--------+---------+-----+----------------------------------+------+--------+-----------------+---------+--------------+------+-------------+-----------------------------------------------------------------------------------------------------------------------------------------+------+-----------------------+-------------------+------------------+---------------+---------------+----------------+-----------+---------+---------+------------------+--------------+----+-----------+
|artist|auth     |firstName|gender|itemInSession|lastName|length   |level|location                          |method|page    |registration     |sessionId|song          |status|ts           |userAgent                                                                                                                                |userId|time                   |datetime           |artist_id         |artist_latitude|artist_location|artist_longitude|artist_name|duration |nu

In [60]:
joined_data.createOrReplaceTempView('joined_data')

In [62]:
songplays_table = spark.sql("""
    SELECT songplay_id AS songplay_id,
        time AS start_time,
        userId as user_id,
        level AS level,
        song_id AS song_id,
        artist_id AS artist_id,
        sessionId AS session_id,
        location AS location,
        userAgent AS user_agent
    FROM joined_data
    ORDER BY (user_id, session_id)
""")

In [63]:
songplays_table.printSchema()

root
 |-- songplay_id: long (nullable = false)
 |-- start_time: timestamp (nullable = true)
 |-- user_id: string (nullable = true)
 |-- level: string (nullable = true)
 |-- song_id: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- session_id: long (nullable = true)
 |-- location: string (nullable = true)
 |-- user_agent: string (nullable = true)



In [64]:
songplays_table.show(2)

+-----------+--------------------+-------+-----+------------------+------------------+----------+--------------------+--------------------+
|songplay_id|          start_time|user_id|level|           song_id|         artist_id|session_id|            location|          user_agent|
+-----------+--------------------+-------+-----+------------------+------------------+----------+--------------------+--------------------+
|          0|2018-11-21 15:56:...|     15| paid|SOZCTXZ12AB0182364|AR5KOSW1187FB35FF4|       818|Chicago-Napervill...|"Mozilla/5.0 (X11...|
+-----------+--------------------+-------+-----+------------------+------------------+----------+--------------------+--------------------+



In [None]:
songplays_table_path = f'{song_output_path}songplays_table.{get_time()}'
songplays_table.write.parquet(songplays_table_path)