In [18]:
import boto3
import psycopg2
import pandas as pd
import json


In [19]:
import configparser
config = configparser.ConfigParser()
config.read_file(open('cluster.config'))

In [20]:
KEY                      =config.get("AWS", "KEY")
SECRET                   =config.get("AWS", "SECRET")

DWH_CLUSTER_TYPE         =config.get("DWH", "DWH_CLUSTER_TYPE")
DWH_NUM_NODES            =config.get("DWH", "DWH_NUM_NODES")
DWH_NODE_TYPE            =config.get("DWH", "DWH_NODE_TYPE")
DWH_CLUSTER_IDENTIFIER   =config.get("DWH", "DWH_CLUSTER_IDENTIFIER")
DWH_DB                   =config.get("DWH", "DWH_DB")
DWH_DB_USER              =config.get("DWH", "DWH_DB_USER")
DWH_DB_PASSWORD          =config.get("DWH", "DWH_DB_PASSWORD")
DWH_PORT                 =config.get("DWH", "DWH_PORT")
DWH_IAM_ROLE_NAME        =config.get("DWH", "DWH_IAM_ROLE_NAME")


('awsuser', 'Passw0rd123', 'myfirstdb')

In [21]:
ec2 = boto3.resource('ec2', region_name ="us-east-1", aws_access_key_id=KEY, aws_secret_access_key=SECRET)

In [23]:
s3 = boto3.resource('s3', region_name ="us-east-1", aws_access_key_id=KEY, aws_secret_access_key=SECRET)
iam = boto3.client('iam', region_name ="us-east-1", aws_access_key_id=KEY, aws_secret_access_key=SECRET)
redshift = boto3.client('redshift', region_name ="us-east-1", aws_access_key_id=KEY, aws_secret_access_key=SECRET)

In [26]:
bucket=s3.Bucket("alexanderjacbucket")
log_data_files = [filename.key for filename in bucket.objects.filter(Prefix='allevents')]
log_data_files


['allevents_pipe.txt']

In [29]:
roleArn = iam.get_role(RoleName=DWH_IAM_ROLE_NAME)['Role']['Arn']


'arn:aws:iam::678659281608:role/redshift-s3-access'

In [30]:
try:
    response = redshift.create_cluster(
    ClusterType=DWH_CLUSTER_TYPE,
    NodeType=DWH_NODE_TYPE,
    
#     Identifiers & Credentials
    DBName=DWH_DB,
    ClusterIdentifier=DWH_CLUSTER_IDENTIFIER,
    MasterUsername=DWH_DB_USER,
    MasterUserPassword=DWH_DB_PASSWORD,
        
#   Roles (for s3 access)
    IamRoles=[roleArn]
    )  
except Exception as e:
    print(e)

In [32]:
redshift.describe_clusters(ClusterIdentifier= DWH_CLUSTER_IDENTIFIER)['Clusters'][0]

