## Project test notebook 
Used to build the ETL with small, local data set

~~Songs table files are partitioned by year and then artist. ~~

~~Time table files are partitioned by year and month. ~~

Songplays table files are partitioned by year and month.

### imports

In [1]:
import configparserx
from datetime import datetime
import os

from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, monotonically_increasing_id, udf, to_timestamp
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
from pyspark.sql import types as t

## credentials

In [2]:
# Read and set AWS creds
config = configparser.ConfigParser()
config.read_file(open('../dl.cfg'))

# KEY = config.get('AWS','KEY')
# SECRET = config.get('AWS','SECRET')
# # REGION_NAME = config.get('AWS','REGION_NAME')


os.environ['AWS_ACCESS_KEY_ID'] = config.get('AWS', 'AWS_ACCESS_KEY_ID')
os.environ['AWS_SECRET_ACCESS_KEY'] = config.get('AWS', 'AWS_SECRET_ACCESS_KEY')
S3_BUCKET = config.get('AWS', 'S3_BUCKET')
# Set hadoop configuration
# sc = SparkContext()
# sc.setLogLevel("DEBUG")
# hadoopConf = sc._jsc.hadoopConfiguration()
# hadoopConf.set("fs.s3a.access.key", KEY)
# hadoopConf.set("fs.s3a.secret.key", SECRET)

### create and get a spark session

In [3]:
spark = SparkSession \
    .builder \
    .appName("Exploring data subset") \
    .getOrCreate()

### read in song data

In [4]:
song_data_dir =  '../sample_data/song_data'
song_files = f'{song_data_dir}/*/*/*/*.json'
song_data_df = spark.read.json(song_files)

In [5]:
song_data_df.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



In [6]:
song_data_df.count()

71

### extract columns to create songs table
### write songs table to parquet files partitioned by year and artist

In [7]:
songs = song_data_df.select \
    ('song_id', 'title', 'artist_id', 'year', 'duration') \
    .dropDuplicates()

In [8]:
songs.printSchema()

root
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- year: long (nullable = true)
 |-- duration: double (nullable = true)



In [9]:
songs.show(5)

+------------------+--------------------+------------------+----+---------+
|           song_id|               title|         artist_id|year| duration|
+------------------+--------------------+------------------+----+---------+
|SOGOSOV12AF72A285E|   ¿Dónde va Chichi?|ARGUVEV1187B98BA17|1997|313.12934|
|SOTTDKS12AB018D69B|It Wont Be Christmas|ARMBR4Y1187B9990EB|   0|241.47546|
|SOBBUGU12A8C13E95D|Setting Fire to S...|ARMAC4T1187FB3FA4C|2004|207.77751|
|SOIAZJW12AB01853F1|          Pink World|AR8ZCNI1187B9A069B|1984|269.81832|
|SONYPOM12A8C13B2D7|I Think My Wife I...|ARDNS031187B9924F0|2005|186.48771|
+------------------+--------------------+------------------+----+---------+
only showing top 5 rows



In [10]:
songs.count()

71

In [11]:
songs.write \
    .mode("overwrite") \
    .partitionBy('year', 'artist_id') \
    .parquet(f's3a://{S3_BUCKET}/songs')

### extract columns to create artists table
### write artists table to parquet files

In [12]:
artists = song_data_df.select \
    ('artist_id', 'artist_name', 'artist_location', 'artist_latitude', 'artist_longitude') \
    .dropDuplicates()

In [13]:
artists.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_longitude: double (nullable = true)



In [14]:
artists.show(5)

+------------------+---------------+---------------+---------------+----------------+
|         artist_id|    artist_name|artist_location|artist_latitude|artist_longitude|
+------------------+---------------+---------------+---------------+----------------+
|AR3JMC51187B9AE49D|Backstreet Boys|    Orlando, FL|       28.53823|       -81.37739|
|AR0IAWL1187B9A96D0|   Danilo Perez|         Panama|         8.4177|       -80.11278|
|ARWB3G61187FB49404|    Steve Morse| Hamilton, Ohio|           null|            null|
|AR47JEX1187B995D81|   SUE THOMPSON|     Nevada, MO|       37.83721|       -94.35868|
|ARHHO3O1187B989413|      Bob Azzam|               |           null|            null|
+------------------+---------------+---------------+---------------+----------------+
only showing top 5 rows



In [15]:
artists.count()

69

In [16]:
artists.write \
    .mode("overwrite") \
    .parquet('artists')

### get filepath to log data file and read log data file, filtering by actions for song plays

In [17]:
log_data_dir =  '../sample_data/log_data'

