# Project 3: Creating Redshift Cluster using the AWS python SDK 
## A Data Warehouse Redshift Project

# Part 0. Make sure we have an AWS secret and access key

- Create a new IAM user in your AWS account
- Give it `AdministratorAccess`, From `Attach existing policies directly` Tab
- Take note of the access key and secret 
- Edit the file `dwh.cfg` in the same folder as this notebook and fill
    
```Python
[AWS]
KEY= YOUR_AWS_KEY
SECRET= YOUR_AWS_SECRET
```
<font color='red' size=20>
*** Make sure to run the clean-up code at the end ***
</ font>

#### Import Python packages

In [1]:
import pandas as pd
import boto3
import json
import configparser
from botocore.exceptions import ClientError
import psycopg2
import time

# Part I. Spin-up Redshift

#### Load DWH Params from a file

In [2]:
config = configparser.ConfigParser()
config.read_file(open('dwh.cfg'))

KEY                    = config.get('AWS','KEY')
SECRET                 = config.get('AWS','SECRET')

DWH_CLUSTER_TYPE       = config.get("DWH","DWH_CLUSTER_TYPE")
DWH_NUM_NODES          = config.get("DWH","DWH_NUM_NODES")
DWH_NODE_TYPE          = config.get("DWH","DWH_NODE_TYPE")

DWH_CLUSTER_IDENTIFIER = config.get("DWH","DWH_CLUSTER_IDENTIFIER")
DWH_DB                 = config.get("DWH","DWH_DB")
DWH_DB_USER            = config.get("DWH","DWH_DB_USER")
DWH_DB_PASSWORD        = config.get("DWH","DWH_DB_PASSWORD")
DWH_PORT               = config.get("DWH","DWH_PORT")

DWH_IAM_ROLE_NAME      = config.get("DWH", "DWH_IAM_ROLE_NAME")

(DWH_DB_USER, DWH_DB_PASSWORD, DWH_DB)

pd.DataFrame({"Param":
                  ["DWH_CLUSTER_TYPE", "DWH_NUM_NODES", "DWH_NODE_TYPE", "DWH_CLUSTER_IDENTIFIER", "DWH_DB", "DWH_DB_USER", "DWH_DB_PASSWORD", "DWH_PORT", "DWH_IAM_ROLE_NAME"],
              "Value":
                  [DWH_CLUSTER_TYPE, DWH_NUM_NODES, DWH_NODE_TYPE, DWH_CLUSTER_IDENTIFIER, DWH_DB, DWH_DB_USER, DWH_DB_PASSWORD, DWH_PORT, DWH_IAM_ROLE_NAME]
             })


Unnamed: 0,Param,Value
0,DWH_CLUSTER_TYPE,multi-node
1,DWH_NUM_NODES,4
2,DWH_NODE_TYPE,dc2.large
3,DWH_CLUSTER_IDENTIFIER,dwhCluster
4,DWH_DB,dwh
5,DWH_DB_USER,dwhuser
6,DWH_DB_PASSWORD,Passw0rd
7,DWH_PORT,5439
8,DWH_IAM_ROLE_NAME,dwhRole


#### Reset DWH Params for Cluster

In [3]:
config['CLUSTER']['host'] = '' #reseting Cluster Host so the we can assgine DWH_endpoint to it
config['IAM_ROLE']['ARN'] = '' #reseting Cluster Host so the we can assgine DWH_ROLE_ARN to it
with open('dwh.cfg', 'w') as configfile:    # save
    config.write(configfile)

#### Create clients for IAM, EC2, S3 and Redshift

In [4]:
ec2 = boto3.resource('ec2',
                       region_name="us-west-2",
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
                    )

s3 = boto3.resource('s3',
                       region_name="us-west-2",
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
                   )

iam = boto3.client('iam',aws_access_key_id=KEY,
                     aws_secret_access_key=SECRET,
                     region_name='us-west-2'
                  )

redshift = boto3.client('redshift',
                       region_name="us-west-2",
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
                       )

