### Test notebook for Data Lake Project: Udacity Data Engineering NanoDegree

In [2]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, monotonically_increasing_id
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
from pyspark.sql.types import StructType, StructField,  DoubleType, StringType, IntegerType, TimestampType

In [3]:
#parse configuration file
config = configparser.ConfigParser()
config.read('dl.cfg')

['dl.cfg']

In [4]:
# access aws
os.environ['AWS_ACCESS_KEY_ID']=config.get('AWS','AWS_ACCESS_KEY_ID')
os.environ['AWS_SECRET_ACCESS_KEY']=config.get('AWS','AWS_SECRET_ACCESS_KEY')

In [5]:
#create a spark session 
spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()

In [10]:
#extract data from local file in workspace
#import zipfile
#with zipfile.ZipFile("data/song-data.zip","r") as zip_ref:
#    zip_ref.extractall("data/song-data")

In [11]:
# with zipfile.ZipFile("data/log-data.zip","r") as zip_ref:
#     zip_ref.extractall("data/log-data")

In [6]:
#define schema to increase performance
song_schema = StructType([
    StructField("artist_id", StringType()),
    StructField("artist_latitude", DoubleType()),
    StructField("artist_location", StringType()),
    StructField("artist_longitude", StringType()),
    StructField("artist_name", StringType()),
    StructField("duration", DoubleType()),
    StructField("num_songs", IntegerType()),
    StructField("song_id", StringType()),
    StructField("title", StringType()),
    StructField("year", IntegerType()),
])

In [7]:
#read in song data from json files using spark
song_data = "data/song-data/song_data/*/*/*/*.json"
dfs = spark.read.json(song_data, schema=song_schema)

In [8]:
dfs.head(1)

[Row(artist_id='ARDR4AC1187FB371A1', artist_latitude=None, artist_location='', artist_longitude=None, artist_name='Montserrat Caballé;Placido Domingo;Vicente Sardinero;Judith Blegen;Sherrill Milnes;Georg Solti', duration=511.16363, num_songs=1, song_id='SOBAYLL12A8C138AF9', title='Sono andati? Fingevo di dormire', year=0)]

In [10]:
#create path to read data from s3
#this dataset was taking too long to read on local machine so further testing done on AWS EMR Cluster
#song_s3 = "s3a://udacity-dend/song_data/*/*/*/*.json"

In [11]:
#dfs3 = spark.read.json(song_s3, schema=song_schema)

In [1]:
#dfs3.head(1)

In [9]:
#create a view in order to run queries
dfs.createOrReplaceTempView("song_table")

In [14]:
#dfs3.createOrReplaceTempView("song_table")

In [10]:
#create song table
#check fields: song_id, title, artist_id, year, duration
dim_songs = spark.sql("""
                 SELECT song_id,
                 title,
                 artist_id,
                 year,
                 duration
                 FROM song_table
                 WHERE song_id IS NOT NULL
             """)

In [11]:
dim_songs.write.mode('overwrite').partitionBy("year", "artist_id").parquet("data/songs")

In [16]:
#partition for columns that are queried often and not as granular
#dim_songs.write.mode('overwrite').partitionBy("year", "artist_id").parquet("s3a://udacity-dlake/songtable")

Processing the dataset located in S3 was too time consuming locally so decided to do further testing on
an AWS EMR Cluster.

In [12]:
#create artist table
#artists fields check: artist_id, name, location, lattitude, longitude
dim_artists = spark.sql("""
                     SELECT DISTINCT artist_id,
                     artist_name AS name,
                     artist_location AS location,
                     artist_latitude AS latitude,
                     artist_longitude AS longitude
                     FROM song_table
                     WHERE artist_id IS NOT NULL
                """)

In [13]:
dim_artists.write.mode('overwrite').parquet("data/artists")

In [15]:
#Let's look at the log data with this path
log_data = "data/log-data/*.json"

In [16]:
#read in logfiles
#reading files from s3 did not take long on emr cluster so decided against creating a schema here
dfl = spark.read.json(log_data)

In [17]:
dfl.head(2)