In [18]:
log_data_df = spark.read.json(log_data_dir).filter("page = 'NextSong'")

In [19]:
log_data_df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [20]:
log_data_df.show(4)

+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+---------------+------+-------------+--------------------+------+
|     artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|           song|status|           ts|           userAgent|userId|
+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+---------------+------+-------------+--------------------+------+
|   Harmonia|Logged In|     Ryan|     M|            0|   Smith|655.77751| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|  Sehr kosmisch|   200|1542241826796|"Mozilla/5.0 (X11...|    26|
|The Prodigy|Logged In|     Ryan|     M|            1|   Smith|260.07465| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|The Big Gundown|

### extract columns for users table and write users table to parquet files

In [21]:
users = log_data_df.select('userId', 'firstName', 'lastName', 'gender', 'level').dropDuplicates()

In [22]:
users.printSchema()

root
 |-- userId: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- lastName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- level: string (nullable = true)



In [23]:
users.count()

104

In [24]:
users.show(5)

+------+---------+--------+------+-----+
|userId|firstName|lastName|gender|level|
+------+---------+--------+------+-----+
|    57|Katherine|     Gay|     F| free|
|    84|  Shakira|    Hunt|     F| free|
|    22|     Sean|  Wilson|     F| free|
|    52| Theodore|   Smith|     M| free|
|    80|    Tegan|  Levine|     F| paid|
+------+---------+--------+------+-----+
only showing top 5 rows



In [25]:
users.write \
    .mode("overwrite") \
    .parquet('users')

### create timestamp column from original timestamp column
### create datetime column from original timestamp column
### extract columns to create time table
### write time table to parquet files partitioned by year and month

In [26]:
@udf("string")
def ts_from_epoch(epoch):
    return datetime.fromtimestamp(epoch / 1000).strftime('%Y-%m-%d %H:%M:%S')

In [27]:
time_table = log_data_df.select(to_timestamp(ts_from_epoch("ts"), "yyyy-MM-dd HH:mm:ss").alias("ts")) \
    .select("ts", hour("ts").alias("hour"), \
            dayofmonth("ts").alias("day"),  \
            weekofyear("ts").alias("week"), \
            month("ts").alias("month"),\
            year("ts").alias("year"), \
            date_format('ts', 'w').alias("weekday")) \
    .withColumnRenamed("ts", "start_time") 

In [28]:
time_table.count()

6820

In [29]:
time_table.show(5)

+-------------------+----+---+----+-----+----+-------+
|         start_time|hour|day|week|month|year|weekday|
+-------------------+----+---+----+-----+----+-------+
|2018-11-14 18:30:26|  18| 14|  46|   11|2018|     46|
|2018-11-14 18:41:21|  18| 14|  46|   11|2018|     46|
|2018-11-14 18:45:41|  18| 14|  46|   11|2018|     46|
|2018-11-14 21:44:09|  21| 14|  46|   11|2018|     46|
|2018-11-14 23:48:55|  23| 14|  46|   11|2018|     46|
+-------------------+----+---+----+-----+----+-------+
only showing top 5 rows



In [30]:
time_table.write \
    .mode("overwrite") \
    .partitionBy('year', 'month') \
    .parquet('time')

### read in song data to use for songplays table
### extract columns from joined song and log datasets to create songplays table 
### write songplays table to parquet files partitioned by year and month

In [31]:
e = log_data_df.alias('e')
s = song_data_df.alias('s')

join_conditions = [e.artist == s.artist_name, e.song == s.title]
date_format = "yyyy-MM-dd HH:mm:ss"

songplays = e.join(s, join_conditions, how='left') \
    .withColumn("songplay_id", monotonically_increasing_id()) \
    .select("songplay_id", 
            to_timestamp(ts_from_epoch("ts"), date_format).alias("start_time"),
            year(ts_from_epoch("ts")).alias("year"), 
            month(ts_from_epoch("ts")).alias("month"), 
            "userId",
            "level",
            "song_id",
            "artist_id",
            "sessionId",
            "location",
            "userAgent") \
    .withColumnRenamed("userId", "user_id") \
    .withColumnRenamed("sessionId", "session_id") \
    .withColumnRenamed("userAgent", "user_agent") \
    .dropDuplicates()

In [32]:
songplays.printSchema()

