In [1]:
%load_ext sql

In [2]:
import json
import socket
import boto3 as b3
import configparser
import pandas as pd

### IAM User Authorization
The IAM user needs the following permissions policies applied:
1. `AmazonS3ReadOnlyAccess` to access the project data sources on S3
1. `IAMFullAccess` to create an IAM Role that makes Redshift able to access S3 bucket
1. `AmazonRedshiftFullAccess` to create a RedShift Cluster
1. `AmazonEC2FullAccess` to open an incoming TCP port to access the Redshift cluster ednpoint

### Setup IAC DWH Access
Load paremeters from `aws.cfg`

In [3]:
dwh_cfg = configparser.ConfigParser()
dwh_cfg.read_file(open('dwh.cfg'))

AWS_KEY    = dwh_cfg.get('AWS','AWS_KEY')
AWS_SECRET = dwh_cfg.get('AWS','AWS_SECRET')

DWH_CLUSTER_TYPE = dwh_cfg.get('DWH','DWH_CLUSTER_TYPE')
DWH_NUM_NODES    = dwh_cfg.get('DWH','DWH_NUM_NODES')
DWH_NODE_TYPE    = dwh_cfg.get('DWH','DWH_NODE_TYPE')

DB_CLUSTER  = dwh_cfg.get('DB','DB_CLUSTER')
DB_NAME     = dwh_cfg.get('DB','DB_NAME')
DB_USER     = dwh_cfg.get('DB','DB_USER')
DB_PASSWORD = dwh_cfg.get('DB','DB_PASSWORD')
DB_PORT     = dwh_cfg.get('DB','DB_PORT')

IAM_ROLE_NAME = dwh_cfg.get('IAM', 'IAM_ROLE_NAME')

Create clients for EC2, IAM, S3 and Redshift

In [4]:
ec2 = b3.resource('ec2',
                 region_name = 'us-west-2',
                 aws_access_key_id = AWS_KEY,
                 aws_secret_access_key = AWS_SECRET)

iam = b3.client('iam',
                 region_name = 'us-west-2',
                 aws_access_key_id = AWS_KEY,
                 aws_secret_access_key = AWS_SECRET)

s3 = b3.resource('s3',
                 region_name = 'us-west-2',
                 aws_access_key_id = AWS_KEY,
                 aws_secret_access_key = AWS_SECRET)

db = b3.client('redshift',
                 region_name = 'us-west-2',
                 aws_access_key_id = AWS_KEY,
                 aws_secret_access_key = AWS_SECRET)

Check access to the project data sources on S3

In [5]:
sampleDbBucket = s3.Bucket('udacity-dend')
for obj in sampleDbBucket.objects.filter(Prefix = 'song-data/A/A/Q/'):
    print(obj)

