In [1]:
import boto3
import pandas as pd
import configparser
import time
import redshift_connector
from tables import FACT_COVID_TABLE, DIM_REGION_TABLE, DIM_DATE_TABLE, DIM_HOSPITAL_TABLE
from io import StringIO

In [2]:
config = configparser.ConfigParser()
config.read("aws.conf")

['aws.conf']

In [4]:
AWS_ACCESS_KEY = config["AWS"]["AWS_ACCESS_KEY"]
AWS_SECRET_KEY = config["AWS"]["AWS_SECRET_KEY"]
AWS_REGION = config["AWS"]["AWS_REGION"]
SCHEMA_NAME = config["S3"]["SCHEMA_NAME"]
S3_STAGING_DIR = config["S3"]["S3_STAGING_DIR"]
S3_BUCKET_NAME = config["S3"]["S3_BUCKET_NAME"]
S3_OUTPUT_DIRECTORY = config["S3"]["S3_OUTPUT_DIRECTORY"]

In [5]:
athena_client = boto3.client(
    "athena",
    aws_access_key_id=AWS_ACCESS_KEY,
    aws_secret_access_key=AWS_SECRET_KEY,
    region_name=AWS_REGION,
)

In [18]:
Dict = {}
def download_and_load_query_results(client: boto3.client, query_response: Dict) -> pd.DataFrame:
    while True:
        try:
            # This function only loads the first 1000 rows
            client.get_query_results(QueryExecutionId=query_response['QueryExecutionId'])
            break
        except Exception as err:
            if "not yet finished" in str(err):
                time.sleep(0.001)
            else:
                raise err
    temp_file_location = "athena_query_results.csv"
    s3_client = boto3.client(
        "s3",
        aws_access_key_id=AWS_ACCESS_KEY,
        aws_secret_access_key=AWS_SECRET_KEY,
        region_name=AWS_REGION,
        )
    s3_client.download_file(
        S3_BUCKET_NAME,
        f"{S3_OUTPUT_DIRECTORY}/{query_response['QueryExecutionId']}.csv",
        temp_file_location,
    )
    return pd.read_csv(temp_file_location)

# Response format:
{'QueryExecutionId': '3c111625-3fd8-4bcc-92f1-ac24c61116f2', <br>
 'ResponseMetadata': {'RequestId': '73ef74da-4e1d-41fb-984f-9216bf06f1d6', <br>
  'HTTPStatusCode': 200, <br>
  'HTTPHeaders': {'date': 'Mon, 12 Feb 2024 22:47:14 GMT', <br>
   'content-type': 'application/x-amz-json-1.1', <br>
   'content-length': '59', <br>
   'connection': 'keep-alive', <br>
   'x-amzn-requestid': '73ef74da-4e1d-41fb-984f-9216bf06f1d6'}, <br>
  'RetryAttempts': 0}}

In [20]:
table = ["countrycode", "countypopulation", "enigma_jhud", "rearc_usa_hospital_beds", "state_abv", "states_daily", "us_county", "us_daily", "us_states", "us_total_latest"]
result = {}
for table_name in table:
    response = athena_client.start_query_execution(
        QueryString="select * from " + table_name,
        QueryExecutionContext={"Database": SCHEMA_NAME},
        ResultConfiguration={
            "OutputLocation": S3_STAGING_DIR,
            "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"}
        },
    )
    result[table_name] = download_and_load_query_results(athena_client, response)

In [21]:
for key, val in result.items():
    print(f"Table name: {key}")
    print(val.head())
    print()

Table name: countrycode
          country alpha-2 code alpha-3 code  numeric code  latitude  longitude
0     Afghanistan           AF          AFG           4.0   33.0000       65.0
1         Albania           AL          ALB           8.0   41.0000       20.0
2         Algeria           DZ          DZA          12.0   28.0000        3.0
3  American Samoa           AS          ASM          16.0  -14.3333     -170.0
4         Andorra           AD          AND          20.0   42.5000        1.6

Table name: countypopulation
               id   id2   county    state  population estimate 2018
0  0500000US01001  1001  Autauga  Alabama                     55601
1  0500000US01003  1003  Baldwin  Alabama                    218022
2  0500000US01005  1005  Barbour  Alabama                     24881
3  0500000US01007  1007     Bibb  Alabama                     22400
4  0500000US01009  1009   Blount  Alabama                     57840

Table name: enigma_jhud
   fips admin2 province_state country_r

Looking at the above print of tables, the header of table "state_abv" is displayed as a record. 
So let's fix that using the following code.

In [22]:
new_header = result["state_abv"].iloc[0]

In [23]:
new_header

col0           State
col1    Abbreviation
Name: 0, dtype: object

In [24]:
result["state_abv"].columns = new_header
result["state_abv"] = result["state_abv"].drop(0)
result["state_abv"].head()

Unnamed: 0,State,Abbreviation
1,Alabama,AL
2,Alaska,AK
3,Arizona,AZ
4,Arkansas,AR
5,California,CA


