In [1]:
#interact with AWS resources
import boto3
import json
import os
import sys
import time
import logging
import requests
import psycopg2
import pandas as pd 
import configparser
import datetime
import numpy as np

#setup config
config = configparser.ConfigParser()
config.read_file(open('cluster.config'))

print(config)
print(config.get('AWS', 'KEY'))

#bring the configuration from the config file and store in variables 
KEY                    = config.get('AWS', 'KEY')
SECRET                 = config.get('AWS', 'SECRET')
DWH_CLUSTER_IDENTIFIER = config.get('DWH', 'DWH_CLUSTER_IDENTIFIER')
DWH_DB                 = config.get('DWH', 'DWH_DB')
DWH_DB_USER            = config.get('DWH', 'DWH_DB_USER')
DWH_DB_PASSWORD        = config.get('DWH', 'DWH_DB_PASSWORD')
DWH_PORT               = config.get('DWH', 'DWH_PORT')
DWH_IAM_ROLE_NAME       = config.get('DWH', 'DWH_IAM_ROLE_NAME')
DWH_CLUSTER_TYPE       = config.get('DWH', 'DWH_CLUSTER_TYPE')
DWH_NUM_NODES          = config.get('DWH', 'DWH_NUM_NODES')
DWH_NODE_TYPE          = config.get('DWH', 'DWH_NODE_TYPE')

SCHEMA_NAME = 'covid-db'
S3_Staging = 's3://akash-covid-project-1/output'
S3_Bucket = 'akash-covid-project-1'
S3_Region = 'us-east-1'
S3_output_dir = 'output'

#create a df out of the above variables
config_df = pd.DataFrame({'Param':
                   ['DWH_CLUSTER_TYPE', 'DWH_NUM_NODES', 'DWH_NODE_TYPE', 'DWH_CLUSTER_IDENTIFIER', 'DWH_DB', 'DWH_DB_USER', 'DWH_DB_PASSWORD', 'DWH_PORT', 'DWH_IAM_ROLE_NAME'],
                   'Value':
                   [DWH_CLUSTER_TYPE, DWH_NUM_NODES, DWH_NODE_TYPE, DWH_CLUSTER_IDENTIFIER, DWH_DB, DWH_DB_USER, DWH_DB_PASSWORD, DWH_PORT, DWH_IAM_ROLE_NAME]
                  })

print(config_df)

#connect to the EC2 instance
ec2 = boto3.resource('ec2',region_name="us-east-1", aws_access_key_id=KEY, aws_secret_access_key=SECRET)

#connect to the S3 bucket
s3 = boto3.resource('s3', region_name="us-east-1", aws_access_key_id=KEY, aws_secret_access_key=SECRET)

#connect to the Redshift cluster
redshift = boto3.client('redshift', region_name="us-east-1", aws_access_key_id=KEY, aws_secret_access_key=SECRET)

#connect to the IAM role
iam = boto3.client('iam', region_name="us-east-1", aws_access_key_id=KEY, aws_secret_access_key=SECRET)

#connect to Athena 
athena = boto3.client('athena', region_name="us-east-1", aws_access_key_id=KEY, aws_secret_access_key=SECRET)




<configparser.ConfigParser object at 0x1111b98a0>
AKIAQSKCN4UZMDWRMUBF
                    Param                   Value
0        DWH_CLUSTER_TYPE             single-node
1           DWH_NUM_NODES                       1
2           DWH_NODE_TYPE               dc2.large
3  DWH_CLUSTER_IDENTIFIER  covid-redshift-cluster
4                  DWH_DB                covid-db
5             DWH_DB_USER                 awsuser
6         DWH_DB_PASSWORD             Passw0rd123
7                DWH_PORT                    5439
8       DWH_IAM_ROLE_NAME      redshift-s3-access


In [43]:
s3_client = boto3.client('s3', region_name="us-east-1", aws_access_key_id=KEY, aws_secret_access_key=SECRET)
s3_client.download_file(S3_Bucket, 'output/fact_final.csv', 'fact_final.csv')


In [44]:
fact_final = pd.read_csv('fact_final.csv')
#remove the first column as it is not needed
fact_final = fact_final.iloc[:, 1:]
print(fact_final.head())

   fips province_state country_region  confirmed  deaths  recovered  active  \
0  72.0    Puerto Rico             US        3.0     0.0        0.0     NaN   
1  72.0    Puerto Rico             US        3.0     0.0        0.0     NaN   
2  72.0    Puerto Rico             US        3.0     0.0        0.0     NaN   
3  72.0    Puerto Rico             US        3.0     0.0        0.0     NaN   
4  72.0    Puerto Rico             US        3.0     0.0        0.0     NaN   

       date  positive  negative  pending  hospitalized   death   total  
0  20210307  101327.0  305972.0      NaN           NaN  2059.0  407299  
1  20210306  101327.0  305972.0      NaN           NaN  2059.0  407299  
2  20210305  101066.0  305972.0      NaN           NaN  2056.0  407038  
3  20210304  100867.0  305972.0      NaN           NaN  2053.0  406839  
4  20210303  100765.0  305972.0      NaN           NaN  2048.0  406737  


