In [43]:
import boto3
import pandas as pd 
import configparser
import time

from io import StringIO 

#### Query to AWS Athena

In [44]:
config = configparser.ConfigParser()
config.read_file(open("config.ini"))

In [45]:
KEY = config.get("AWS", "KEY")
SECRET = config.get("AWS", "SECRET")
REGION = config.get("AWS", "REGION")
SCHEMA_NAME = config.get("ATHENA", "SCHEMA_NAME")
S3_BUCKET_NAME = config.get("S3", "BUCKET_NAME")
S3_STAGING_DIR = config.get("S3", "STAGING_DIR")
S3_OUTPUT_DIR = config.get("S3", "OUTPUT_DIR")

In [4]:
# Connect to athena service
athena_client = boto3.client(
    "athena",
    aws_access_key_id=KEY,
    aws_secret_access_key=SECRET,
    region_name=REGION
)

In [5]:
def download_and_load_query_results(
        client: boto3.client,
        query_response: dict,
        retry: int = 3
) -> pd.DataFrame:
    while True:
        try:
            # This function only loads the first 1k rows 
            # Is this where the query execute?
            client.get_query_results(
                QueryExecutionId=query_response["QueryExecutionId"]
            )
            break
        except Exception as e:
            if "not yet finished" in str(e):
                time.sleep(.001)
            else: 
                raise e 
    # Local tmp data 
    temp_file_location: str = "athena_query_results.csv"
    s3_client = boto3.client(
        "s3",
        aws_access_key_id=KEY,
        aws_secret_access_key=SECRET,
        region_name=REGION
    )
    for i in range(retry):
        try:
            s3_client.download_file(
                S3_BUCKET_NAME,
                f"{S3_OUTPUT_DIR}/{query_response['QueryExecutionId']}.csv",
                temp_file_location
            )
            break
        except Exception as e:
            print(f"Some thing went wrong for try {i + 1}")
            print(e)
            time.sleep(5)
            
    return pd.read_csv(temp_file_location)

In [6]:
# Define the query input and config 
# Limit output to 10,000 for simplicity
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM enigma_jhud LIMIT 10000;",
    QueryExecutionContext={"Database": SCHEMA_NAME},
    ResultConfiguration={
        "OutputLocation": S3_STAGING_DIR,
        "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"}
    }
)

In [7]:
enigma_jhud = download_and_load_query_results(athena_client, response)

In [8]:
# repeat the process for all table
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM nytimes_data_in_usa_us_county LIMIT 10000;",
    QueryExecutionContext={"Database": SCHEMA_NAME},
    ResultConfiguration={
        "OutputLocation": S3_STAGING_DIR,
        "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"}
    }
)

nytimes_data_in_usa_us_county = download_and_load_query_results(athena_client, response)

response = athena_client.start_query_execution(
    QueryString="SELECT * FROM nytimes_data_in_usa_us_states LIMIT 10000;",
    QueryExecutionContext={"Database": SCHEMA_NAME},
    ResultConfiguration={
        "OutputLocation": S3_STAGING_DIR,
        "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"}
    }
)

nytimes_data_in_usa_us_states = download_and_load_query_results(athena_client, response)

response = athena_client.start_query_execution(
    QueryString="SELECT * FROM rearc_covid_19_testing_data_states_daily LIMIT 10000;",
    QueryExecutionContext={"Database": SCHEMA_NAME},
    ResultConfiguration={
        "OutputLocation": S3_STAGING_DIR,
        "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"}
    }
)

rearc_covid_19_testing_data_states_daily = download_and_load_query_results(athena_client, response)

response = athena_client.start_query_execution(
    QueryString="SELECT * FROM rearc_covid_19_testing_data_us_daily LIMIT 10000;",
    QueryExecutionContext={"Database": SCHEMA_NAME},
    ResultConfiguration={
        "OutputLocation": S3_STAGING_DIR,
        "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"}
    }
)

rearc_covid_19_testing_data_us_daily = download_and_load_query_results(athena_client, response)

response = athena_client.start_query_execution(
    QueryString="SELECT * FROM rearc_usa_hospital_beds LIMIT 10000;",
    QueryExecutionContext={"Database": SCHEMA_NAME},
    ResultConfiguration={
        "OutputLocation": S3_STAGING_DIR,
        "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"}
    }
)

rearc_usa_hospital_beds = download_and_load_query_results(athena_client, response)

