# project_2_data_warehouse_with_redshift
<img src="https://upload.wikimedia.org/wikipedia/commons/thumb/9/93/Amazon_Web_Services_Logo.svg/2560px-Amazon_Web_Services_Logo.svg.png" width="100" height="100">

In this project, I'm a data engineer for a music streaming startup called Sparkify.

Sparkify has expanded its user base and song database, and now wants to move its processes, data, and analytics applications to the cloud.

Their data is stored in S3, with one directory containing JSON logs of user activity on the app, and another containing JSON metadata for the songs in their app.  
My task is to build an ETL pipeline that extracts data from S3, stages it in Redshift, and transforms it into a set of dimensional tables.  
Why? So their analytics team can continue finding insights into what songs users are listening to.

In [None]:
import json
import configparser
import psycopg2

import pandas as pd

#!pip install boto3
import boto3
%load_ext sql

# 1. Setup
- Create a **new IAM user**.
- Under **Attach existing policies directly**, assign **`AdministratorAccess`**.
- Store the **access key** and **secret** in  **2_cloud_data_wh_redshift_boto3.cfg** file (same folder as this notebook).
- Fill in the configuration file as follows:
    ```
    [AWS]
    KEY=YOUR_AWS_KEY
    SECRET=YOUR_AWS_SECRET
    ```

