# Parallel ETL

In [1]:
%load_ext sql

In [2]:
import boto3
import configparser
import matplotlib.pyplot as plt
import pandas as pd
from time import time

# STEP 1: Get the params of the created redshift cluster 
- We need:
    - The redshift cluster <font color='red'>endpoint</font>
    - The <font color='red'>IAM role ARN</font> that give access to Redshift to read from S3

In [3]:
config = configparser.ConfigParser()
config.read_file(open('dwh.cfg'))
KEY=config.get('AWS','KEY')
SECRET= config.get('AWS','SECRET')

DWH_DB= config.get("DWH","DWH_DB")
DWH_DB_USER= config.get("DWH","DWH_DB_USER")
DWH_DB_PASSWORD= config.get("DWH","DWH_DB_PASSWORD")
DWH_PORT = config.get("DWH","DWH_PORT")

DWH_ENDPOINT = config.get("DWH","DWH_ENDPOINT")
DWH_ROLE_ARN = config.get("DWH","DWH_ROLE_ARN")

LOG_DATA = config.get("S3","LOG_DATA")
LOG_JSONPATH = config.get("S3","LOG_JSONPATH")
SONG_DATA = config.get("S3","SONG_DATA")

# STEP 2: Connect to the Redshift Cluster

In [4]:
conn_string="postgresql://{}:{}@{}:{}/{}".format(DWH_DB_USER, DWH_DB_PASSWORD, DWH_ENDPOINT, DWH_PORT, DWH_DB)
print(conn_string)
%sql $conn_string

postgresql://awsuser:Passw0rd@dwhclusterp3.censvj08nksa.us-west-2.redshift.amazonaws.com:5439/dev


'Connected: awsuser@dev'

In [31]:
s3 = boto3.resource(
    's3',
    region_name="us-west-2",
    aws_access_key_id=KEY,
    aws_secret_access_key=SECRET
)

sampleDbBucket =  s3.Bucket("udacity-dend")

In [None]:
for obj in sampleDbBucket.objects.filter(Prefix="song_data"):
    print(obj)

In [34]:
s3 = boto3.client(    
    's3',
    region_name="us-west-2",
    aws_access_key_id=KEY,
    aws_secret_access_key=SECRET
)

In [None]:
with open('log_data_file', 'wb') as f:
    s3.download_fileobj('udacity-dend', 'log_data/2018/11/2018-11-01-events.json', f)

In [35]:
with open('song_data_file', 'wb') as f:
    s3.download_fileobj('udacity-dend', 'song_data/A/A/A/TRAAAAK128F9318786.json', f)

# STEP 3: Create Tables

In [29]:
%%time

staging_events_table_drop = "DROP TABLE IF EXISTS staging_events;"

staging_events_table_create= ("""
CREATE TABLE staging_events (
    "songplay_id" BIGINT IDENTITY(1,1), 
    "artist" TEXT,
    "auth" TEXT,
    "firstName" TEXT,
    "gender" CHAR,
    "itemInSession" INTEGER,
    "lastName" TEXT,
    "length" DOUBLE PRECISION,
    "level" TEXT,
    "location" TEXT,
    "method" TEXT,
    "page" TEXT,
    "registration" DOUBLE PRECISION,
    "sessionId" INTEGER,
    "song" TEXT,
    "status" INTEGER,
    "ts" BIGINT,
    "userAgent" TEXT,
    "userId" TEXT
);
""")

%sql $staging_events_table_drop
%sql $staging_events_table_create

 * postgresql://awsuser:***@dwhclusterp3.censvj08nksa.us-west-2.redshift.amazonaws.com:5439/dev
Done.
 * postgresql://awsuser:***@dwhclusterp3.censvj08nksa.us-west-2.redshift.amazonaws.com:5439/dev
Done.
CPU times: user 7.31 ms, sys: 2.84 ms, total: 10.1 ms
Wall time: 545 ms


# STEP 4: Load Partitioned data into the cluster
Use the COPY command to load data from S3 using your iam role credentials. Use gzip delimiter `;`.

In [30]:
%%time

staging_songs_copy = ("""
    COPY staging_events 
    FROM '{}'
    CREDENTIALS 'aws_iam_role={}'
    REGION 'us-west-2'
    FORMAT AS json '{}';
""").format(LOG_DATA, DWH_ROLE_ARN, LOG_JSONPATH)

%sql $staging_songs_copy

 * postgresql://awsuser:***@dwhclusterp3.censvj08nksa.us-west-2.redshift.amazonaws.com:5439/dev
Done.
CPU times: user 3.4 ms, sys: 662 µs, total: 4.06 ms
Wall time: 25.6 s


# STEP 5: Create Tables for the non-partitioned data

In [None]:
%%time

staging_songs_table_drop = "DROP TABLE IF EXISTS staging_songs;"

staging_songs_table_create = ("""
CREATE TABLE staging_songs (
    "song_id" BIGINT IDENTITY(1,1), 
    "num_songs" INTEGER,
    "artist_id" TEXT,
    "artist_latitude" TEXT,
    "artist_longitude" TEXT,
    "artist_location" TEXT,
    "artist_name" TEXT,
    "song_id" TEXT,
    "title" TEXT,
    "duration" DOUBLE PRECISION,
    "year" INTEGER
);
""")

%sql $staging_songs_table_drop
%sql $staging_songs_table_create

# STEP 6: Load non-partitioned data into the cluster
Use the COPY command to load data from `s3://udacity-labs/tickets/full/full.csv.gz` using your iam role credentials. Use gzip delimiter `;`.

- Note how it's slower than loading partitioned data

In [28]:
%%time

staging_songs_copy = ("""
COPY staging_songs 
FROM '{}'
credentials 'aws_iam_role={}'
region 'us-west-2'
json 'auto'
""").format(SONG_DATA, DWH_ROLE_ARN)

%sql $staging_songs_copy

 * postgresql://awsuser:***@dwhclusterp3.censvj08nksa.us-west-2.redshift.amazonaws.com:5439/dev
Done.
CPU times: user 5.14 ms, sys: 0 ns, total: 5.14 ms
Wall time: 12min 59s


In [None]:
test = ("""
select * from stl_load_errors;
""")

%sql $test