In [45]:

fact_final_sql = pd.io.sql.get_schema(fact_final.reset_index(), 'fact_final')

In [46]:
print(fact_final_sql)

CREATE TABLE "fact_final" (
"index" INTEGER,
  "fips" REAL,
  "province_state" TEXT,
  "country_region" TEXT,
  "confirmed" REAL,
  "deaths" REAL,
  "recovered" REAL,
  "active" REAL,
  "date" INTEGER,
  "positive" REAL,
  "negative" REAL,
  "pending" REAL,
  "hospitalized" REAL,
  "death" REAL,
  "total" INTEGER
)


In [47]:
#similarly download the other tables and generate their schemas and then execute the sql to create the tables in redshift
s3_client.download_file(S3_Bucket, 'output/dim_region_final.csv', 'dim_region_final.csv')
s3_client.download_file(S3_Bucket, 'output/dim_date.csv', 'dim_date.csv')
s3_client.download_file(S3_Bucket, 'output/dim_hospital.csv', 'dim_hospital.csv')

dim_region_final = pd.read_csv('dim_region_final.csv').iloc[1,:1]
dim_date = pd.read_csv('dim_date.csv').iloc[1,:1]
dim_hospital = pd.read_csv('dim_hospital.csv').iloc[1,:1]


  dim_region_final = pd.read_csv('dim_region_final.csv').iloc[1,:1]


In [None]:
print(dim_region_final.head())
print(dim_date.head())
print(dim_hospital.head())


In [None]:
#generate th sql for the above tables
dim_region_final_sql = pd.io.sql.get_schema(dim_region_final.reset_index(), 'dim_region_final')
dim_date_sql = pd.io.sql.get_schema(dim_date.reset_index(), 'dim_date')
dim_hospital_sql = pd.io.sql.get_schema(dim_hospital.reset_index(), 'dim_hospital')

In [10]:
%pip install redshift_connector
print(dim_region_final_sql)
print(dim_date_sql)


