# Import Library

In [15]:
import boto3
import pandas as pd
import psycopg2 # for postgres
import json
import configparser # to read .config file

In [16]:
config = configparser.ConfigParser()
config.read_file(open('cluster.config'))

In [17]:
# Try to read key
config.get('AWS', 'KEY')

'AKIAURWGSMJGKNIZB4UV'

In [18]:
# Define config variable from config file
KEY = config.get('AWS', 'KEY')
SECRET = config.get('AWS', 'SECRET')

DWH_CLUSTER_TYPE = config.get('DWH', 'DWH_CLUSTER_TYPE')
DWH_NUM_NODES = config.get('DWH', 'DWH_NUM_NODES')
DWH_NODE_TYPE = config.get('DWH', 'DWH_NODE_TYPE')

DWH_CLUSTER_IDENTIFIER = config.get('DWH', 'DWH_CLUSTER_IDENTIFIER')
DWH_DB = config.get('DWH', 'DWH_DB')
DWH_DB_USER = config.get('DWH', 'DWH_DB_USER')
DWH_DB_PASSWORD = config.get('DWH', 'DWH_DB_PASSWORD')
DWH_PORT = config.get('DWH', 'DWH_PORT')

DWH_IAM_ROLE_NAME = config.get('DWH', 'DWH_IAM_ROLE_NAME')

(DWH_DB_USER, DWH_DB_PASSWORD, DWH_DB)

('awsuser', 'Passw0rd', 'myfirstdb')

In [20]:
pd.DataFrame({
    "Param": 
        ['DWH_CLUSTER_TYPE', 'DWH_NUM_NODES', 'DWH_NODE_TYPE', 'DWH_CLUSTER_IDENTIFIER', 'DWH_DB', 'DWH_DB_USER', 'DWH_DB_PASSWORD', 'DWH_PORT', 'DWH_IAM_ROLE_NAME'],
    "Value": 
        [DWH_CLUSTER_TYPE, DWH_NUM_NODES, DWH_NODE_TYPE, DWH_CLUSTER_IDENTIFIER, DWH_DB, DWH_DB_USER, DWH_DB_PASSWORD, DWH_PORT, DWH_IAM_ROLE_NAME]
})

Unnamed: 0,Param,Value
0,DWH_CLUSTER_TYPE,single-node
1,DWH_NUM_NODES,1
2,DWH_NODE_TYPE,dc2.large
3,DWH_CLUSTER_IDENTIFIER,try-naufal-redshift
4,DWH_DB,myfirstdb
5,DWH_DB_USER,awsuser
6,DWH_DB_PASSWORD,Passw0rd
7,DWH_PORT,5439
8,DWH_IAM_ROLE_NAME,redshift-s3-access


# Connect to  Required Amazon Services

In [21]:
# Connect to AWS EC2
ec2 = boto3.resource('ec2',
                     region_name='ap-southeast-3',
                     aws_access_key_id=KEY,
                     aws_secret_access_key=SECRET
                       )

In [22]:
# Connect to other resources
# Connect to AWS S3
s3 = boto3.resource('s3',
                     region_name='ap-southeast-3',
                     aws_access_key_id=KEY,
                     aws_secret_access_key=SECRET
                       )
# Connect to AWS IAM
iam = boto3.client('iam',
                     region_name='ap-southeast-3',
                     aws_access_key_id=KEY,
                     aws_secret_access_key=SECRET
                       )
# Connect to AWS Redshift
redshift = boto3.client('redshift',
                     region_name='ap-southeast-3',
                     aws_access_key_id=KEY,
                     aws_secret_access_key=SECRET
                       )

In [23]:
# Try access bucket resources
bucket = s3.Bucket("naufal-redshift-trial-bucket")
log_data_files = [filename.key for filename in bucket.objects.filter(Prefix='')]
log_data_files

['allevents_pipe.txt',
 'allusers_pipe.txt',
 'category_pipe.txt',
 'date2008_pipe.txt',
 'listings_pipe.txt',
 'sales_tab.txt',
 'venue_pipe.txt']

In [24]:
# Try access bucket resources but with filter
bucket = s3.Bucket("naufal-redshift-trial-bucket")
log_data_files = [filename.key for filename in bucket.objects.filter(Prefix='all')]
log_data_files

['allevents_pipe.txt', 'allusers_pipe.txt']

# Attach iam permission to our cluster for security reason 

In [33]:
roleArn = iam.get_role(RoleName=DWH_IAM_ROLE_NAME)['Role']['Arn']

In [37]:
# This is basically the url role of the user
roleArn 

'arn:aws:iam::312875311692:role/redshift-s3-access'

# Create Redshift Cluster

In [36]:
# For more info/other functions open Redshift sdk python docs
try:
    response = redshift.create_cluster(
        ClusterType=DWH_CLUSTER_TYPE,
        NodeType=DWH_NODE_TYPE,
        
        # Identifier and credentials
        DBName=DWH_DB,
        ClusterIdentifier=DWH_CLUSTER_IDENTIFIER,
        MasterUsername=DWH_DB_USER,
        MasterUserPassword=DWH_DB_PASSWORD,
        
        # Roles
        IamRoles=[roleArn]
    )
