# Data Lake

In [1]:
import os
import configparser
from etl import *
from datetime import datetime
from pyspark.sql import SparkSession
from resource_reader import *
from sql_queries import *
from pyspark.sql.functions import udf, to_timestamp, col, year, month, dayofmonth, hour, weekofyear, dayofweek, monotonically_increasing_id

# 1. Load configuration parameters

In [2]:
rr = ResourceReader()
config = rr.config_reader('dwh.yml')

ACCESS_KEY_ID = config['AWS']['ACCESS_KEY_ID']
SECRET_ACCESS_KEY = config['AWS']['SECRET_ACCESS_KEY']
BUCKET_NAME = config['S3']['BUCKET_NAME']
REGION = config['AWS']['REGION']

# import AWS credential 
os.environ["AWS_ACCESS_KEY_ID"] = ACCESS_KEY_ID
os.environ["AWS_SECRET_ACCESS_KEY"] = SECRET_ACCESS_KEY
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.amazonaws:aws-java-sdk:1.11.95,org.apache.hadoop:hadoop-aws:3.3.1 pyspark-shell'
    

Configs loaded!


### Getting files in the bucket

In [3]:
rr.setup_aws_services(ACCESS_KEY_ID, SECRET_ACCESS_KEY, REGION)
# rr.create_s3_bucket(config)
# Don't forget to upload song & log data to <your own bucket if any>
# rr.upload_file('whatever.txt', BUCKET_NAME)

Establish AWS services


In [4]:
response = rr.s3_client.list_buckets()
print(response['Buckets'])

[{'Name': 'myeyesbucket.test', 'CreationDate': datetime.datetime(2022, 8, 21, 17, 52, 22, tzinfo=tzutc())}]


In [5]:
bucket = rr.s3_resource.Bucket(config['S3']['BUCKET_NAME'])
song_data_files = [f.key for f in bucket.objects.filter(Prefix='song_data')]
log_data_files = [f.key for f in bucket.objects.filter(Prefix='log_data')]
for _ in song_data_files[:5]:
    print(_)
print('---------------------------------------')
for _ in log_data_files[:5]:
    print(_)

song_data/A/A/A/TRAAAAW128F429D538.json
song_data/A/A/A/TRAAABD128F429CF47.json
song_data/A/A/A/TRAAADZ128F9348C2E.json
song_data/A/A/A/TRAAAEF128F4273421.json
song_data/A/A/A/TRAAAFD128F92F423A.json
---------------------------------------
log_data/2018/11/2018-11-01-events.json
log_data/2018/11/2018-11-02-events.json
log_data/2018/11/2018-11-03-events.json
log_data/2018/11/2018-11-04-events.json
log_data/2018/11/2018-11-05-events.json


# 2. Spark Session Creation

In [6]:
spark_session = create_spark_session_s3(config)

S3 spark session created


## 2.1 Load Song Data

In [7]:
songs_data = init_spark_data(spark_session, config['AWS']['SONG_DATA'])

Spark data has established


### 2.1.1 Verify song data schema & data

In [8]:
songs_data.printSchema()
songs_data.show(3, truncate=True)

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)

+------------------+---------------+-----------------+----------------+--------------------+---------+---------+------------------+--------------------+----+
|         artist_id|artist_latitude|  artist_location|artist_longitude|         artist_name| duration|num_songs|           song_id|               title|year|
+------------------+---------------+-----------------+----------------+--------------------+---------+---------+------------------+--------------------+----+
|ARDR4AC1187FB371A1|           null|                 |            null|Montserrat Caball...|511.16363|   

## 2.2 Load log data

In [11]:
logs_data = init_spark_data(spark_session, config['AWS']['LOG_DATA'])

Spark data has established


### 2.2.1 Verify log data schema & data

In [12]:
logs_data.printSchema()
logs_data.show(3, truncate=True)

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)

+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+---------------+------+-------------+--------------------+------+
|     artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            lo

### 2.2.2 Inquiry `NextSong` data

In [13]:
logs_data_nextsong = logs_data.filter(logs_data.page == 'NextSong')

### 2.2.3 Verify `NextSong` log data schema

In [14]:
logs_data_nextsong.printSchema()
logs_data_nextsong.show(3, truncate=True)

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)

+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+---------------+------+-------------+--------------------+------+
|     artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            lo

