<h1>Data Lake ETL</h1>

In [1]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format

In [2]:
config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['S3']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['S3']['AWS_SECRET_ACCESS_KEY']

In [3]:
spark = SparkSession \
    .builder \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
    .getOrCreate()

In [4]:
# input_data = "s3a://udacity-dend/"

input_data = "data/"

# output_data = "s3a://datalake-target-s3/data_model/"

output_data = "data_model/"

In [5]:
song_data = input_data + "song_data/*/*/*"
df = spark.read.json(song_data)

In [6]:
df.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



In [7]:
songs_table = df.select("song_id","title","artist_id","year","duration")

In [8]:
songs_table.show(5)

+------------------+--------------------+------------------+----+---------+
|           song_id|               title|         artist_id|year| duration|
+------------------+--------------------+------------------+----+---------+
|SOBAYLL12A8C138AF9|Sono andati? Fing...|ARDR4AC1187FB371A1|   0|511.16363|
|SOOLYAZ12A6701F4A6|Laws Patrolling (...|AREBBGV1187FB523D2|   0|173.66159|
|SOBBUGU12A8C13E95D|Setting Fire to S...|ARMAC4T1187FB3FA4C|2004|207.77751|
|SOAOIBZ12AB01815BE|I Hold Your Hand ...|ARPBNLO1187FB3D52F|2000| 43.36281|
|SONWXQJ12A8C134D94|The Ballad Of Sle...|ARNF6401187FB57032|1994|  305.162|
+------------------+--------------------+------------------+----+---------+
only showing top 5 rows



In [9]:
# Write songs to parquet tables partitioned by year and artist
songs_table.write.partitionBy("year","artist_id").parquet(output_data + "songs.parquet")

AnalysisException: 'path file:/workspace/home/data_model/songs.parquet already exists.;'

In [10]:
artists_table = df.select("artist_id","artist_name","artist_location","artist_latitude","artist_longitude")

In [11]:
artists_table.show(5)

+------------------+--------------------+--------------------+---------------+----------------+
|         artist_id|         artist_name|     artist_location|artist_latitude|artist_longitude|
+------------------+--------------------+--------------------+---------------+----------------+
|ARDR4AC1187FB371A1|Montserrat Caball...|                    |           null|            null|
|AREBBGV1187FB523D2|Mike Jones (Featu...|         Houston, TX|           null|            null|
|ARMAC4T1187FB3FA4C|The Dillinger Esc...|   Morris Plains, NJ|       40.82624|       -74.47995|
|ARPBNLO1187FB3D52F|            Tiny Tim|        New York, NY|       40.71455|       -74.00712|
|ARNF6401187FB57032|   Sophie B. Hawkins|New York, NY [Man...|       40.79086|       -73.96644|
+------------------+--------------------+--------------------+---------------+----------------+
only showing top 5 rows



In [12]:
# write artists table to parquet files
artists_table.write.parquet(output_data + "artists.parquet")

AnalysisException: 'path file:/workspace/home/data_model/artists.parquet already exists.;'

In [13]:
### log data
log_data = input_data + "log_data/*"
df = spark.read.json(log_data)

In [14]:
df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [15]:
df.select("page").distinct().show()

+----------------+
|            page|
+----------------+
|Submit Downgrade|
|            Home|
|       Downgrade|
|          Logout|
|   Save Settings|
|           About|
|        Settings|
|           Login|
|        NextSong|
|            Help|
|         Upgrade|
|           Error|
|  Submit Upgrade|
+----------------+



In [16]:
df.select('ts').show(12)

+-------------+
|           ts|
+-------------+
|1542241826796|
|1542242481796|
|1542242741796|
|1542247071796|
|1542252577796|
|1542253449796|
|1542253460796|
|1542260074796|
|1542260277796|
|1542260935796|
|1542261224796|
|1542261356796|
+-------------+
only showing top 12 rows



In [17]:
df2 = df.withColumn("Timestamp",F.from_unixtime(col("ts")/1000))

In [18]:
df2.select("ts","Timestamp").show(10)

+-------------+-------------------+
|           ts|          Timestamp|
+-------------+-------------------+
|1542241826796|2018-11-15 00:30:26|
|1542242481796|2018-11-15 00:41:21|
|1542242741796|2018-11-15 00:45:41|
|1542247071796|2018-11-15 01:57:51|
|1542252577796|2018-11-15 03:29:37|
|1542253449796|2018-11-15 03:44:09|
|1542253460796|2018-11-15 03:44:20|
|1542260074796|2018-11-15 05:34:34|
|1542260277796|2018-11-15 05:37:57|
|1542260935796|2018-11-15 05:48:55|
+-------------+-------------------+
only showing top 10 rows



In [19]:
df_songs = df2.filter(df.page == "NextSong")

In [20]:
df_songs.select("userId").distinct().count()

96

In [21]:
last_order_time = df_songs.groupBy('userId').agg(F.max('Timestamp').alias('last_order_time'))

In [22]:
df_recent = df_songs.join(last_order_time, on='userId', how='left')

In [23]:
df_songs_recent = df_recent.filter(df_recent.Timestamp == df_recent.last_order_time)

In [24]:
df_songs_recent.select("userId").count()

96

In [25]:
user_table = df_songs_recent.select("userId","firstName","lastName","gender","level").distinct()

In [26]:
user_table.sort(F.desc("userId")).show()

+------+---------+---------+------+-----+
|userId|firstName| lastName|gender|level|
+------+---------+---------+------+-----+
|    99|      Ann|    Banks|     F| free|
|    98|   Jordyn|   Powell|     F| free|
|    97|     Kate|  Harrell|     F| paid|
|    96|   Cierra|   Finley|     F| free|
|    95|     Sara|  Johnson|     F| paid|
|    94|     Noah|   Chavez|     M| free|
|    92|    Ryann|    Smith|     F| free|
|    91|   Jayden|     Bell|     M| free|
|    90|   Andrea|   Butler|     F| free|
|     9|    Wyatt|    Scott|     M| free|
|    89|  Kynnedi|  Sanchez|     F| free|
|    88| Mohammad|Rodriguez|     M| paid|
|    87|   Dustin|      Lee|     M| free|
|    86|    Aiden|     Hess|     M| free|
|    85|  Kinsley|    Young|     F| paid|
|    84|  Shakira|     Hunt|     F| free|
|    83|  Stefany|    White|     F| free|
|    82|    Avery| Martinez|     F| paid|
|    81|   Sienna|    Colon|     F| free|
|    80|    Tegan|   Levine|     F| paid|
+------+---------+---------+------

In [26]:
user_table.write.parquet(output_data + "users.parquet")

AnalysisException: 'path file:/workspace/home/data_model/users.parquet already exists.;'

In [None]:
### Here is the time table start

In [27]:
time_table = df2.select('Timestamp')

In [28]:
time_table.printSchema()

root
 |-- Timestamp: string (nullable = true)



In [29]:
time_table = time_table.withColumn('hour', F.hour('Timestamp'))
time_table = time_table.withColumn('day', F.dayofmonth('Timestamp'))
time_table = time_table.withColumn('weekofyear', F.weekofyear('Timestamp'))
time_table = time_table.withColumn('month', F.month('Timestamp'))
time_table = time_table.withColumn('year', F.year('Timestamp'))
time_table = time_table.withColumn('weekday', F.dayofweek('Timestamp'))

In [30]:
time_table.show(10)

+-------------------+----+---+----------+-----+----+-------+
|          Timestamp|hour|day|weekofyear|month|year|weekday|
+-------------------+----+---+----------+-----+----+-------+
|2018-11-15 00:30:26|   0| 15|        46|   11|2018|      5|
|2018-11-15 00:41:21|   0| 15|        46|   11|2018|      5|
|2018-11-15 00:45:41|   0| 15|        46|   11|2018|      5|
|2018-11-15 01:57:51|   1| 15|        46|   11|2018|      5|
|2018-11-15 03:29:37|   3| 15|        46|   11|2018|      5|
|2018-11-15 03:44:09|   3| 15|        46|   11|2018|      5|
|2018-11-15 03:44:20|   3| 15|        46|   11|2018|      5|
|2018-11-15 05:34:34|   5| 15|        46|   11|2018|      5|
|2018-11-15 05:37:57|   5| 15|        46|   11|2018|      5|
|2018-11-15 05:48:55|   5| 15|        46|   11|2018|      5|
+-------------------+----+---+----------+-----+----+-------+
only showing top 10 rows



In [44]:
time_table = time_table.selectExpr("Timestamp as start_time","hour as hour","day as day","weekofyear as weekofyear",
                                   "month as month","year as year","weekday as weekday")

In [31]:
time_table.show(5)

+-------------------+----+---+----------+-----+----+-------+
|          Timestamp|hour|day|weekofyear|month|year|weekday|
+-------------------+----+---+----------+-----+----+-------+
|2018-11-15 00:30:26|   0| 15|        46|   11|2018|      5|
|2018-11-15 00:41:21|   0| 15|        46|   11|2018|      5|
|2018-11-15 00:45:41|   0| 15|        46|   11|2018|      5|
|2018-11-15 01:57:51|   1| 15|        46|   11|2018|      5|
|2018-11-15 03:29:37|   3| 15|        46|   11|2018|      5|
+-------------------+----+---+----------+-----+----+-------+
only showing top 5 rows



In [32]:
time_table.write.parquet(output_data + "time.parquet")

AnalysisException: 'path file:/workspace/home/data_model/time.parquet already exists.;'

In [33]:
song_df = spark.read.json(song_data)

In [34]:
song_df.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



In [36]:
song_df = song_df.withColumnRenamed("title", "song")
song_df.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- song: string (nullable = true)
 |-- year: long (nullable = true)



In [35]:
df_songs.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- Timestamp: string (nullable = true)



In [37]:
songplays_table = df_songs.join(song_df, on='song', how='left')
songplays_table.printSchema()

root
 |-- song: string (nullable = true)
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- Timestamp: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = tru

In [39]:
songplays_table = songplays_table.select("Timestamp","userId","level","song_id","artist_id","sessionId","location","userAgent")
songplays_table.printSchema()

root
 |-- Timestamp: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- level: string (nullable = true)
 |-- song_id: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- location: string (nullable = true)
 |-- userAgent: string (nullable = true)



In [40]:
songplays_table = songplays_table.selectExpr("Timestamp as start_time","userId as userId","song_id as song_id","artist_id as artist_id",
                                   "sessionId as sessionId","location as location","userAgent as userAgent")
songplays_table.printSchema()

root
 |-- start_time: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- song_id: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- location: string (nullable = true)
 |-- userAgent: string (nullable = true)



In [41]:
songplays_table.write.parquet(output_data + "songplays.parquet")

In [None]:
def process_log_data(spark, input_data, output_data):
    # get filepath to log data file
    log_data =

    # read log data file
    df = 
    
    # filter by actions for song plays
    df = 

    # extract columns for users table    
    artists_table = 
    
    # write users table to parquet files
    artists_table

    # create timestamp column from original timestamp column
    get_timestamp = udf()
    df = 
    
    # create datetime column from original timestamp column
    get_datetime = udf()
    df = 
    
    # extract columns to create time table
    time_table = 
    
    # write time table to parquet files partitioned by year and month
    time_table

    # read in song data to use for songplays table
    song_df = 

    # extract columns from joined song and log datasets to create songplays table 
    songplays_table = 

    # write songplays table to parquet files partitioned by year and month
    songplays_table


def main():
    spark = create_spark_session()
    input_data = "s3a://udacity-dend/"
    output_data = "s3a://datalake-target-s3/data_model/"
    
    process_song_data(spark, input_data, output_data)    
    process_log_data(spark, input_data, output_data)


if __name__ == "__main__":
    main()