# DATA WAREHOUSING PET PROJECT 

Infrastructure - AWS S3, AWS RedShift

In [3]:
#Import necessary libraries

import boto3
import configparser
import json
import pandas as pd
import psycopg2

### CONFIGURATION OF AWS SERVICES

The first part of this pet project invloves configuring AWS Services - S3, IAM, EC2 & REDSHIFT

In [4]:
#Parsing the configuration file for the AWS service

config = configparser.ConfigParser()
config.read_file(open('cluster.config'))

In [None]:
config.get("AWS", "KEY")

In [None]:
#Retrieving configuration details into variables

KEY = config.get('AWS', 'KEY')
SECRET = config.get('AWS', 'SECRET')

DWH_CLUSTER_TYPE = config.get("DWH", "DWH_CLUSTER_TYPE")
DWH_NUM_MODES = config.get("DWH", "DWH_NUM_NODES")
DWH_NODE_TYPE = config.get("DWH", "DWH_NODE_TYPE")

DWH_CLUSTER_IDENTIFIER = config.get("DWH", "DWH_CLUSTER_IDENTIFIER")
DWH_DB = config.get("DWH", "DWH_DB")
DWH_DB_USER = config.get("DWH", "DWH_DB_USER")
DWH_DB_PASSWORD = config.get("DWH", "DWH_DB_PASSWORD")
DWH_PORT = config.get("DWH", "DWH_PORT")

DWH_IAM_ROLE_NAME = config.get("DWH", "DWH_IAM_ROLE_NAME")

(DWH_DB_USER, DWH_DB_PASSWORD, DWH_DB)

### A sanity check to make sure Config Parameters and Values are accurate

In [None]:
pd.DataFrame({"Param":
                  ["DWH_CLUSTER_TYPE", "DWH_NUM_MODES", "DWH_NODE_TYPE", "DWH_CLUSTER_IDENTIFIER", "DWH_DB", "DWH_DB_USER", "DWH_DB_PASSWORD", "DWH_PORT", "DWH_IAM_ROLE_NAME"], 
              "Value":
                  [DWH_CLUSTER_TYPE, DWH_NUM_MODES, DWH_NODE_TYPE, DWH_CLUSTER_IDENTIFIER, DWH_DB, DWH_DB_USER, DWH_DB_PASSWORD, DWH_PORT, DWH_IAM_ROLE_NAME]
             })

### INSTANTIATING AWS SERVICES

The services are instantiated here to be used as objects

In [8]:
ec2 = boto3.resource('ec2',
                        region_name="eu-west-3",
                        aws_access_key_id=KEY,
                        aws_secret_access_key=SECRET
                    )

In [9]:
s3 = boto3.resource('s3',
                        region_name="eu-west-3",
                        aws_access_key_id=KEY,
                        aws_secret_access_key=SECRET
                    )

In [10]:
iam = boto3.client('iam',
                        region_name="eu-west-3",
                        aws_access_key_id=KEY,
                        aws_secret_access_key=SECRET
                    )

In [11]:
redshift = boto3.client('redshift',
                        region_name="eu-west-3",
                        aws_access_key_id=KEY,
                        aws_secret_access_key=SECRET
                    )

In [12]:
bucket = s3.Bucket("gregg-test-bucket")
log_data_files = [filename.key for filename in bucket.objects.filter(Prefix='')]
log_data_files

['allevents_pipe.txt',
 'allusers_pipe.txt',
 'category_pipe.txt',
 'date2008_pipe.txt',
 'listings_pipe.txt',
 'sales_tab.txt',
 'venue_pipe.txt']

### A role is assigned to AWS Redshift to be able to retrieve data from S3

In [13]:
roleArn = iam.get_role(RoleName=DWH_IAM_ROLE_NAME)['Role']['Arn']

In [None]:
roleArn

### AWS REDSHIFT CLUSTER CREATION

In [18]:
try:
    response = redshift.create_cluster(
    ClusterType=DWH_CLUSTER_TYPE,
    NodeType=DWH_NODE_TYPE,
        
    #Identiiers & Credentials
    DBName=DWH_DB,
    ClusterIdentifier=DWH_CLUSTER_IDENTIFIER,
    MasterUsername=DWH_DB_USER,
    MasterUserPassword=DWH_DB_PASSWORD,
        
    #Roles (for s3 access)
    IamRoles=[roleArn]
    )
except Exception as e:
    print()




In [None]:
redshift.describe_clusters(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)['Clusters'][0]

In [None]:
#Defining a function to return a pandas DataFrame consisting of cluster properties.

def prettyRedShiftProps(props):
    """
    Function to return a DataFrame of Cluster Properties
    Parameter - Cluster Properties. Data Type: Dictionary
    Return Value - Pandas DataFrame
    """
    
    pd.set_option('display.max_colwidth', -1)
    keysToShow = ["ClusterIdentifier", "NodeType", "ClusterStatus", "MasterUsername", "DBName", "Endpoint", "VpcId"]
    x = [(k,v) for k,v in props.items() if k in keysToShow]
    return pd.DataFrame(data=x, columns=["key", "Value"])

#Getting Argument for function
myClusterProps = redshift.describe_clusters(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)["Clusters"][0]

#Invoking the Function
prettyRedShiftProps(myClusterProps)

In [21]:
DWH_ENDPOINT = myClusterProps['Endpoint']['Address']
DWH_ROLE_ARN = myClusterProps['IamRoles'][0]['IamRoleArn']
DB_NAME = myClusterProps['DBName']
DB_USER = myClusterProps['MasterUsername']

In [None]:
DB_NAME

### SETTING UP VIRTUAL PRIVATE CLOUD 

A VPC is set up around EC2 