In [25]:
result['enigma_jhud'].head() 

Unnamed: 0,fips,admin2,province_state,country_region,last_update,latitude,longitude,confirmed,deaths,recovered,active,combined_key,partition_0
0,,,Anhui,China,2020-01-22T17:00:00,31.826,117.226,1.0,,,,"""Anhui",csv
1,,,Beijing,China,2020-01-22T17:00:00,40.182,116.414,14.0,,,,"""Beijing",csv
2,,,Chongqing,China,2020-01-22T17:00:00,30.057,107.874,6.0,,,,"""Chongqing",csv
3,,,Fujian,China,2020-01-22T17:00:00,26.079,117.987,1.0,,,,"""Fujian",csv
4,,,Gansu,China,2020-01-22T17:00:00,36.061,103.834,,,,,"""Gansu",csv


In [26]:
result['us_daily'].head()

Unnamed: 0,date,states,positive,negative,pending,hospitalizedcurrently,hospitalizedcumulative,inicucurrently,inicucumulative,onventilatorcurrently,...,lastmodified,recovered,total,posneg,deathincrease,hospitalizedincrease,negativeincrease,positiveincrease,totaltestresultsincrease,hash
0,20210307,56,28755524.0,74579770.0,11808.0,40212.0,878613.0,8137.0,45475.0,2801.0,...,2021-03-07T24:00:00Z,,0,0,839,726,130414,41265,1156241,8b26839690cd05c0cef69cb9ed85641a76b5e78e
1,20210306,56,28714259.0,74449356.0,11783.0,41401.0,877887.0,8409.0,45453.0,2811.0,...,2021-03-06T24:00:00Z,,0,0,1674,503,142201,59620,1409138,d0c0482ea549c9d5c04a7c86acb6fc6a8095a592
2,20210305,56,28654639.0,74307155.0,12213.0,42541.0,877384.0,8634.0,45373.0,2889.0,...,2021-03-05T24:00:00Z,,0,0,2221,2781,271917,68787,1744417,a35ea4289cec4bb55c9f29ae04ec0fd5ac4e0222
3,20210304,56,28585852.0,74035238.0,12405.0,44172.0,874603.0,8970.0,45293.0,2973.0,...,2021-03-04T24:00:00Z,,0,0,1743,1530,177957,65487,1590984,a19ad6379a653834cbda3093791ad2c3b9fab5ff
4,20210303,56,28520365.0,73857281.0,11778.0,45462.0,873073.0,9359.0,45214.0,3094.0,...,2021-03-03T24:00:00Z,,0,0,2449,2172,267001,66836,1406795,9e1d2afda1b0ec243060d6f68a7134d011c0cb2a


# Now, from the above tables let's create the dataframes for our warehouse

In [14]:
fact_covid_1 = result['enigma_jhud'][['fips','province_state','country_region','confirmed','deaths','recovered','active']]
fact_covid_2 = result['states_daily'][['fips','date','positive','negative','hospitalizedcurrently','hospitalized','hospitalizeddischarged']]
fact_covid = pd.merge(fact_covid_1, fact_covid_2, on='fips', how='inner')

In [15]:
fact_covid.head()

Unnamed: 0,fips,province_state,country_region,confirmed,deaths,recovered,active,date,positive,negative,hospitalizedcurrently,hospitalized,hospitalizeddischarged
0,,Anhui,China,1.0,,,,20210119,289939,,1066.0,,
1,,Beijing,China,14.0,,,,20210119,289939,,1066.0,,
2,,Chongqing,China,6.0,,,,20210119,289939,,1066.0,,
3,,Fujian,China,1.0,,,,20210119,289939,,1066.0,,
4,,Gansu,China,,,,,20210119,289939,,1066.0,,


In [16]:
fact_covid.shape

(27992, 13)

In [19]:
dim_region_1 = result['enigma_jhud'][['fips','province_state','country_region','latitude','longitude']]
dim_region_2 = result['us_county'][['fips','county','state']]
dim_region = pd.merge(dim_region_1, dim_region_2, on='fips', how='inner')

In [20]:
dim_region.head()

Unnamed: 0,fips,province_state,country_region,latitude,longitude,county,state
0,,Anhui,China,31.826,117.226,New York City,New York
1,,Anhui,China,31.826,117.226,Unknown,Rhode Island
2,,Anhui,China,31.826,117.226,New York City,New York
3,,Anhui,China,31.826,117.226,Unknown,Rhode Island
4,,Anhui,China,31.826,117.226,New York City,New York


In [21]:
dim_region.shape

(11752274, 7)

In [27]:
dim_hospital = result['rearc_usa_hospital_beds'][['fips','state_name','latitude','longtitude','hq_address','hospital_name','hospital_type','hq_city','hq_state']]

In [23]:
dim_hospital.head()

Unnamed: 0,fips,state_name,latitude,longtitude,hq_address,hospital_name,hospital_type,hq_city,hq_state
0,4013.0,Arizona,33.495498,-112.066157,650 E Indian School Rd,Phoenix VA Health Care System (AKA Carl T Hayd...,VA Hospital,Phoenix,AZ
1,4019.0,Arizona,32.181263,-110.965885,3601 S 6th Ave,Southern Arizona VA Health Care System,VA Hospital,Tucson,AZ
2,6019.0,California,36.773324,-119.779742,2615 E Clinton Ave,VA Central California Health Care System,VA Hospital,Fresno,CA
3,9009.0,Connecticut,41.2844,-72.95761,950 Campbell Ave,VA Connecticut Healthcare System - West Haven ...,VA Hospital,West Haven,CT
4,10003.0,Delaware,39.740206,-75.606532,1601 Kirkwood Hwy,Wilmington VA Medical Center,VA Hospital,Wilmington,DE


In [28]:
dim_date = result['states_daily'][['fips','date']]

In [25]:
dim_date.head()

Unnamed: 0,fips,date
0,2.0,20210307
1,1.0,20210307
2,5.0,20210307
3,60.0,20210307
4,4.0,20210307


In [29]:
dim_date.shape

(2685, 2)

In [None]:
dim_date['date'] = pd.to_datetime(dim_date['date'], format='%Y%m%d')

In [31]:
dim_date.head()

Unnamed: 0,fips,date
0,2.0,2021-03-07
1,1.0,2021-03-07
2,5.0,2021-03-07
3,60.0,2021-03-07
4,4.0,2021-03-07


In [None]:
dim_date['year'] = dim_date['date'].dt.year
dim_date['month'] = dim_date['date'].dt.month
dim_date['day_of_week'] = dim_date['date'].dt.dayofweek

In [33]:
dim_date.head()

Unnamed: 0,fips,date,year,month,day_of_week
0,2.0,2021-03-07,2021,3,6
1,1.0,2021-03-07,2021,3,6
2,5.0,2021-03-07,2021,3,6
3,60.0,2021-03-07,2021,3,6
4,4.0,2021-03-07,2021,3,6


In [34]:
s3_resource = boto3.resource('s3',
    aws_access_key_id=AWS_ACCESS_KEY,
    aws_secret_access_key= AWS_SECRET_KEY)

In [35]:
bucket = config["S3"]["FINAL_OUTPUT_BUCKET"]

In [37]:
csv_buffer = StringIO()

In [None]:
fact_covid.to_csv(csv_buffer)
s3_resource.Object(bucket, 'output/fact_covid/fact_covid.csv').put(Body=csv_buffer.getvalue())

In [None]:
dim_region.to_csv(csv_buffer)
s3_resource.Object(bucket, 'output/dim_region/dim_region.csv').put(Body=csv_buffer.getvalue())

In [None]:
dim_hospital.to_csv(csv_buffer)
s3_resource.Object(bucket, 'output/dim_hospital/dim_hospital.csv').put(Body=csv_buffer.getvalue())

In [None]:
dim_date.to_csv(csv_buffer)
s3_resource.Object(bucket, 'output/dim_date/dim_date.csv').put(Body=csv_buffer.getvalue())

In [6]:
conn = redshift_connector.connect(
    host=config["DWH"]["DWH_HOST"]
    database=config["DWH"]["DWH_DB_NAME"],
    user=config["DWH"]["DWH_DB_USER"],
    password=config["DWH"]["DWH_DB_PASSWORD"]
)

In [7]:
conn.autocommit = True

In [8]:
curr = redshift_connector.Cursor = conn.cursor()

In [None]:
curr.execute(FACT_COVID_TABLE)

In [None]:
curr.execute(DIM_REGION_TABLE)

In [None]:
curr.execute(DIM_DATE_TABLE)

In [None]:
curr.execute(DIM_HOSPITAL_TABLE)

In [None]:
curr.execute(f"copy fact_covid \
             from '{config["S3"]["S3_STAGING_DIR"]}fact_covid/fact_covid.csv' \
             iam_role '{config["DWH"]["DWH_IAM_ROLE"]}' \ \
             maxerror 5 delimiter ',' \
             IGNOREHEADER 1")

In [None]:
curr.execute(f"copy dim_date \
             from '{config["S3"]["S3_STAGING_DIR"]}dim_date/dim_date.csv' \
             iam_role '{config["DWH"]["DWH_IAM_ROLE"]}' \ \
             maxerror 5 delimiter ',' \
             IGNOREHEADER 1")

In [None]:
curr.execute(f"copy dim_region \
             from '{config["S3"]["S3_STAGING_DIR"]}dim_region/dim_region.csv' \
             iam_role '{config["DWH"]["DWH_IAM_ROLE"]}' \ \
             maxerror 5 delimiter ',' \
             IGNOREHEADER 1")

In [None]:
curr.execute(f"copy dim_hospital \
             from '{config["S3"]["S3_STAGING_DIR"]}dim_hospital/dim_hospital.csv' \
             iam_role '{config["DWH"]["DWH_IAM_ROLE"]}' \
             maxerror 5 delimiter ',' \
             IGNOREHEADER 1")