# Test ETL implementation

This notebook help to test our ETL implementation script.
Main steps:
1. Create Redshift cluster on AWS
2. Create database
3. Load the data from S3 to staging tables, then transform and insert data to database

## 1. Create Redshift Cluster on AWS

In [1]:
import pandas as pd
import boto3
import json

### Create iam and ec2 object

In [2]:
import configparser
config = configparser.ConfigParser()
config.read_file(open('dwh.cfg'))

KEY = config['AWS']['KEY']
SECRET = config['AWS']['SECRET']
redshift = boto3.client('redshift',
                       region_name="us-west-2",
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
                       )
iam = boto3.client('iam',aws_access_key_id=KEY,
                     aws_secret_access_key=SECRET,
                     region_name='us-west-2'
                  )
ec2 = boto3.resource('ec2',
                       region_name="us-west-2",
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
                    )

### Create Role

In [3]:
from botocore.exceptions import ClientError

DWH_IAM_ROLE_NAME = config['DWH']['DWH_IAM_ROLE_NAME']
#1.1 Create the role, 
try:
    print("1.1 Creating a new IAM Role") 
    dwhRole = iam.create_role(
        Path='/',
        RoleName=DWH_IAM_ROLE_NAME,
        Description = "Allows Redshift clusters to call AWS services on your behalf.",
        AssumeRolePolicyDocument=json.dumps(
            {'Statement': [{'Action': 'sts:AssumeRole',
               'Effect': 'Allow',
               'Principal': {'Service': 'redshift.amazonaws.com'}}],
             'Version': '2012-10-17'})
    )    
except Exception as e:
    print(e)
    
    
print("1.2 Attaching Policy")

iam.attach_role_policy(RoleName=DWH_IAM_ROLE_NAME,
                       PolicyArn="arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess"
                      )['ResponseMetadata']['HTTPStatusCode']

print("1.3 Get the IAM role ARN")
roleArn = iam.get_role(RoleName=DWH_IAM_ROLE_NAME)['Role']['Arn']

print(roleArn)

1.1 Creating a new IAM Role
An error occurred (EntityAlreadyExists) when calling the CreateRole operation: Role with name dwhRole already exists.
1.2 Attaching Policy
1.3 Get the IAM role ARN
arn:aws:iam::172826706179:role/dwhRole


### Create Redshift Cluster

In [4]:
DWH_CLUSTER_TYPE = config['DWH']['DWH_CLUSTER_TYPE']
DWH_NODE_TYPE = config['DWH']['DWH_NODE_TYPE']
DWH_NUM_NODES = config['DWH']['DWH_NUM_NODES']

DWH_DB = config['CLUSTER']['DB_NAME']
DWH_CLUSTER_IDENTIFIER = config['DWH']['DWH_CLUSTER_IDENTIFIER']
DWH_DB_USER = config['CLUSTER']['DB_USER']
DWH_DB_PASSWORD = config['CLUSTER']['DB_PASSWORD']

In [5]:

#roleArn = config['IAM_ROLE']['ARN']

try:
    response = redshift.create_cluster(        
        #HW
        ClusterType=DWH_CLUSTER_TYPE,
        NodeType=DWH_NODE_TYPE,
        NumberOfNodes=int(DWH_NUM_NODES),

        #Identifiers & Credentials
        DBName=DWH_DB,
        ClusterIdentifier=DWH_CLUSTER_IDENTIFIER,
        MasterUsername=DWH_DB_USER,
        MasterUserPassword=DWH_DB_PASSWORD,
        
        #Roles (for s3 access)
        IamRoles=[roleArn]  
    )
except Exception as e:
    print(e)

In [5]:
def prettyRedshiftProps(props):
    pd.set_option('display.max_colwidth', -1)
    keysToShow = ["ClusterIdentifier", "NodeType", "ClusterStatus", "MasterUsername", "DBName", "Endpoint", "NumberOfNodes", 'VpcId']
    x = [(k, v) for k,v in props.items() if k in keysToShow]
    return pd.DataFrame(data=x, columns=["Key", "Value"])

myClusterProps = redshift.describe_clusters(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)['Clusters'][0]
prettyRedshiftProps(myClusterProps)

Unnamed: 0,Key,Value
0,ClusterIdentifier,dwhcluster
1,NodeType,dc2.large
2,ClusterStatus,available
3,MasterUsername,dwhuser
4,DBName,dwh
5,Endpoint,"{'Address': 'dwhcluster.c32slsdkedl6.us-west-2.redshift.amazonaws.com', 'Port': 5439}"
6,VpcId,vpc-d2f351aa
7,NumberOfNodes,4


In [6]:
DWH_ENDPOINT = myClusterProps['Endpoint']['Address']
DWH_ROLE_ARN = myClusterProps['IamRoles'][0]['IamRoleArn']
print("DWH_ENDPOINT :: ", DWH_ENDPOINT)
print("DWH_ROLE_ARN :: ", DWH_ROLE_ARN)