{'ClusterIdentifier': 'my-first-redshift',
 'NodeType': 'dc2.large',
 'ClusterStatus': 'creating',
 'ClusterAvailabilityStatus': 'Modifying',
 'MasterUsername': 'awsuser',
 'DBName': 'myfirstdb',
 'AutomatedSnapshotRetentionPeriod': 1,
 'ManualSnapshotRetentionPeriod': -1,
 'ClusterSecurityGroups': [],
 'VpcSecurityGroups': [{'VpcSecurityGroupId': 'sg-0ad5bd314344e9c64',
   'Status': 'active'}],
 'ClusterParameterGroups': [{'ParameterGroupName': 'default.redshift-1.0',
   'ParameterApplyStatus': 'in-sync'}],
 'ClusterSubnetGroupName': 'default',
 'VpcId': 'vpc-09ab21723304dbae8',
 'AvailabilityZone': 'us-east-1e',
 'PreferredMaintenanceWindow': 'fri:03:00-fri:03:30',
 'PendingModifiedValues': {'MasterUserPassword': '****'},
 'ClusterVersion': '1.0',
 'AllowVersionUpgrade': True,
 'NumberOfNodes': 1,
 'PubliclyAccessible': True,
 'Encrypted': False,
 'ClusterPublicKey': 'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQCEMsH6hCeGIZokFliqTbpXKGFS1Tr+6kDTBCmjHJ4HAjb4WMO15UcXxEB6GTWNkI9DXts0sjmbF29ns

In [35]:
def prettyRedshiftProps(props):
    pd.set_option('display.max_colwidth', -1)
    keysToShow = ["ClusterIdentifier", "NodeType", "ClusterStatus", "MasterUsername", "DBName", "Endpoint", 'VpcId']
    x =[(k, v) for k, v in props.items() if k in keysToShow]
    return pd.DataFrame(data=x, columns=["key", "Value"])

myClusterProps = redshift.describe_clusters(ClusterIdentifier= DWH_CLUSTER_IDENTIFIER)['Clusters'][0]
prettyRedshiftProps(myClusterProps)       

  pd.set_option('display.max_colwidth', -1)


Unnamed: 0,key,Value
0,ClusterIdentifier,my-first-redshift
1,NodeType,dc2.large
2,ClusterStatus,available
3,MasterUsername,awsuser
4,DBName,myfirstdb
5,Endpoint,"{'Address': 'my-first-redshift.cymhepowljvv.us-east-1.redshift.amazonaws.com', 'Port': 5439}"
6,VpcId,vpc-09ab21723304dbae8


In [38]:
DWH_ENDPOINT = myClusterProps['Endpoint']['Address']
DWH_ROLE_ARN = myClusterProps['IamRoles'][0]['IamRoleArn']
DB_NAME = myClusterProps['DBName']
DB_USER = myClusterProps['MasterUsername']

In [41]:
try:
    vpc = ec2.VPC(id=myClusterProps['VpcId'])
    defaultSg = list(vpc.security_groups.all())[0]
    print(defaultSg)
    
    defaultSg.authorize_ingress(
        GroupName=defaultSg.group_name,
        CidrIp='0.0.0.0/0',
        IdProtocol='TCP',
        FromPort=int(DWH_PORT),
        ToPort=int(DWH_PORT)
    )
except Exception as e:
    print(e)

'ec2.ServiceResource' object has no attribute 'VPC'


In [47]:
try:
    conn = psycopg2.connect(host=DWH_ENDPOINT, dbname=DB_NAME, user=DB_USER, password="Passw0rd123", port=5439)
except psycopg2.Error as e:
    print("Error: Could not make connection to the Postgres database")
    print(e)
conn.set_session(autocommit=True)

In [48]:
try:
    cur = conn.cursor()
except psycopg2.Error as e:
    print("Error: Could not get curser to the Database")
    print(e)

In [49]:
try:
    cur.execute(
    """create table users(
    userid integer not null distkey sortkey,
    username char(8),
    firstname varchar(30),
    lastname varchar(30),
    city varchar(30),
    state char(2),
    email varchar(100),
    phone char(14),
    likesports boolean,
    liketheatre boolean,
    likeconcerts boolean,
    likejazz boolean,
    likeclassical boolean,
    likeopera boolean,
    likerock boolean,
    likevegas boolean,
    likebroadway boolean,
    likemusicals boolean);"""
    )
except psycopg2.Error as e:
    print("Error: Issue creating table")
    print(e)

In [50]:
try:
    cur.execute("""create table venue(
    venueid smallint not null distkey sortkey,
    venuename varchar(100),
    venuecity varchar(30),
    venuestate char(2),
    venueseats integer);""")
except psycopg2.Error as e:
    print("Error: Issue creating table")
    print(e)

In [51]:
try:
    cur.execute("""create table category(
    catid smallint not null distkey sortkey,
    catgroup varchar(10),
    catname varchar(10),
    catdesc varchar(50));
    
create table date(
    dateid smallint not null distkey sortkey,
    caldate date not null,
    day character(3) not null,
    week smallint not null,
    month character(5) not null,
    qtr character(5) not null,
    year smallint not null,
    holiday boolean default('N'));

create table event(
    eventid integer not null distkey,
    venueid smallint not null,
    catid smallint not null,
    dateid smallint not null sortkey,
    eventname varchar(200),
    starttime timestamp);
    
create table listing(
    listid integer not null distkey,
    sellerid integer not null,
    eventid integer not null,
    dateid smallint not null sortkey,
    numtickets smallint not null,
    pricepertickets decimal(8,2),
    totalprice decimal(8,2),
    listtime timestamp);
    """)
except psycopg2.Error as e:
    print("Error: Issue creating table")
    print(e)

In [52]:
try:
    cur.execute("""
    copy users from 's3://alexanderjacbucket/allusers_pipe.txt'
    credentials 'aws_iam_role=arn:aws:iam::678659281608:role/redshift-s3-access'
    delimiter '|'
    region 'us-east-1'
    """)
except psycopg2.Error as e:
    print("Error: Issue creating table")
    print(e)

In [53]:
try:
    cur.execute("""
    select * from users;
    """)
except psycopg2.Error as e:
    print("Error: Issue creating table")
    print(e)

In [55]:
row = cur.fetchone()
print(row)


(5, 'AEB55QTM', 'Reagan', 'Hodge', 'Forest Lake', 'NS', 'Cum@accumsan.com', '(476) 519-9131', None, None, True, False, None, None, True, True, False, True)


In [57]:
# migrate all other tables from s3 to redshift
try:
    cur.execute("""
    copy event from 's3://alexanderjacbucket/allevents_pipe.txt'
    credentials 'aws_iam_role=arn:aws:iam::678659281608:role/redshift-s3-access'
    delimiter '|'
    region 'us-east-1'
    """)
except psycopg2.Error as e:
    print("Error: Issue creating table")
    print(e)
    
try:
    cur.execute("""
    copy venue from 's3://alexanderjacbucket/venue_pipe.txt'
    credentials 'aws_iam_role=arn:aws:iam::678659281608:role/redshift-s3-access'
    delimiter '|'
    region 'us-east-1'
    """)
except psycopg2.Error as e:
    print("Error: Issue creating table")
    print(e)
    
try:
    cur.execute("""
    copy date from 's3://alexanderjacbucket/date2008_pipe.txt'
    credentials 'aws_iam_role=arn:aws:iam::678659281608:role/redshift-s3-access'
    delimiter '|'
    region 'us-east-1'
    """)
except psycopg2.Error as e:
    print("Error: Issue creating table")
    print(e)
    
try:
    cur.execute("""
    copy category from 's3://alexanderjacbucket/category_pipe.txt'
    credentials 'aws_iam_role=arn:aws:iam::678659281608:role/redshift-s3-access'
    delimiter '|'
    region 'us-east-1'
    """)
except psycopg2.Error as e:
    print("Error: Issue creating table")
    print(e)
    
try:
    cur.execute("""
    copy listing from 's3://alexanderjacbucket/listings_pipe.txt'
    credentials 'aws_iam_role=arn:aws:iam::678659281608:role/redshift-s3-access'
    delimiter '|'
    region 'us-east-1'
    """)
except psycopg2.Error as e:
    print("Error: Issue creating table")
    print(e)

In [58]:
try:
    conn.close()
except psycopg2.Error as e:
    print(e)

In [60]:
redshift.delete_cluster(
    ClusterIdentifier=DWH_CLUSTER_IDENTIFIER,
    SkipFinalClusterSnapshot=True
)

{'Cluster': {'ClusterIdentifier': 'my-first-redshift',
  'NodeType': 'dc2.large',
  'ClusterStatus': 'deleting',
  'ClusterAvailabilityStatus': 'Modifying',
  'MasterUsername': 'awsuser',
  'DBName': 'myfirstdb',
  'Endpoint': {'Address': 'my-first-redshift.cymhepowljvv.us-east-1.redshift.amazonaws.com',
   'Port': 5439},
  'ClusterCreateTime': datetime.datetime(2022, 10, 2, 22, 36, 27, 58000, tzinfo=tzutc()),
  'AutomatedSnapshotRetentionPeriod': 1,
  'ManualSnapshotRetentionPeriod': -1,
  'ClusterSecurityGroups': [],
  'VpcSecurityGroups': [{'VpcSecurityGroupId': 'sg-0ad5bd314344e9c64',
    'Status': 'active'}],
  'ClusterParameterGroups': [{'ParameterGroupName': 'default.redshift-1.0',
    'ParameterApplyStatus': 'in-sync'}],
  'ClusterSubnetGroupName': 'default',
  'VpcId': 'vpc-09ab21723304dbae8',
  'AvailabilityZone': 'us-east-1e',
  'PreferredMaintenanceWindow': 'fri:03:00-fri:03:30',
  'PendingModifiedValues': {},
  'ClusterVersion': '1.0',
  'AllowVersionUpgrade': True,
  'Num