In [15]:
"""
Bilding a relational data model
Connect to athena and query data
Build ETL Jobs
Save Results to s3 Buckets
Glue Deployment
Build data models on redshift
Copy data to redshift
"""  

'\nBilding a relational data model\nConnect to athena and query data\nBuild ETL Jobs\nSave Results to s3 Buckets\nGlue Deployment\nBuild data models on redshift\nCopy data to redshift\n'

In [16]:
import boto3
import pandas as pd 
from io import StringIO
import time
import awswrangler as wr  
import redshift_connector
import psycopg2
import json
import sqlalchemy
import configparser


In [17]:
config = configparser.ConfigParser()
config.read_file(open('cluster.config'))
config.get("DWH","DWH_IAM_ROLE_NAME")

'new-s3-redshift'

In [18]:
# AWS Credentials
AWS_Key = config.get('AWS','Key')
AWS_Secret_Key = config.get('AWS','Secret_Key')

# Data WareHouse Info
DWH_CLUSTER_TYPE = config.get("DWH","DWH_CLUSTER_TYPE")
DWH_NUM_NODES = config.get("DWH","DWH_NUM_NODES")
DWH_NODE_TYPE = config.get("DWH","DWH_NODE_TYPE")
DWH_CLUSTER_IDENTIFIER = config.get("DWH","DWH_CLUSTER_IDENTIFIER")
DWH_DB = config.get("DWH","DWH_DB")
DWH_DB_USER = config.get("DWH","DWH_DB_USER")
DWH_DB_PASSWORD = config.get("DWH","DWH_DB_PASSWORD")
DWH_PORT = config.get("DWH","DWH_PORT")
DWH_IAM_ROLE_NAME = config.get("DWH","DWH_IAM_ROLE_NAME")
DWH_END_POINT = config.get("DWH","DWH_END_POINT")

DWH_CLUSTER_IDENTIFIER,DWH_IAM_ROLE_NAME

('youtube-api-2023', 'new-s3-redshift')

In [19]:
# Connecting Redshift,IAM& s3 to AWS
session = boto3.session.Session()

s3_Buckets = boto3.resource('s3',
                            region_name="us-east-1")

IAM = boto3.client('iam',
                   region_name="us-east-1")

ec2 = boto3.client('ec2',
                        region_name = "us-east-1")

Redshift = boto3.client('redshift',
                        region_name = "us-east-1")

In [20]:
# Creating IAM Role for the Redshift Access to the s3 Bucket
RoleARN = IAM.get_role(RoleName = DWH_IAM_ROLE_NAME)['Role']['Arn']
RoleARN

'arn:aws:iam::807724986505:role/new-s3-redshift'

In [21]:
# Creating Redshift Cluster 
try:
    response = Redshift.create_cluster(
    ClusterType = DWH_CLUSTER_TYPE,
    NodeType = DWH_NODE_TYPE,
        
    # Identifiers & Credentials
    DBName = DWH_DB,
    ClusterIdentifier = DWH_CLUSTER_IDENTIFIER,
    MasterUsername = DWH_DB_USER,
    MasterUserPassword = 'Olajide1965',
    
    # Role for s3 
    IamRoles = [RoleARN]
    )
except Exception as e:
    print(e)

An error occurred (ClusterAlreadyExists) when calling the CreateCluster operation: Cluster already exists


In [22]:
# Get valuable infor from Cluster 

def prettyredshift(props):
    pd.set_option('display.max_colwidth',-1)
    keysToShow = ["ClusterIdentifier","NodeType","Endpoint","VpcId","MasterUsername","DBName"] 
    x = [(k,v) for k,v in props.items() if k in keysToShow]
    return pd.DataFrame(data = x, columns =["Key","Value"])

MyClusterProps = Redshift.describe_clusters(ClusterIdentifier = DWH_CLUSTER_IDENTIFIER)['Clusters'][0]
prettyredshift(MyClusterProps) 

  pd.set_option('display.max_colwidth',-1)


Unnamed: 0,Key,Value
0,ClusterIdentifier,youtube-api-2023
1,NodeType,dc2.large
2,MasterUsername,awsuser
3,DBName,flight
4,Endpoint,"{'Address': 'youtube-api-2023.clisu1wzhgfq.us-east-1.redshift.amazonaws.com', 'Port': 5439}"
5,VpcId,vpc-04fe5451242d56083


