### AWS INFRUSTRACTURE AS CODE USING PYTHON FOR REDSHIFT DW


It is pretty simple to create and maintain few users and servers in the cloud using the provided UI but as they increase in number, it gets more cumbersome. 

This is where Infrustructure as Code (IaC) comes into play. IaC automates, maintains, deploy, replicate and share complex infrustructure as easily as one maintains code.

In AWS, IaC can be made possible through the use of ```aws-cli SDK and Cloud Formation.``` 

AWS CloudFormation is a service that gives developers and businesses an easy way to create a collection of related AWS and third-party resources, and provision and manage them in an orderly and predictable fashion. It makes use of configuration files in json format providing a description of all resources, permissions, users etc

For our case, we’ll go the SDK way using Python to access and manage redshift and its resources. For this particular case we will create a configuration file that would provide all the necessary information to access and manage the redshift cluster already developed - ```redshift_cluster.config``` file.

This file would easen the programmatic access to the cluster while we write code. Information from this file are obtained from the "properties" window of the redshift cluster in AWS

Creating an IAM role would come handy in that, we would not need to explicitly provide the public and secret keys to facilitate communication between the aws services. We'll just use this Role.



<img src="../images/aws_property.png" alt="Alternative text" />

In [1]:
import boto3
import pandas as pd
import psycopg2
import json

In [2]:
import configparser
config = configparser.ConfigParser()
config.read_file(open('../configs/cluster.config'))

In [3]:
#Test Access To the files
config.get('AWS', 'KEY')

'########'

SECURELY ACCESSING CONFIGURATION FILE

In [4]:
KEY= config.get('AWS', 'KEY')
SECRET= config.get('AWS', 'SECRET')
DWH_CLUSTER_TYPE=config.get('DWH', 'DWH_CLUSTER_TYPE')
DWH_DB= config.get('DWH', 'DWH_DB')
DWH_NUM_NODE=config.get('DWH', 'DWH_NUM_NODE')
DWH_NODE_TYPE=config.get('DWH', 'DWH_NODE_TYPE')
DWH_CLUSTER_IDENTIFIER=config.get('DWH', 'DWH_CLUSTER_IDENTIFIER')
DWH_DB_USER=config.get('DWH', 'DWH_DB_USER')
DWH_DB_PASSWORD=config.get('DWH', 'DWH_DB_PASSWORD')
DWH_PORT=config.get('DWH', 'DWH_PORT')
DWH_IAM_ROLE_NAME=config.get('DWH', 'DWH_IAM_ROLE_NAME')

(DWH_DB_USER, DWH_DB_PASSWORD, DWH_DB)

('awsuser', 'Testing321', 'first-redshift-db')

In [5]:
pd.DataFrame({'param':
                ['KEY', 'SECRET', 'DWH_CLUSTER_TYPE', 'DWH_DB', 'DWN_NUM_NODE', 'DWN_NODE_TYPE','DWH_CLUSTER_IDENTIFIER', 'DWH_DB_USER', 'DWH_DB_PASSWORD', 'DWH_PORT', 'DWH_IAM_ROLE_NAME'],
              
              'value':
                [KEY, SECRET, DWH_CLUSTER_TYPE, DWH_DB, DWH_NUM_NODE, DWH_NODE_TYPE,DWH_CLUSTER_IDENTIFIER, DWH_DB_USER, DWH_DB_PASSWORD, DWH_PORT, DWH_IAM_ROLE_NAME]})

Unnamed: 0,param,value
0,KEY,########
1,SECRET,########
2,DWH_CLUSTER_TYPE,single-node
3,DWH_DB,first-redshift-db
4,DWN_NUM_NODE,1
5,DWN_NODE_TYPE,dc2.large
6,DWH_CLUSTER_IDENTIFIER,my-first-redshift-cluster
7,DWH_DB_USER,awsuser
8,DWH_DB_PASSWORD,Testing321
9,DWH_PORT,5439


CONNECT TO AWS

For my key and secret, I will store them in a folder ignored in this project, for security purposes

In [6]:
key_config = configparser.ConfigParser()
key_config.read_file(open('../private_config/redshift_cluster.config'))

KEY = key_config.get('AWS','KEY')
SECRET = key_config.get('AWS','SECRET')

In [7]:
s3 = boto3.resource('s3', 
                        region_name='us-east-2', 
                        aws_access_key_id=KEY, 
                        aws_secret_access_key=SECRET)


iam = boto3.client('iam', 
                        region_name='us-east-2', 
                        aws_access_key_id=KEY, 
                        aws_secret_access_key=SECRET)

redshift = boto3.client('redshift', 
                        region_name='us-east-2', 
                        aws_access_key_id=KEY, 
                        aws_secret_access_key=SECRET)

CONNECT TO A BUCKET AND ACCESS OBJECTS WITHIN IT