## 1.1. Troubleshoot  
If your **keys are not working**, such as encountering an **InvalidAccessKeyId** error, follow these steps to **create a new pair of access keys**:
1. Go to the **[IAM Dashboard](https://console.aws.amazon.com/iam/home)**.  
2. View the details of the **Admin user** you created.  
3. Select **Security Credentials** → **Create access key**.  
4. A new **Access Key ID** and **Secret** will be **generated**.  
5. Update the `.cfg` file with the **new** credentials.

## 1.2. Load configuration from a file

In [None]:
# Load configuration file
config = configparser.ConfigParser()
config.read_file(open("2.3_cloud_data_warehouse_aws_services_config_values.cfg"))

# AWS Credentials
aws_key = config.get("AWS", "KEY")
aws_secret = config.get("AWS", "SECRET")

# AWS Region
dwh_region = config.get("DWH", "DWH_REGION")

# IAM Role
dwh_iam_role_name = config.get("DWH", "DWH_IAM_ROLE_NAME")

# Redshift Cluster Configuration
dwh_cluster_type = config.get("DWH", "DWH_CLUSTER_TYPE")
dwh_node_type = config.get("DWH", "DWH_NODE_TYPE")
dwh_num_nodes = config.get("DWH", "DWH_NUM_NODES")
dwh_cluster_identifier = config.get("DWH", "DWH_CLUSTER_IDENTIFIER")

# Database Configuration
dwh_db = config.get("DWH", "DWH_DB")
dwh_db_user = config.get("DWH", "DWH_DB_USER")
dwh_db_password = config.get("DWH", "DWH_DB_PASSWORD")
dwh_port = config.get("DWH", "DWH_PORT")

# Display parameters in a DataFrame
pd.DataFrame(
    {
        "Param": [
            "DWH_REGION",
            "DWH_IAM_ROLE_NAME",
            "DWH_CLUSTER_TYPE",
            "DWH_NODE_TYPE",
            "DWH_NUM_NODES",
            "DWH_CLUSTER_IDENTIFIER",
            "DWH_DB",
            "DWH_DB_USER",
            "DWH_DB_PASSWORD",
            "DWH_PORT",
        ],
        "Value": [
            dwh_region,
            dwh_iam_role_name,
            dwh_cluster_type,
            dwh_node_type,
            dwh_num_nodes,
            dwh_cluster_identifier,
            dwh_db,
            dwh_db_user,
            dwh_db_password,
            dwh_port,
        ],
    }
)

# 2. Initialize AWS services

In [None]:
iam = boto3.client(
    "iam",
    region_name=dwh_region,
    aws_access_key_id=aws_key,
    aws_secret_access_key=aws_secret,
)

ec2 = boto3.resource(
    "ec2",
    region_name=dwh_region,
    aws_access_key_id=aws_key,
    aws_secret_access_key=aws_secret,
)

s3 = boto3.resource(
    "s3",
    region_name=dwh_region,
    aws_access_key_id=aws_key,
    aws_secret_access_key=aws_secret,
)

redshift = boto3.client(
    "redshift",
    region_name=dwh_region,
    aws_access_key_id=aws_key,
    aws_secret_access_key=aws_secret,
)

## 2.1. IAM Role Setup

In [None]:
# Create the role
try:
    print("1.1 Creating a new IAM Role")
    dwh_role = iam.create_role(
        Path="/",
        RoleName=dwh_iam_role_name,
        Description="Allows Redshift clusters to call AWS services on your behalf.",
        AssumeRolePolicyDocument=json.dumps({
            "Statement": [{
                "Action": "sts:AssumeRole",
                "Effect": "Allow",
                "Principal": {"Service": "redshift.amazonaws.com"}
            }],
            "Version": "2012-10-17"
        })
    )
except Exception as e:
    print(e)

# Attach Policy
print("1.2 Attaching Policy")

iam.attach_role_policy(RoleName=dwh_iam_role_name,
                       PolicyArn="arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess"
                      )['ResponseMetadata']['HTTPStatusCode']

# Get and print the IAM role ARN
print("1.3 Get the IAM role ARN")
role_arn = iam.get_role(RoleName=dwh_iam_role_name)["Role"]["Arn"]

print(role_arn)

## 2.2. S3: check sample data, verify dataset presence.  
The code lists and prints objects in the S3 bucket "udacity-dend" that have keys starting with "song_data".

In [None]:
sample_song_db_bucket = s3.Bucket("udacity-dend")

for obj in sample_song_db_bucket.objects.filter(Prefix="song_data"):
    print(obj)

# Uncomment the following lines to list all objects in the bucket
#for obj in sample_db_bucket.objects.all():
#    print(obj)

## 2.3. Redshift: create cluster

In [None]:
try:
    response = redshift.create_cluster(
        # Parameters for hardware
        ClusterType=dwh_cluster_type,
        NodeType=dwh_node_type,
        NumberOfNodes=int(dwh_num_nodes),

        # Parameters for identifiers & credentials
        DBName=dwh_db,
        ClusterIdentifier=dwh_cluster_identifier,
        MasterUsername=dwh_db_user,
        MasterUserPassword=dwh_db_password,

        # Parameter for role (to allow S3 access)
        IamRoles=[role_arn],

        # Make the cluster publicly accessible
        PubliclyAccessible=True  
    )
except Exception as e:
    print(e)

## 2.4. Monitor cluster status
Format and display the cluster properties in a DataFrame. Run this block multiple times until **ClusterStatus** is **Available**.

In [None]:
def pretty_redshift_props(props):
    pd.set_option("display.max_colwidth", None)
    
    keys_to_show = [
        "ClusterIdentifier", "NodeType", "ClusterStatus", "MasterUsername", 
        "DBName", "Endpoint", "NumberOfNodes", "VpcId", "PubliclyAccessible"
    ]
    
    x = [(k, v) for k, v in props.items() if k in keys_to_show]
    return pd.DataFrame(data=x, columns=["Key", "Value"])

my_cluster_props = redshift.describe_clusters(ClusterIdentifier=dwh_cluster_identifier)["Clusters"][0]
pretty_redshift_props(my_cluster_props)

## 2.5 Take note of endpoint & IAM role
Do not run this unless **ClusterStatus** is **Available**.

In [None]:
dwh_endpoint = my_cluster_props["Endpoint"]["Address"]
dwh_role_arn = my_cluster_props["IamRoles"][0]["IamRoleArn"]

print("DWH_ENDPOINT:", dwh_endpoint)
print("DWH_ROLE_ARN:", dwh_role_arn)

# 3. Open network access
## 3.1 Allow inbound traffic to Redshift by updating the security group.

In [None]:
try:
    vpc = ec2.Vpc(id=my_cluster_props["VpcId"])
    default_sg = list(vpc.security_groups.all())[0]
    print(default_sg)

    default_sg.authorize_ingress(
        GroupName=default_sg.group_name,
        CidrIp="0.0.0.0/0",
        IpProtocol="TCP",
        FromPort=int(dwh_port),
        ToPort=int(dwh_port),
    )
except Exception as e:
    print(e)

## 3.2. PostgreSQL: connect to the cluster

In [None]:
conn_string = f"postgresql://{dwh_db_user}:{dwh_db_password}@{dwh_endpoint}:{dwh_port}/{dwh_db}?sslmode=verify-full&sslrootcert=system"
%sql $conn_string
%sql SELECT current_user;

# 4. Queries

Below is the content from sql_queries.py which will be imported into/by create_tables.py and etl.py queries. What it contains:
- DROP [staging, fact, dimensions] tables if they exist
- CREATE those tables 
- COPY staging tables FROM S3 to Redshift
- INSERT data INTO fact and dimensions tables FROM the staging tables.

In [None]:
%%sql

CREATE SCHEMA IF NOT EXISTS song_play_analysis;
SET search_path TO song_play_analysis;

# DROP TABLES

staging_events_table_drop = "DROP TABLE IF EXISTS staging_events;"
staging_songs_table_drop = "DROP TABLE IF EXISTS staging_songs;"
songplay_table_drop = "DROP TABLE IF EXISTS songplays;"
user_table_drop = "DROP TABLE IF EXISTS users;"
song_table_drop = "DROP TABLE IF EXISTS songs;"
artist_table_drop = "DROP TABLE IF EXISTS artists;"
time_table_drop = "DROP TABLE IF EXISTS time;"

# CREATE TABLES

staging_events_table_create= ("""
CREATE TABLE staging_events (
    artist varchar(max),
    auth varchar(10),
    firstName varchar(25),
    gender varchar(1),
    itemInSession integer sortkey distkey,
    lastName varchar(25),
    length float,
    level varchar(4),
    location varchar(max),
    method varchar(4),
    page varchar(max),
    registration float,
    sessionId integer,
    song varchar(max),
    status smallint,
    ts bigint,
    user_agent varchar(max),
    user_id integer
);
""")

staging_songs_table_create = ("""
CREATE TABLE staging_songs (
    num_songs integer sortkey distkey,
    artist_id varchar(20),
    artist_latitude float,
    artist_longitude float,
    artist_location varchar(max),
    artist_name varchar(max),
    song_id varchar(20),
    title varchar(max),
    duration float,
    year smallint
);
""")

songplay_table_create = ("""
CREATE TABLE songplays (
    songplay_id integer identity(0,1) not null sortkey distkey,
    start_time timestamp not null,
    user_id integer not null,
    level varchar(4) not null,
    song_id varchar(20) not null,
    artist_id varchar(20) not null,
    session_id integer not null,
    location varchar(max),
    user_agent varchar(max) not null
);
""")

user_table_create = ("""
CREATE TABLE users (
    user_id integer not null sortkey distkey,
    first_name varchar(25) not null,
    last_name varchar(25) not null,
    gender varchar(1) not null,
    level varchar(4) not null
);
""")

song_table_create = ("""
CREATE TABLE songs (
    song_id varchar(20) not null sortkey distkey,
    title varchar(max) not null,
    artist_id varchar(20) not null,
    year smallint not null,
    duration float not null
);
""")

artist_table_create = ("""
CREATE TABLE artists (
    artist_id varchar(20) not null sortkey distkey,
    name varchar(max),
    location varchar(max),
    latitude float,
    longitude float
);
""")

time_table_create = ("""
CREATE TABLE time (
    time_id integer identity(0,1) not null sortkey distkey,
    start_time timestamp not null,
    hour smallint not null,
    day smallint not null,
    week smallint not null,
    month smallint not null,
    year smallint not null,
    weekday integer not null
);""")

# STAGING TABLES

staging_events_copy = ("""
COPY staging_events
    FROM 's3://udacity-dend/log_data/'
    credentials 'aws_iam_role={}'
    JSON 's3://udacity-dend/log_json_path.json'
    COMPUPDATE off 
    REGION 'us-west-2'
""".format(config.get('IAM_ROLE', 'ARN')))

staging_songs_copy = ("""
    COPY staging_songs 
    FROM 's3://udacity-dend/song_data/'
    credentials 'aws_iam_role={}'
    JSON 'auto'
    COMPUPDATE off 
    REGION 'us-west-2'
""".format(config.get('IAM_ROLE', 'ARN')))

# FINAL TABLES

songplay_table_insert = ("""
INSERT INTO songplays (
    start_time, user_id, level, song_id, artist_id, session_id, location, user_agent
)
SELECT
    TIMESTAMP 'epoch' + se.ts/1000 * INTERVAL '1 second' AS start_time,
    se.user_id,
    se.level,
    ss.song_id,
    ss.artist_id,
    se.sessionId,
    se.location,
    se.user_agent
FROM staging_events se
JOIN staging_songs ss
  ON se.song = ss.title 
  AND se.artist = ss.artist_name
  AND se.length = ss.durationWHERE se.page = 'NextSong';
""")

user_table_insert = ("""
INSERT INTO users (
    user_id, first_name, last_name, gender, level
)
SELECT DISTINCT
    user_id,
    firstName,
    lastName,
    gender,
    level
FROM staging_events
WHERE page = 'NextSong' AND user_id IS NOT NULL;
""")

song_table_insert = ("""
INSERT INTO songs (
    song_id, title, artist_id, year, duration
)
SELECT DISTINCT
    song_id,
    title,
    artist_id,
    year,
    duration
FROM staging_songs;
""")

artist_table_insert = ("""
INSERT INTO artists (
    artist_id, name, location, latitude, longitude
)
SELECT DISTINCT
    artist_id,
    artist_name AS name,
    artist_location AS location,
    artist_latitude AS latitude,
    artist_longitude AS longitude
FROM staging_songs;
""")

time_table_insert = ("""
INSERT INTO time (
    start_time, hour, day, week, month, year, weekday
)
SELECT DISTINCT
    start_time,
    EXTRACT(hour FROM start_time),
    EXTRACT(day FROM start_time),
    EXTRACT(week FROM start_time),
    EXTRACT(month FROM start_time),
    EXTRACT(year FROM start_time),
    EXTRACT(weekday FROM start_time)
FROM (
    SELECT TIMESTAMP 'epoch' + ts/1000 * INTERVAL '1 second' AS start_time
    FROM staging_events
    WHERE page = 'NextSong'
) AS time_data;
""")

# ANALYTICS QUERIES

top_artists_query = ("""
SELECT a.name, COUNT(sp.artist_id) AS artist_count
FROM artists a
JOIN songplays sp ON a.artist_id = sp.artist_id
GROUP BY a.name
ORDER BY artist_count DESC
LIMIT 5;
""")

top_genders_query = ("""
SELECT gender, COUNT(gender) AS gender_count
FROM staging_events
WHERE page = 'NextSong'
GROUP BY gender
ORDER BY gender_count DESC;
""")

peak_hours_query = ("""
SELECT hour, COUNT(hour) AS peak_hour_count
FROM time
GROUP BY hour
ORDER BY peak_hour_count DESC
LIMIT 5;
""")

# QUERY LISTS

create_table_queries = [staging_events_table_create, staging_songs_table_create, songplay_table_create, user_table_create, song_table_create, artist_table_create, time_table_create]
drop_table_queries = [staging_events_table_drop, staging_songs_table_drop, songplay_table_drop, user_table_drop, song_table_drop, artist_table_drop, time_table_drop]
copy_table_queries = [staging_events_copy, staging_songs_copy]
insert_table_queries = [songplay_table_insert, user_table_insert, song_table_insert, artist_table_insert, time_table_insert]

analytics_queries = [
    ("Who are the top 5 most played artists?", top_artists_query),
    ("Which gender listened to more songs?", top_genders_query),
    ("What are the 5 peak listening hours?", peak_hours_query)
]

# 5. Create tables

Below is the content from create_tables.py script.  
The script connects to the database, creates the tables, and drops them beforehand if they already exists.

In [None]:
def drop_tables(cur, conn):
    for query in drop_table_queries:
        cur.execute(query)
        conn.commit()


def create_tables(cur, conn):
    for query in create_table_queries:
        cur.execute(query)
        conn.commit()


def main():
    config = configparser.ConfigParser()
    config.read('dwh.cfg')

    conn = psycopg2.connect("host={} dbname={} user={} password={} port={}".format(*config['CLUSTER'].values()))
    cur = conn.cursor()

    drop_tables(cur, conn)
    create_tables(cur, conn)

    conn.close()


if __name__ == "__main__":
    main()

## 5.1. Check for data loading errors

In [None]:
%%sql
SELECT *
FROM stl_load_errors
ORDER BY starttime DESC
LIMIT 5;

## 5.2. Check for created and loaded tables

In [None]:
%sql SELECT tablename FROM pg_tables WHERE schemaname = 'song_play_analysis';

## 5.3. Check for the tables' characteristics

In [None]:
%%sql
SET search_path TO song_play_analysis;

SELECT table_name, column_name, data_type, is_nullable, column_default
FROM information_schema.columns
WHERE table_schema = 'song_play_analysis'
ORDER BY table_name, column_name;

# 6. ETL

Below is the content from the etl.py. What this script does:
- connects to the Sparkify Redshift database, 
- loads log_data and song_data into staging tables, 
- transforms them into the five tables, 
- runs three analytics queries.

In [None]:
def load_staging_tables(cur, conn):
    for query in copy_table_queries:
        cur.execute(query)
        conn.commit()


def insert_tables(cur, conn):
    for query in insert_table_queries:
        cur.execute(query)
        conn.commit()


def run_analytics_queries(cur, conn):
    print("\n─── Songplay Analytics ───")
    for question, query in analytics_queries:
        print(f"\n{question}")
        cur.execute(query)
        rows = cur.fetchall()
        for row in rows:
            print(row)
    conn.commit()


def main():
    config = configparser.ConfigParser()
    config.read('dwh.cfg')

    conn = psycopg2.connect("host={} dbname={} user={} password={} port={}".format(*config['CLUSTER'].values()))
    cur = conn.cursor()
    
    load_staging_tables(cur, conn)
    insert_tables(cur, conn)
    run_analytics_queries(cur, conn)

    conn.close()


if __name__ == "__main__":
    main()

# 7. Clean up AWS resources

## 7.1. Delete cluster

In [None]:
redshift.delete_cluster( ClusterIdentifier=dwh_cluster_identifier, SkipFinalClusterSnapshot=True)

Run the following block several times until the cluster is deleted = `ClusterNotFoundFault`.

In [None]:
myClusterProps = redshift.describe_clusters(ClusterIdentifier=dwh_cluster_identifier)['Clusters'][0]
pretty_redshift_props(myClusterProps)

## 7.2. Delete IAM role

In [None]:
iam.detach_role_policy(RoleName=dwh_iam_role_name, PolicyArn="arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess")
iam.delete_role(RoleName=dwh_iam_role_name)