DWH_ENDPOINT ::  dwhcluster.c32slsdkedl6.us-west-2.redshift.amazonaws.com
DWH_ROLE_ARN ::  arn:aws:iam::172826706179:role/dwhRole


### Set up TCP connection

In [7]:
DWH_PORT = config['CLUSTER']['DB_PORT']

try:
    vpc = ec2.Vpc(id=myClusterProps['VpcId'])
    defaultSg = list(vpc.security_groups.all())[0]
    print(defaultSg)
    defaultSg.authorize_ingress(
        GroupName=defaultSg.group_name,
        CidrIp='0.0.0.0/0',
        IpProtocol='TCP',
        FromPort=int(DWH_PORT),
        ToPort=int(DWH_PORT)
    )
except Exception as e:
    print(e)

ec2.SecurityGroup(id='sg-fc2885b7')
An error occurred (InvalidPermission.Duplicate) when calling the AuthorizeSecurityGroupIngress operation: the specified rule "peer: 0.0.0.0/0, TCP, from port: 5439, to port: 5439, ALLOW" already exists


## 2. Create database

In [10]:
%load_ext autoreload
%autoreload 2

import psycopg2
from sql_queries import create_table_queries, drop_table_queries


def drop_tables(cur, conn):
    """drop tables(if exists) in the database 
    :param cur: cursor object
    :param conn: psycopg's connection object (connect to a database in Redshift cluster)
    
    """
    for query in drop_table_queries:
        cur.execute(query)
        conn.commit()


def create_tables(cur, conn):
    """create tables in the database
    :param cur: cursor object
    :param conn: psycopg's connection object (connect to a database in Redshift cluster)
    
    """
    for query in create_table_queries:
        cur.execute(query)
        conn.commit()


def main():
    config = configparser.ConfigParser()
    config.read('dwh.cfg')

    conn = psycopg2.connect("host={} dbname={} user={} password={} port={}".format(*config['CLUSTER'].values()))
    cur = conn.cursor()

    drop_tables(cur, conn)
    create_tables(cur, conn)

    conn.close()


if __name__ == "__main__":
    main()

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


## 3. ETL

In [11]:
import psycopg2
from sql_queries import copy_table_queries, insert_table_queries
%load_ext autoreload
%autoreload 2

def load_staging_tables(cur, conn):
    """load raw data from S3 to staging tables in Redshift.
    :param cur: cursor object
    :param conn: psycopg's connection object (connect to a database in Redshift cluster)
    
    """
    for query in copy_table_queries:
        cur.execute(query)
        conn.commit()


def insert_tables(cur, conn):
    """insert data from staging tables to other tables.
    :param cur: cursor object
    :param conn: psycopg's connection object (connect to a database in Redshift cluster)
    
    """
    for query in insert_table_queries:
        cur.execute(query)
        conn.commit()


def main():
    config = configparser.ConfigParser()
    config.read('dwh.cfg')

    conn = psycopg2.connect("host={} dbname={} user={} password={} port={}".format(*config['CLUSTER'].values()))
    cur = conn.cursor()
    
    load_staging_tables(cur, conn)
    insert_tables(cur, conn)

    conn.close()


if __name__ == "__main__":
    main()

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


#### Check data quality

In [12]:
conn = psycopg2.connect("host={} dbname={} user={} password={} port={}".format(*config['CLUSTER'].values()))
cur = conn.cursor()

In [15]:
cur.execute('SELECT * from songplay LIMIT 5;')
rows=cur.fetchall()
for row in rows:
    print(row)

