# Data Warehouses and ETL on AWS


In [24]:
%load_ext autoreload
%autoreload 2

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [25]:
import sys
import logging
import warnings
import boto3
import json
import pandas as pd
from rich import traceback
from pathlib import Path
from IPython.core import display as ICD

In [26]:
src_path: str = "../src"
sys.path.append(src_path)
_ = traceback.install()
logging.basicConfig(force=True)
logging.getLogger().setLevel(logging.ERROR)
warnings.filterwarnings("ignore")

In [27]:
from utils import (
    process_config,
    open_db_port,
    delete_cluster,
    delete_iam_roles,
    get_db_connection,
)
from create_dwh import main as run_create_dwh
from create_tables import main as run_create_tables
from etl import main as run_etl
from sql_queries import STAR_TABLES, get_simple_select_query

In [28]:
user_config, dwh_config = (
    process_config(Path("../_user.cfg")),
    process_config(Path("../dwh.cfg")),
)

## 1. Explore source data


### 1.1. Get S3 client

In [29]:
s3_client = boto3.resource(
    "s3",
    aws_access_key_id=user_config.get("AWS", "KEY"),
    aws_secret_access_key=user_config.get("AWS", "SECRET"),
    region_name=dwh_config.get("GENERAL", "REGION"),
)

In [30]:
udacity_bucket = s3_client.Bucket("udacity-dend")

### 1.2. Explore the song dataset

In [31]:
song_objects = [f for f in udacity_bucket.objects.limit(5).filter(Prefix="song_data")]
song_objects

[s3.ObjectSummary(bucket_name='udacity-dend', key='song_data/'),
 s3.ObjectSummary(bucket_name='udacity-dend', key='song_data/A/A/A/TRAAAAK128F9318786.json'),
 s3.ObjectSummary(bucket_name='udacity-dend', key='song_data/A/A/A/TRAAAAV128F421A322.json'),
 s3.ObjectSummary(bucket_name='udacity-dend', key='song_data/A/A/A/TRAAABD128F429CF47.json'),
 s3.ObjectSummary(bucket_name='udacity-dend', key='song_data/A/A/A/TRAAACN128F9355673.json')]

Print contents of the first few files

In [32]:
songs_preview = pd.DataFrame(
    {
        Path(s3_object.key).stem: json.loads(
            udacity_bucket.Object(s3_object.key).get()["Body"].read().decode("utf-8")
        )
        for s3_object in song_objects
        if ".json" in s3_object.key
    }
).transpose()
songs_preview

Unnamed: 0,artist_id,artist_latitude,artist_location,artist_longitude,artist_name,duration,num_songs,song_id,title,year
TRAAAAK128F9318786,ARJNIUY12298900C91,,,,Adelitas Way,213.9424,1,SOBLFFE12AF72AA5BA,Scream,2009
TRAAAAV128F421A322,AR73AIO1187B9AD57B,37.77916,"San Francisco, CA",-122.42005,Western Addiction,118.07302,1,SOQPWCR12A6D4FB2A3,A Poor Recipe For Civic Cohesion,2005
TRAAABD128F429CF47,ARMJAGH1187FB546F3,35.14968,"Memphis, TN",-90.04892,The Box Tops,148.03546,1,SOCIWDW12A8C13D406,Soul Deep,1969
TRAAACN128F9355673,AR9Q9YC1187FB5609B,,New Jersey,,Quest_ Pup_ Kevo,252.94322,1,SOFRDWL12A58A7CEF7,Hit Da Scene,0


In [33]:
songs_preview.infer_objects().dtypes

artist_id            object
artist_latitude     float64
artist_location      object
artist_longitude    float64
artist_name          object
duration            float64
num_songs             int64
song_id              object
title                object
year                  int64
dtype: object

### 1.3. Explore the log dataset

In [34]:
log_objects = [f for f in udacity_bucket.objects.limit(5).filter(Prefix="log_data")]
log_objects

