#### Importing Necessary Libraries:

In [14]:
import boto3
import pandas as pd
import psycopg2
import json

####  Configuing Cluster:

In [15]:
import configparser
config = configparser.ConfigParser()
config.read_file(open('cluster.config'))

In [16]:
KEY = config.get("AWS", "KEY")
SECRET = config.get("AWS", "SECRET")

DWH_CLUSTER_TYPE = config.get("DWH", "DWH_CLUSTER_TYPE")
DWH_NUM_NODES = config.get("DWH", "DWH_NUM_NODES")
DWH_NODE_TYPE = config.get("DWH", "DWH_NODE_TYPE")
DWH_CLUSTER_IDENTIFIER = config.get("DWH", "DWH_CLUSTER_IDENTIFIER")
DWH_DB = config.get("DWH", "DWH_DB")
DWH_DB_USER = config.get("DWH", "DWH_DB_USER")
DWH_DB_PASSWORD = config.get("DWH","DWH_DB_PASSWORD")
DWH_PORT = config.get("DWH", "DWH_PORT")
DWH_IAM_ROLE_NAME = config.get("DWH", "DWH_IAM_ROLE_NAME")

In [17]:
pd.DataFrame({"Param":
                  ["DWH_CLUSTER_TYPE", "DWH_NUM_NODES", "DWH_NODE_TYPE", "DWH_CLUSTER_IDENTIFIER", "DWH_DB", "DWH_DB_USER",  "DWH_PORT", "DWH_IAM_ROLE_NAME"],
              "Value":
                  [DWH_CLUSTER_TYPE, DWH_NUM_NODES, DWH_NODE_TYPE, DWH_CLUSTER_IDENTIFIER, DWH_DB, DWH_DB_USER,  DWH_PORT, DWH_IAM_ROLE_NAME]
             })

Unnamed: 0,Param,Value
0,DWH_CLUSTER_TYPE,single-node
1,DWH_NUM_NODES,1
2,DWH_NODE_TYPE,dc2.large
3,DWH_CLUSTER_IDENTIFIER,redshift-cluster-nep
4,DWH_DB,flight
5,DWH_DB_USER,awsuser
6,DWH_PORT,5439
7,DWH_IAM_ROLE_NAME,redshift-s3-access


Conntecting to EC2:

In [18]:
ec2 = boto3.resource('ec2',
                     region_name='ap-south-1',
                     aws_access_key_id=KEY,
                     aws_secret_access_key=SECRET
                     )

Connecting to S3, IAM, and REDSHIFT:

In [19]:
s3 = boto3.resource('s3', 
                    region_name='ap-south-1',
                    aws_access_key_id=KEY,
                    aws_secret_access_key=SECRET
                    )

iam = boto3.client('iam', 
                   region_name='ap-south-1',
                   aws_access_key_id=KEY,
                   aws_secret_access_key=SECRET
                   )

redshift = boto3.client('redshift', 
                        region_name='ap-south-1',
                        aws_access_key_id=KEY,
                        aws_secret_access_key=SECRET
                        )

#### Listing Objects in S3:

In [20]:
bucket = s3.Bucket('dipesh-test-buck')
log_data_files = [ filename.key for filename in bucket.objects.filter(Prefix='')]
log_data_files

['allevents_pipe.txt',
 'allusers_pipe.txt',
 'category_pipe.txt',
 'date2008_pipe.txt',
 'listings_pipe.txt',
 'sales_tab.txt',
 'venue_pipe.txt']

Assigning role to the Redshift:

In [21]:
roleArn = iam.get_role(RoleName=DWH_IAM_ROLE_NAME)['Role']['Arn']

Creating Redshift Cluster:

In [22]:
try:
    response = redshift.create_cluster(
        ClusterType = DWH_CLUSTER_TYPE,
        NodeType = DWH_NODE_TYPE,

        # Identitfier and credentials for the cluster
        DBName = DWH_DB,
        ClusterIdentifier = DWH_CLUSTER_IDENTIFIER,
        MasterUsername = DWH_DB_USER,
        MasterUserPassword = DWH_DB_PASSWORD,
        # Roles (for s3 access)
        IamRoles = [roleArn],
    )
except Exception as e:
    print(e)


An error occurred (ClusterAlreadyExists) when calling the CreateCluster operation: Cluster already exists


Describe cluster property:

In [23]:
redshift.describe_clusters(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)['Clusters'][0]

