# Amazon Data Engineering

In [59]:
# Needed packages
import boto3 
import pandas as pd 
import matplotlib.pyplot as plt
from botocore.exceptions import ClientError
import configparser 
from time import time



Let use the Configpaser to parse the config file and use our variables to create and lauch a cluster on the amazon aws.  

In [76]:
config = configparser.ConfigParser()
config.read_file(open('config.cfg'))

KEY = config.get('AWS', 'KEY')
SECRET = config.get('AWS', 'SECRET') 

DWH_CLUSTER_TYPE = config.get('DWH', 'DWH_CLUSTER_TYPE')
DWH_NUM_NODES = config.get('DWH', 'DWH_NUM_NODES')
DWH_CLUSTER_IDENTIFIER = config.get('DWH', 'DWH_CLUSTER_IDENTIFIER')
DWH_DB = config.get('DWH', 'DWH_DB')
DWH_DB_USER = config.get('DWH', 'DWH_DB_USER')
DWH_DB_PASSWORD = config.get('DWH', 'DWH_PASSWORD')
DWH_DB_PORT = config.get('DWH', 'DWH_PORT')
DWH_IAM_ROLE = config.get('DWH', 'DWH_IAM_ROLE')
DWH_NODE_TYPE = config.get('DWH', 'DWH_NODE_TYPE')

pd.DataFrame ( {'Param': ["DWH_CLUSTER_TYPE", "DWH_NUM_NODES", "DWH_CLUSTER_IDENTIFIER", "DWH_DB_USER",  "DWH_DB_PORT", "DWH_IAM_ROLE", "DWH_NODE_TYPE"], 
                'Value': [DWH_CLUSTER_TYPE, DWH_NUM_NODES, DWH_CLUSTER_IDENTIFIER, DWH_DB_USER, DWH_DB_PORT, DWH_IAM_ROLE, DWH_NODE_TYPE]} )

Unnamed: 0,Param,Value
0,DWH_CLUSTER_TYPE,multi-node
1,DWH_NUM_NODES,4
2,DWH_CLUSTER_IDENTIFIER,redshift-cluster
3,DWH_DB_USER,awsuser
4,DWH_DB_PORT,5439
5,DWH_IAM_ROLE,myRedshiftRole
6,DWH_NODE_TYPE,dc2.large


Creating the Amazon Instances to Connect to. 

In [17]:
ec2 = boto3.resource('ec2', 
        region_name = 'us-west-2', 
        aws_access_key_id = KEY, 
        aws_secret_access_key = SECRET
)

s3 = boto3.resource('s3', 
        region_name = 'us-west-2', 
        aws_access_key_id = KEY, 
        aws_secret_access_key = SECRET
)

iam = boto3.client('iam',
        region_name = 'us-west-2', 
        aws_access_key_id = KEY, 
        aws_secret_access_key = SECRET
)

redshift = boto3.client ('redshift',
        region_name = 'us-west-2', 
        aws_access_key_id = KEY, 
        aws_secret_access_key = SECRET

)

### Testing the S3

Since I am using the sample data provided by amazon I am going to test the connection to the sample buckets in amazon.  Below we see we get results back from amazon S3.   


In [19]:
sampleBucket = s3.Bucket("awssampledbuswest2")

for obj in sampleBucket.objects.filter(Prefix='ssbgz'): 
    print(obj)