In [8]:
bucket = s3.Bucket('wachira-dataeng-redshift')
log_data_files = [filename.key for filename in bucket.objects.all()]
log_data_files

['clean_output.csv']

ARN

Amazon Resource Names (ARNs) uniquely identify AWS resources. We require an ARN when you need to specify a resource unambiguously across all of AWS, such as in IAM policies, Amazon Relational Database Service (Amazon RDS) tags, and API calls.


In [9]:
#Uniquely determining the rolename with the permission to access s3 buckets
roleARN = iam.get_role(RoleName=DWH_IAM_ROLE_NAME)['Role']['Arn']
roleARN

'arn:aws:iam::320105466872:role/redshift-s3-access-role'

CREATE REDSHIFT CLUSTER

In [None]:
try:
    response = redshift.create_cluster(
            ClusterType=DWH_CLUSTER_TYPE,
            NodeType=DWH_NODE_TYPE,

            #Identifier and credentials
            ClusterIdentifier=DWH_CLUSTER_IDENTIFIER,
            DBName=DWH_DB,
            MasterUsername=DWH_DB_USER,
            MasterUserPassword=DWH_DB_PASSWORD,

            #Roles For s3 access
            IamRoles=[
                    roleARN
                    ],
    )
    

except Exception as e:
    print(e)

DESCRIBE CLUSTER DETAILS

In [10]:
cluster_details = redshift.describe_clusters(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)['Clusters'][0]
cluster_details