response = athena_client.start_query_execution(
    QueryString="SELECT * FROM rearc_covid_19_testing_data_us_total_latest LIMIT 10000;",
    QueryExecutionContext={"Database": SCHEMA_NAME},
    ResultConfiguration={
        "OutputLocation": S3_STAGING_DIR,
        "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"}
    }
)

rearc_covid_19_testing_data_us_total_latest = download_and_load_query_results(athena_client, response)

response = athena_client.start_query_execution(
    QueryString="SELECT * FROM static_data_states_abv LIMIT 10000;",
    QueryExecutionContext={"Database": SCHEMA_NAME},
    ResultConfiguration={
        "OutputLocation": S3_STAGING_DIR,
        "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"}
    }
)

static_data_states_abv = download_and_load_query_results(athena_client, response)

response = athena_client.start_query_execution(
    QueryString="SELECT * FROM static_data_countrypopulation LIMIT 10000;",
    QueryExecutionContext={"Database": SCHEMA_NAME},
    ResultConfiguration={
        "OutputLocation": S3_STAGING_DIR,
        "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"}
    }
)

static_data_countrypopulation = download_and_load_query_results(athena_client, response)

response = athena_client.start_query_execution(
    QueryString="SELECT * FROM static_data_countrycode LIMIT 10000;",
    QueryExecutionContext={"Database": SCHEMA_NAME},
    ResultConfiguration={
        "OutputLocation": S3_STAGING_DIR,
        "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"}
    }
)

static_data_countrycode = download_and_load_query_results(athena_client, response)

In [9]:
# Handle header parsing error 
static_data_states_abv.columns = ["State", "Abbreviation", "partition_0"]
static_data_states_abv = static_data_states_abv[1:]

#### ETL jobs with python
- Implement star schema with python

In [10]:
# Combine confirmed case data and testing data
factCovid_left = enigma_jhud[[
    "fips", 
    "province_state",
    "country_region",
    "confirmed",
    "deaths",
    "recovered",
    "active",
]]

factCovid_right = rearc_covid_19_testing_data_states_daily[[
    "fips", 
    "date",
    "positive",
    "negative",
    "hospitalizedcurrently",
    "hospitalized",
    "hospitalizeddischarged",
]]
factCovid = pd.merge(
    factCovid_left,
    factCovid_right,
    on="fips",
    how="inner"
)

In [11]:
factCovid.shape

(1068, 13)

In [12]:
dimRegion_left = enigma_jhud[[
    "fips",
    "province_state",
    "country_region",
    "latitude",
    "longitude"
]]

dimRegion_right = nytimes_data_in_usa_us_county[[
    "fips",
    "county",
    "state"
]]

In [13]:
dimRegion = pd.merge(
    dimRegion_left,
    dimRegion_right,
    on="fips",
    how="inner"
)

In [14]:
rearc_usa_hospital_beds.columns

Index(['objectid', 'hospital_name', 'hospital_type', 'hq_address',
       'hq_address1', 'hq_city', 'hq_state', 'hq_zip_code', 'county_name',
       'state_name', 'state_fips', 'cnty_fips', 'fips', 'num_licensed_beds',
       'num_staffed_beds', 'num_icu_beds', 'adult_icu_beds', 'pedi_icu_beds',
       'bed_utilization', 'avg_ventilator_usage',
       'potential_increase_in_bed_capac', 'latitude', 'longtitude',
       'partition_0'],
      dtype='object')

In [15]:
dimHospital = rearc_usa_hospital_beds[[
    "fips",
    "latitude",
    "longtitude",
    "hq_address",
    "hospital_name",
    "hospital_type",
    "hq_city",
    "hq_state"
]]

In [16]:
dimDate = rearc_covid_19_testing_data_states_daily[["fips", "date"]].copy(deep=True)

In [17]:
dimDate["date"] = pd.to_datetime(dimDate["date"], format=r"%Y%m%d")

In [18]:
dimDate["year"] = dimDate.date.dt.year
dimDate["month"] = dimDate.date.dt.month
dimDate["day_of_week"] = dimDate.date.dt.dayofweek

#### store transformation result to s3 bucket 

In [19]:
def store_to_s3(data: pd.DataFrame, file_name: str):
    """Store the dataframe as csv in s3 bucket"""
    # Store the csv file into memory buffer in binary format
    csv_buffer = StringIO()
    data.to_csv(csv_buffer) # Index = None ?
    # Store them into s3 bucket 
    s3_resource = boto3.resource(
        "s3",
        aws_access_key_id=KEY,
        aws_secret_access_key=SECRET,
        region_name=REGION
    )
    s3_resource.Object(S3_BUCKET_NAME, f"covid/output/{file_name}.csv")\
        .put(Body=csv_buffer.getvalue())