except Exception as e:
    print(e)

In [40]:
rs_info = redshift.describe_clusters(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)
rs_info

{'Clusters': [{'ClusterIdentifier': 'try-naufal-redshift',
   'NodeType': 'dc2.large',
   'ClusterStatus': 'creating',
   'ClusterAvailabilityStatus': 'Modifying',
   'MasterUsername': 'awsuser',
   'DBName': 'myfirstdb',
   'AutomatedSnapshotRetentionPeriod': 1,
   'ManualSnapshotRetentionPeriod': -1,
   'ClusterSecurityGroups': [],
   'VpcSecurityGroups': [{'VpcSecurityGroupId': 'sg-01ed892c7b5358b3c',
     'Status': 'active'}],
   'ClusterParameterGroups': [{'ParameterGroupName': 'default.redshift-1.0',
     'ParameterApplyStatus': 'in-sync'}],
   'ClusterSubnetGroupName': 'default',
   'VpcId': 'vpc-027a391276a2484a3',
   'AvailabilityZone': 'ap-southeast-3b',
   'PreferredMaintenanceWindow': 'sun:19:30-sun:20:00',
   'PendingModifiedValues': {'MasterUserPassword': '****'},
   'ClusterVersion': '1.0',
   'AllowVersionUpgrade': True,
   'NumberOfNodes': 1,
   'PubliclyAccessible': True,
   'Encrypted': False,
   'ClusterPublicKey': 'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQDasyOjXktntj

In [25]:
# Get only cluster info
def prettyRedshiftProps(props):
    pd.set_option('display.max_colwidth', -1)
    keysToShow = ['ClusterIdentifier', 'NodeType', 'ClusterStatus', 'MasterUsername', 'DBName', 'Endpoint', 'VpcId']
    x = [(k, v) for k, v in props.items() if k in keysToShow]
    return pd.DataFrame(data=x, columns=['Key', 'Value'])

myClusterProps = redshift.describe_clusters(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)['Clusters'][0]
prettyRedshiftProps(myClusterProps)

  pd.set_option('display.max_colwidth', -1)


Unnamed: 0,Key,Value
0,ClusterIdentifier,try-naufal-redshift
1,NodeType,dc2.large
2,ClusterStatus,available
3,MasterUsername,awsuser
4,DBName,myfirstdb
5,Endpoint,"{'Address': 'try-naufal-redshift.crzycxx5xuqb.ap-southeast-3.redshift.amazonaws.com', 'Port': 5439}"
6,VpcId,vpc-027a391276a2484a3


In [26]:
DWH_ENDPOINT = myClusterProps['Endpoint']['Address']
DWH_ROLE_ARN = myClusterProps['IamRoles'][0]['IamRoleArn']
DB_NAME = myClusterProps['DBName']
DB_USER = myClusterProps['MasterUsername']

In [27]:
DB_NAME

'myfirstdb'

## Create VPC Security Group

In [48]:
try:
    vpc = ec2.Vpc(id=myClusterProps['VpcId'])
    defaultSg = list(vpc.security_groups.all())[0]
    print(defaultSg)
    
    defaultSg.authorize_ingress(
        GroupName=defaultSg.group_name,
        CidrIp='0.0.0.0/0',
        IpProtocol='TCP',
        FromPort=int(DWH_PORT),
        ToPort=int(DWH_PORT)
    )
except Exception as e:
    print(e)

ec2.SecurityGroup(id='sg-01ed892c7b5358b3c')


# Connect Python with AWS Postgres DB

In [30]:
try:
    conn = psycopg2.connect(host=DWH_ENDPOINT, dbname=DB_NAME, user=DB_USER, password='Passw0rd', port=5439)
except Exception as e:
    print("Error: Could not make connection to the Postgres database")
    print(e)

In [31]:
conn.set_session(autocommit=True)  

In [34]:
# Cursor is used to connect to AWS redshift
try:
    cur = conn.cursor()
except Exception as e:
    print("Error: Could not get cursor to Database")
    print(e)

# Create Table for DWH Based on Data's on Bucket alluser

In [36]:
try:
    cur.execute("""CREATE TABLE users(
    userid integer not null distkey sortkey,
    username char(8),
    firstname varchar(30),
    lastname varchar(30),
    city varchar(30),
    state char(2),
    email varchar(100),
    phone char(14),
    likesports boolean,
    liketheatre boolean,
    likeconcerts boolean,
    likejazz boolean,
    likeclassical boolean,
    likeopera boolean,
    likerock boolean,
    likevegas boolean,
    likebroadway boolean,
    likemusicals boolean);""")
except psycopg2.Error as e:
    print("Error: Issue creating table")
    print(e)

In [37]:
try:
    cur.execute("""CREATE TABLE venue(
    venueid smallint not null distkey sortkey,
    venuename varchar(100),
    venuecity varchar(30),
    venuestate char(2),
    venueseats integer);""")
except psycopg2.Error as e:
    print("Error: Issue creating table")
    print(e)

In [40]:
try:
    cur.execute("""CREATE TABLE category(
    catid smallint not null distkey sortkey,
    catgroup varchar(10),
    catname varchar(10),
    catdesc varchar(50));
    
    CREATE TABLE date(
    dateid smallint not null distkey sortkey,
    caldate date not null,
    day character(3) not null,
    week smallint not null,
    month character(5) not null,
    qtr character(5) not null,
    year smallint not null,
    holiday boolean default('N'));
    
    CREATE TABLE event(
    eventid integer not null distkey,
    venueid smallint not null,
    catid smallint not null,
    dateid smallint not null sortkey,
    eventname varchar(200),
    starttime timestamp);
    
    CREATE TABLE listing(
    listid integer not null distkey,
    sellerid integer not null,
    eventid integer not null,
    dateid smallint not null sortkey,
    numtickets smallint not null,
    priceperticket decimal(8,2),
    listtime timestamp);
    
    """)
except pyscopg2.Error as e:
    print("Error: Issue creating table")
    print(e)


# Copy data From AWS S3 into Redshift

In [42]:
# credentials are copied from aws iam redshift-s3-access arn
try:
    cur.execute("""
    copy users from 's3://naufal-redshift-trial-bucket/allusers_pipe.txt'
    credentials 'aws_iam_role=arn:aws:iam::312875311692:role/redshift-s3-access'
    delimiter '|'
    region 'ap-southeast-3'

    """)
except Exception as e:
    print("Error: Issue copying data")
    print(e)

In [54]:
# credentials are copied from aws iam redshift-s3-access arn
try:
    cur.execute("""
    SELECT * FROM users;
    """)
except Exception as e:
    print("Error: Connecting to database")
    print(e)

In [56]:
# Read rows of queried data
row = cur.fetchone()
while row:
    print(row)
    row = cur.fetchone()

IOPub data rate exceeded.
The notebook server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variable
`--NotebookApp.iopub_data_rate_limit`.

Current values:
NotebookApp.iopub_data_rate_limit=1000000.0 (bytes/sec)
NotebookApp.rate_limit_window=3.0 (secs)




(44558, 'ICB17UXP', 'Mia', 'Suarez', 'Ketchikan', 'SK', 'fames@in.edu', '(311) 488-0150', None, None, False, False, False, None, None, None, None, None)
(44559, 'DDJ59GOC', 'Roary', 'Mcmillan', 'Gastonia', 'WI', 'amet.orci.Ut@turpisNullaaliquet.ca', '(571) 605-0668', False, False, True, False, False, False, None, True, None, None)
(44560, 'CXF69SUR', 'Hiram', 'Norman', 'Grambling', 'NL', 'arcu.Vestibulum@dictumsapienAenean.com', '(940) 256-0286', None, None, None, False, False, False, None, None, False, None)
(44563, 'SVM65BHS', 'Vladimir', 'Whitehead', 'Port St. Lucie', 'AZ', 'quis.turpis@vulputatelacus.com', '(941) 214-7050', None, None, None, True, None, False, True, False, False, None)
(44564, 'INK46XES', 'Joan', 'Copeland', 'Wahoo', 'WV', 'volutpat.Nulla.dignissim@tristiqueneque.edu', '(857) 904-8132', None, True, False, False, None, None, None, None, True, True)
(44568, 'OGP29NZA', 'Ariel', 'Ward', 'Boston', 'PE', 'Nullam.velit@Nullamvelit.com', '(381) 485-1837', False, False, T

In [55]:
print(row)

None


In [59]:
redshift.delete_cluster(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER, SkipFinalClusterSnapshot=True)

{'Cluster': {'ClusterIdentifier': 'try-naufal-redshift',
  'NodeType': 'dc2.large',
  'ClusterStatus': 'deleting',
  'ClusterAvailabilityStatus': 'Modifying',
  'MasterUsername': 'awsuser',
  'DBName': 'myfirstdb',
  'Endpoint': {'Address': 'try-naufal-redshift.crzycxx5xuqb.ap-southeast-3.redshift.amazonaws.com',
   'Port': 5439},
  'ClusterCreateTime': datetime.datetime(2022, 11, 26, 14, 15, 35, 381000, tzinfo=tzutc()),
  'AutomatedSnapshotRetentionPeriod': 1,
  'ManualSnapshotRetentionPeriod': -1,
  'ClusterSecurityGroups': [],
  'VpcSecurityGroups': [{'VpcSecurityGroupId': 'sg-01ed892c7b5358b3c',
    'Status': 'active'}],
  'ClusterParameterGroups': [{'ParameterGroupName': 'default.redshift-1.0',
    'ParameterApplyStatus': 'in-sync'}],
  'ClusterSubnetGroupName': 'default',
  'VpcId': 'vpc-027a391276a2484a3',
  'AvailabilityZone': 'ap-southeast-3b',
  'PreferredMaintenanceWindow': 'sun:19:30-sun:20:00',
  'PendingModifiedValues': {},
  'ClusterVersion': '1.0',
  'AllowVersionUpgrad