# ETL Walkthrough

Here we will walk through the ETL process used in `etl.py`.

In [1]:
from platform import python_version
print(python_version())

3.6.3


In [2]:
#Load modules

from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType
import pyspark.sql.functions as F
import os
import configparser
import zipfile
import pandas as pd
import datetime
from pyspark.sql.functions import hour, year, month, dayofmonth, weekofyear, dayofweek
from pyspark.sql.types import IntegerType

## AWS credentials

In [3]:
config = configparser.ConfigParser()
config.read_file(open('dl.cfg'))

# Set AWS key and secret key
KEY = config.get('AWS','AWS_ACCESS_KEY_ID')
SECRET = config.get('AWS','AWS_SECRET_ACCESS_KEY')

os.environ['AWS_ACCESS_KEY_ID'] = config.get('AWS','AWS_ACCESS_KEY_ID')
os.environ['AWS_SECRET_ACCESS_KEY'] = config.get('AWS','AWS_SECRET_ACCESS_KEY')

## Uncomment to unzip the zipped data

In [4]:
# Unzip song and log data
#with zipfile.ZipFile('data/log-data.zip', 'r') as zip_ref:
#    zip_ref.extractall('unzip_data/log_data')

#with zipfile.ZipFile('data/song-data.zip', 'r') as zip_ref:
#    zip_ref.extractall('unzip_data/song_data/')

## Loading Data

#### Load Song Data

In [5]:
#Create spark session
spark = SparkSession.builder\
                    .config('spark.jars.packages','org.apache.hadoop:hadoop-aws:2.7.0')\
                    .getOrCreate()

In [6]:
# Load song data
song_data = spark.read.json('unzip_data/song_data/song_data/A/*/*/*.json')

In [7]:
#Check shcema
song_data.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



In [8]:
#check data
song_data.show(5)