Collecting redshift_connector
  Downloading redshift_connector-2.1.0-py3-none-any.whl.metadata (66 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m66.8/66.8 kB[0m [31m1.9 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting scramp<1.5.0,>=1.2.0 (from redshift_connector)
  Downloading scramp-1.4.4-py3-none-any.whl.metadata (19 kB)
Collecting asn1crypto>=1.5.1 (from scramp<1.5.0,>=1.2.0->redshift_connector)
  Downloading asn1crypto-1.5.1-py2.py3-none-any.whl.metadata (13 kB)
Downloading redshift_connector-2.1.0-py3-none-any.whl (125 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m125.0/125.0 kB[0m [31m3.4 MB/s[0m eta [36m0:00:00[0m00:01[0m
[?25hDownloading scramp-1.4.4-py3-none-any.whl (13 kB)
Downloading asn1crypto-1.5.1-py2.py3-none-any.whl (105 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m105.0/105.0 kB[0m [31m7.4 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: asn1crypto, scramp, redshift_connector
Suc

In [11]:
import redshift_connector

In [12]:
import redshift_connector

methods = [method for method in dir(redshift_connector) if callable(getattr(redshift_connector, method))]
print(methods)




In [13]:
#we need the iam role arn to attach to the redshift cluster. to connect s3 and redshift. ARN is the unique identifier for the role
roleArn = iam.get_role(RoleName=DWH_IAM_ROLE_NAME)['Role']['Arn']
print(roleArn)

arn:aws:iam::039330899250:role/redshift-s3-access


In [37]:
#create the redshift cluster
try: 
        response = redshift.create_cluster(        
            #HW
            ClusterType=DWH_CLUSTER_TYPE,
            NodeType=DWH_NODE_TYPE,
            NumberOfNodes=int(DWH_NUM_NODES),

            #Identifiers & Credentials
            DBName=DWH_DB,
            ClusterIdentifier=DWH_CLUSTER_IDENTIFIER,
            MasterUsername=DWH_DB_USER,
            MasterUserPassword=DWH_DB_PASSWORD,
            
            #Roles (for s3 access)
            IamRoles=[roleArn]  
        )
        print(response)
except Exception as e:
    print(e)

{'Cluster': {'ClusterIdentifier': 'covid-redshift-cluster', 'NodeType': 'dc2.large', 'ClusterStatus': 'creating', 'ClusterAvailabilityStatus': 'Modifying', 'MasterUsername': 'awsuser', 'DBName': 'covid-db', 'AutomatedSnapshotRetentionPeriod': 1, 'ManualSnapshotRetentionPeriod': -1, 'ClusterSecurityGroups': [], 'VpcSecurityGroups': [{'VpcSecurityGroupId': 'sg-0557e607d10acd930', 'Status': 'active'}], 'ClusterParameterGroups': [{'ParameterGroupName': 'default.redshift-1.0', 'ParameterApplyStatus': 'in-sync'}], 'ClusterSubnetGroupName': 'default', 'VpcId': 'vpc-030ca89833b123009', 'PreferredMaintenanceWindow': 'sun:06:30-sun:07:00', 'PendingModifiedValues': {'MasterUserPassword': '****'}, 'ClusterVersion': '1.0', 'AllowVersionUpgrade': True, 'NumberOfNodes': 1, 'PubliclyAccessible': True, 'Encrypted': False, 'Tags': [], 'EnhancedVpcRouting': False, 'IamRoles': [{'IamRoleArn': 'arn:aws:iam::039330899250:role/redshift-s3-access', 'ApplyStatus': 'adding'}], 'MaintenanceTrackName': 'current',

In [24]:
#connect to redshift cluster using redshift_connector
conn = redshift_connector.connect(
    host="covid-redshift-cluster.cw6wwmbewavp.us-east-1.redshift.amazonaws.com",
    database='covid-db',
    user='awsuser',
    password='Passw0rd123'
)

In [25]:
conn.autocommit = True

In [26]:
#create the cursor
cur = redshift_connector.Cursor = conn.cursor()

In [27]:
cur.execute(fact_final_sql)

<redshift_connector.cursor.Cursor at 0x14b1091b0>

In [41]:
print(dim_region_final_sql)

CREATE TABLE "dim_region_final" (
"index" TEXT,
  "1" INTEGER
)


In [30]:
#create the tables in redshift
cur.execute(dim_region_final_sql)
cur.execute(dim_date_sql)
cur.execute(dim_hospital_sql)

<redshift_connector.cursor.Cursor at 0x14b1091b0>

In [34]:
#copy command to copy the data from s3 to redshift
cur.execute("""
            copy fact_final from 's3://akash-covid-project-1/output/fact_final.csv' 
            credentials 'aws_iam_role=arn:aws:iam::039330899250:role/redshift-s3-access' 
            delimiter ',' 
            region 'us-east-1' 
            ignoreheader 1
            """)

<redshift_connector.cursor.Cursor at 0x14b1091b0>

In [35]:
#similarly copy the other tables
cur.execute("""
            copy dim_region_final from 's3://akash-covid-project-1/output/dim_region_final.csv' 
            credentials 'aws_iam_role=arn:aws:iam::039330899250:role/redshift-s3-access' 
            delimiter ',' 
            region 'us-east-1' 
            ignoreheader 1
            """)

ProgrammingError: {'S': 'ERROR', 'C': 'XX000', 'M': "Load into table 'dim_region_final' failed.  Check 'stl_load_errors' system table for details.", 'F': '../src/pg/src/backend/commands/commands_copy.c', 'L': '737', 'R': 'CheckMaxRowError'}

In [48]:
#delete the cluster
redshift.delete_cluster( ClusterIdentifier=DWH_CLUSTER_IDENTIFIER,  SkipFinalClusterSnapshot=True)

{'Cluster': {'ClusterIdentifier': 'covid-redshift-cluster',
  'NodeType': 'dc2.large',
  'ClusterStatus': 'deleting',
  'ClusterAvailabilityStatus': 'Modifying',
  'MasterUsername': 'awsuser',
  'DBName': 'covid-db',
  'Endpoint': {'Address': 'covid-redshift-cluster.cw6wwmbewavp.us-east-1.redshift.amazonaws.com',
   'Port': 5439},
  'ClusterCreateTime': datetime.datetime(2024, 3, 18, 22, 58, 23, 468000, tzinfo=tzutc()),
  'AutomatedSnapshotRetentionPeriod': 1,
  'ManualSnapshotRetentionPeriod': -1,
  'ClusterSecurityGroups': [],
  'VpcSecurityGroups': [{'VpcSecurityGroupId': 'sg-0557e607d10acd930',
    'Status': 'active'}],
  'ClusterParameterGroups': [{'ParameterGroupName': 'default.redshift-1.0',
    'ParameterApplyStatus': 'in-sync'}],
  'ClusterSubnetGroupName': 'default',
  'VpcId': 'vpc-030ca89833b123009',
  'AvailabilityZone': 'us-east-1e',
  'PreferredMaintenanceWindow': 'sun:06:30-sun:07:00',
  'PendingModifiedValues': {},
  'ClusterVersion': '1.0',
  'AllowVersionUpgrade': Tr

In [2]:
#delete an ec2 instance. first list all the instances present
instances = ec2.instances.filter(Filters=[{'Name': 'instance-state-name', 'Values': ['running','terminated']}])

In [5]:
for instance in instances:
    print(instance.id, instance.state)
    ec2.instances.filter(InstanceIds=[instance.id]).terminate()

i-0ec9f23d51e9f4e9b {'Code': 48, 'Name': 'terminated'}


In [None]:
#airflow-standalone password on EC2 
wNzNuAvURRDfkmtY