[s3.ObjectSummary(bucket_name='udacity-dend', key='log_data/'),
 s3.ObjectSummary(bucket_name='udacity-dend', key='log_data/2018/11/2018-11-01-events.json'),
 s3.ObjectSummary(bucket_name='udacity-dend', key='log_data/2018/11/2018-11-02-events.json'),
 s3.ObjectSummary(bucket_name='udacity-dend', key='log_data/2018/11/2018-11-03-events.json'),
 s3.ObjectSummary(bucket_name='udacity-dend', key='log_data/2018/11/2018-11-04-events.json')]

Print contents of the first log file

In [35]:
s3_object = log_objects[1]

In [36]:
log_preview = pd.DataFrame(
    [
        json.loads(x)
        for x in (
            udacity_bucket.Object(s3_object.key)
            .get()["Body"]
            .read()
            .decode("utf-8")
            .split("\n")
        )
    ]
)
log_preview.head()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,,Logged In,Walter,M,0,Frye,,free,"San Francisco-Oakland-Hayward, CA",GET,Home,1540919000000.0,38,,200,1541105830796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",39
1,,Logged In,Kaylee,F,0,Summers,,free,"Phoenix-Mesa-Scottsdale, AZ",GET,Home,1540345000000.0,139,,200,1541106106796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",8
2,Des'ree,Logged In,Kaylee,F,1,Summers,246.30812,free,"Phoenix-Mesa-Scottsdale, AZ",PUT,NextSong,1540345000000.0,139,You Gotta Be,200,1541106106796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",8
3,,Logged In,Kaylee,F,2,Summers,,free,"Phoenix-Mesa-Scottsdale, AZ",GET,Upgrade,1540345000000.0,139,,200,1541106132796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",8
4,Mr Oizo,Logged In,Kaylee,F,3,Summers,144.03873,free,"Phoenix-Mesa-Scottsdale, AZ",PUT,NextSong,1540345000000.0,139,Flat 55,200,1541106352796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",8


In [37]:
log_preview.infer_objects().dtypes

artist            object
auth              object
firstName         object
gender            object
itemInSession      int64
lastName          object
length           float64
level             object
location          object
method            object
page              object
registration     float64
sessionId          int64
song              object
status             int64
ts                 int64
userAgent         object
userId            object
dtype: object

## 2. Create Amazon Redshift cluster


In [38]:
cluster_props, redshift_client, iam_client = run_create_dwh()

Waiting for Redshift cluster to become available...
Redshift cluster is ready to be used!
ec2.SecurityGroup(id='sg-09c515e9660d60716')
An error occurred (InvalidPermission.Duplicate) when calling the AuthorizeSecurityGroupIngress operation: the specified rule "peer: 0.0.0.0/0, TCP, from port: 5439, to port: 5439, ALLOW" already exists


Reload config files after cluster creation, as they might include new fields

In [39]:
user_config, dwh_config = (
    process_config(Path("../_user.cfg")),
    process_config(Path("../dwh.cfg")),
)

## 3. Create staging and star schema tables


Create tables

In [40]:
run_create_tables()

## 4. Perform ETL


In [41]:
run_etl()

Output()

Output()

## 5. Perform some example queries for validation


### 5.1. Get DB connection

In [42]:
conn, cur = get_db_connection(dwh_config)

### 5.2. Get a preview of all star schema tables

In [43]:
query_columns = ("table_name", "column_name", "data_type")

for table_name, table_cols in STAR_TABLES.items():
    columns = [col.split(" ")[0] for col in table_cols]

    cur.execute(get_simple_select_query(table_name, ("*",), limit=5))
    print(f"Preview of '{table_name}':")
    ICD.display(pd.DataFrame(cur.fetchall(), columns=columns))
    print("\n")

Preview of 'dim_users':


Unnamed: 0,user_id,first_name,last_name,gender,level
0,53,Celeste,Williams,F,free
1,69,Anabelle,Simpson,F,free
2,62,Connar,Moreno,M,free
3,101,Jayden,Fox,M,free
4,95,Sara,Johnson,F,paid




Preview of 'dim_artists':