s3.ObjectSummary(bucket_name=&#39;awssampledbuswest2&#39;, key=&#39;ssbgz/&#39;)
s3.ObjectSummary(bucket_name=&#39;awssampledbuswest2&#39;, key=&#39;ssbgz/customer0002_part_00.gz&#39;)
s3.ObjectSummary(bucket_name=&#39;awssampledbuswest2&#39;, key=&#39;ssbgz/dwdate.tbl.gz&#39;)
s3.ObjectSummary(bucket_name=&#39;awssampledbuswest2&#39;, key=&#39;ssbgz/lineorder0000_part_00.gz&#39;)
s3.ObjectSummary(bucket_name=&#39;awssampledbuswest2&#39;, key=&#39;ssbgz/lineorder0001_part_00.gz&#39;)
s3.ObjectSummary(bucket_name=&#39;awssampledbuswest2&#39;, key=&#39;ssbgz/lineorder0002_part_00.gz&#39;)
s3.ObjectSummary(bucket_name=&#39;awssampledbuswest2&#39;, key=&#39;ssbgz/lineorder0003_part_00.gz&#39;)
s3.ObjectSummary(bucket_name=&#39;awssampledbuswest2&#39;, key=&#39;ssbgz/lineorder0004_part_00.gz&#39;)
s3.ObjectSummary(bucket_name=&#39;awssampledbuswest2&#39;, key=&#39;ssbgz/lineorder0005_part_00.gz&#39;)
s3.ObjectSummary(bucket_name=&#39;awssampledbuswest2&#39;, key=&#39;ssbgz/lineorder0006_par

Before Launch the cluster I am going to create an I AM ROLE. The reason I am doing this is to show case and end to end.  Once a cluster is created with the nessary roles and policies in place all we need to connect to the cluster is endpoint, and the credentials.    

In [54]:
try: 
    print('Created a new I AM Role')
    dwhrole = iam.create_role(
        Path='/', 
        RoleName=DWH_IAM_ROLE, 
        Description='Allows Redshift clusters to call aws services on my behalf',
        AssumeRolePolicyDocument=json.dumps(
            {
                'Statement': [{
                    'Actiion': 'sts:AssumeRole', 
                    'Effect': 'Allow', 
                    'Principal': {
                        'Service':'redshift-amazonaws.com'
                    }
                }], 
                'Version': '2012-10-17'
            }
        )
    )
except Exception as e: 
    print(e)

print('Attached Policy')

iam.attach_role_policy(
    RoleName=DWH_IAM_ROLE, 
    PolicyArn='arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess'
)['ResponseMetadata']['HTTPStatusCode']


ARNROLE = iam.get_role(RoleName=DWH_IAM_ROLE)['Role']['Arn']


Created a new I AM Role
name &#39;DWH_IAM_ROLENAME&#39; is not defined
Attached Policy
Get I am role ARN


### Create and Launch Redshift Cluster

In [31]:
try: 
    response = redshift.create_cluster(
        ClusterType = DWH_CLUSTER_TYPE, 
        NodeType = DWH_NODE_TYPE, 
        NumberOfNodes = int(DWH_NUM_NODES), 
        DBName = DWH_DB, 
        ClusterIdentifier = DWH_CLUSTER_IDENTIFIER, 
        MasterUsername = DWH_DB_USER, 
        MasterUserPassword = DWH_DB_PASSWORD,

        IamRoles = [ARNROLE]
    )
except Exception as e: 
    print(e)

Use the below code to see the status of the cluster, you may need to execute this code a number of time until you see cluster becoming "Available".  May take serveral minutes.

In [53]:
def prettyRedshiftProps(props):
    pd.set_option('display.max_colwidth', -1)
    keysToShow = ["ClusterIdentifier", "NodeType", "ClusterStatus", "MasterUsername", "DBName", "NumberOfNodes"]
    x = [(k, v) for k,v in props.items() if k in keysToShow]
    return pd.DataFrame(data=x, columns=["Key", "Value"])

myClusterProps = redshift.describe_clusters(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)['Clusters'][0]
prettyRedshiftProps(myClusterProps)

Unnamed: 0,Key,Value
0,ClusterIdentifier,redshift-cluster
1,NodeType,dc2.large
2,ClusterStatus,available
3,MasterUsername,awsuser
4,DBName,dev
5,NumberOfNodes,4


After the cluster becomes available we need to access the cluster endpoint and ARN role we will do this by using the below code. We can print the results of this and lets make a note of the endpoint and role arn we going to need that info to connect to the cluster. 

In [51]:
DWH_ENDPOINT = myClusterProps['Endpoint']['Address']
DWH_ROLE_ARN = myClusterProps['IamRoles'][0]['IamRoleArn']

Before we proceed to connecting to the cluster we need to open port to be able to connect from the outside.   I already have define the port so this already exists.  If you haven't you will see a different message.  

In [46]:
try:
    vpc = ec2.Vpc(id=myClusterProps['VpcId'])
    defaultSg = list(vpc.security_groups.all())[0]
    print(defaultSg)
    defaultSg.authorize_ingress(
        GroupName=defaultSg.group_name,
        CidrIp='0.0.0.0/0',
        IpProtocol='TCP',
        FromPort=int(DWH_DB_PORT),
        ToPort=int(DWH_DB_PORT)
    )
except Exception as e:
    print(e)

ec2.SecurityGroup(id=&#39;sg-8c8917a6&#39;)
An error occurred (InvalidPermission.Duplicate) when calling the AuthorizeSecurityGroupIngress operation: the specified rule &quot;peer: 0.0.0.0/0, TCP, from port: 5439, to port: 5439, ALLOW&quot; already exists


### CONNECTING TO CLUSTER

In [47]:
%load_ext sql

In [50]:
conn_string="postgresql://{}:{}@{}:{}/{}".format(DWH_DB_USER, DWH_DB_PASSWORD, DWH_ENDPOINT, DWH_DB_PORT,DWH_DB)
%sql $conn_string

### CREATE THE SCHEMA AND TABLE

I have decided to create a distributed table schema this reason for this is using this schema once the tables are loaded we can quickly query our data. 

What this will do is to create partition on the table. 

In [73]:
%%sql 

CREATE SCHEMA IF NOT EXISTS dist;
SET search_path TO dist;

DROP TABLE IF EXISTS part cascade;
DROP TABLE IF EXISTS supplier;
DROP TABLE IF EXISTS supplier;
DROP TABLE IF EXISTS customer;
DROP TABLE IF EXISTS dwdate;
DROP TABLE IF EXISTS lineorder;

CREATE TABLE part (
  p_partkey     	integer     	not null	sortkey distkey,
  p_name        	varchar(22) 	not null,
  p_mfgr        	varchar(6)      not null,
  p_category    	varchar(7)      not null,
  p_brand1      	varchar(9)      not null,
  p_color       	varchar(11) 	not null,
  p_type        	varchar(25) 	not null,
  p_size        	integer     	not null,
  p_container   	varchar(10)     not null
);

CREATE TABLE supplier (
  s_suppkey     	integer        not null sortkey,
  s_name        	varchar(25)    not null,
  s_address     	varchar(25)    not null,
  s_city        	varchar(10)    not null,
  s_nation      	varchar(15)    not null,
  s_region      	varchar(12)    not null,
  s_phone       	varchar(15)    not null)
diststyle all;

CREATE TABLE customer (
  c_custkey     	integer        not null sortkey,
  c_name        	varchar(25)    not null,
  c_address     	varchar(25)    not null,
  c_city        	varchar(10)    not null,
  c_nation      	varchar(15)    not null,
  c_region      	varchar(12)    not null,
  c_phone       	varchar(15)    not null,
  c_mktsegment      varchar(10)    not null)
diststyle all;

CREATE TABLE dwdate (
  d_datekey            integer       not null sortkey,
  d_date               varchar(19)   not null,
  d_dayofweek	      varchar(10)   not null,
  d_month      	    varchar(10)   not null,
  d_year               integer       not null,
  d_yearmonthnum       integer  	 not null,
  d_yearmonth          varchar(8)	not null,
  d_daynuminweek       integer       not null,
  d_daynuminmonth      integer       not null,
  d_daynuminyear       integer       not null,
  d_monthnuminyear     integer       not null,
  d_weeknuminyear      integer       not null,
  d_sellingseason      varchar(13)    not null,
  d_lastdayinweekfl    varchar(1)    not null,
  d_lastdayinmonthfl   varchar(1)    not null,
  d_holidayfl          varchar(1)    not null,
  d_weekdayfl          varchar(1)    not null)
diststyle all;

CREATE TABLE lineorder (
  lo_orderkey      	    integer     	not null,
  lo_linenumber        	integer     	not null,
  lo_custkey           	integer     	not null,
  lo_partkey           	integer     	not null distkey,
  lo_suppkey           	integer     	not null,
  lo_orderdate         	integer     	not null sortkey,
  lo_orderpriority     	varchar(15)     not null,
  lo_shippriority      	varchar(1)      not null,
  lo_quantity          	integer     	not null,
  lo_extendedprice     	integer     	not null,
  lo_ordertotalprice   	integer     	not null,
  lo_discount          	integer     	not null,
  lo_revenue           	integer     	not null,
  lo_supplycost        	integer     	not null,
  lo_tax               	integer     	not null,
  lo_commitdate         integer         not null,
  lo_shipmode          	varchar(10)     not null
);




### LOADING THE TABLES

The below function will load the tables from S3

In [57]:
def loadTables(schema, tables):
    loadTimes = []
    SQL_SET_SCEMA = "SET search_path TO {};".format(schema)
    %sql $SQL_SET_SCEMA
    
    for table in tables:
        SQL_COPY = """
copy {} from 's3://awssampledbuswest2/ssbgz/{}' 
credentials 'aws_iam_role={}'
gzip region 'us-west-2';
        """.format(table,table, ARNROLE)

        print("======= LOADING TABLE: ** {} ** IN SCHEMA ==> {} =======".format(table, schema))
        print(SQL_COPY)

        t0 = time()
        %sql $SQL_COPY
        loadTime = time()-t0
        loadTimes.append(loadTime)

        print("=== DONE IN: {0:.2f} sec\n".format(loadTime))
    return pd.DataFrame({"table":tables, "loadtime_"+schema:loadTimes}).set_index('table')

In [74]:
# list tables and have the function to take care the loading. 

tables = ["customer","dwdate","supplier", "part", "lineorder"]

#-- Insertion twice for each schema (WARNING!! EACH CAN TAKE MORE THAN 10 MINUTES!!!)
distStats = loadTables("dist", tables)




Below query will be checking the contents of our lineorder table.

In [75]:
%%sql 
SELECT count(*) 
FROM dist.lineorder; 

1 rows affected. 600037902


### CLEANING UP THE RESOURCES

In [77]:
print('')#### CAREFUL!!
#-- Uncomment & run to delete the created resources
redshift.delete_cluster( ClusterIdentifier=DWH_CLUSTER_IDENTIFIER,  SkipFinalClusterSnapshot=True)
#### CAREFUL!!




In [68]:
myClusterProps = redshift.describe_clusters(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)['Clusters'][0]
prettyRedshiftProps(myClusterProps)

Unnamed: 0,Key,Value
0,ClusterIdentifier,redshift-cluster
1,NodeType,dc2.large
2,ClusterStatus,deleting
3,MasterUsername,awsuser
4,DBName,dev
5,NumberOfNodes,4


In [69]:
#### CAREFUL!!
#-- Uncomment & run to delete the created resources
iam.detach_role_policy(RoleName=DWH_IAM_ROLE, PolicyArn="arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess")
iam.delete_role(RoleName=DWH_IAM_ROLE)
#### CAREFUL!!

{&#39;ResponseMetadata&#39;: {&#39;RequestId&#39;: &#39;08a89ce6-4928-44aa-8408-8a61004e7fa5&#39;,
  &#39;HTTPStatusCode&#39;: 200,
  &#39;HTTPHeaders&#39;: {&#39;x-amzn-requestid&#39;: &#39;08a89ce6-4928-44aa-8408-8a61004e7fa5&#39;,
   &#39;content-type&#39;: &#39;text/xml&#39;,
   &#39;content-length&#39;: &#39;200&#39;,
   &#39;date&#39;: &#39;Thu, 01 Oct 2020 23:25:33 GMT&#39;},
  &#39;RetryAttempts&#39;: 0}}

THE END