[Row(artist='Harmonia', auth='Logged In', firstName='Ryan', gender='M', itemInSession=0, lastName='Smith', length=655.77751, level='free', location='San Jose-Sunnyvale-Santa Clara, CA', method='PUT', page='NextSong', registration=1541016707796.0, sessionId=583, song='Sehr kosmisch', status=200, ts=1542241826796, userAgent='"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/36.0.1985.125 Chrome/36.0.1985.125 Safari/537.36"', userId='26'),
 Row(artist='The Prodigy', auth='Logged In', firstName='Ryan', gender='M', itemInSession=1, lastName='Smith', length=260.07465, level='free', location='San Jose-Sunnyvale-Santa Clara, CA', method='PUT', page='NextSong', registration=1541016707796.0, sessionId=583, song='The Big Gundown', status=200, ts=1542242481796, userAgent='"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/36.0.1985.125 Chrome/36.0.1985.125 Safari/537.36"', userId='26')]

In [18]:
#using python filter for page = NextSong
dfl = dfl.filter(dfl.page == "NextSong")

In [19]:
#create view to run queries
dfl.createOrReplaceTempView("log_table")

In [21]:
#create user table
#user fields check: user_id, first_name, last_name, gender, level
dim_users = spark.sql("""
                  SELECT DISTINCT userId AS user_id,
                  firstName AS first_name,
                  lastName AS last_name,
                  gender,
                  level
                  FROM log_table
                  WHERE userId IS NOT NULL
              """)

In [22]:
dim_users.show(5)

+-------+----------+---------+------+-----+
|user_id|first_name|last_name|gender|level|
+-------+----------+---------+------+-----+
|     98|    Jordyn|   Powell|     F| free|
|     34|    Evelin|    Ayala|     F| free|
|     85|   Kinsley|    Young|     F| paid|
|     38|    Gianna|    Jones|     F| free|
|     85|   Kinsley|    Young|     F| free|
+-------+----------+---------+------+-----+
only showing top 5 rows



In [24]:
#write table to parquet file
dim_users.write.mode('overwrite').parquet("data/users")

In [25]:
#convert ts field to timestamp to extract time field later on
time_convert = spark.sql("""
                 SELECT to_timestamp(ts/1000) as start_times
                 FROM log_table
                 WHERE ts IS NOT NULL
            """)

In [26]:
#test dim_users table
dim_users.createOrReplaceTempView("user_table")

In [27]:
#test query for dim_users
spark.sql("SELECT * FROM user_table LIMIT 5").show()

+-------+----------+---------+------+-----+
|user_id|first_name|last_name|gender|level|
+-------+----------+---------+------+-----+
|     98|    Jordyn|   Powell|     F| free|
|     34|    Evelin|    Ayala|     F| free|
|     85|   Kinsley|    Young|     F| paid|
|     38|    Gianna|    Jones|     F| free|
|     85|   Kinsley|    Young|     F| free|
+-------+----------+---------+------+-----+



In [28]:
#create temp view to run queries
time_convert.createOrReplaceTempView("time_table")

In [29]:
#create time table from temp view time table
#fields chech: start_time, hour, day, week, month, year, weekday
dim_time = spark.sql("""
                       SELECT start_times as start_time,
                       hour(start_times) as hour,
                       dayofmonth(start_times) as day,
                       weekofyear(start_times) as week,
                       month(start_times) as month,
                       year(start_times) as year,
                       dayofweek(start_times) as weekday
                       FROM time_table
                    """)

In [30]:
dim_time.show(5)

+--------------------+----+---+----+-----+----+-------+
|          start_time|hour|day|week|month|year|weekday|
+--------------------+----+---+----+-----+----+-------+
|2018-11-15 00:30:...|   0| 15|  46|   11|2018|      5|
|2018-11-15 00:41:...|   0| 15|  46|   11|2018|      5|
|2018-11-15 00:45:...|   0| 15|  46|   11|2018|      5|
|2018-11-15 03:44:...|   3| 15|  46|   11|2018|      5|
|2018-11-15 05:48:...|   5| 15|  46|   11|2018|      5|
+--------------------+----+---+----+-----+----+-------+
only showing top 5 rows