Unnamed: 0,artist_id,name,location,latitude,longitude
0,ARTMSN91187FB3A3B7,Annihilator,,,
1,ARXSPYZ1187B98972A,Carl Belew,"Salina, OK",36.2929,-95.15261
2,ARK4B1I1187FB4FBF1,Hari Mata Hari,,,
3,AR0693R1187FB59D32,Dusminguet,,,
4,AR051VM1187B9B7F27,Chicken Shack,"Birmingham, England",52.47859,-1.9086




Preview of 'dim_songs':


Unnamed: 0,song_id,title,artist_id,year,duration
0,SOIGHOD12A8C13B5A1,Indian Angel,ARY589G1187B9A9F4E,2004,171
1,SOGJSEF12AB01847D4,Love's Been Good To Me,ARXSPYZ1187B98972A,0,131
2,SOHESEE12A6D4FBEA1,Holding On To A Life,AR2MIPD1187B9AD547,2004,213
3,SOGABSW12AB018E300,Baby Girl,ARNHAJ71187FB42C19,0,183
4,SORRIYU12A67ADC983,Noi,AR8H2Y81187B989747,2007,286




Preview of 'dim_time':


Unnamed: 0,start_time,hour,day,week,month,year,weekday
0,2018-11-03 01:04:33,1,3,44,11,2018,6
1,2018-11-03 01:05:23,1,3,44,11,2018,6
2,2018-11-03 01:05:50,1,3,44,11,2018,6
3,2018-11-03 01:08:36,1,3,44,11,2018,6
4,2018-11-03 01:12:26,1,3,44,11,2018,6




Preview of 'fact_songplays':


Unnamed: 0,songplay_id,start_time,user_id,level,song_id,artist_id,session_id,location,user_agent
0,42,2018-11-15 11:11:25,80,paid,SOLZOBD12AB0185720,Usher,611,"Portland-South Portland, ME","""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4..."
1,106,2018-11-22 11:45:32,97,paid,SOYEKUR12AAF3B5274,OneRepublic,828,"Lansing-East Lansing, MI","""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5..."
2,170,2018-11-14 05:06:03,10,free,SOROLCY12AB0182652,Percubaba,484,"Washington-Arlington-Alexandria, DC-VA-MD-WV","""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4..."
3,234,2018-11-13 20:09:53,29,paid,SOBEUMD12AB018A9BC,Edward Sharpe & The Magnetic Zeros,556,"Atlanta-Sandy Springs-Roswell, GA","""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4..."
4,298,2018-11-15 17:44:25,97,paid,SOFXNQP12AB0184F1A,Cat Power,605,"Lansing-East Lansing, MI","""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5..."






### 5.3. How many records are there in each table?

In [44]:
for table_name in STAR_TABLES.keys():
    cur.execute(f"SELECT count(*) FROM {table_name}")
    print(f"{table_name} has {cur.fetchone()[0]} records.")

dim_users has 105 records.
dim_artists has 10025 records.
dim_songs has 14896 records.
dim_time has 8023 records.
fact_songplays has 1144 records.


### 5.4. Who are the top 5 users with the highest activity?

In [45]:
cur.execute(
    """
    SELECT
        sub.user_id, du.first_name, du.last_name, sub.counted
    FROM
        (
            SELECT
                fs.user_id, count(*) AS counted
            FROM
                fact_songplays fs
            JOIN
                dim_users du
            ON
                fs.user_id = du.user_id
            GROUP BY
                fs.user_id
        ) sub
    JOIN
        dim_users du ON sub.user_id = du.user_id
    ORDER BY
        sub.counted DESC, user_id
    LIMIT 5
    """
)
pd.DataFrame(cur.fetchall(), columns=("user_id", "first_name", "second_name", "count"))

Unnamed: 0,user_id,first_name,second_name,count
0,80,Tegan,Levine,147
1,49,Chloe,Cuevas,118
2,97,Kate,Harrell,84
3,24,Layla,Griffin,77
4,15,Lily,Koch,59


## 6. Close and shutdown all resources


In [46]:
conn.close()

In [47]:
delete_cluster(redshift_client, dwh_config)
delete_iam_roles(iam_client, dwh_config)