{'ClusterIdentifier': 'my-first-redshift-cluster',
 'NodeType': 'dc2.large',
 'ClusterStatus': 'available',
 'ClusterAvailabilityStatus': 'Available',
 'MasterUsername': 'awsuser',
 'DBName': 'first-redshift-db',
 'Endpoint': {'Address': 'my-first-redshift-cluster.cgr5ywmtttya.us-east-2.redshift.amazonaws.com',
  'Port': 5439},
 'ClusterCreateTime': datetime.datetime(2023, 1, 11, 9, 27, 5, 369000, tzinfo=tzutc()),
 'AutomatedSnapshotRetentionPeriod': 1,
 'ManualSnapshotRetentionPeriod': -1,
 'ClusterSecurityGroups': [],
 'VpcSecurityGroups': [{'VpcSecurityGroupId': 'sg-0c1d82624169294b3',
   'Status': 'active'}],
 'ClusterParameterGroups': [{'ParameterGroupName': 'default.redshift-1.0',
   'ParameterApplyStatus': 'in-sync'}],
 'ClusterSubnetGroupName': 'default',
 'VpcId': 'vpc-0b7131ad04a3a0fb3',
 'AvailabilityZone': 'us-east-2b',
 'PreferredMaintenanceWindow': 'sat:07:00-sat:07:30',
 'PendingModifiedValues': {},
 'ClusterVersion': '1.0',
 'AllowVersionUpgrade': True,
 'NumberOfNodes'

In [11]:
def prettyRedshiftProps(props):
    pd.set_option('display.max_colwidth', None)
    keysToShow = ['ClusterIdentifier', 'NodeType', 'ClusterStatus', 'MasterUsername', 'DBName', 'Endpoint', 'ClusterStatus', 'VpcId']
    x = [(k, v) for k,v in props.items() if k in keysToShow]
    return pd.DataFrame(data=x, columns=['key', 'value'])
    
prettyRedshiftProps(cluster_details)

Unnamed: 0,key,value
0,ClusterIdentifier,my-first-redshift-cluster
1,NodeType,dc2.large
2,ClusterStatus,available
3,MasterUsername,awsuser
4,DBName,first-redshift-db
5,Endpoint,"{'Address': 'my-first-redshift-cluster.cgr5ywmtttya.us-east-2.redshift.amazonaws.com', 'Port': 5439}"
6,VpcId,vpc-0b7131ad04a3a0fb3


cluster_details

ATTACH VPC TO THE REDSHIFT CLUSTER USING ec2 CONNECTION

In [None]:
try:
    vpc = ec2.Vpc(id=cluster_details['VpcId'])
    defaultSG = list(vpc.security_groups.all())[0]

    defaultSG.authorize_ingress(
            CidrIp='0.0.0.0/0',
            IpProtocol='TCP',
            FromPort=int(DWH_PORT),
            ToPort=int(DWH_PORT),
            GroupName=defaultSG.group_name
    )

    print("VPC Attached to Redshift through ec2")


except Exception as e:
    print(e)

VPC Attached to Redshift through ec2


CONNECTING TO THE REDSHIFT DATA WAREHOUSE

In [12]:
DWH_ENDPOINT = cluster_details['Endpoint']
END_POINT = DWH_ENDPOINT['Address']
END_POINT


'my-first-redshift-cluster.cgr5ywmtttya.us-east-2.redshift.amazonaws.com'

In [26]:
try: 
    conn=psycopg2.connect(dbname = DWH_DB,
                                    host = END_POINT, 
                                    port = 5439,
                                    user = DWH_DB_USER,
                                    password = 'Testing321')
    print("Connected Successfully")
except psycopg2.Error as e: 
    print("Error: Could not make connection to the Redshift database")
    print(e)

conn.set_session(autocommit=True)


Connected Successfully


Making Connections Using a single string for configuration file

In [None]:
# try: 
#     conn = psycopg2.connect(f"host={END_POINT} port={int(5439)} dbname={DWH_DB} user={DWH_DB_USER} password=Testing321")
#     print("Connected Successfully")
# except psycopg2.Error as e: 
#     print("Error: Could not make connection to the Postgres database")
#     print(e)

#### LOAD LOCAL DATA

In [18]:
df = pd.read_csv('../data/clean_data.csv')
df.head()

Unnamed: 0,track_id,type,traveled_d,avg_speed,lat,lon,speed,lon_acc,lat_acc,time
0,1,Car,48.85,9.770344,37.977391,23.737688,4.9178,0.0518,-0.0299,0.0
1,2,Motorcycle,98.09,19.839417,37.977642,23.7374,16.9759,-0.0361,-0.0228,0.0
2,3,Motorcycle,63.8,18.228752,37.977997,23.737264,20.1906,-0.0795,-0.3395,0.0
3,4,Motorcycle,145.72,26.229014,37.978135,23.737072,2.7555,-0.0302,0.0948,0.0
4,5,Motorcycle,138.01,24.841425,37.978134,23.737103,0.0,0.0,0.0,0.0


Upload the data to S3

In [19]:
bucket_name = 'wachira-dataeng-redshift'
file_name = 'clean_output.csv'

Load Data To s3 bucket

In [20]:
try:
    s3.meta.client.upload_file(Filename='../data/final_clean.csv', Bucket=bucket_name, Key=file_name)
    print("Successfully loaded data!")
except Exception as e:
    print("Unsuccessful!!!")
    print(e)

Successfully loaded data!


CREATING A TABLE IN THE REDSHIFT WAREHOUSE

We can actually extract the schema of the database from the dataframe created.

In [21]:
pd.io.json.build_table_schema(data)

{'fields': [{'name': 'index', 'type': 'integer'},
  {'name': 'track_id', 'type': 'integer'},
  {'name': 'type', 'type': 'string'},
  {'name': 'traveled_d', 'type': 'number'},
  {'name': 'avg_speed', 'type': 'number'},
  {'name': 'lat', 'type': 'number'},
  {'name': 'lon', 'type': 'number'},
  {'name': 'speed', 'type': 'number'},
  {'name': 'lon_acc', 'type': 'number'},
  {'name': 'lat_acc', 'type': 'number'},
  {'name': 'time', 'type': 'number'}],
 'primaryKey': ['index'],
 'pandas_version': '1.4.0'}

In [160]:
create_table = """CREATE TABLE clean_output(
                    index INTEGER PRIMARY KEY,
                    type VARCHAR,
                    track_id VARCHAR,
                    traveled_d FLOAT,
                    avg_speed FLOAT,
                    lat FLOAT,
                    lon FLOAT,
                    speed FLOAT,
                    lon_acc FLOAT,
                    lat_acc FLOAT,
                    time FLOAT           
);"""

CREATE CURSOR TO EXECUTE QUERY

In [27]:
cur = conn.cursor()

In [164]:
try:
    cur.execute(create_table)
    print("Successfully created table")
except Exception as e:
    print("Error: Problem creating the database")
    print(e)


Successfully created table


COPY DATA FROM OUR S3 BUCKET TO THE TABLE CREATED USING THE "COPY" COMMAND

In [22]:
COPY_CMD = """ 
COPY clean_output
FROM 's3://wachira-dataeng-redshift/clean_output.csv'
iam_role 'arn:aws:iam::320105466872:role/redshift-s3-access-role'
region 'us-east-2'
delimiter ',';
"""

In [139]:
# COPY_CMD = """ 
# COPY clean_output FROM 's3://wachira-dataeng-redshift' 
# iam_role 'arn:aws:iam::320105466872:role/redshift-s3-access-role'
# delimiter '|';
# """

In [28]:
try:
    cur.execute(COPY_CMD)
    print("Successfully Loaded from s3!")

except Exception as e:
    print("Loading Error")
    print(e)


Successfully Loaded from s3!


Earlier, I had some problem with using the copy command. This was because I used the data file that contained the column names as first row hence it tricky to load this data as it didn't conform to the schema's specifications i.e All the column names are in string format yet the columns are of different data types

<img src="../images/redshift-s3.png" alt="Alternative text" />