# Exercise 3: Parallel ETL

In [11]:
%load_ext sql

In [2]:
import boto3
import configparser
import matplotlib.pyplot as plt
import pandas as pd
from time import time

# STEP 1: Get the params of the created redshift cluster 
- We need:
    - The redshift cluster <font color='red'>endpoint</font>
    - The <font color='red'>IAM role ARN</font> that give access to Redshift to read from S3

In [3]:
config = configparser.ConfigParser()
config.read_file(open('dwh.cfg'))
KEY                    = config.get('AWS','KEY')
SECRET                 = config.get('AWS','SECRET')

DWH_DB= config.get("DWH","DWH_DB")
DWH_DB_USER= config.get("DWH","DWH_DB_USER")
DWH_DB_PASSWORD= config.get("DWH","DWH_DB_PASSWORD")
DWH_PORT = config.get("DWH","DWH_PORT")

print(DWH_DB, DWH_DB_USER, DWH_DB_PASSWORD, DWH_PORT)


dwh dwhuser Passw0rd 5439


In [4]:
# FILL IN THE REDSHIFT ENPOINT HERE

DWH_ENDPOINT="dwhcluster.c7sugqgmfjzq.us-west-2.redshift.amazonaws.com" 

DWH_ROLE_ARN="arn:aws:iam::988451517811:role/dwhRole"

# STEP 2: Connect to the Redshift Cluster

In [None]:
import psycopg2

def run_query(sql):
    try:
        conn = psycopg2.connect(
            dbname='dwh',
            user='dwhuser',
            password='Passw0rd',
            host='dwhcluster.c7sugqgmfjzq.us-west-2.redshift.amazonaws.com',
            port='5439'
        )
        cur = conn.cursor()
        cur.execute(sql.strip())

        conn.commit()  # in case it's not autocommit
        cur.close()
        conn.close()
        return "✅ Query executed."

    except Exception as e:
        print("❌ Query failed:", e)
        return None

In [5]:
import boto3

s3 = boto3.resource(
    's3',
    region_name='us-west-2',
    aws_access_key_id=KEY,
    aws_secret_access_key=SECRET
)

sampleDbBucket = s3.Bucket('udacity-labs')


In [6]:
for obj in sampleDbBucket.objects.filter(Prefix="tickets"):
    print(obj)

s3.ObjectSummary(bucket_name='udacity-labs', key='tickets/')
s3.ObjectSummary(bucket_name='udacity-labs', key='tickets/full/')
s3.ObjectSummary(bucket_name='udacity-labs', key='tickets/full/full.csv.gz')
s3.ObjectSummary(bucket_name='udacity-labs', key='tickets/split/')
s3.ObjectSummary(bucket_name='udacity-labs', key='tickets/split/part-00000-d33afb94-b8af-407d-abd5-59c0ee8f5ee8-c000.csv.gz')
s3.ObjectSummary(bucket_name='udacity-labs', key='tickets/split/part-00001-d33afb94-b8af-407d-abd5-59c0ee8f5ee8-c000.csv.gz')
s3.ObjectSummary(bucket_name='udacity-labs', key='tickets/split/part-00002-d33afb94-b8af-407d-abd5-59c0ee8f5ee8-c000.csv.gz')
s3.ObjectSummary(bucket_name='udacity-labs', key='tickets/split/part-00003-d33afb94-b8af-407d-abd5-59c0ee8f5ee8-c000.csv.gz')
s3.ObjectSummary(bucket_name='udacity-labs', key='tickets/split/part-00004-d33afb94-b8af-407d-abd5-59c0ee8f5ee8-c000.csv.gz')
s3.ObjectSummary(bucket_name='udacity-labs', key='tickets/split/part-00005-d33afb94-b8af-407d-abd5-

# STEP 3: Create Tables

In [69]:
query = 'DROP TABLE IF EXISTS "sport_event_ticket";'
print(run_query(query))
    
query = """CREATE TABLE "sport_event_ticket" (
        "id" double precision DEFAULT nextval('sport_event_ticket') NOT NULL,
        "sporting_event_id" double precision NOT NULL,
        "sport_location_id" double precision NOT NULL,
        "seat_level" numeric(1,0) NOT NULL,
        "seat_section" character varying(15) NOT NULL,
        "seat_row" character varying(10) NOT NULL,
        "seat" character varying(10) NOT NULL,
        "ticketholder_id" double precision,
        "ticket_price" numeric(8,2) NOT NULL
    );
"""

result = run_query(query)
print(result)

✅ Query executed.
✅ Query executed.


# STEP 4: Load Partitioned data into the cluster
Use the COPY command to load data from `s3://udacity-labs/tickets/split/part` using your iam role credentials. Use gzip delimiter `;`.

In [73]:

query2 = """
COPY sport_event_ticket FROM 's3://udacity-labs/tickets/split/part'
CREDENTIALS 'aws_iam_role=arn:aws:iam::988451517811:role/dwhRole'
gzip delimiter ';' REGION 'us-west-2'
""".format(DWH_ROLE_ARN)

result = run_query(query2)
print(result)

✅ Query executed.


# STEP 5: Create Tables for the non-partitioned data

In [74]:

query = 'DROP TABLE IF EXISTS "sport_event_ticket_full";'
print(run_query(query))

query = """CREATE TABLE "sport_event_ticket_full" (
    "id" double precision DEFAULT nextval('sport_event_ticket_seq') NOT NULL,
    "sporting_event_id" double precision NOT NULL,
    "sport_location_id" double precision NOT NULL,
    "seat_level" numeric(1,0) NOT NULL,
    "seat_section" character varying(15) NOT NULL,
    "seat_row" character varying(10) NOT NULL,
    "seat" character varying(10) NOT NULL,
    "ticketholder_id" double precision,
    "ticket_price" numeric(8,2) NOT NULL
);"""
print(run_query(query))

✅ Query executed.
✅ Query executed.


# STEP 6: Load non-partitioned data into the cluster
Use the COPY command to load data from `s3://udacity-labs/tickets/full/full.csv.gz` using your iam role credentials. Use gzip delimiter `;`.

- Note how it's slower than loading partitioned data

In [None]:

query2 = """
COPY sport_event_ticket_full FROM 's3://udacity-labs/tickets/full/full.csv.gz'
CREDENTIALS 'aws_iam_role=arn:aws:iam::988451517811:role/dwhRole'
gzip delimiter ';' REGION 'us-west-2'
""".format(DWH_ROLE_ARN)

result = run_query(query2)
print(result)