# 3. Table Creation

## 3.1 `songs` table

In [15]:
songs_tbl = create_table(spark_session, songs_data, songs_table_query_template)

### 3.1.1 View `songs` schema & data

In [16]:
songs_tbl.printSchema()
songs_tbl.show(5, truncate=True)

root
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- year: long (nullable = true)
 |-- artist_id: string (nullable = true)

+------------------+--------------------+---------+----+------------------+
|           song_id|               title| duration|year|         artist_id|
+------------------+--------------------+---------+----+------------------+
|SOAOIBZ12AB01815BE|I Hold Your Hand ...| 43.36281|2000|ARPBNLO1187FB3D52F|
|SOBAYLL12A8C138AF9|Sono andati? Fing...|511.16363|   0|ARDR4AC1187FB371A1|
|SOBBUGU12A8C13E95D|Setting Fire to S...|207.77751|2004|ARMAC4T1187FB3FA4C|
|SOBBXLX12A58A79DDA|Erica (2005 Digit...|138.63138|   0|AREDBBQ1187B98AFF5|
|SOBCOSW12A8C13D398|  Rumba De Barcelona|218.38322|   0|AR7SMBG1187B9B9066|
+------------------+--------------------+---------+----+------------------+
only showing top 5 rows



### 3.1.2 Process `songs` data via Parquet

In [17]:
song_path = rr.resource_path_builder('output-data', 'parquet', 'song-data')
save_as_parquet_format(songs_tbl, song_path, order_by=['artist_id', 'year'])

[PARQUET] files saved success


### 3.1.3 Read `songs` data from parquet package

In [18]:
song_data_parquet = read_parquet_format(spark_session, str(song_path) + '/')
song_data_parquet.printSchema()
song_data_parquet.show(5)

root
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- year: integer (nullable = true)

+------------------+--------------------+---------+------------------+----+
|           song_id|               title| duration|         artist_id|year|
+------------------+--------------------+---------+------------------+----+
|SOAOIBZ12AB01815BE|I Hold Your Hand ...| 43.36281|ARPBNLO1187FB3D52F|2000|
|SONYPOM12A8C13B2D7|I Think My Wife I...|186.48771|ARDNS031187B9924F0|2005|
|SODREIN12A58A7F2E5|A Whiter Shade Of...|326.00771|ARLTWXK1187FB5A3F8|   0|
|SOYMRWW12A6D4FAB14|The Moon And I (O...| 267.7024|ARKFYS91187B98E58F|   0|
|SOWQTQZ12A58A7B63E|Streets On Fire (...|279.97995|ARPFHN61187FB575F6|   0|
+------------------+--------------------+---------+------------------+----+
only showing top 5 rows



## 3.2. `artists` table

In [19]:
artists_tbl = create_table(spark_session, songs_data, artists_table_query_template)

### 3.2.1 View `artists` schema & data

In [20]:
artists_tbl.printSchema()
artists_tbl.show(5, truncate=True)

root
 |-- artist_id: string (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_longitude: double (nullable = true)

+------------------+------------------+---------------+---------------+----------------+
|         artist_id|       artist_name|artist_location|artist_latitude|artist_longitude|
+------------------+------------------+---------------+---------------+----------------+
|ARYKCQI1187FB3B18F|             Tesla|               |           null|            null|
|ARXR32B1187FB57099|               Gob|               |           null|            null|
|ARWB3G61187FB49404|       Steve Morse| Hamilton, Ohio|           null|            null|
|ARVBRGZ1187FB4675A|      Gwen Stefani|               |           null|            null|
|ARULZCI1241B9C8611|Luna Orbit Project|               |           null|            null|
+------------------+------------------+---------------+-

### 3.2.2 Process `artists` data via Parquet

In [21]:
artist_path = rr.resource_path_builder('output-data', 'parquet', 'artist-data')
save_as_parquet_format(artists_tbl, artist_path, order_by='artist_id')

[PARQUET] files saved success


### 3.2.3 Read `artists` data from parquet package

In [22]:
artist_data_parquet = read_parquet_format(spark_session, str(artist_path) + '/')
artist_data_parquet.printSchema()
artist_data_parquet.show(5)

root
 |-- artist_name: string (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_id: string (nullable = true)

+--------------------+--------------------+---------------+----------------+------------------+
|         artist_name|     artist_location|artist_latitude|artist_longitude|         artist_id|
+--------------------+--------------------+---------------+----------------+------------------+
|Montserrat Caball...|                    |           null|            null|ARDR4AC1187FB371A1|
|The Dillinger Esc...|   Morris Plains, NJ|       40.82624|       -74.47995|ARMAC4T1187FB3FA4C|
|   Sophie B. Hawkins|New York, NY [Man...|       40.79086|       -73.96644|ARNF6401187FB57032|
|         Willie Bobo|New York, NY [Spa...|       40.79195|       -73.94512|AROUOZZ1187B9ABE51|
|Nick Ingman;Gavyn...|     London, England|       51.50632|        -0.12714|ARI2JSK1187FB496EF|


## 3.3 `users` table

In [23]:
users_tbl = create_table(spark_session, logs_data_nextsong, users_table_query_template)

### 3.3.1 View `users` schema & data

In [24]:
users_tbl.printSchema()
users_tbl.show(5)

root
 |-- user_id: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- level: string (nullable = true)

+-------+----------+---------+------+-----+
|user_id|first_name|last_name|gender|level|
+-------+----------+---------+------+-----+
|      7|    Adelyn|   Jordan|     F| free|
|    100|     Adler|  Barrera|     M| free|
|     86|     Aiden|     Hess|     M| free|
|     20|     Aiden|  Ramirez|     M| paid|
|     44|    Aleena|    Kirby|     F| paid|
+-------+----------+---------+------+-----+
only showing top 5 rows



### 3.3.2 Process `users` data via Parquet

In [25]:
user_path = rr.resource_path_builder('output-data', 'parquet', 'user-data')
save_as_parquet_format(users_tbl, user_path, order_by=['first_name', 'last_name'])

[PARQUET] files saved success


### 3.3.3 Read `users` data from parquet package

In [26]:
user_data_parquet = read_parquet_format(spark_session, str(user_path) + '/')
user_data_parquet.printSchema()
user_data_parquet.show(5)

root
 |-- user_id: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- level: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)

+-------+------+-----+----------+---------+
|user_id|gender|level|first_name|last_name|
+-------+------+-----+----------+---------+
|     50|     F| free|       Ava| Robinson|
|     13|     F| free|       Ava| Robinson|
|     15|     F| paid|      Lily|     Koch|
|     15|     F| free|      Lily|     Koch|
|     29|     F| paid|Jacqueline|    Lynch|
+-------+------+-----+----------+---------+
only showing top 5 rows



## 3.4 `time` table

### 3.4.1 Format column data `ts` as timestamp

In [27]:
# add "time_stamp" column by extract from "ts" data
logs_data_nextsong = logs_data_nextsong.withColumn('time_stamp', to_timestamp(logs_data_nextsong.ts / 1000))
logs_data_nextsong.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- time_stamp: timestamp (nullable = true)



### 3.4.2 Definition of `time` table

In [28]:
time_tbl = create_table(spark_session, logs_data_nextsong, time_table_query_template)

### 3.4.3 View `time` schema & data

In [29]:
time_tbl.printSchema()
time_tbl.show(3, truncate=False)

root
 |-- time_stamp: timestamp (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- weekday: integer (nullable = true)

+-----------------------+----+-----+---+----+----+-------+
|time_stamp             |year|month|day|hour|week|weekday|
+-----------------------+----+-----+---+----+----+-------+
|2018-11-15 17:15:12.796|2018|11   |15 |17  |46  |5      |
|2018-11-16 01:48:45.796|2018|11   |16 |1   |46  |6      |
|2018-11-16 05:20:27.796|2018|11   |16 |5   |46  |6      |
+-----------------------+----+-----+---+----+----+-------+
only showing top 3 rows



### 3.4.5 Process `time` data via Parquet

In [30]:
time_path = rr.resource_path_builder('output-data', 'parquet', 'time-data')
save_as_parquet_format(time_tbl, time_path, order_by=['year', 'month'])

[PARQUET] files saved success


### 3.4.6 Read `time` data from parquet package

In [31]:
time_data_parquet = read_parquet_format(spark_session, str(time_path) + '/')
time_data_parquet.printSchema()
time_data_parquet.show(5)

root
 |-- time_stamp: timestamp (nullable = true)
 |-- day: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- weekday: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)

+--------------------+---+----+----+-------+----+-----+
|          time_stamp|day|hour|week|weekday|year|month|
+--------------------+---+----+----+-------+----+-----+
|2018-11-15 17:15:...| 15|  17|  46|      5|2018|   11|
|2018-11-16 01:48:...| 16|   1|  46|      6|2018|   11|
|2018-11-16 05:20:...| 16|   5|  46|      6|2018|   11|
|2018-11-21 08:09:...| 21|   8|  47|      4|2018|   11|
|2018-11-22 03:27:...| 22|   3|  47|      5|2018|   11|
+--------------------+---+----+----+-------+----+-----+
only showing top 5 rows



## 3.5 `songplays` table

### 3.5.1 Concatenate songs data and logs data as `songplays` table data

_Add definition `songplay_id` column into log data_

In [32]:
logs_data_nextsong = logs_data_nextsong.withColumn('songplay_id', monotonically_increasing_id())
logs_data_nextsong.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- time_stamp: timestamp (nullable = true)
 |-- songplay_id: long (nullable = false)



In [33]:
logs_data_nextsong.show(3, truncate=True)

+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+---------------+------+-------------+--------------------+------+--------------------+-----------+
|     artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|           song|status|           ts|           userAgent|userId|          time_stamp|songplay_id|
+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+---------------+------+-------------+--------------------+------+--------------------+-----------+
|   Harmonia|Logged In|     Ryan|     M|            0|   Smith|655.77751| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|  Sehr kosmisch|   200|1542241826796|"Mozilla/5.0 (X11...|    26|2018-11-15 07:30:...|          0|
|The Prodigy|Logged In|     

### 3.5.2 Definition of `songplays` table

In [35]:
songplays_tbl = create_nextsong_table(spark_session, songs_data, logs_data_nextsong)
songplays_tbl.printSchema()
songplays_tbl.show(3, truncate=False)

root
 |-- songplay_id: long (nullable = false)
 |-- song_id: string (nullable = true)
 |-- time_stamp: timestamp (nullable = true)
 |-- level: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- session_id: long (nullable = true)
 |-- user_id: string (nullable = true)
 |-- location: string (nullable = true)
 |-- user_agent: string (nullable = true)

+-----------+------------------+-----------------------+-----+------------------+----------+-------+----------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------+
|songplay_id|song_id           |time_stamp             |level|artist_id         |session_id|user_id|location                          |user_agent                                                                                                                               |
+-----------+------------------+-----------------------+-----+------------------+

### 3.5.3 Process `songplays` data via Parquet

In [36]:
songplays_path = rr.resource_path_builder('output-data', 'parquet', 'songplays-data')
save_as_parquet_format(songplays_tbl, songplays_path)

[PARQUET] files saved success


### 3.5.4 Read time data from parquet package

In [37]:
songplays_parquet = read_parquet_format(spark_session, str(songplays_path) + '/')
songplays_parquet.printSchema()
songplays_parquet.show(3, truncate=True)

root
 |-- songplay_id: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- time_stamp: timestamp (nullable = true)
 |-- level: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- session_id: long (nullable = true)
 |-- user_id: string (nullable = true)
 |-- location: string (nullable = true)
 |-- user_agent: string (nullable = true)

+-----------+------------------+--------------------+-----+------------------+----------+-------+--------------------+--------------------+
|songplay_id|           song_id|          time_stamp|level|         artist_id|session_id|user_id|            location|          user_agent|
+-----------+------------------+--------------------+-----+------------------+----------+-------+--------------------+--------------------+
|        882|SOZCTXZ12AB0182364|2018-11-22 04:56:...| paid|AR5KOSW1187FB35FF4|       818|     15|Chicago-Napervill...|"Mozilla/5.0 (X11...|
+-----------+------------------+--------------------+-----+-----------

# Clean Up Storage

### <font color=red> _BE CAREFUL!!! Keep your eyes lean on your files & folders_</font>

In [44]:
import shutil

# Put them here and double check your path
# print(song_path)
# print(artist_path)
# print(user_path)
# print(songplay_path)
# print(time_path)

In [45]:
# shutil.rmtree(song_path)
# shutil.rmtree(artist_path)
# shutil.rmtree(user_path)
# shutil.rmtree(songplay_path)
# rr.delete_s3_bucket(BUCKET_NAME)