root
 |-- songplay_id: long (nullable = false)
 |-- start_time: timestamp (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- user_id: string (nullable = true)
 |-- level: string (nullable = true)
 |-- song_id: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- session_id: long (nullable = true)
 |-- location: string (nullable = true)
 |-- user_agent: string (nullable = true)



In [33]:
songplays.count()

6820

In [34]:
songplays.show(5)

+-----------+-------------------+----+-----+-------+-----+-------+---------+----------+--------------------+--------------------+
|songplay_id|         start_time|year|month|user_id|level|song_id|artist_id|session_id|            location|          user_agent|
+-----------+-------------------+----+-----+-------+-----+-------+---------+----------+--------------------+--------------------+
|        167|2018-11-15 08:42:58|2018|   11|     97| paid|   null|     null|       605|Lansing-East Lans...|"Mozilla/5.0 (X11...|
|        174|2018-11-15 08:51:43|2018|   11|     60| free|   null|     null|       531|Tampa-St. Petersb...|Mozilla/5.0 (Wind...|
|        178|2018-11-15 08:57:44|2018|   11|     30| paid|   null|     null|       324|San Jose-Sunnyval...|Mozilla/5.0 (Wind...|
|        632|2018-11-21 03:09:07|2018|   11|     36| paid|   null|     null|       728|Janesville-Beloit...|"Mozilla/5.0 (Win...|
|        766|2018-11-21 08:52:09|2018|   11|     26| free|   null|     null|       811|San

In [35]:
songplays.write \
    .mode("overwrite") \
    .partitionBy('year', 'month') \
    .parquet('songplays')

In [36]:
songplays.select(year("start_time"), month("start_time")).show(5)

+----------------+-----------------+
|year(start_time)|month(start_time)|
+----------------+-----------------+
|            2018|               11|
|            2018|               11|
|            2018|               11|
|            2018|               11|
|            2018|               11|
+----------------+-----------------+
only showing top 5 rows



In [37]:
time_in = spark.read.parquet("time")
songplays_in = spark.read.parquet("songplays/year=2018/month=11")
songs_in = spark.read.parquet("songs")

In [38]:
time_in.show(3)

+-------------------+----+---+----+-------+----+-----+
|         start_time|hour|day|week|weekday|year|month|
+-------------------+----+---+----+-------+----+-----+
|2018-11-14 18:30:26|  18| 14|  46|     46|2018|   11|
|2018-11-14 18:41:21|  18| 14|  46|     46|2018|   11|
|2018-11-14 18:45:41|  18| 14|  46|     46|2018|   11|
+-------------------+----+---+----+-------+----+-----+
only showing top 3 rows



In [39]:
songplays_in.show(3)

+-----------+-------------------+-------+-----+------------------+------------------+----------+--------------------+--------------------+
|songplay_id|         start_time|user_id|level|           song_id|         artist_id|session_id|            location|          user_agent|
+-----------+-------------------+-------+-----+------------------+------------------+----------+--------------------+--------------------+
|        192|2018-11-15 09:24:07|     30| paid|              null|              null|       324|San Jose-Sunnyval...|Mozilla/5.0 (Wind...|
|        882|2018-11-21 15:56:47|     15| paid|SOZCTXZ12AB0182364|AR5KOSW1187FB35FF4|       818|Chicago-Napervill...|"Mozilla/5.0 (X11...|
|       1056|2018-11-14 02:27:17|     58| paid|              null|              null|       522|Augusta-Richmond ...|"Mozilla/5.0 (Win...|
+-----------+-------------------+-------+-----+------------------+------------------+----------+--------------------+--------------------+
only showing top 3 rows



In [40]:
songs_in.show(3)

+------------------+--------------------+---------+----+------------------+
|           song_id|               title| duration|year|         artist_id|
+------------------+--------------------+---------+----+------------------+
|SOAOIBZ12AB01815BE|I Hold Your Hand ...| 43.36281|2000|ARPBNLO1187FB3D52F|
|SONYPOM12A8C13B2D7|I Think My Wife I...|186.48771|2005|ARDNS031187B9924F0|
|SODREIN12A58A7F2E5|A Whiter Shade Of...|326.00771|   0|ARLTWXK1187FB5A3F8|
+------------------+--------------------+---------+----+------------------+
only showing top 3 rows



In [41]:
songplays_in.printSchema()

root
 |-- songplay_id: long (nullable = true)
 |-- start_time: timestamp (nullable = true)
 |-- user_id: string (nullable = true)
 |-- level: string (nullable = true)
 |-- song_id: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- session_id: long (nullable = true)
 |-- location: string (nullable = true)
 |-- user_agent: string (nullable = true)



In [42]:
time_in.printSchema()

root
 |-- start_time: timestamp (nullable = true)
 |-- hour: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- weekday: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)



In [43]:
songs_in.printSchema()

root
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- year: integer (nullable = true)
 |-- artist_id: string (nullable = true)