#### Create an IAM Role to Access the S3 Bucket
- Create an IAM Role that makes Redshift able to access S3 bucket (ReadOnly)

In [5]:
try:
    print("1.0 Creating a new IAM Role")
    dwhRole = iam.create_role(
        Path='/',
        RoleName=DWH_IAM_ROLE_NAME,
        Description = "Allows Redshift clusters to call AWS services on your behalf.",
        AssumeRolePolicyDocument=json.dumps(
            {'Statement': [{'Action': 'sts:AssumeRole',
               'Effect': 'Allow',
               'Principal': {'Service': 'redshift.amazonaws.com'}}],
             'Version': '2012-10-17'})
    )    
except ClientError as e:
     if e.response['Error']['Code'] == 'EntityAlreadyExists':
        print()
        print("1.0.1 Role with name dwhRole already exists: %s" % e)
    
    
print("1.2 Attaching Policy")

iam.attach_role_policy(RoleName=DWH_IAM_ROLE_NAME,
                       PolicyArn="arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess"
                      )['ResponseMetadata']['HTTPStatusCode']

print("1.3 Get the IAM role ARN")
roleArn = iam.get_role(RoleName=DWH_IAM_ROLE_NAME)['Role']['Arn']

1.0 Creating a new IAM Role
1.2 Attaching Policy
1.3 Get the IAM role ARN


#### Create a Redshift Cluster