In [20]:
store_to_s3(factCovid, "factCovid")
store_to_s3(dimDate, "dimDate")
store_to_s3(dimHospital, "dimHospital")
store_to_s3(dimRegion, "dimRegion")


#### Create Redshift Cluster 

In [48]:
DWH_CLUSTE_TYPE = config.get("DWH", "DWH_CLUSTE_TYPE")
DWH_NUM_NODES = config.get("DWH", "DWH_NUM_NODES")
DWH_NODE_TYPE = config.get("DWH", "DWH_NODE_TYPE")
DWH_CLUSTER_IDENIFIER = config.get("DWH", "DWH_CLUSTER_IDENIFIER")
DWH_DB = config.get("DWH", "DWH_DB")
DWH_DB_USER = config.get("DWH", "DWH_DB_USER")
DWH_DB_PASSWORD = config.get("DWH", "DWH_DB_PASSWORD")
DWH_PORT = config.get("DWH", "DWH_PORT")
DWH_IAM_ROLE_NAME = config.get("DWH", "DWH_IAM_ROLE_NAME")

In [49]:
iam = boto3.client(
    'iam',
    region_name="ap-east-1",
    aws_access_key_id=KEY,
    aws_secret_access_key=SECRET
)

ec2 = boto3.resource(
    'ec2',
    region_name="ap-east-1",
    aws_access_key_id=KEY,
    aws_secret_access_key=SECRET
)

redshift = boto3.client(
    'redshift',
    region_name="ap-east-1",
    aws_access_key_id=KEY,
    aws_secret_access_key=SECRET
)

In [59]:
# Identifier for iam roles 
roleArn = iam.get_role(RoleName=DWH_IAM_ROLE_NAME)["Role"]["Arn"]

# Create redshift cluster with code  
try:
    response = redshift.create_cluster(
        ClusterType=DWH_CLUSTE_TYPE,
        NodeType=DWH_NODE_TYPE,
        # Credentials & Identifiers
        DBName=DWH_DB,
        ClusterIdentifier=DWH_CLUSTER_IDENIFIER,
        MasterUsername=DWH_DB_USER,
        MasterUserPassword=DWH_DB_PASSWORD,
        # Role for s3 access 
        IamRoles=[roleArn]
    )
except Exception as e:
    print(e)


In [60]:
while True:
    try:
        # Loading the redshift cluster info
        myClusterProps = redshift.describe_clusters(ClusterIdentifier=DWH_CLUSTER_IDENIFIER)["Clusters"][0]
        DWH_ENDPOINT = myClusterProps['Endpoint']['Address']
        DWH_ROLE_ARN = myClusterProps['IamRoles'][0]['IamRoleArn']
        DWH_VPC = myClusterProps['VpcId']
        DB_NAME = myClusterProps['DBName']
        DB_USER = myClusterProps['MasterUsername']
        break
    except KeyError as e:
        print("Not ready yet.", e)
        time.sleep(1)

# Configure default security group (inbound role) for redshift cluster 
try:
    vpc = ec2.Vpc(id=DWH_VPC)
    defaultSg = list(vpc.security_groups.all())[0]
    print(defaultSg)
    defaultSg.authorize_ingress(
        GroupName=defaultSg.group_name,
        CidrIp='0.0.0.0/0',
        IpProtocol='TCP',
        FromPort=int(DWH_PORT),
        ToPort=int(DWH_PORT)
    )
except Exception as e:
    print(e)