In [23]:
DWH_ENPOINT = MyClusterProps['Endpoint']['Address']
DWH_ROLE_ARN = MyClusterProps['IamRoles'][0]['IamRoleArn']
DB_NAME = MyClusterProps['DBName']
DB_USER = MyClusterProps['MasterUsername']

DWH_ENPOINT,DB_NAME

('youtube-api-2023.clisu1wzhgfq.us-east-1.redshift.amazonaws.com', 'flight')

In [24]:
try:
    vpc = ec2.Vpc(id = MyClusterProps['VpcId'])
    defaultSg = list(vpc.security_groups.all())[0]
    print(defaultSg)
    
    defaultSg.authorize_ingress(
        GroupName = defaultSg.group_name,
        CidrIp = '0.0.0.0/0',
        IpProtocl ='TCP',
        FromPort = int(DWH_PORT),
        Toport = int(DWH_PORT)
    )
except Exception as e:
    print(e)

'EC2' object has no attribute 'Vpc'


In [25]:
print('Connecting to the AWSRedshift database...')
print('Database Connected!!')
try:
    conn = psycopg2.connect(
                                host ='youtube-api-2023.clisu1wzhgfq.us-east-1.redshift.amazonaws.com',
                                database = "flight",
                                user = "awsuser",
                                password = "Olajide_1965",
                                port = 5439
                                )
    cur = conn.cursor()
    conn.set_session(autocommit=True)
        # Create an empty table
    cur.execute(
        """
    CREATE TABLE "FactCovid" (
    "index" INTEGER,
    "fips" INTEGER,
    "province_state" TEXT,
    "country_region" TEXT,
    "confirmed" REAL,
    "recovered" REAL,
    "active" REAL,
    "state" TEXT,
    "date" TEXT,
    "cases" INTEGER,
    "deaths" INTEGER);
                """
    )

    cur.execute(
        """
    CREATE TABLE "DimRegion" (
    "index" INTEGER,
    "fips" INTEGER,
    "province_state" TEXT,
    "country_region" TEXT,
    "latitude" REAL,
    "longitude" REAL,
    "county" TEXT,
    "state" TEXT);
    """
    )

    cur.execute(
        """
        CREATE TABLE "DimDate" (
    "index" INTEGER,
    "fips" INTEGER,
    "date" TEXT
)
    """   
)


# Copy Data from the transformed s3 Buckets to Redshift 
    # cur.execute("""
    #             COPY DimDate from 's3://analytics-buckets/Output/DimDate.csv'
    #                iam_role 'arn:aws:iam::807724986505:role/new-s3-redshift'
    #                 IGNOREHEADER 1
    #                 csv;
    #                 """)

# cur.execute(""" 
#             select * from DimDate; 
#             """)

    cur.execute("""
                COPY FactCovid from 's3://analytics-buckets/Output/FactCovid.csv'
                    iam_role 'arn:aws:iam::807724986505:role/new-s3-redshift'
                    IGNOREHEADER 1
                    csv;
                    """)
    
    # cur.execute("""
    #             SELECT *  FROM FactCovid;
    #             """)
    
    cur.execute("""
                COPY DimRegion from 's3://analytics-buckets/Output/DimRegion.csv'
                    iam_role 'arn:aws:iam::807724986505:role/new-s3-redshift'
                    IGNOREHEADER 1
                    csv;
                    """)
    # cur.execute("""
    #             SELECT *  FROM DimRegion;
    #             """)
    # cur.execute("""
    #            SELECT *  FROM stl_load_errors where query = '1111';
    #             """)
    conn.commit()
    conn.set_session(autocommit=True)

 # close the communication with the Redshift
    cur.close()
except (Exception, redshift_connector.DatabaseError) as error:
        print(error)
finally:
    if conn is not None:
        conn.close()
        print('Database connection closed.')

Connecting to the AWSRedshift database...
Database Connected!!
Load into table 'dimdate' failed.  Check 'stl_load_errors' system table for details.

Database connection closed.


In [26]:
cur.execute("""
create view loadview as
(select distinct tbl, trim(name) as table_name, query, starttime,
trim(filename) as input, line_number, colname, err_code,
trim(err_reason) as reason
from stl_load_errors sl, stv_tbl_perm sp
where sl.tbl = sp.id);
"""
)

InterfaceError: cursor already closed