# ETL notebook version

In [2]:
from pyspark.sql import SparkSession
import os
import configparser
import logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s  [%(name)s] %(message)s')
LOG = logging.getLogger('etl')

# Make sure that your AWS credentials are loaded as env vars

In [4]:
config = configparser.ConfigParser()

#Normally this file should be in ~/.aws/credentials
config.read_file(open('dl.cfg'))

os.environ["AWS_ACCESS_KEY_ID"]= config['AWS']['AWS_ACCESS_KEY_ID']
os.environ["AWS_SECRET_ACCESS_KEY"]= config['AWS']['AWS_SECRET_ACCESS_KEY']

# Create spark session with hadoop-aws package


In [5]:
spark = SparkSession.builder\
                     .config("spark.jars.packages","org.apache.hadoop:hadoop-aws:2.7.0")\
                     .getOrCreate()

# Load data from local or S3

## Process song_data

In [74]:
# get filepath to song data file
CWD = os.getcwd()
# local data, output_data directory
input_data = f"{CWD}/data/"
output_data = f"{CWD}/output_data/"

song_data = f"{input_data}song_data/*/*/*/*.json"
LOG.info(f"Here you go, song_data: {song_data}")

# read all song json files into df
df_s = spark.read.json(song_data)

2020-09-23 14:13:59,905 INFO  [etl] Here you go, song_data: /home/fxrc/Learn/UdacityNanodegree/Udacity-Data-Engineering/Data-Lake-Spark/data/song_data/*/*/*/*.json


In [75]:
# quick inspect
LOG.info(f"df_s count: {df_s.count()}")
df_s.printSchema()
df_s.show(5)

2020-09-23 14:14:01,347 INFO  [etl] df_s count: 71


root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)

+------------------+---------------+-----------------+----------------+--------------------+---------+---------+------------------+--------------------+----+
|         artist_id|artist_latitude|  artist_location|artist_longitude|         artist_name| duration|num_songs|           song_id|               title|year|
+------------------+---------------+-----------------+----------------+--------------------+---------+---------+------------------+--------------------+----+
|ARDR4AC1187FB371A1|           null|                 |            null|Montserrat Caball...|511.16363|   

In [76]:
# easy local env double check
!pwd
!find ./data/song_data -name '*.json' -type f | wc -l

/home/fxrc/Learn/UdacityNanodegree/Udacity-Data-Engineering/Data-Lake-Spark
71


In [77]:
## Creates or replaces a local temporary view with this DataFrame, so as to use pyspark.sql
df_s.createOrReplaceTempView("song_view")

In [93]:
# extract columns to create songs table
songs_table = spark.sql("""
SELECT DISTINCT song_id,
                title,
                artist_id,
                year,
                duration
FROM song_view  
""")
songs_table.printSchema()
songs_table.show(5)
LOG.info(f"songs_table count: {songs_table.count()}")

root
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- year: long (nullable = true)
 |-- duration: double (nullable = true)

+------------------+--------------------+------------------+----+---------+
|           song_id|               title|         artist_id|year| duration|
+------------------+--------------------+------------------+----+---------+
|SOGNCJP12A58A80271|Do You Finally Ne...|ARB29H41187B98F0EF|1972|342.56934|
|SOOJPRH12A8C141995|   Loaded Like A Gun|ARBGXIG122988F409D|   0|173.19138|
|SOFCHDR12AB01866EF|         Living Hell|AREVWGE1187B9B890A|   0|282.43546|
|SOWTBJW12AC468AC6E|Broken-Down Merry...|ARQGYP71187FB44566|   0|151.84934|
|SOGOSOV12AF72A285E|   ¿Dónde va Chichi?|ARGUVEV1187B98BA17|1997|313.12934|
+------------------+--------------------+------------------+----+---------+
only showing top 5 rows



2020-09-23 14:22:15,728 INFO  [etl] songs_table count: 71


In [97]:
# write songs table to parquet files partitioned by year and artist
songs_table.write.partitionBy("year", "artist_id").parquet(path=output_data+'songs.parquet', mode="overwrite")

In [80]:
yc = spark.sql("""
-- select distinct song_id from song_table
select year, count(*) from song_table 
group by 1 
order by 2 desc
""")
yc.printSchema()
yc.show(truncate=False)

root
 |-- year: long (nullable = true)
 |-- count(1): long (nullable = false)

+----+--------+
|year|count(1)|
+----+--------+
|0   |43      |
|2004|4       |
|1997|2       |
|2003|2       |
|1994|2       |
|2005|2       |
|2000|2       |
|2007|1       |
|1986|1       |
|1969|1       |
|1987|1       |
|1972|1       |
|1964|1       |
|1982|1       |
|1992|1       |
|1999|1       |
|1985|1       |
|1984|1       |
|2008|1       |
|1993|1       |
+----+--------+
only showing top 20 rows



In [92]:
# extract columns to create artists table
df_s.createOrReplaceTempView("song_view")
artists_table = spark.sql("""
SELECT DISTINCT artist_id, 
                artist_name, 
                artist_location, 
                artist_latitude, 
                artist_longitude
FROM song_view
""")
artists_table.printSchema()
artists_table.show()
LOG.info(f"artists_table count: {artists_table.count()}")

root
 |-- artist_id: string (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_longitude: double (nullable = true)

+------------------+--------------------+--------------------+---------------+----------------+
|         artist_id|         artist_name|     artist_location|artist_latitude|artist_longitude|
+------------------+--------------------+--------------------+---------------+----------------+
|ARPBNLO1187FB3D52F|            Tiny Tim|        New York, NY|       40.71455|       -74.00712|
|ARBEBBY1187B9B43DB|           Tom Petty|     Gainesville, FL|           null|            null|
|AR0IAWL1187B9A96D0|        Danilo Perez|              Panama|         8.4177|       -80.11278|
|ARMBR4Y1187B9990EB|        David Martin|     California - SF|       37.77916|      -122.42005|
|ARD0S291187B9B7BF5|             Rated R|                Ohio|           null|            null|


2020-09-23 14:21:52,592 INFO  [etl] artists_table count: 69


In [82]:
ac = spark.sql("""
-- select distinct artist_id from song_table
select year, artist_id, count(*) from song_table 
group by 1,2 
order by 3 desc
""")
ac.printSchema()
ac.show(truncate=False)

root
 |-- year: long (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- count(1): long (nullable = false)

+----+------------------+--------+
|year|artist_id         |count(1)|
+----+------------------+--------+
|0   |ARD7TVE1187B99BFB1|2       |
|0   |ARNTLGG11E2835DDB9|2       |
|0   |ARNPAGP1241B9C7FD4|1       |
|0   |ARPFHN61187FB575F6|1       |
|1987|ARD842G1187B997376|1       |
|2003|AR0IAWL1187B9A96D0|1       |
|0   |ARI2JSK1187FB496EF|1       |
|0   |AR36F9J1187FB406F1|1       |
|0   |ARMBR4Y1187B9990EB|1       |
|1984|AR8ZCNI1187B9A069B|1       |
|0   |ARKULSX1187FB45F84|1       |
|0   |ARGSJW91187B9B1D6B|1       |
|0   |ARIG6O41187B988BDD|1       |
|1969|ARMJAGH1187FB546F3|1       |
|2004|ARVBRGZ1187FB4675A|1       |
|0   |AROGWRA122988FEE45|1       |
|0   |ARKRRTF1187B9984DA|1       |
|0   |AR9AWNF1187B9AB0B4|1       |
|2004|ARP6N5A1187B99D1A3|1       |
|2004|ARYKCQI1187FB3B18F|1       |
+----+------------------+--------+
only showing top 20 rows



In [83]:
# write artists table to parquet files
artists_table.write.parquet(output_data+'artists.parquest', mode="overwrite")

## Process log_data

In [122]:
# path log_data
# log_data = f"{input_data}log_data/*/*/*.json"
log_data = f"{input_data}log_data/*.json"
LOG.info(f"Here you go, log_data: {log_data}")

# read all song json files into df
df_l = spark.read.json(log_data)

2020-09-23 14:50:21,032 INFO  [etl] Here you go, log_data: /home/fxrc/Learn/UdacityNanodegree/Udacity-Data-Engineering/Data-Lake-Spark/data/log_data/*.json


In [123]:
# quick inspect
LOG.info(f"df_l count: {df_l.count()}")
df_l.printSchema()
df_l.show(5)

2020-09-23 14:50:22,597 INFO  [etl] df_l count: 8056


root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)

+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+---------------+------+-------------+--------------------+------+
|     artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            lo

## users table

In [124]:
# filter by actions for song plays
df_l = df_l.filter(df_l.page == 'NextSong')

In [125]:
df_l.createOrReplaceTempView("log_view")

# extract columns for users table
users_table = spark.sql("""
SELECT DISTINCT userId as user_id,
                firstName as first_name,
                lastName as last_name,
                gender,
                level
FROM log_view
""")
users_table.printSchema()
users_table.show()
LOG.info(f"users_table count: {users_table.count()}")

root
 |-- user_id: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- level: string (nullable = true)

+-------+----------+---------+------+-----+
|user_id|first_name|last_name|gender|level|
+-------+----------+---------+------+-----+
|     98|    Jordyn|   Powell|     F| free|
|     34|    Evelin|    Ayala|     F| free|
|     85|   Kinsley|    Young|     F| paid|
|     38|    Gianna|    Jones|     F| free|
|     85|   Kinsley|    Young|     F| free|
|     63|      Ayla|  Johnson|     F| free|
|     37|    Jordan|    Hicks|     F| free|
|      6|   Cecilia|    Owens|     F| free|
|     15|      Lily|     Koch|     F| paid|
|     27|    Carlos|   Carter|     M| free|
|     89|   Kynnedi|  Sanchez|     F| free|
|     57| Katherine|      Gay|     F| free|
|     74|    Braden|   Parker|     M| free|
|     29|Jacqueline|    Lynch|     F| paid|
|     75|    Joseph|Gutierrez|     M| free|
|    

2020-09-23 14:50:27,372 INFO  [etl] users_table count: 104


In [112]:
# write users table to parquet files
users_table.write.parquet(output_data+"users.parquet", mode = "overwrite")

## time table

In [126]:
# create timestamp column from original timestamp column

import pyspark.sql.functions as f
from pyspark.sql.functions import udf
from pyspark.sql import types as t
from datetime import datetime
from pyspark.sql.functions import to_timestamp

# https://knowledge.udacity.com/questions/192909
# create timestamp column from original timestamp column
get_timestamp = udf(lambda x:  datetime.fromtimestamp(x/1000).strftime('%Y-%m-%d %H:%M:%S'))
df_l = df_l.withColumn("timestamp", get_timestamp(df_l.ts))
    
# create datetime column from original timestamp column
get_datetime = udf(lambda x: datetime.fromtimestamp(x/1000).strftime('%Y-%m-%d'))
df_l = df_l.withColumn("datetime", get_datetime(df_l.ts))

df_l.printSchema()
df_l.show(5)

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- datetime: string (nullable = true)

+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+--------

In [159]:
# extract columns to create time table
df_l.createOrReplaceTempView("log_view")

time_table = spark.sql("""
SELECT  DISTINCT timestamp AS start_time, 
                     hour(timestamp) AS hour, 
                     day(timestamp)  AS day, 
                     weekofyear(timestamp) AS week,
                     month(timestamp) AS month,
                     year(timestamp) AS year,
                     dayofweek(timestamp) AS weekday
FROM log_view
""")

time_table.printSchema()
time_table.show(5)

root
 |-- start_time: string (nullable = true)
 |-- hour: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- weekday: integer (nullable = true)

+-------------------+----+---+----+-----+----+-------+
|         start_time|hour|day|week|month|year|weekday|
+-------------------+----+---+----+-----+----+-------+
|2018-11-14 22:14:16|  22| 14|  46|   11|2018|      4|
|2018-11-15 12:38:46|  12| 15|  46|   11|2018|      5|
|2018-11-15 14:48:46|  14| 15|  46|   11|2018|      5|
|2018-11-14 01:19:37|   1| 14|  46|   11|2018|      4|
|2018-11-13 10:42:47|  10| 13|  46|   11|2018|      3|
+-------------------+----+---+----+-----+----+-------+
only showing top 5 rows



In [160]:
# write time table to parquet files partitioned by year and month
time_table.write.partitionBy("year", "month").parquet(output_data+"time.parquet", mode = "overwrite")

## create fact table: songplays

In [162]:
# quick check before joining

df_l.printSchema()
df_l.show(5)

df_s.printSchema()
df_s.show(5)

time_table.printSchema()
time_table.show(5)

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- datetime: string (nullable = true)

+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+--------

In [164]:
# extract columns from joined song and log datasets to create songplays table
from pyspark.sql.functions import monotonically_increasing_id
df_l.createOrReplaceTempView("logs")
df_s.createOrReplaceTempView("songs")
time_table.createOrReplaceTempView("time")

songplays_table = spark.sql("""
SELECT
    monotonically_increasing_id() as songplay_id,
    l.datetime,
    l.userId as user_id,
    l.level,
    s.song_id,
    s.artist_id,
    l.sessionId as session_id,
    l.location,
    l.userAgent,
    t.year,
    t.month
FROM logs l
JOIN songs s ON l.song = s.title AND l.artist = s.artist_name AND l.length = s.duration
JOIN time t ON l.timestamp = t.start_time
""")

# df_sp = df_l.join(df_s, (df_l.artist == df_s.artist_name) & (df_l.song == df_s.title) & (df_l.length == df_s.duration))
# df_sp = df_sp.join(time_table, (df_sp.datetime == time_table.start_time))

# songplays_table = songplays_table.withColumn('songplay_id', )
songplays_table.printSchema()
songplays_table.show(5)
LOG.info(f"songplays table count: {songplays_table.count()}")

root
 |-- songplay_id: long (nullable = false)
 |-- datetime: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- level: string (nullable = true)
 |-- song_id: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- session_id: long (nullable = true)
 |-- location: string (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)

+-----------+----------+-------+-----+------------------+------------------+----------+--------------------+--------------------+----+-----+
|songplay_id|  datetime|user_id|level|           song_id|         artist_id|session_id|            location|           userAgent|year|month|
+-----------+----------+-------+-----+------------------+------------------+----------+--------------------+--------------------+----+-----+
|          0|2018-11-21|     15| paid|SOZCTXZ12AB0182364|AR5KOSW1187FB35FF4|       818|Chicago-Napervill...|"Mozilla/5.0 (X11...|2018|   

2020-09-23 16:00:49,541 INFO  [etl] songplays table count: 1


In [165]:
# write songplays table to parquet files partitioned by year and month
songplays_table.write.partitionBy("year", "month").parquet(output_data+"songplays.parquet", mode = "overwrite")

# Query play

In [8]:
# Read songs_table
# input_data_parquet = output_data + "songplays.parquet"
real_songplays = "songplays.parquet.emr-1st-try/year=2018/month=11/part-00032-d1e1f19c-ec3c-4dba-9d65-91062627bbfd.c000.snappy.parquet"
df = spark.read.parquet(real_songplays)
df.printSchema()
df.show()

root
 |-- songplay_id: long (nullable = true)
 |-- datetime: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- level: string (nullable = true)
 |-- song_id: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- session_id: long (nullable = true)
 |-- location: string (nullable = true)
 |-- userAgent: string (nullable = true)

+------------+----------+-------+-----+------------------+------------------+----------+--------------------+--------------------+
| songplay_id|  datetime|user_id|level|           song_id|         artist_id|session_id|            location|           userAgent|
+------------+----------+-------+-----+------------------+------------------+----------+--------------------+--------------------+
|274877906944|2018-11-15|     49| paid|SOLQSYZ12A58A7919B|ARQSM561187FB4A0CF|       621|San Francisco-Oak...|Mozilla/5.0 (Wind...|
|274877906945|2018-11-14|     16| paid|SOYPWPS12AC4688650|ARCPYBD1187B9A56CE|       479|Birmingham-Hoover...|"Mo