(173, datetime.datetime(2018, 11, 5, 11, 14), 44, 'paid', 'SOCDOVE12AB01808DE', 'ARYE9E71187B9AA1B3', '269', 'Waterloo-Cedar Falls, IA', 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10.9; rv:31.0) Gecko/20100101 Firefox/31.0')
(243, datetime.datetime(2018, 11, 5, 14, 43, 54), 44, 'paid', 'SOZARNI12A67020744', 'AR3WLE91187B99430A', '269', 'Waterloo-Cedar Falls, IA', 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10.9; rv:31.0) Gecko/20100101 Firefox/31.0')
(165, datetime.datetime(2018, 11, 5, 17, 0, 27), 73, 'paid', 'SOBANHD12A58A7BB7C', 'ARSUFX91187FB3B73E', '255', 'Tampa-St. Petersburg-Clearwater, FL', '"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4) AppleWebKit/537.78.2 (KHTML, like Gecko) Version/7.0.6 Safari/537.78.2"')
(124, datetime.datetime(2018, 11, 7, 18, 11, 11), 15, 'paid', 'SODTRKW12A6D4F9A51', 'AR71MIY1187B9BA0C3', '221', 'Chicago-Naperville-Elgin, IL-IN-WI', '"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/36.0.1985.125 Chrome/36.0.1985.125 Saf

In [17]:
cur.execute('SELECT * from users LIMIT 5;')
rows=cur.fetchall()
for row in rows:
    print(row)

(12, 'Austin', 'Rosales', 'M', 'free')
(18, 'Jacob', 'Rogers', 'M', 'free')
(33, 'Bronson', 'Harris', 'M', 'free')
(35, 'Molly', 'Taylor', 'F', 'free')
(38, 'Gianna', 'Jones', 'F', 'free')


In [18]:
cur.execute('SELECT * from songs LIMIT 5;')
rows=cur.fetchall()
for row in rows:
    print(row)

('SOAADUU12AB0183B6F', 'Intro / Locataire (Instrumental)', 'AR70XXH1187FB44B55', 0, 101.92934)
('SOAAFUV12AB018831D', 'Where Do The Children Play? (LP Version)', 'AR5ZGC11187FB417A3', 0, 216.05832)
('SOAAGXT12A8C13A94D', 'Beautiful Zelda (2007 Digital Remaster)', 'ARTC1LV1187B9A4858', 1968, 145.03138)
('SOAAKBE12A8C139075', 'Emília', 'ART1OPW1187FB3C5EF', 0, 159.7122)
('SOAAKLA12A58A7A3CC', 'Snow Day (LP Version)', 'ARGWNT41187FB463F1', 0, 211.90485)


In [19]:
cur.execute('SELECT * from artists LIMIT 5;')
rows=cur.fetchall()
for row in rows:
    print(row)

('AR00Y9I1187B999412', 'Akercocke', '', None, None)
('AR01SCU1187B9A693C', 'Street Dogs', 'Boston, MA', None, None)
('AR02T3I1187FB4D0E5', 'Aberfeldy', 'Edinburgh, Scotland', None, None)
('AR03IZC1187FB3E058', 'Aidonia', '', None, None)
('AR05IU31187B9B9A1A', 'Alex Ubago', 'Vitoria, Spain', 42.84751, -2.67973)


In [13]:
cur.execute('SELECT * from time LIMIT 10;')
rows=cur.fetchall()
for row in rows:
    print(row)

(datetime.datetime(2018, 11, 2, 16, 35), 16, 2, 44, 11, 2018, 5)
(datetime.datetime(2018, 11, 3, 18, 19, 10), 18, 3, 44, 11, 2018, 6)
(datetime.datetime(2018, 11, 3, 19, 33, 39), 19, 3, 44, 11, 2018, 6)
(datetime.datetime(2018, 11, 5, 1, 48), 1, 5, 45, 11, 2018, 1)
(datetime.datetime(2018, 11, 5, 11, 8, 56), 11, 5, 45, 11, 2018, 1)
(datetime.datetime(2018, 11, 5, 11, 36, 56), 11, 5, 45, 11, 2018, 1)
(datetime.datetime(2018, 11, 5, 18, 26, 7), 18, 5, 45, 11, 2018, 1)
(datetime.datetime(2018, 11, 6, 8, 49, 19), 8, 6, 45, 11, 2018, 2)
(datetime.datetime(2018, 11, 6, 16, 38, 15), 16, 6, 45, 11, 2018, 2)
(datetime.datetime(2018, 11, 6, 20, 12, 11), 20, 6, 45, 11, 2018, 2)


In [20]:
conn.close()

### Delete Redshift Cluster

In [21]:
redshift.delete_cluster(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER,  SkipFinalClusterSnapshot=True)

{'Cluster': {'ClusterIdentifier': 'dwhcluster',
  'NodeType': 'dc2.large',
  'ClusterStatus': 'deleting',
  'ClusterAvailabilityStatus': 'Modifying',
  'MasterUsername': 'dwhuser',
  'DBName': 'dwh',
  'Endpoint': {'Address': 'dwhcluster.c32slsdkedl6.us-west-2.redshift.amazonaws.com',
   'Port': 5439},
  'ClusterCreateTime': datetime.datetime(2019, 12, 29, 8, 19, 50, 484000, tzinfo=tzutc()),
  'AutomatedSnapshotRetentionPeriod': 1,
  'ManualSnapshotRetentionPeriod': -1,
  'ClusterSecurityGroups': [],
  'VpcSecurityGroups': [{'VpcSecurityGroupId': 'sg-fc2885b7',
    'Status': 'active'}],
  'ClusterParameterGroups': [{'ParameterGroupName': 'default.redshift-1.0',
    'ParameterApplyStatus': 'in-sync'}],
  'ClusterSubnetGroupName': 'default',
  'VpcId': 'vpc-d2f351aa',
  'AvailabilityZone': 'us-west-2b',
  'PreferredMaintenanceWindow': 'sat:13:30-sat:14:00',
  'PendingModifiedValues': {},
  'ClusterVersion': '1.0',
  'AllowVersionUpgrade': True,
  'NumberOfNodes': 4,
  'PubliclyAccessible