- Create a RedShift Cluster
- For complete arguments to `create_cluster`, see [docs](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/redshift.html#Redshift.Client.create_cluster)

In [6]:
try:
    print("1.4 Createing Cluster")
    response = redshift.create_cluster(        
        #HW
        ClusterType=DWH_CLUSTER_TYPE,
        NodeType=DWH_NODE_TYPE,
        NumberOfNodes=int(DWH_NUM_NODES),

        #Identifiers & Credentials
        DBName=DWH_DB,
        ClusterIdentifier=DWH_CLUSTER_IDENTIFIER,
        MasterUsername=DWH_DB_USER,
        MasterUserPassword=DWH_DB_PASSWORD,
        
        #Roles (for s3 access)
        IamRoles=[roleArn]  
    )
except Exception as e:
    print(e)

1.4 Createing Cluster


#### Check Cluster's Status

In [7]:
def prettyRedshiftProps(props):
    pd.set_option('display.max_colwidth', -1)
    keysToShow = ["ClusterIdentifier", "NodeType", "ClusterStatus", "MasterUsername", "DBName", "Endpoint", "NumberOfNodes", 'VpcId']
    x = [(k, v) for k,v in props.items() if k in keysToShow]
    return pd.DataFrame(data=x, columns=["Key", "Value"])

myClusterProps = redshift.describe_clusters(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)['Clusters'][0]
#prettyRedshiftProps(myClusterProps)

In [8]:
DWH_Status = myClusterProps['ClusterStatus']
print('1.5 Please wait while we create the cluster')
while not DWH_Status == 'available':
    print('-', end='')
    time.sleep(2)
    myClusterProps = redshift.describe_clusters(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)['Clusters'][0]
    DWH_Status = myClusterProps['ClusterStatus']
    
print("!")
print('Cluster is now available')
myClusterProps = redshift.describe_clusters(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)['Clusters'][0]

1.5 Please wait while we create the cluster
---------------------------------------------------------------------------------------------------!
Cluster is now available


#### Obtain Endpoint and ARN

In [9]:
DWH_ENDPOINT = myClusterProps['Endpoint']['Address']
DWH_ROLE_ARN = myClusterProps['IamRoles'][0]['IamRoleArn']
print("1.6 Getting the Endpoint and ARN")
print("DWH_ENDPOINT :: ", DWH_ENDPOINT)
print("DWH_ROLE_ARN :: ", DWH_ROLE_ARN)
print()
print("1.7 Saving the Endpoint and ARN to dwh.cfg file: ")
config['CLUSTER']['host'] = DWH_ENDPOINT #reseting Cluster Host so the we can assgine DWH_endpoint to it
config['IAM_ROLE']['ARN'] = DWH_ROLE_ARN #reseting Cluster Host so the we can assgine DWH_ROLE_ARN to it
with open('dwh.cfg', 'w') as configfile:    # save
    config.write(configfile)

1.6 Getting the Endpoint and ARN
DWH_ENDPOINT ::  dwhcluster.cj35o0lqimdx.us-west-2.redshift.amazonaws.com
DWH_ROLE_ARN ::  arn:aws:iam::580635780882:role/dwhRole

1.7 Saving the Endpoint and ARN to dwh.cfg file: 


#### Open TCP Port to Access Cluster Endpoint

In [10]:
try:
    print("1.7 Authorize Security Group Ingress: ", end='')
    vpc = ec2.Vpc(id=myClusterProps['VpcId'])
    defaultSg = list(vpc.security_groups.all())[0]
    defaultSg.authorize_ingress(
        GroupName=defaultSg.group_name,
        CidrIp='0.0.0.0/0',
        IpProtocol='TCP',
        FromPort=int(DWH_PORT),
        ToPort=int(DWH_PORT)
    )
except ClientError as e:
    #print("Unexpected error: %s" % e)
    if e.response['Error']['Code'] == 'InvalidPermission.Duplicate':
        print()
        print("1.7.1 Authorize Security Group Ingress already exists")
print(defaultSg)

1.7 Authorize Security Group Ingress: 
1.7.1 Authorize Security Group Ingress already exists
ec2.SecurityGroup(id='sg-f09ee7a4')


#### Connect to Cluster

In [11]:
%load_ext sql
conn_string="postgresql://{}:{}@{}:{}/{}".format(DWH_DB_USER, DWH_DB_PASSWORD, DWH_ENDPOINT, DWH_PORT,DWH_DB)
# print(conn_string)
%sql $conn_string

'Connected: dwhuser@dwh'

# Part II: Run Create Table

In [12]:
%run -i 'create_tables.py'

1.8 Loding Config file
1.9 Connecting to redshift database
2.0 Droping tables
2.1 Create ables tables
2.2 Closed connection to redshift database


# Part III: Run ETL Table

In [13]:
%run -i 'etl.py'

2.3 Loding Config file
2.4 connect to redshift database
2.5 Load staging tables
2.6 Insert tables


# Part IV: Clean-up

#### Delete Cluster

In [15]:
try:
    redshift_delete = redshift.delete_cluster( ClusterIdentifier=DWH_CLUSTER_IDENTIFIER,  SkipFinalClusterSnapshot=True)
    myClusterProps = redshift.describe_clusters(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)['Clusters'][0]
    DWH_Status = myClusterProps['ClusterStatus']
    myClusterProps = redshift.describe_clusters(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)['Clusters'][0]
    print('2.7 Please wait while we delete the cluster')
    print()
    while DWH_Status == 'deleting':
        print('-', end='')
        time.sleep(2)
        myClusterProps = redshift.describe_clusters(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)['Clusters'][0]
        DWH_Status = myClusterProps['ClusterStatus']

except ClientError as e:
    #print("Unexpected error: %s" % e)
    print("!")
    if e.response['Error']['Code'] == 'ClusterNotFound':
        print("2.8 Cluster successfully deleted") 

!
2.8 Cluster successfully deleted


#### Delete IAM Role

In [16]:
detach_role_policy_response = iam.detach_role_policy(RoleName=DWH_IAM_ROLE_NAME, PolicyArn="arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess")
if detach_role_policy_response['ResponseMetadata']['HTTPStatusCode'] == 200:
    print('2.9 Successfully detach role policy: ', DWH_IAM_ROLE_NAME, "PolicyArn=arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess")

2.9 Successfully detach role policy:  dwhRole PolicyArn=arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess


In [17]:
delete_role_response = iam.delete_role(RoleName=DWH_IAM_ROLE_NAME)
if detach_role_policy_response['ResponseMetadata']['HTTPStatusCode'] == 200:
    print('3.0 successfully deleted role ', DWH_IAM_ROLE_NAME)

3.0 successfully deleted role  dwhRole