{'ClusterIdentifier': 'redshift-cluster-nep',
 'NodeType': 'dc2.large',
 'ClusterStatus': 'available',
 'ClusterAvailabilityStatus': 'Available',
 'MasterUsername': 'awsuser',
 'DBName': 'flight',
 'Endpoint': {'Address': 'redshift-cluster-nep.c0mnzsmyyuqe.ap-south-1.redshift.amazonaws.com',
  'Port': 5439},
 'ClusterCreateTime': datetime.datetime(2022, 8, 5, 4, 47, 51, 158000, tzinfo=tzutc()),
 'AutomatedSnapshotRetentionPeriod': 1,
 'ManualSnapshotRetentionPeriod': -1,
 'ClusterSecurityGroups': [],
 'VpcSecurityGroups': [{'VpcSecurityGroupId': 'sg-0e45e233e619568a1',
   'Status': 'active'}],
 'ClusterParameterGroups': [{'ParameterGroupName': 'default.redshift-1.0',
   'ParameterApplyStatus': 'in-sync'}],
 'ClusterSubnetGroupName': 'default',
 'VpcId': 'vpc-0884ceea7b66fdca1',
 'AvailabilityZone': 'ap-south-1a',
 'PreferredMaintenanceWindow': 'mon:08:00-mon:08:30',
 'PendingModifiedValues': {},
 'ClusterVersion': '1.0',
 'AllowVersionUpgrade': True,
 'NumberOfNodes': 1,
 'PubliclyAcce

In [24]:
def prettyRedshiftProps(props):
    pd.set_option('display.max_colwidth', None)
    keysToShow = ["ClusterIdentifier", "NodeType", "ClusterStatus", "MasterUsername", "DBName", "Endpoint", "NumberOfNodes", 'VpcId']
    x = [(k, v) for k,v in props.items() if k in keysToShow]
    return pd.DataFrame(data=x, columns=["Key", "Value"])

myClusterProps = redshift.describe_clusters(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)['Clusters'][0]
prettyRedshiftProps(myClusterProps)

Unnamed: 0,Key,Value
0,ClusterIdentifier,redshift-cluster-nep
1,NodeType,dc2.large
2,ClusterStatus,available
3,MasterUsername,awsuser
4,DBName,flight
5,Endpoint,"{'Address': 'redshift-cluster-nep.c0mnzsmyyuqe.ap-south-1.redshift.amazonaws.com', 'Port': 5439}"
6,VpcId,vpc-0884ceea7b66fdca1
7,NumberOfNodes,1


In [25]:
DWH_ENDPOINT = myClusterProps['Endpoint']['Address']
DWH_ROLE_ARN = myClusterProps['IamRoles'][0]['IamRoleArn']
DB_NAME = myClusterProps['DBName']
DB_USER = myClusterProps['MasterUsername']

In [26]:
DB_USER

'awsuser'

Attach security to the redshift cluster

In [27]:
try:
    vpc = ec2.Vpc(id=myClusterProps['VpcId'])
    defaultSg = list(vpc.security_groups.all())[0]
    print(defaultSg)
    defaultSg.authorize_ingress(
        GroupName=defaultSg.group_name,
        CidrIp='0.0.0.0/0',
        IpProtocol='TCP',
        FromPort=int(DWH_PORT),
        ToPort=int(DWH_PORT)
    )
except Exception as e:
    print(e)

ec2.SecurityGroup(id='sg-095757fd4cbec409f')
An error occurred (InvalidPermission.Duplicate) when calling the AuthorizeSecurityGroupIngress operation: the specified rule "peer: 0.0.0.0/0, TCP, from port: 5439, to port: 5439, ALLOW" already exists


#### Connecting to Redshift Cluster:

In [28]:
try:
    conn = psycopg2.connect(host=DWH_ENDPOINT, dbname = DB_NAME, user=DB_USER, password=DWH_DB_PASSWORD, port=DWH_PORT)
except psycopg2.Exception as e:
    print("Erroe: Could not make connection to the postgres database")
    print(e)

conn.set_session(autocommit=True)

In [29]:
try:
    cur = conn.cursor()
except psycopg2.Error as e:
    print("Error: Could not get curser to the Database")
    print(e)

Creating tables in redshift cluster:

In [30]:
try:
    cur.execute(
        """ create table if not exists users(
            userid integer not null distkey sortkey,
            username char(8),
            firstname char(30),
            lastname varchar(30),
            city varchar(30),
            state char(2),
            email varchar(100),
            phone char(14),
            likesports boolean,
            liketheatre boolean,
            likeconcerts boolean,
            likejazz boolean,
            likeclassical boolean,
            likeopera boolean,
            likerock boolean,
            likevegas boolean,
            likebroadway boolean,
            likemusicals boolean);
        """)
        
except psycopg2.Error as e:
    print (e)

In [31]:
try:
    cur.execute(
        """ create table if not exists venue(
            venueid integer not null distkey sortkey,
            venuename varchar(100),
            venuecity varchar(30),
            venuestate char(2),
            venueseats integer);
        """
    )
except psycopg2.Error as e:
    print (e)

In [32]:
try:
    cur.execute(
        """ create table if not exists category(
            catid smallint not null distkey sortkey,
            catgroup varchar(10),
            catname varchar(10),
            catdesc varchar(50));


            create table if not exists date(
                dateid integer not null distkey sortkey,
                caldate date not null,
                day character(3) not null,
                week smallint not null,
                month smallint not null,
                qtr character(5),
                year smallint not null,
                holiday boolean default('N'));

            create table if not exists event(
                eventid integer not null distkey,
                venueid smallint not null,
                catid smallint not null,
                dateid integer not null sortkey,
                eventname varchar(200),
                starttime timestamp);
            
            create table if not exists listing(
                listid integer not null distkey,
                sellerid integer not null,
                eventid integer not null,
                dateid smallint not null sortkey,
                numtickets smallint not null,
                priceperticket smallint not null,
                totalprice decimal(8,2),
                listtime timestamp);
        """
    )

except psycopg2.Error as e:
    print (e)

Populating Redshift cluster table with object in s3.

In [33]:
try: 
    cur.execute(
        """ copy users from 's3://dipesh-test-buck/allusers_pipe.txt'
            credentials 'aws_iam_role=arn:aws:iam::321503493735:role/redshift-s3-access'
            delimiter '|'
            region 'ap-south-1'
        """)

except psycopg2.Error as e:
    print (e)

Selecting Table users from Redshift:

In [34]:
try:
    cur.execute(""" select * from users; """ )
except psycopg2.Error as e:
    print("Error: Could not select from users table")
    print(e)

In [35]:
row = cur.fetchone()
print(row)

(1, 'JSG99FHE', 'Rafael                        ', 'Taylor', 'Kent', 'WA', 'Etiam.laoreet.libero@sodalesMaurisblandit.edu', '(664) 602-4412', True, True, None, False, True, None, None, True, False, True)


#### Closing connection:

In [36]:
try: 
    conn.close()
except psycopg2.Error as e:
    print (e)

#### Deleting Cluster

In [37]:
redshift.delete_cluster( ClusterIdentifier = DWH_CLUSTER_IDENTIFIER, SkipFinalClusterSnapshot = True)

{'Cluster': {'ClusterIdentifier': 'redshift-cluster-nep',
  'NodeType': 'dc2.large',
  'ClusterStatus': 'deleting',
  'ClusterAvailabilityStatus': 'Modifying',
  'MasterUsername': 'awsuser',
  'DBName': 'flight',
  'Endpoint': {'Address': 'redshift-cluster-nep.c0mnzsmyyuqe.ap-south-1.redshift.amazonaws.com',
   'Port': 5439},
  'ClusterCreateTime': datetime.datetime(2022, 8, 5, 4, 47, 51, 158000, tzinfo=tzutc()),
  'AutomatedSnapshotRetentionPeriod': 1,
  'ManualSnapshotRetentionPeriod': -1,
  'ClusterSecurityGroups': [],
  'VpcSecurityGroups': [{'VpcSecurityGroupId': 'sg-0e45e233e619568a1',
    'Status': 'active'}],
  'ClusterParameterGroups': [{'ParameterGroupName': 'default.redshift-1.0',
    'ParameterApplyStatus': 'in-sync'}],
  'ClusterSubnetGroupName': 'default',
  'VpcId': 'vpc-0884ceea7b66fdca1',
  'AvailabilityZone': 'ap-south-1a',
  'PreferredMaintenanceWindow': 'mon:08:00-mon:08:30',
  'PendingModifiedValues': {},
  'ClusterVersion': '1.0',
  'AllowVersionUpgrade': True,
  