# Data Lakes Project

In [25]:
import pandas as pd
import matplotlib
import configparser
from datetime import datetime
import pyspark.sql.functions as f
from pyspark.sql import types as t
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format


In [26]:

config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']
spark = SparkSession \
    .builder \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
    .getOrCreate()

In [27]:
config.read('dl.cfg')
SONG_DATA_LOCAL=config['LOCAL']['INPUT_DATA_SONG_DATA_LOCAL']
LOG_DATA_LOCAL=config['LOCAL']['INPUT_DATA_LOG_DATA_LOCAL']
OUTPUT_DATA_LOCAL=config['LOCAL']['OUTPUT_DATA_LOCAL']

In [28]:
print(SONG_DATA_LOCAL)

data/song_data/*/*/*/*.json


In [29]:
# get filepath to song data file
#song_data = "data/song-data.zip"
#song_data = "s3://udacity-dend/song_data/song-data.zip"
#song_data = "s3://udacity-dend/log_data/log-data.zip"
song_data = SONG_DATA_LOCAL
# read song data file
df = spark.read.json(song_data)


In [30]:
df.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



In [31]:
df.show(5)

+------------------+---------------+-----------------+----------------+--------------------+---------+---------+------------------+--------------------+----+
|         artist_id|artist_latitude|  artist_location|artist_longitude|         artist_name| duration|num_songs|           song_id|               title|year|
+------------------+---------------+-----------------+----------------+--------------------+---------+---------+------------------+--------------------+----+
|ARDR4AC1187FB371A1|           null|                 |            null|Montserrat Caball...|511.16363|        1|SOBAYLL12A8C138AF9|Sono andati? Fing...|   0|
|AREBBGV1187FB523D2|           null|      Houston, TX|            null|Mike Jones (Featu...|173.66159|        1|SOOLYAZ12A6701F4A6|Laws Patrolling (...|   0|
|ARMAC4T1187FB3FA4C|       40.82624|Morris Plains, NJ|       -74.47995|The Dillinger Esc...|207.77751|        1|SOBBUGU12A8C13E95D|Setting Fire to S...|2004|
|ARPBNLO1187FB3D52F|       40.71455|     New York, N

In [32]:
# extract columns to create songs table
df.createOrReplaceTempView("songs_table")
songs_table = spark.sql("""
                        SELECT song_id, title, artist_id, year, duration
                        FROM songs_table
                        ORDER BY song_id
""")
songs_table.show(5)
songs_table.printSchema()
songs_table.write.parquet(OUTPUT_DATA_LOCAL + "songs_table.parquet")

+------------------+--------------------+------------------+----+---------+
|           song_id|               title|         artist_id|year| duration|
+------------------+--------------------+------------------+----+---------+
|SOAOIBZ12AB01815BE|I Hold Your Hand ...|ARPBNLO1187FB3D52F|2000| 43.36281|
|SOBAYLL12A8C138AF9|Sono andati? Fing...|ARDR4AC1187FB371A1|   0|511.16363|
|SOBBUGU12A8C13E95D|Setting Fire to S...|ARMAC4T1187FB3FA4C|2004|207.77751|
|SOBBXLX12A58A79DDA|Erica (2005 Digit...|AREDBBQ1187B98AFF5|   0|138.63138|
|SOBCOSW12A8C13D398|  Rumba De Barcelona|AR7SMBG1187B9B9066|   0|218.38322|
+------------------+--------------------+------------------+----+---------+
only showing top 5 rows

root
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- year: long (nullable = true)
 |-- duration: double (nullable = true)



In [33]:
# extract columns to create artists table
df.createOrReplaceTempView("artists_table")
artists_table = spark.sql("""
                        SELECT artist_id, 
                        artist_name as name,  
                        artist_location as location, 
                        artist_latitude as lattitude, 
                        artist_longitude as longitude
                        FROM artists_table
                        ORDER BY artist_id
""")
artists_table.show(5)
artists_table.printSchema()
artists_table.write.parquet(OUTPUT_DATA_LOCAL + "artists_table.parquet")

+------------------+--------------------+--------------------+---------+---------+
|         artist_id|                name|            location|lattitude|longitude|
+------------------+--------------------+--------------------+---------+---------+
|AR051KA1187B98B2FF|               Wilks|                    |     null|     null|
|AR0IAWL1187B9A96D0|        Danilo Perez|              Panama|   8.4177|-80.11278|
|AR0RCMP1187FB3F427|    Billie Jo Spears|        Beaumont, TX| 30.08615|-94.10158|
|AR10USD1187B99F3F1|Tweeterfriendly M...|Burlington, Ontar...|     null|     null|
|AR1Y2PT1187FB5B9CE|         John Wesley|             Brandon| 27.94017|-82.32547|
+------------------+--------------------+--------------------+---------+---------+
only showing top 5 rows

root
 |-- artist_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- location: string (nullable = true)
 |-- lattitude: double (nullable = true)
 |-- longitude: double (nullable = true)



In [34]:
# get filepath to log data file
log_data = LOG_DATA_LOCAL
# read log data file
df = spark.read.json(log_data)

In [35]:
df.show(5)
df.printSchema()

+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+---------------+------+-------------+--------------------+------+
|     artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|           song|status|           ts|           userAgent|userId|
+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+---------------+------+-------------+--------------------+------+
|   Harmonia|Logged In|     Ryan|     M|            0|   Smith|655.77751| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|  Sehr kosmisch|   200|1542241826796|"Mozilla/5.0 (X11...|    26|
|The Prodigy|Logged In|     Ryan|     M|            1|   Smith|260.07465| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|The Big Gundown|

In [36]:
 # filter by actions for song plays
df = df.filter(df.page == 'NextSong')

In [37]:
df.show(5)

+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+
|     artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|                song|status|           ts|           userAgent|userId|
+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+
|   Harmonia|Logged In|     Ryan|     M|            0|   Smith|655.77751| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|       Sehr kosmisch|   200|1542241826796|"Mozilla/5.0 (X11...|    26|
|The Prodigy|Logged In|     Ryan|     M|            1|   Smith|260.07465| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      

In [38]:
# extract columns for users table
df.createOrReplaceTempView("users_table")
users_table = spark.sql("""
                        SELECT userId as user_id, 
                        firstName as first_name,  
                        lastName as last_name, 
                        gender, 
                        level
                        FROM users_table
                        ORDER BY user_id
""")
users_table.show(5)
users_table.printSchema()
users_table.write.parquet(OUTPUT_DATA_LOCAL + "users_table.parquet")

+-------+----------+---------+------+-----+
|user_id|first_name|last_name|gender|level|
+-------+----------+---------+------+-----+
|     10|    Sylvie|     Cruz|     F| free|
|     10|    Sylvie|     Cruz|     F| free|
|     10|    Sylvie|     Cruz|     F| free|
|     10|    Sylvie|     Cruz|     F| free|
|     10|    Sylvie|     Cruz|     F| free|
+-------+----------+---------+------+-----+
only showing top 5 rows

root
 |-- user_id: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- level: string (nullable = true)



In [39]:
# create datetime column from original timestamp column
@udf(t.StringType())
def get_datetime(ts):
    return datetime.fromtimestamp(ts / 1000.0).strftime('%Y-%m-%d %H:%M:%S')
df = df.withColumn("datetime", get_datetime("ts"))
df.printSchema()
df.show(5)

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- datetime: string (nullable = true)

+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+-------------------+
|     artist|     auth|first

In [40]:
# create timestamp column from original timestamp column
@udf(t.TimestampType())
def get_timestamp(ts):
    return datetime.fromtimestamp(ts / 1000.0)
df = df.withColumn("timestamp", get_timestamp("ts"))
df.printSchema()
df.show(5)

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- datetime: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)

+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+-----

In [41]:
# extract columns to create time table
df.createOrReplaceTempView("time_table")
time_table = spark.sql("""
    SELECT  DISTINCT datetime AS start_time, 
                     hour(timestamp) AS hour, 
                     day(timestamp)  AS day, 
                     weekofyear(timestamp) AS week,
                     month(timestamp) AS month,
                     year(timestamp) AS year,
                     dayofweek(timestamp) AS weekday
    FROM time_table
    ORDER BY start_time
""")
time_table.printSchema()
time_table.show(5)
time_table.write.parquet(OUTPUT_DATA_LOCAL + "time_table.parquet")

root
 |-- start_time: string (nullable = true)
 |-- hour: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- weekday: integer (nullable = true)

+-------------------+----+---+----+-----+----+-------+
|         start_time|hour|day|week|month|year|weekday|
+-------------------+----+---+----+-----+----+-------+
|2018-11-01 21:01:46|  21|  1|  44|   11|2018|      5|
|2018-11-01 21:05:52|  21|  1|  44|   11|2018|      5|
|2018-11-01 21:08:16|  21|  1|  44|   11|2018|      5|
|2018-11-01 21:11:13|  21|  1|  44|   11|2018|      5|
|2018-11-01 21:17:33|  21|  1|  44|   11|2018|      5|
+-------------------+----+---+----+-----+----+-------+
only showing top 5 rows



In [42]:
# create view of joined dataframes
spark.sql("""
    CREATE OR REPLACE TEMPORARY VIEW joined_artists_users 
    AS SELECT *
    FROM time_table
    JOIN artists_table ON artists_table.artist_name = time_table.artist
""")
# read in song data to use for songplays table
# REFERENCE https://stackoverflow.com/questions/16555454/how-to-generate-auto-increment-field-in-select-query
songplays_table = spark.sql("""
            SELECT  row_number() OVER (ORDER BY userId, sessionId) AS songplay_id, 
            timestamp   AS start_time, 
            userId      AS user_id, 
            level       AS level,
            song_id     AS song_id,
            artist_id   AS artist_id,
            sessionId   AS session_id,
            location    AS location,
            userAgent   AS user_agent
    FROM joined_artists_users
""")
songplays_table.printSchema()
songplays_table.show(5)
songplays_table.write.parquet(OUTPUT_DATA_LOCAL + "songplays_table.parquet")

root
 |-- songplay_id: integer (nullable = true)
 |-- start_time: timestamp (nullable = true)
 |-- user_id: string (nullable = true)
 |-- level: string (nullable = true)
 |-- song_id: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- session_id: long (nullable = true)
 |-- location: string (nullable = true)
 |-- user_agent: string (nullable = true)

+-----------+--------------------+-------+-----+------------------+------------------+----------+--------------------+--------------------+
|songplay_id|          start_time|user_id|level|           song_id|         artist_id|session_id|            location|          user_agent|
+-----------+--------------------+-------+-----+------------------+------------------+----------+--------------------+--------------------+
|          1|2018-11-14 20:16:...|    101| free|SORRZGD12A6310DBC3|ARVBRGZ1187FB4675A|       603|New Orleans-Metai...|"Mozilla/5.0 (Win...|
|          2|2018-11-21 21:56:...|     15| paid|SOZCTXZ12AB0182364|

In [43]:
# Read fact table songplays from output file
input_data_parquet = OUTPUT_DATA_LOCAL + "songplays_table.parquet"
df = spark.read.parquet(input_data_parquet)
df.printSchema()
df.show(5)

root
 |-- songplay_id: integer (nullable = true)
 |-- start_time: timestamp (nullable = true)
 |-- user_id: string (nullable = true)
 |-- level: string (nullable = true)
 |-- song_id: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- session_id: long (nullable = true)
 |-- location: string (nullable = true)
 |-- user_agent: string (nullable = true)

+-----------+--------------------+-------+-----+------------------+------------------+----------+--------------------+--------------------+
|songplay_id|          start_time|user_id|level|           song_id|         artist_id|session_id|            location|          user_agent|
+-----------+--------------------+-------+-----+------------------+------------------+----------+--------------------+--------------------+
|          1|2018-11-14 20:16:...|    101| free|SORRZGD12A6310DBC3|ARVBRGZ1187FB4675A|       603|New Orleans-Metai...|"Mozilla/5.0 (Win...|
|          2|2018-11-21 21:56:...|     15| paid|SOZCTXZ12AB0182364|