s3.ObjectSummary(bucket_name='udacity-dend', key='song-data/A/A/Q/TRAAQAL128F92EA7A7.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='song-data/A/A/Q/TRAAQBF12903CF796B.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='song-data/A/A/Q/TRAAQCI128F4257A4F.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='song-data/A/A/Q/TRAAQCK128F92E8C33.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='song-data/A/A/Q/TRAAQGL128F9308363.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='song-data/A/A/Q/TRAAQIH128F428BDEA.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='song-data/A/A/Q/TRAAQLJ128F428E870.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='song-data/A/A/Q/TRAAQOU128F92D5955.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='song-data/A/A/Q/TRAAQPI128F1489569.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='song-data/A/A/Q/TRAAQPP12903CE6658.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='song-data/A/A/Q/TRAAQTM128F426

Create an IAM Role that makes Redshift able to access S3 bucket (ReadOnly)

In [6]:
try:
    print('0.3.1 Creating a new IAM Role') 
    dwhRole = iam.create_role(
        Path = '/',
        RoleName = IAM_ROLE_NAME,
        Description = 'Allows Redshift clusters to call AWS services on your behalf.',
        AssumeRolePolicyDocument = json.dumps(
            {'Statement': [{'Action': 'sts:AssumeRole',
                            'Effect': 'Allow',
                            'Principal': {'Service': 'redshift.amazonaws.com'}}],
             'Version': '2012-10-17'}
        )
    )    
except Exception as e: print(e)
    
print('0.3.2 Attaching Policy')
iam.attach_role_policy(RoleName = IAM_ROLE_NAME,
                       PolicyArn = 'arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess'
                      )['ResponseMetadata']['HTTPStatusCode']

print('0.3.3 Get the IAM role ARN')
roleArn = iam.get_role(RoleName = IAM_ROLE_NAME)['Role']['Arn']
print(roleArn)

0.3.1 Creating a new IAM Role
0.3.2 Attaching Policy
0.3.3 Get the IAM role ARN
arn:aws:iam::460272965982:role/RedshiftRole


Create a RedShift Cluster

In [7]:
try:
    response = db.create_cluster(        
        ClusterType        = DWH_CLUSTER_TYPE,
        NumberOfNodes      = int(DWH_NUM_NODES),
        NodeType           = DWH_NODE_TYPE,
        ClusterIdentifier  = DB_CLUSTER,
        DBName             = DB_NAME,
        MasterUsername     = DB_USER,
        MasterUserPassword = DB_PASSWORD,
        IamRoles           = [roleArn]  
    )
except Exception as e: print(e)

Check the cluster status

In [8]:
def prettyRedshiftProps(props):
    pd.set_option('display.max_colwidth', -1)
    keysToShow = ['ClusterIdentifier', 'NodeType', 'ClusterStatus', 'MasterUsername', 'DBName', 'Endpoint', 'NumberOfNodes', 'VpcId']
    x = [(k, v) for k,v in props.items() if k in keysToShow]
    return pd.DataFrame(data = x, columns = ['Key', 'Value'])

myClusterProps = db.describe_clusters(ClusterIdentifier = DB_CLUSTER)['Clusters'][0]
prettyRedshiftProps(myClusterProps)

Unnamed: 0,Key,Value
0,ClusterIdentifier,dwhcluster
1,NodeType,dc2.large
2,ClusterStatus,available
3,MasterUsername,dwhuser
4,DBName,dwh
5,Endpoint,"{'Address': 'dwhcluster.cqgzvwwklrxn.us-west-2.redshift.amazonaws.com', 'Port': 5439}"
6,VpcId,vpc-82d787fa
7,NumberOfNodes,4


Once the cluster becomes available, note the cluster endpoint. Save the endpoint and role ARN to `aws.cfg` so they can be reused in the ETL process.

In [9]:
DB_ENDPOINT = myClusterProps['Endpoint']['Address']
print('DB_ENDPOINT :: ', DB_ENDPOINT)

dwh_cfg['DB']['DB_ENDPOINT'] = DB_ENDPOINT
dwh_cfg['IAM']['IAM_ROLE_ARN'] = roleArn

with open('dwh.cfg', 'w') as configfile: dwh_cfg.write(configfile)

DB_ENDPOINT ::  dwhcluster.cqgzvwwklrxn.us-west-2.redshift.amazonaws.com


Open an incoming TCP port to access the cluster ednpoint

In [10]:
try:
    vpc = ec2.Vpc(id = myClusterProps['VpcId'])
    defaultSg = list(vpc.security_groups.all())[0]
    print(defaultSg)
    defaultSg.authorize_ingress(
        GroupName  = defaultSg.group_name,
        CidrIp     = socket.gethostbyname(socket.getfqdn()) + '/32', # only from the current IP
        IpProtocol = 'TCP',
        FromPort   = int(DB_PORT),
        ToPort     = int(DB_PORT)
    )
except Exception as e: print(e)

ec2.SecurityGroup(id='sg-19bc003b')
An error occurred (InvalidPermission.Duplicate) when calling the AuthorizeSecurityGroupIngress operation: the specified rule "peer: 172.18.0.2/32, TCP, from port: 5439, to port: 5439, ALLOW" already exists


Test SQL connection to the cluster

In [11]:
conn_string='postgresql://{}:{}@{}:{}/{}'.format(DB_USER, DB_PASSWORD, DB_ENDPOINT, DB_PORT, DB_NAME)
print(conn_string)
%sql $conn_string

postgresql://dwhuser:P1S$w0r9@dwhcluster.cqgzvwwklrxn.us-west-2.redshift.amazonaws.com:5439/dwh


'Connected: dwhuser@dwh'

Z Clean up AWS resources

In [12]:
#Uncomment & run to delete the created resources
db.delete_cluster(ClusterIdentifier = DB_CLUSTER, SkipFinalClusterSnapshot = True)

{'Cluster': {'ClusterIdentifier': 'dwhcluster',
  'NodeType': 'dc2.large',
  'ClusterStatus': 'deleting',
  'MasterUsername': 'dwhuser',
  'DBName': 'dwh',
  'Endpoint': {'Address': 'dwhcluster.cqgzvwwklrxn.us-west-2.redshift.amazonaws.com',
   'Port': 5439},
  'ClusterCreateTime': datetime.datetime(2020, 12, 29, 16, 50, 4, 509000, tzinfo=tzlocal()),
  'AutomatedSnapshotRetentionPeriod': 1,
  'ClusterSecurityGroups': [],
  'VpcSecurityGroups': [{'VpcSecurityGroupId': 'sg-19bc003b',
    'Status': 'active'}],
  'ClusterParameterGroups': [{'ParameterGroupName': 'default.redshift-1.0',
    'ParameterApplyStatus': 'in-sync'}],
  'ClusterSubnetGroupName': 'default',
  'VpcId': 'vpc-82d787fa',
  'AvailabilityZone': 'us-west-2b',
  'PreferredMaintenanceWindow': 'fri:12:00-fri:12:30',
  'PendingModifiedValues': {},
  'ClusterVersion': '1.0',
  'AllowVersionUpgrade': True,
  'NumberOfNodes': 4,
  'PubliclyAccessible': True,
  'Encrypted': False,
  'Tags': [],
  'EnhancedVpcRouting': False,
  'Ia

Check the cluster status until it becomes `deleting`

In [13]:
myClusterProps = db.describe_clusters(ClusterIdentifier=DB_CLUSTER)['Clusters'][0]
prettyRedshiftProps(myClusterProps)

Unnamed: 0,Key,Value
0,ClusterIdentifier,dwhcluster
1,NodeType,dc2.large
2,ClusterStatus,deleting
3,MasterUsername,dwhuser
4,DBName,dwh
5,Endpoint,"{'Address': 'dwhcluster.cqgzvwwklrxn.us-west-2.redshift.amazonaws.com', 'Port': 5439}"
6,VpcId,vpc-82d787fa
7,NumberOfNodes,4


Detach IAM role policy and delete IAM role

In [14]:
#Uncomment & run to delete the created resources
iam.detach_role_policy(RoleName = IAM_ROLE_NAME, PolicyArn='arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess')
iam.delete_role(RoleName = IAM_ROLE_NAME)

{'ResponseMetadata': {'RequestId': '3c43c2a4-a978-40e6-ba58-6048ff7cc336',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '3c43c2a4-a978-40e6-ba58-6048ff7cc336',
   'content-type': 'text/xml',
   'content-length': '200',
   'date': 'Tue, 29 Dec 2020 18:20:36 GMT'},
  'RetryAttempts': 0}}