+------------------+---------------+-----------------+----------------+--------------------+---------+---------+------------------+--------------------+----+
|         artist_id|artist_latitude|  artist_location|artist_longitude|         artist_name| duration|num_songs|           song_id|               title|year|
+------------------+---------------+-----------------+----------------+--------------------+---------+---------+------------------+--------------------+----+
|ARDR4AC1187FB371A1|           null|                 |            null|Montserrat Caball...|511.16363|        1|SOBAYLL12A8C138AF9|Sono andati? Fing...|   0|
|AREBBGV1187FB523D2|           null|      Houston, TX|            null|Mike Jones (Featu...|173.66159|        1|SOOLYAZ12A6701F4A6|Laws Patrolling (...|   0|
|ARMAC4T1187FB3FA4C|       40.82624|Morris Plains, NJ|       -74.47995|The Dillinger Esc...|207.77751|        1|SOBBUGU12A8C13E95D|Setting Fire to S...|2004|
|ARPBNLO1187FB3D52F|       40.71455|     New York, N

#### Load Log Data

In [9]:
# Load song data
log_data = spark.read.json('unzip_data/log_data/*.json')

In [10]:
# get start time
log_data = log_data.withColumn('start_time', F.from_unixtime(log_data['ts']/1000).cast("timestamp")).filter(log_data['page']=='NextSong')

In [11]:
# Check Schema
log_data.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- start_time: timestamp (nullable = true)



In [12]:
#check data
log_data.show(5)

+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+-------------------+
|     artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|                song|status|           ts|           userAgent|userId|         start_time|
+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+-------------------+
|   Harmonia|Logged In|     Ryan|     M|            0|   Smith|655.77751| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|       Sehr kosmisch|   200|1542241826796|"Mozilla/5.0 (X11...|    26|2018-11-15 00:30:26|
|The Prodigy|Logged In|     Ryan|     M|            1|   Smi

## Create Tables

## Time table
Time table contains: start time, hour, day, week, month, year, weekday.

In [13]:
# Get start time
time_table = log_data.select(['start_time'])

# Convert from start time to hour/day/week/month/year/weekday
time_table = time_table.select(
    'start_time',
    hour("start_time").alias('hour'),
    dayofmonth("start_time").alias('day'),
    weekofyear("start_time").alias('week'),
    month("start_time").alias('month'),
    year("start_time").alias('year'), 
    dayofweek("start_time").alias('weekday')
)

In [14]:
time_table.show(5)

+-------------------+----+---+----+-----+----+-------+
|         start_time|hour|day|week|month|year|weekday|
+-------------------+----+---+----+-----+----+-------+
|2018-11-15 00:30:26|   0| 15|  46|   11|2018|      5|
|2018-11-15 00:41:21|   0| 15|  46|   11|2018|      5|
|2018-11-15 00:45:41|   0| 15|  46|   11|2018|      5|
|2018-11-15 03:44:09|   3| 15|  46|   11|2018|      5|
|2018-11-15 05:48:55|   5| 15|  46|   11|2018|      5|
+-------------------+----+---+----+-----+----+-------+
only showing top 5 rows



In [15]:
time_table.printSchema()

root
 |-- start_time: timestamp (nullable = true)
 |-- hour: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- weekday: integer (nullable = true)



In [16]:
time_table.describe('month','year').show()

+-------+-----+------+
|summary|month|  year|
+-------+-----+------+
|  count| 6820|  6820|
|   mean| 11.0|2018.0|
| stddev|  0.0|   0.0|
|    min|   11|  2018|
|    max|   11|  2018|
+-------+-----+------+



## User table
User table contains :user id, first name, last name, gender, and level.

In [17]:
user_table = log_data.select(['userId','firstName','lastName','gender','level'])\
    .dropDuplicates(subset=['userId']).sort('lastName').where(log_data['userId']!= '')

# Change userid type from string to integer
user_table = user_table.withColumn("userId", user_table["userId"].cast(IntegerType()))

In [18]:
user_table.show(5)

+------+---------+--------+------+-----+
|userId|firstName|lastName|gender|level|
+------+---------+--------+------+-----+
|    66|    Kevin|Arellano|     M| free|
|    34|   Evelin|   Ayala|     F| free|
|    99|      Ann|   Banks|     F| free|
|   100|    Adler| Barrera|     M| free|
|    42|   Harper| Barrett|     M| paid|
+------+---------+--------+------+-----+
only showing top 5 rows



In [19]:
user_table.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- firstName: string (nullable = true)
 |-- lastName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- level: string (nullable = true)



## Songs table
Songs table contains: song id, title, artist id, year, duration

In [20]:
songs_table = song_data.select(['song_id','title','artist_id','year','duration']).dropDuplicates(subset=['song_id']).sort('title')

In [21]:
songs_table.show(5)

+------------------+--------------------+------------------+----+---------+
|           song_id|               title|         artist_id|year| duration|
+------------------+--------------------+------------------+----+---------+
|SOFFKZS12AB017F194|A Higher Place (A...|ARBEBBY1187B9B43DB|1994|236.17261|
|SODREIN12A58A7F2E5|A Whiter Shade Of...|ARLTWXK1187FB5A3F8|   0|326.00771|
|SOXVLOJ12AB0189215|     Amor De Cabaret|ARKRRTF1187B9984DA|   0|177.47546|
|SORAMLE12AB017C8B0|      Auguri Cha Cha|ARHHO3O1187B989413|   0|191.84281|
|SOZHPGD12A8C1394FE|     Baby Come To Me|AR9AWNF1187B9AB0B4|   0|236.93016|
+------------------+--------------------+------------------+----+---------+
only showing top 5 rows



In [22]:
songs_table.printSchema()

root
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- year: long (nullable = true)
 |-- duration: double (nullable = true)



## Artist table
Artist table contains: artist id, name, location, lattitude, longitude

In [23]:
artist_table = song_data.select(['artist_id','artist_name','artist_location','artist_latitude','artist_longitude']).dropDuplicates(subset=['artist_id']).sort('artist_name')
artist_table = artist_table.selectExpr('artist_id as artist_id','artist_name as name',\
                                            'artist_location as location','artist_latitude as latitude',\
                                            'artist_longitude as longitude')

In [24]:
artist_table.show(5)

+------------------+---------------+---------------+--------+---------+
|         artist_id|           name|       location|latitude|longitude|
+------------------+---------------+---------------+--------+---------+
|AR558FS1187FB45658|        40 Grit|               |    null|     null|
|AR7G5I41187FB4CE6C|       Adam Ant|London, England|    null|     null|
|ARI3BMM1187FB4255E|   Alice Stuart|     Washington| 38.8991|  -77.029|
|ARL7K851187B99ACD2|      Andy Andy|               |    null|     null|
|AR3JMC51187B9AE49D|Backstreet Boys|    Orlando, FL|28.53823|-81.37739|
+------------------+---------------+---------------+--------+---------+
only showing top 5 rows



In [25]:
artist_table.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- location: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)



## Song plays table
Songplays table contains: songplay_id, start time, user id, level, song id, artist id, session, location, user agent.

In [26]:
# Get data from log data
songplays_table = log_data.select(['start_time','userId','level','song','sessionId','location','userAgent','length']).filter(log_data['page']=='NextSong')

#get song, songid, and artistid to join
song_artist_table = song_data.select(['title','song_id','artist_id','duration']).dropDuplicates(subset=['song_id'])

# Create songplay_id column
songplays_table = songplays_table.withColumn("songplay_id", F.monotonically_increasing_id())

#join songid and artistid
songplays_table = songplays_table.join(song_artist_table,\
                                      songplays_table.song == song_artist_table.title,\
                                      how='left')

#drop ts song and title columns
songplays_table = songplays_table.drop('title')

# Change userid type from string to integer
songplays_table = songplays_table.withColumn("userId", songplays_table["userId"].cast(IntegerType()))

# create month column
songplays_table = songplays_table.withColumn("month",\
                                             month(songplays_table["start_time"]).alias('month'))   
# create year column
songplays_table = songplays_table.withColumn("year",\
                                             year(songplays_table["start_time"]).alias('year'))   

In [27]:
songplays_table.show(5)

+-------------------+------+-----+--------------------+---------+--------------------+--------------------+---------+-----------+-------+---------+--------+-----+----+
|         start_time|userId|level|                song|sessionId|            location|           userAgent|   length|songplay_id|song_id|artist_id|duration|month|year|
+-------------------+------+-----+--------------------+---------+--------------------+--------------------+---------+-----------+-------+---------+--------+-----+----+
|2018-11-15 00:30:26|    26| free|       Sehr kosmisch|      583|San Jose-Sunnyval...|"Mozilla/5.0 (X11...|655.77751|          0|   null|     null|    null|   11|2018|
|2018-11-15 00:41:21|    26| free|     The Big Gundown|      583|San Jose-Sunnyval...|"Mozilla/5.0 (X11...|260.07465|          1|   null|     null|    null|   11|2018|
|2018-11-15 00:45:41|    26| free|            Marry Me|      583|San Jose-Sunnyval...|"Mozilla/5.0 (X11...|205.45261|          2|   null|     null|    null|   1

In [28]:
songplays_table.printSchema()

root
 |-- start_time: timestamp (nullable = true)
 |-- userId: integer (nullable = true)
 |-- level: string (nullable = true)
 |-- song: string (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- location: string (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- length: double (nullable = true)
 |-- songplay_id: long (nullable = false)
 |-- song_id: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)



#### SQL Method
The method below uses the SQL to create the table matching song title, artist, and duration.

In [29]:
log_data.createOrReplaceTempView("events_sql")
time_table.createOrReplaceTempView("time_sql")
songs_table.createOrReplaceTempView("song_sql")
artist_table.createOrReplaceTempView("artist_sql")

In [30]:
songplays_table_test = spark.sql("""
SELECT 
e.start_time, e.userId, e.level,
s.song_id, s.artist_id, e.sessionId,
e.userAgent, t.year, t.month
FROM events_sql e
JOIN song_sql s ON e.song = s.title AND e.length = s.duration
JOIN artist_sql a ON e.artist = a.name AND a.artist_id = s.artist_id
JOIN time_sql t ON e.start_time = t.start_time
""")

In [31]:
songplays_table_test.createOrReplaceTempView("songplays_sql")

In [32]:
spark.sql("""
SELECT count(*)
FROM songplays_sql
""")

DataFrame[count(1): bigint]

## Write files

In [33]:
output_data = "s3a://udacity-datalakes-project/"

In [34]:
songs_table.write.mode("overwrite").partitionBy('year','artist_id').parquet(output_data+'songs_table')

Py4JJavaError: An error occurred while calling o201.parquet.
: java.io.IOException: Bucket udacity-datalakes-project does not exist
	at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:298)
	at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2669)
	at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:94)
	at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2703)
	at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2685)
	at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:373)
	at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295)
	at org.apache.spark.sql.execution.datasources.DataSource.planForWritingFileFormat(DataSource.scala:424)
	at org.apache.spark.sql.execution.datasources.DataSource.planForWriting(DataSource.scala:524)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:290)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
	at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:566)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)


In [None]:
artist_table.write.mode("overwrite").parquet(output_data+'artist_table')

In [None]:
user_table.write.mode("overwrite").parquet(output_data+'user_table')

In [None]:
songplays_table.write.mode("overwrite").partitionBy('year','month').parquet(output_data+'songplays_table')   

In [None]:
time_table.write.mode("overwrite").partitionBy('year','month').parquet(output_data+'time_table')