In [113]:
import boto3
import pandas as pd
import psycopg2
import json

In [183]:
import configparser
config = configparser.ConfigParser()
config.read_file(open('cluster.config'))

In [184]:
#Checking the AWS parameters
KEY = config.get("AWS", "KEY")
SECRET = config.get("AWS", "SECRET")

#Checking the DWH parameters
DWH_CLUSTER_TYPE = config.get("DWH", "DWH_CLUSTER_TYPE")
DWH_NUM_NODES = config.get("DWH", "DWH_NUM_NODES")
DWH_NODE_TYPE = config.get("DWH", "DWH_NODE_TYPE")
DWH_CLUSTER_IDENTIFIER = config.get("DWH", "DWH_CLUSTER_IDENTIFIER")
DWH_DB = config.get("DWH", "DWH_DB")
DWH_DB_USER = config.get("DWH", "DWH_DB_USER")
DWH_DB_PASSWORD = config.get("DWH", "DWH_DB_PASSWORD")
DWH_PORT = config.get("DWH", "DWH_PORT")
DWH_IAM_ROLE_NAME = config.get("DWH", "DWH_IAM_ROLE_NAME")

In [185]:
#Checking the DWH parameters and summarizing them in a dataframe
pd.DataFrame(
{
    "Parameter": ["DWH_CLUSTER_TYPE", "DWH_NUM_NODES",
                  "DWH_NODE_TYPE", "DWH_CLUSTER_IDENTIFIER",
                  "DWH_DB", "DWH_DB_USER", "DWH_DB_PASSWORD", 
                  "DWH_PORT", "DWH_IAM_ROLE_NAME"],
    "Value": [DWH_CLUSTER_TYPE, DWH_NUM_NODES, 
              DWH_NODE_TYPE, DWH_CLUSTER_IDENTIFIER, 
              DWH_DB, DWH_DB_USER, DWH_DB_PASSWORD, 
              DWH_PORT,DWH_IAM_ROLE_NAME]
}
)

Unnamed: 0,Parameter,Value
0,DWH_CLUSTER_TYPE,single-node
1,DWH_NUM_NODES,1
2,DWH_NODE_TYPE,dc2.large
3,DWH_CLUSTER_IDENTIFIER,myredshift-cluster-1
4,DWH_DB,myfirstdb
5,DWH_DB_USER,awsuser
6,DWH_DB_PASSWORD,Aw$u$er2023
7,DWH_PORT,5439
8,DWH_IAM_ROLE_NAME,redshift-s3-access


In [186]:
#creating the object for ec2 
#The EC2 will be used to attach the VPC and the security group to the cluster
ec2 = boto3.resource('ec2',
                    region_name = "us-east-1",
                     aws_access_key_id = KEY,
                     aws_secret_access_key = SECRET
                    )

#creating the object for s3
s3 = boto3.resource('s3',
                    region_name = "us-east-1",
                     aws_access_key_id = KEY,
                     aws_secret_access_key = SECRET
                    )

#creating the object for iam
iam= boto3.client('iam',
                    region_name = "us-east-1",
                     aws_access_key_id = KEY,
                     aws_secret_access_key = SECRET
                    )

#creating the object for redshift
redshift= boto3.client('redshift',
                    region_name = "us-east-1",
                     aws_access_key_id = KEY,
                     aws_secret_access_key = SECRET
                    )

In [187]:
#checking the files in the s3 bucket
mybucket = s3.Bucket("collen-bucket1")
my_data_files = [filename.key for filename in mybucket.objects.filter(Prefix = '')] #empty prefix returns all the files, it does not filter really
my_data_files # the displayed files are just objects in the bucket

['2022Capture.JPG',
 'allevents_pipe.txt',
 'allusers_pipe.txt',
 'bucketfile.txt',
 'category_pipe.txt',
 'date2008_pipe.txt',
 'listings_pipe.txt',
 'sales_tab.txt',
 'venue_pipe.txt']

In [205]:
#Checking the roleArn
roleArn = iam.get_role(RoleName = DWH_IAM_ROLE_NAME)['Role']['Arn']
roleArn

'arn:aws:iam::934595983067:role/redshift-s3-access'

In [189]:
try:
    response = redshift.create_cluster(
    ClusterType=DWH_CLUSTER_TYPE,
    NodeType = DWH_NODE_TYPE,
    
    #Identifier & Credentials
    DBName = DWH_DB,
    ClusterIdentifier = DWH_CLUSTER_IDENTIFIER,
    MasterUsername = DWH_DB_USER,
    MasterUserPassword = DWH_DB_PASSWORD,
        
    #Roles for S3 Access
    IamRoles = [roleArn]
    )