In [31]:
#write table to parquet file, partition by less granular fields
dim_time.write.mode('overwrite').partitionBy("year", "month").parquet("data/time")

In [32]:
#test if reading in parquet file is working correctly
songdata = spark.read.parquet('data/songs')

In [33]:
songdata.show(3)

+------------------+--------------------+---------+----+------------------+
|           song_id|               title| duration|year|         artist_id|
+------------------+--------------------+---------+----+------------------+
|SOAOIBZ12AB01815BE|I Hold Your Hand ...| 43.36281|2000|ARPBNLO1187FB3D52F|
|SONYPOM12A8C13B2D7|I Think My Wife I...|186.48771|2005|ARDNS031187B9924F0|
|SODREIN12A58A7F2E5|A Whiter Shade Of...|326.00771|   0|ARLTWXK1187FB5A3F8|
+------------------+--------------------+---------+----+------------------+
only showing top 3 rows



In [34]:
songdata.createOrReplaceTempView("songdata")

In [35]:
#run queries to test 
spark.sql("SELECT * FROM songdata LIMIT 5").show()

+------------------+--------------------+---------+----+------------------+
|           song_id|               title| duration|year|         artist_id|
+------------------+--------------------+---------+----+------------------+
|SOAOIBZ12AB01815BE|I Hold Your Hand ...| 43.36281|2000|ARPBNLO1187FB3D52F|
|SONYPOM12A8C13B2D7|I Think My Wife I...|186.48771|2005|ARDNS031187B9924F0|
|SODREIN12A58A7F2E5|A Whiter Shade Of...|326.00771|   0|ARLTWXK1187FB5A3F8|
|SOYMRWW12A6D4FAB14|The Moon And I (O...| 267.7024|   0|ARKFYS91187B98E58F|
|SOWQTQZ12A58A7B63E|Streets On Fire (...|279.97995|   0|ARPFHN61187FB575F6|
+------------------+--------------------+---------+----+------------------+



In [58]:
#create songplays table from both log and song files
#table fields check songplay_id, start_time, user_id, level, song_id, artist_id, session_id, location, user_agent
#month, year column used to partition
fact_songplays = spark.sql("""
                     SELECT monotonically_increasing_id() as songplay_id,
                     to_timestamp(lt.ts/1000) as start_time,
                     month(to_timestamp(lt.ts/1000)) as month,
                     year(to_timestamp(lt.ts/1000)) as year,
                     lt.userId as user_id,
                     lt.level as level,
                     st.song_id as song_id,
                     st.artist_id as artist_id,
                     lt.sessionId as session_id,
                     lt.location as location,
                     lt.userAgent as user_agent
                     FROM log_table lt
                     JOIN song_table st ON lt.song = st.title AND lt.artist = st.artist_name
""")

In [61]:
fact_songplays.show()

+-----------+--------------------+-----+----+-------+-----+------------------+------------------+----------+--------------------+--------------------+
|songplay_id|          start_time|month|year|user_id|level|           song_id|         artist_id|session_id|            location|          user_agent|
+-----------+--------------------+-----+----+-------+-----+------------------+------------------+----------+--------------------+--------------------+
|          0|2018-11-21 21:56:...|   11|2018|     15| paid|SOZCTXZ12AB0182364|AR5KOSW1187FB35FF4|       818|Chicago-Napervill...|"Mozilla/5.0 (X11...|
+-----------+--------------------+-----+----+-------+-----+------------------+------------------+----------+--------------------+--------------------+



In [62]:
#write songplays table to
fact_songplays.write.mode('overwrite').partitionBy("year", "month").parquet("data/songplays")

In [63]:
#create temp view to test queries
fact_songplays.createOrReplaceTempView("songplays")

In [64]:
spark.sql("SELECT month, year, user_id, location FROM songplays").show()

+-----+----+-------+--------------------+
|month|year|user_id|            location|
+-----+----+-------+--------------------+
|   11|2018|     15|Chicago-Napervill...|
+-----+----+-------+--------------------+