In [None]:
try:
    vpc = ec2.Vpc(id=myClusterProps['VpcId'])
    defaultSg = list(vpc.security_groups.all())[0]
    print(defaultSg)
    
    defaultSg.authorize_ingress(
        GroupName=defaultSg.group_name,
        CidrIp = '0.0.0.0/0',
        IpProtocol='TCP',
        FromPort=int(DWH_PORT),
        ToPort=int(DWH_PORT)
    )
except Exception as e:
    print(e)

### CONNECTION TO DATABASE

This part of the pet project focuses on connecting to the Database (Redshift) and creating fact and dimension tables.

In [23]:
try: 
    conn = psycopg2.connect(host=DWH_ENDPOINT, dbname=DB_NAME, user=DB_USER, password='johndoe', port=5439)
except psycopg2.Error as e: 
    print("Error: Could not make connection to the database")
    print(e)
                            
conn.set_session(autocommit=True)

In [24]:
try: 
    cur = conn.cursor()
except psycopg2.Error as e: 
    print("Error: Could not get curser to the Database")
    print(e)

In [25]:
try:
    cur.execute("""create table users(
    userid integer not null distkey sortkey,
    username char(8),
    firstname varchar(30),
    lastname varchar(30),
    city varchar(30),
    state char(2),
    email varchar(100),
    phone char(14),
    likesports boolean,
    liketheatre boolean,
    likeconcerts boolean,
    likejazz boolean,
    likeclassical boolean,
    likeopera boolean,
    likerock boolean,
    likebroadway boolean,
    likemusicals boolean);""")
    
except psycopg2.Error as e:
    print("Error: Issue creating table")
    print(e)

In [27]:
try:
    cur.execute("""create table venue(
    venueid smallint not null distkey sortkey,
    venuename varchar(100),
    venuecity varchar(30),
    venuestate char(2),
    venueseats integer);""")
    
except psycopg2.Error as e:
    print("Error: Issue creating table")
    print(e)

In [29]:
try:
    cur.execute("""create table category(
    catid smallint not null distkey sortkey,
    catgroup varchar(10),
    catname varchar(10),
    catdesc varchar(50));
    
        create table date(
    dateid smallint not null distkey sortkey,
    caldate date not null,
    day character(3) not null,
    week smallint not null,
    month character(5) not null,
    qtr character(5) not null,
    year smallint not null,
    holiday boolean default('N'));
    
        create table event(
    eventid integer not null distkey,
    venueid smallint not null,
    catid smallint not null,
    dateid smallint not null sortkey,
    eventname varchar(200),
    starttime timestamp);
    
        create table listing(
    listid integer not null distkey,
    sellerid integer not null,
    eventid integer not null,
    dateid smallint not null sortkey,
    numtickets smallint not null,
    priceperticket decimal(8,2),
    listtime timestamp);
    
    """)
    
except psycopg2.Error as e:
    print("Error: Issue creating table")
    print(e)

### Data is copied from S3 to Redshift cluster in this section

In [None]:
try:
    cur.execute("""
    copy users from 's3://gregg-test-bucket/allusers_pipe.txt'
    credentials 'aws_iam_role=arn:aws:iam::819056877307:role/redshift-s3-access'
    delimiter '|'
    region 'eu-west-3'
    """)
    
except psycopg2.Error as e:
    print("Error: Issue creating table")
    print(e)  

In [None]:
try:
    cur.execute("""
    copy users from 's3://gregg-test-bucket/category_pipe.txt'
    credentials 'aws_iam_role=arn:aws:iam::819056877307:role/redshift-s3-access'
    delimiter '|'
    region 'eu-west-3'
    """)
    
except psycopg2.Error as e:
    print("Error: Issue creating table")
    print(e)  

In [None]:
try:
    cur.execute("""
    copy users from 's3://gregg-test-bucket/venue_pipe.txt'
    credentials 'aws_iam_role=arn:aws:iam::819056877307:role/redshift-s3-access'
    delimiter '|'
    region 'eu-west-3'
    """)
    
except psycopg2.Error as e:
    print("Error: Issue creating table")
    print(e)  

In [None]:
try:
    cur.execute("""
    copy users from 's3://gregg-test-bucket/date2008_pipe.txt'
    credentials 'aws_iam_role=arn:aws:iam::819056877307:role/redshift-s3-access'
    delimiter '|'
    region 'eu-west-3'
    """)
    
except psycopg2.Error as e:
    print("Error: Issue creating table")
    print(e)  

In [None]:
try:
    cur.execute("""
    copy users from 's3://gregg-test-bucket/allevents_pipe.txt'
    credentials 'aws_iam_role=arn:aws:iam::819056877307:role/redshift-s3-access'
    delimiter '|'
    region 'eu-west-3'
    """)
    
except psycopg2.Error as e:
    print("Error: Issue creating table")
    print(e)  

In [None]:
try:
    cur.execute("""
    copy users from 's3://gregg-test-bucket/listings_pipe.txt'
    credentials 'aws_iam_role=arn:aws:iam::819056877307:role/redshift-s3-access'
    delimiter '|'
    region 'eu-west-3'
    """)
    
except psycopg2.Error as e:
    print("Error: Issue creating table")
    print(e)  

### Sanity check of Database tables

In [None]:
try:
    cur.execute("""
        select * from users;
    """)
except psycopg2.Error as e:
    print("Error: Issue creating table")
    print(e)

In [None]:
row = cur.fetchone()
while row:
    print(row)
    row = cur.fetchone()
    break

### Closing and Deleting the cluster

In [None]:
try:
    conn.close()
except psycopg2.Error as e:
    print(e)

In [None]:
redshift.delete_cluster(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER, SkipFinalClusterSnapshot=True)