except Exception as e:
    print(e)

An error occurred (ClusterAlreadyExists) when calling the CreateCluster operation: Cluster already exists


In [190]:
DWH_DB

'myfirstdb'

In [204]:
redshift.describe_clusters(ClusterIdentifier = DWH_CLUSTER_IDENTIFIER)["Clusters"][0]

{'ClusterIdentifier': 'myredshift-cluster-1',
 'NodeType': 'dc2.large',
 'ClusterStatus': 'available',
 'ClusterAvailabilityStatus': 'Available',
 'MasterUsername': 'awsuser',
 'DBName': 'dev',
 'Endpoint': {'Address': 'myredshift-cluster-1.cxbltfig3ytx.us-east-1.redshift.amazonaws.com',
  'Port': 5439},
 'ClusterCreateTime': datetime.datetime(2023, 1, 23, 15, 32, 22, 469000, tzinfo=tzutc()),
 'AutomatedSnapshotRetentionPeriod': 1,
 'ManualSnapshotRetentionPeriod': -1,
 'ClusterSecurityGroups': [],
 'VpcSecurityGroups': [{'VpcSecurityGroupId': 'sg-0ea156f9dbea8a67b',
   'Status': 'active'}],
 'ClusterParameterGroups': [{'ParameterGroupName': 'default.redshift-1.0',
   'ParameterApplyStatus': 'in-sync'}],
 'ClusterSubnetGroupName': 'default',
 'VpcId': 'vpc-08965f21d85699538',
 'AvailabilityZone': 'us-east-1e',
 'PreferredMaintenanceWindow': 'thu:04:30-thu:05:00',
 'PendingModifiedValues': {},
 'ClusterVersion': '1.0',
 'AllowVersionUpgrade': True,
 'NumberOfNodes': 1,
 'PubliclyAccessi

In [203]:
def smartRSProps(properties):
    pd.set_option('display.max_colwidth', -1)
    keysToShow = ['ClusterIdentifier',  'NodeType',
  'ClusterStatus',  'ClusterAvailabilityStatus',
  'MasterUsername',  'IamRoles','DBName',
  'Endpoint',  'VpcId']
    x = [(k,v) for k,v in properties.items()  if k in keysToShow]
    return pd.DataFrame(data = x, columns = ["Key", "Value"])

myClusterProperties = redshift.describe_clusters(ClusterIdentifier = DWH_CLUSTER_IDENTIFIER)["Clusters"][0]
smartRSProps(myClusterProperties)

  pd.set_option('display.max_colwidth', -1)


Unnamed: 0,Key,Value
0,ClusterIdentifier,myredshift-cluster-1
1,NodeType,dc2.large
2,ClusterStatus,available
3,ClusterAvailabilityStatus,Available
4,MasterUsername,awsuser
5,DBName,dev
6,Endpoint,"{'Address': 'myredshift-cluster-1.cxbltfig3ytx.us-east-1.redshift.amazonaws.com', 'Port': 5439}"
7,VpcId,vpc-08965f21d85699538
8,IamRoles,[]


In [194]:
DWH_ENDPOINT = myClusterProperties['Endpoint']['Address']
DWH_ROLE_ARN = myClusterProperties['IamRoles']
#DWH_ROLE_ARN = myClusterProperties['IamRoles'][0]['IamRoleArn']
DB_NAME = myClusterProperties['DBName']
DB_USER = myClusterProperties['MasterUsername']

In [195]:
try:
    vpc = ec2.Vpc(id =myClusterProperties['VpcId'])
    defaultSg = list(vpc.security_groups.all())[0]
    print(defaultSg)
    
    defaultSg.authorize_ingress(
        GroupName = defaultSg.group_name,
        CidrIp = '0.0.0.0/0',
        IpProtocol = 'TCP',
        FromPort = int(DWH_PORT),
        ToPort = int(DWH_PORT)
    )
except Exception as e:
    print(e)

ec2.SecurityGroup(id='sg-0ea156f9dbea8a67b')
An error occurred (InvalidPermission.Duplicate) when calling the AuthorizeSecurityGroupIngress operation: the specified rule "peer: 0.0.0.0/0, TCP, from port: 5439, to port: 5439, ALLOW" already exists


In [196]:
DB_USER

'awsuser'

In [197]:
#Connecting the Redshift cluster using Psycopg2 library
try:
    conn = psycopg2.connect(host = DWH_ENDPOINT, dbname = DB_NAME, user = DB_USER, password = "Aw$u$er2023", port = 5439)
except psycopg2.Error as e:
    print("Error: Failed to connect to the Database")
    print(e)


conn.set_session(autocommit = True)

#### Create the data model and then create the tables in Redshift

In [198]:
#Executing a query on the database
try:
    cur = conn.cursor()
except psycopg2.Error as e:
    print("Error: Could not get the cursor to the Database")
    print(e)

In [199]:
try:
    cur.execute("""create table category(
    catid integer not null,
    catgroup varchar(10),
    catname varchar(10),
    catdesc varchar(50)
     );  """)
except psycopg2.Error as e:
    print("Error: Failed to create the table")
    print(e)

Error: Failed to create the table
Relation "category" already exists



In [200]:
try:
    cur.execute("""create table date(
    dateid smallint not null distkey sortkey,
    caldate date not null,
    day character(3) not null,
    week smallint not null,
    month character(5) not null,
    qtr character(5) not null,
    year smallint not null,
    holiday boolean default('N')); """)
except psycopg2.Error as e:
    print("Error: Failed to create the table")
    print(e)

Error: Failed to create the table
Relation "date" already exists



In [201]:
try:
    cur.execute("""create table event(
    eventid integer not null distkey,
    venueid smallint not null,
    catid smallint not null,
    dateid smallint not null sortkey, 
    eventname varchar(200),
    starttime timestamp);""")
except psycopg2.Error as e:
    print("Error: Failed to create the table")
    print(e)

Error: Failed to create the table
Relation "event" already exists



In [202]:
try:
    cur.execute("""create table listing(
    listid integer not null distkey,
    sellerid integer not null,
    eventid integer not null,
    dateid smallint not null sortkey,
    numtickets smallint not null,
    priceperticket decimal (8,2),
    totalprice  decimal (8,2),
    listtime timestamp);""")
except psycopg2.Error as e:
    print("Error: Failed to create the table")
    print(e)

Error: Failed to create the table
Relation "listing" already exists



In [159]:
try:
    cur.execute("""create table sales(
    salesid integer not null distkey,
    listid integer not null,
    sellerid integer not null,
    buyerid integer not null,
    eventid integer not null,
    dateid smallint not null sortkey,
    qtysold smallint,
    pricepaid decimal (8,2),
    commission decimal (8,2),
    saletime timestamp);""")
except psycopg2.Error as e:
    print("Error: Failed to create the table")
    print(e)

Error: Failed to create the table
Relation "sales" already exists



In [141]:
try:
    cur.execute("""create table users(
    userid integer not null distkey sortkey,
    username char(8),
    firstname varchar(30),
    lastname varchar(30),
    city varchar(30),
    state char(2),
    email varchar(100),
    phone char(14),
    likesports boolean,
    liketheatre boolean,
    likeconcerts boolean,
    likejazz boolean,
    likeclassical boolean,
    likeopera boolean,
    likerock boolean,
    likevegas boolean,
    likebroadway boolean,
    likemusicals boolean
    );  """)
except psycopg2.Error as e:
    print("Error: Failed to create the table")
    print(e)

Error: Failed to create the table
Relation "users" already exists



In [160]:
try:
    cur.execute("""create table venue(
    venueid smallint not null distkey sortkey,
    venuename varchar(100),
    venuecity varchar(30),
    venuestate char(2),
    venueseats integer); """)
except psycopg2.Error as e:
    print("Error: Failed to create the table")
    print(e)

Error: Failed to create the table
Relation "venue" already exists



#### Copying data from the S3 bucket to Redshift

In [168]:
#copying and loading the table
try:
    cur.execute("""
    copy category from 's3://collen-bucket1/category_pipe.txt'
    credentials 'aws_iam_role=arn:aws:iam::934595983067:role/redshift-s3-access'
    delimiter '|'
    region 'us-east-1'
    """)
except psycopg2.Error as e:
    print("Error: Failed to copy the table")
    print(e)

Error: Failed to copy the table
exception name : UnauthorizedException, error type : 135, message: The requested role arn:aws:iam::934595983067:role/redshift-s3-access is not associated to cluster, should retry : 0
DETAIL:  
  -----------------------------------------------
  error:  exception name : UnauthorizedException, error type : 135, message: The requested role arn:aws:iam::934595983067:role/redshift-s3-access is not associated to cluster, should retry : 0
  code:      30000
  context:   
  query:     202945
  location:  xen_aws_credentials_mgr.cpp:411
  process:   padbmaster [pid=1331]
  -----------------------------------------------




In [169]:
#checking the tables and data 
try:
    cur.execute("""
   select * from category;
    """)
except psycopg2.Error as e:
    print("Error: Failed to copy the table")
    print(e)

In [None]:
#copying and loading the Date table
try:
    cur.execute("""
    copy date from 's3://collen-bucket1/date2008_pipe.txt'
    credentials 'aws_iam_role=arn:aws:iam::934595983067:role/redshift-s3-access'
    delimiter '|'
    region 'us-east-1'
    """)
except psycopg2.Error as e:
    print("Error: Failed to copy the table")
    print(e)

In [None]:
#copying and loading the Event table
try:
    cur.execute("""
    copy event from 's3://collen-bucket1/allevents_pipe.txt'
    credentials 'aws_iam_role=arn:aws:iam::934595983067:role/redshift-s3-access'
    delimiter '|'
    region 'us-east-1'
    """)
except psycopg2.Error as e:
    print("Error: Failed to copy the table")
    print(e)

In [None]:
#copying and loading the Listings table
try:
    cur.execute("""
    copy event from 's3://collen-bucket1/listings_pipe.txt'
    credentials 'aws_iam_role=arn:aws:iam::934595983067:role/redshift-s3-access'
    delimiter '|'
    region 'us-east-1'
    """)
except psycopg2.Error as e:
    print("Error: Failed to copy the table")
    print(e)

In [170]:
#copying and loading the Sales table
try:
    cur.execute("""
    copy event from 's3://collen-bucket1/sales_tab.txt'
    credentials 'aws_iam_role=arn:aws:iam::934595983067:role/redshift-s3-access'
    delimiter '|'
    region 'us-east-1'
    """)
except psycopg2.Error as e:
    print("Error: Failed to copy the table")
    print(e)

Error: Failed to copy the table
exception name : UnauthorizedException, error type : 135, message: The requested role arn:aws:iam::934595983067:role/redshift-s3-access is not associated to cluster, should retry : 0
DETAIL:  
  -----------------------------------------------
  error:  exception name : UnauthorizedException, error type : 135, message: The requested role arn:aws:iam::934595983067:role/redshift-s3-access is not associated to cluster, should retry : 0
  code:      30000
  context:   
  query:     203099
  location:  xen_aws_credentials_mgr.cpp:411
  process:   padbmaster [pid=1331]
  -----------------------------------------------




In [179]:
#checking the tables and data 
try:
    cur.execute("""
   select * from category;
    """)
except psycopg2.Error as e:
    print("Error: Failed to copy the table")
    print(e)

In [180]:
row = cur.fetchone()
while row:
    print(row)
    row = cur.fetchone()
    break

(2, 'Sports', 'NHL', 'National Hockey League')


In [206]:
#closing the connections
try:
   conn.close()
except psycopg2.Error as e:
    print("Error: Failed to close connection")
    print(e)

In [215]:
#deleting the cluster
redshift.delete_cluster(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER, SkipFinalClusterSnapshot = True)

{'Cluster': {'ClusterIdentifier': 'myredshift-cluster-1',
  'NodeType': 'dc2.large',
  'ClusterStatus': 'deleting',
  'ClusterAvailabilityStatus': 'Modifying',
  'MasterUsername': 'awsuser',
  'DBName': 'dev',
  'Endpoint': {'Address': 'myredshift-cluster-1.cxbltfig3ytx.us-east-1.redshift.amazonaws.com',
   'Port': 5439},
  'ClusterCreateTime': datetime.datetime(2023, 1, 23, 15, 32, 22, 469000, tzinfo=tzutc()),
  'AutomatedSnapshotRetentionPeriod': 1,
  'ManualSnapshotRetentionPeriod': -1,
  'ClusterSecurityGroups': [],
  'VpcSecurityGroups': [{'VpcSecurityGroupId': 'sg-0ea156f9dbea8a67b',
    'Status': 'active'}],
  'ClusterParameterGroups': [{'ParameterGroupName': 'default.redshift-1.0',
    'ParameterApplyStatus': 'in-sync'}],
  'ClusterSubnetGroupName': 'default',
  'VpcId': 'vpc-08965f21d85699538',
  'AvailabilityZone': 'us-east-1e',
  'PreferredMaintenanceWindow': 'thu:04:30-thu:05:00',
  'PendingModifiedValues': {},
  'ClusterVersion': '1.0',
  'AllowVersionUpgrade': True,
  'Nu