Not ready yet. 'Endpoint'
Not ready yet. 'Endpoint'
Not ready yet. 'Endpoint'
Not ready yet. 'Endpoint'
Not ready yet. 'Endpoint'
Not ready yet. 'Endpoint'
Not ready yet. 'Endpoint'
Not ready yet. 'Endpoint'
Not ready yet. 'Endpoint'
Not ready yet. 'Endpoint'
Not ready yet. 'Endpoint'
Not ready yet. 'Endpoint'
Not ready yet. 'Endpoint'
Not ready yet. 'Endpoint'
Not ready yet. 'Endpoint'
Not ready yet. 'Endpoint'
Not ready yet. 'Endpoint'
Not ready yet. 'Endpoint'
Not ready yet. 'Endpoint'
Not ready yet. 'Endpoint'
Not ready yet. 'Endpoint'
Not ready yet. 'Endpoint'
Not ready yet. 'Endpoint'
Not ready yet. 'Endpoint'
Not ready yet. 'Endpoint'
Not ready yet. 'Endpoint'
Not ready yet. 'Endpoint'
Not ready yet. 'Endpoint'
Not ready yet. 'Endpoint'
Not ready yet. 'Endpoint'
Not ready yet. 'Endpoint'
Not ready yet. 'Endpoint'
Not ready yet. 'Endpoint'
Not ready yet. 'Endpoint'
Not ready yet. 'Endpoint'
Not ready yet. 'Endpoint'
Not ready yet. 'Endpoint'
Not ready yet. 'Endpoint'
Not ready ye

#### Build table and insert data on Redshift 

In [None]:
# Get sql table creation command from pandas dataframe 
factCovidsql = "".join(pd.io.sql.get_schema(
    factCovid.reset_index(),
    "factCovid"
))
dimDatesql = "".join(pd.io.sql.get_schema(
    dimDate.reset_index(),
    "dimDate"
))
dimRegionsql = "".join(pd.io.sql.get_schema(
    dimRegion.reset_index(),
    "dimRegion"
))
dimHospitalsql = "".join(pd.io.sql.get_schema(
    dimHospital.reset_index(),
    "dimHospital"
))

In [None]:
print(dimHospitalsql)

In [62]:
#%pip install redshift_connector

Collecting redshift_connector
  Downloading redshift_connector-2.0.910-py3-none-any.whl (112 kB)
     ------------------------------------ 112.1/112.1 kB 926.4 kB/s eta 0:00:00
Collecting scramp<1.5.0,>=1.2.0
  Downloading scramp-1.4.4-py3-none-any.whl (13 kB)
Collecting asn1crypto>=1.5.1
  Downloading asn1crypto-1.5.1-py2.py3-none-any.whl (105 kB)
     -------------------------------------- 105.0/105.0 kB 1.2 MB/s eta 0:00:00
Installing collected packages: asn1crypto, scramp, redshift_connector
Successfully installed asn1crypto-1.5.1 redshift_connector-2.0.910 scramp-1.4.4
Note: you may need to restart the kernel to use updated packages.



[notice] A new release of pip available: 22.2.2 -> 23.1.2
[notice] To update, run: python.exe -m pip install --upgrade pip


In [65]:
import redshift_connector

In [66]:
conn = redshift_connector.connect(
    host=DWH_ENDPOINT,
    database=DB_NAME,
    user=DB_USER,
    password=DWH_DB_PASSWORD
)
conn.autocommit = True

In [68]:
cursor = redshift_connector.Cursor = conn.cursor()

In [99]:
# Create table on redshift
cursor.execute(factCovidsql)
cursor.execute(dimDatesql)
cursor.execute(dimHospitalsql)
cursor.execute(dimRegionsql)

<redshift_connector.cursor.Cursor at 0x210126f50f0>

In [101]:
# Copy data from s3 
try:
    cursor.execute(f"""
        copy dimDate from 's3://covid-de-project-thlawab/covid/output/dimDate.csv'
        credentials 'aws_iam_role={DWH_ROLE_ARN}'
        delimiter ','
        region 'ap-east-1'
        ignoreheader 1
    """)
    cursor.execute(f"""
        copy dimHospital from 's3://covid-de-project-thlawab/covid/output/dimHospital.csv'
        credentials 'aws_iam_role={DWH_ROLE_ARN}'
        delimiter ','
        region 'ap-east-1'
        ignoreheader 1;
    """)
    cursor.execute(f"""
        copy dimRegion from 's3://covid-de-project-thlawab/covid/output/dimRegion.csv'
        credentials 'aws_iam_role={DWH_ROLE_ARN}'
        delimiter ','
        region 'ap-east-1'
        ignoreheader 1;
    """)
    cursor.execute(f"""
        copy factCovid from 's3://covid-de-project-thlawab/covid/output/factCovid.csv'
        credentials 'aws_iam_role={DWH_ROLE_ARN}'
        delimiter ','
        region 'ap-east-1'
        ignoreheader 1;
    """)
except redshift_connector.Error as e:
    # Roll back the transaction
    conn.rollback()
    print("Error: Issue copying data to table")
    print(e)