In [2]:
'''

@Author: Vighnesh Harish Bilgi
@Date: 2022-12-12
@Last Modified by: Vighnesh Harish Bilgi
@Last Modified time: 2022-12-12
@Title : 2 - Athena Data Modelling

'''

'\n\n@Author: Vighnesh Harish Bilgi\n@Date: 2022-12-12\n@Last Modified by: Vighnesh Harish Bilgi\n@Last Modified time: 2022-12-12\n@Title : 2 - Athena Data Modelling\n\n'

In [3]:
import boto3
import pandas as pd
import json
import time
from io import StringIO

In [4]:
import os
os.environ['AWS_DEFAULT_REGION'] = 'us-east-1'
os.environ['AWS_ACCESS_KEY_ID'] = os.environ.get('test1_access_key')
os.environ['AWS_SECRET_ACCESS_KEY'] = os.environ.get('test1_secret_access_key')

In [5]:
DATABASE_NAME = 'covid_db'
OUTPUT_BUCKET_NAME = "covid19-athena-output-bucket"
RESULT_OUTPUT_LOCATION = f"s3://{OUTPUT_BUCKET_NAME}/"
COVID_BUCKET = "covid19-schema"

### Custom function to connect to athena service

In [6]:
def connect_to_athena():
    """

    Description:
        To connect to AWS Athena service.
    Parameter:
        No parameters
    Return:
        ServiceResource athena

    """
    athena =  boto3.client('athena')
    return athena

### Custom function to download query results and return them as pandas dataframe

In [7]:
Dict = {}
def download_and_load_query_results(
    athena:boto3.client, query_response : Dict
)-> pd.DataFrame:
    while True:
        try:
            # This fuction only loads first 1000 rows
            athena.get_query_results(QueryExecutionId=query_response["QueryExecutionId"])
            break
        except Exception as err:
            if "not yet finished" in str(err):
                time.sleep(0.001)
            else:
                raise err
    temp_file_location:str = "athena_query_results.csv"
    s3_client = boto3.client("s3")
    s3_client.download_file(OUTPUT_BUCKET_NAME,f"{query_response['QueryExecutionId']}.csv",
    temp_file_location,)

    return pd.read_csv(temp_file_location)

### connecting to athena service

In [8]:
athena = connect_to_athena()


### get all records from 'enigma_jhud'

In [9]:
table_name = 'enigma_jhud'
response = athena.start_query_execution(
    QueryString=f"SELECT * from {DATABASE_NAME}.{table_name}",
    QueryExecutionContext = {"Database":DATABASE_NAME},
    ResultConfiguration={"OutputLocation": RESULT_OUTPUT_LOCATION}
    )

enigma_jhud_df = download_and_load_query_results(athena,response)
enigma_jhud_df.head()


Unnamed: 0,fips,admin2,province_state,country_region,last_update,latitude,longitude,confirmed,deaths,recovered,active,combined_key,partition_0
0,,,Anhui,China,2020-01-22T17:00:00,31.826,117.226,1.0,,,,"""Anhui",csv
1,,,Beijing,China,2020-01-22T17:00:00,40.182,116.414,14.0,,,,"""Beijing",csv
2,,,Chongqing,China,2020-01-22T17:00:00,30.057,107.874,6.0,,,,"""Chongqing",csv
3,,,Fujian,China,2020-01-22T17:00:00,26.079,117.987,1.0,,,,"""Fujian",csv
4,,,Gansu,China,2020-01-22T17:00:00,36.061,103.834,,,,,"""Gansu",csv


### get all records from 'us_total_latest'

In [11]:
table_name = 'us_total_latest'
response = athena.start_query_execution(
    QueryString=f"SELECT * from {DATABASE_NAME}.{table_name}",
    QueryExecutionContext = {"Database":DATABASE_NAME},
    ResultConfiguration={"OutputLocation": RESULT_OUTPUT_LOCATION}
    )

us_total_latest_df = download_and_load_query_results(athena,response)
us_total_latest_df.head()


Unnamed: 0,positive,negative,pending,hospitalizedcurrently,hospitalizedcumulative,inicucurrently,inicucumulative,onventilatorcurrently,onventilatorcumulative,recovered,hash,lastmodified,death,hospitalized,total,totaltestresults,posneg,notes
0,1061101,5170081,2775,53793,111955,9486,4192,4712,373,153947,95064ba29ccbc20dbec397033dfe4b1f45137c99,2020-05-01T09:12:31.891Z,57266,111955,6233957,6231182,6231182,"""NOTE: """"total"""""


### get all records from 'us_states'

In [12]:
table_name = 'us_states'
response = athena.start_query_execution(
    QueryString=f"SELECT * from {DATABASE_NAME}.{table_name}",
    QueryExecutionContext = {"Database":DATABASE_NAME},
    ResultConfiguration={"OutputLocation": RESULT_OUTPUT_LOCATION}
    )

us_states_df = download_and_load_query_results(athena,response)
us_states_df.head()

Unnamed: 0,date,state,fips,cases,deaths
0,2020-01-21,Washington,53,1,0
1,2020-01-22,Washington,53,1,0
2,2020-01-23,Washington,53,1,0
3,2020-01-24,Illinois,17,1,0
4,2020-01-24,Washington,53,1,0


### get all records from 'us_daily'

In [13]:
table_name = 'us_daily'
response = athena.start_query_execution(
    QueryString=f"SELECT * from {DATABASE_NAME}.{table_name}",
    QueryExecutionContext = {"Database":DATABASE_NAME},
    ResultConfiguration={"OutputLocation": RESULT_OUTPUT_LOCATION}
    )
    
us_daily_df = download_and_load_query_results(athena,response)
us_daily_df.head()


Unnamed: 0,date,states,positive,negative,pending,hospitalizedcurrently,hospitalizedcumulative,inicucurrently,inicucumulative,onventilatorcurrently,...,lastmodified,recovered,total,posneg,deathincrease,hospitalizedincrease,negativeincrease,positiveincrease,totaltestresultsincrease,hash
0,20210307,56,28755524.0,74579770.0,11808.0,40212.0,878613.0,8137.0,45475.0,2801.0,...,2021-03-07T24:00:00Z,,0,0,839,726,130414,41265,1156241,8b26839690cd05c0cef69cb9ed85641a76b5e78e
1,20210306,56,28714259.0,74449356.0,11783.0,41401.0,877887.0,8409.0,45453.0,2811.0,...,2021-03-06T24:00:00Z,,0,0,1674,503,142201,59620,1409138,d0c0482ea549c9d5c04a7c86acb6fc6a8095a592
2,20210305,56,28654639.0,74307155.0,12213.0,42541.0,877384.0,8634.0,45373.0,2889.0,...,2021-03-05T24:00:00Z,,0,0,2221,2781,271917,68787,1744417,a35ea4289cec4bb55c9f29ae04ec0fd5ac4e0222
3,20210304,56,28585852.0,74035238.0,12405.0,44172.0,874603.0,8970.0,45293.0,2973.0,...,2021-03-04T24:00:00Z,,0,0,1743,1530,177957,65487,1590984,a19ad6379a653834cbda3093791ad2c3b9fab5ff
4,20210303,56,28520365.0,73857281.0,11778.0,45462.0,873073.0,9359.0,45214.0,3094.0,...,2021-03-03T24:00:00Z,,0,0,2449,2172,267001,66836,1406795,9e1d2afda1b0ec243060d6f68a7134d011c0cb2a


### get all records from 'us_county'

In [14]:
table_name = 'us_county'
response = athena.start_query_execution(
    QueryString=f"SELECT * from {DATABASE_NAME}.{table_name}",
    QueryExecutionContext = {"Database":DATABASE_NAME},
    ResultConfiguration={"OutputLocation": RESULT_OUTPUT_LOCATION}
    )
us_county_df = download_and_load_query_results(athena,response)
us_county_df.head()


Unnamed: 0,date,county,state,fips,cases,deaths
0,2020-03-22,St. Charles,Missouri,29183.0,3.0,0.0
1,2020-03-22,St. Louis,Missouri,29189.0,55.0,1.0
2,2020-03-22,St. Louis city,Missouri,29510.0,14.0,0.0
3,2020-03-22,Unknown,Missouri,,1.0,0.0
4,2020-03-22,Broadwater,Montana,30007.0,1.0,0.0


### get all records from 'states_daily'

In [15]:
table_name = 'states_daily'
response = athena.start_query_execution(
    QueryString=f"SELECT * from {DATABASE_NAME}.{table_name}",
    QueryExecutionContext = {"Database":DATABASE_NAME},
    ResultConfiguration={"OutputLocation": RESULT_OUTPUT_LOCATION}
    )

states_daily_df = download_and_load_query_results(athena,response)
states_daily_df.head()

Unnamed: 0,date,state,positive,probablecases,negative,pending,totaltestresultssource,totaltestresults,hospitalizedcurrently,hospitalizedcumulative,...,dataqualitygrade,deathincrease,hospitalizedincrease,hash,commercialscore,negativeregularscore,negativescore,positivescore,score,grade
0,20210307,AK,56886,,,,totalTestsViral,1731628,33.0,1293.0,...,,0.0,0.0,dc4bccd4bb885349d7e94d6fed058e285d4be164,0.0,0.0,0.0,0.0,0.0,
1,20210307,AL,499819,107742.0,1931711.0,,totalTestsPeopleViral,2323788,494.0,45976.0,...,,-1.0,0.0,997207b430824ea40b8eb8506c19a93e07bc972e,0.0,0.0,0.0,0.0,0.0,
2,20210307,AR,324818,69092.0,2480716.0,,totalTestsViral,2736442,335.0,14926.0,...,,22.0,11.0,50921aeefba3e30d31623aa495b47fb2ecc72fae,0.0,0.0,0.0,0.0,0.0,
3,20210307,AS,0,,2140.0,,totalTestsViral,2140,,,...,,0.0,0.0,96d23f888c995b9a7f3b4b864de6414f45c728ff,0.0,0.0,0.0,0.0,0.0,
4,20210307,AZ,826454,56519.0,3073010.0,,totalTestsViral,7908105,963.0,57907.0,...,,5.0,44.0,0437a7a96f4471666f775e63e86923eb5cbd8cdf,0.0,0.0,0.0,0.0,0.0,


### get all records from 'state_abv'

In [17]:
table_name = 'state_abv'
response = athena.start_query_execution(
    QueryString=f"SELECT * from {DATABASE_NAME}.{table_name}",
    QueryExecutionContext = {"Database":DATABASE_NAME},
    ResultConfiguration={"OutputLocation": RESULT_OUTPUT_LOCATION}
    )

state_abv_df = download_and_load_query_results(athena,response)
state_abv_df.head()

Unnamed: 0,col0,col1
0,State,Abbreviation
1,Alabama,AL
2,Alaska,AK
3,Arizona,AZ
4,Arkansas,AR


#### Selecting 1st record of the record as header of the dataframe

In [18]:
header = state_abv_df.iloc[0]
records = state_abv_df.iloc[1:]
state_abv_df.columns = header
state_abv_df.head()

Unnamed: 0,State,Abbreviation
0,State,Abbreviation
1,Alabama,AL
2,Alaska,AK
3,Arizona,AZ
4,Arkansas,AR


### get all records from 'state_abv'

In [19]:
table_name = 'rearc_usa_hospital_beds'
response = athena.start_query_execution(
    QueryString=f"SELECT * from {DATABASE_NAME}.{table_name}",
    QueryExecutionContext = {"Database":DATABASE_NAME},
    ResultConfiguration={"OutputLocation": RESULT_OUTPUT_LOCATION}
    )

rearc_usa_hospital_beds_df = download_and_load_query_results(athena,response)
rearc_usa_hospital_beds_df.head()

Unnamed: 0,objectid,hospital_name,hospital_type,hq_address,hq_address1,hq_city,hq_state,hq_zip_code,county_name,state_name,...,num_licensed_beds,num_staffed_beds,num_icu_beds,adult_icu_beds,pedi_icu_beds,bed_utilization,avg_ventilator_usage,potential_increase_in_bed_capac,latitude,longtitude
0,1,Phoenix VA Health Care System (AKA Carl T Hayd...,VA Hospital,650 E Indian School Rd,,Phoenix,AZ,85012,Maricopa,Arizona,...,129.0,129.0,0,0,,,0.0,0,33.495498,-112.066157
1,2,Southern Arizona VA Health Care System,VA Hospital,3601 S 6th Ave,,Tucson,AZ,85723,Pima,Arizona,...,295.0,295.0,2,2,,,2.0,0,32.181263,-110.965885
2,3,VA Central California Health Care System,VA Hospital,2615 E Clinton Ave,,Fresno,CA,93703,Fresno,California,...,57.0,57.0,2,2,,,2.0,0,36.773324,-119.779742
3,4,VA Connecticut Healthcare System - West Haven ...,VA Hospital,950 Campbell Ave,,West Haven,CT,6516,New Haven,Connecticut,...,216.0,216.0,1,1,,,2.0,0,41.2844,-72.95761
4,5,Wilmington VA Medical Center,VA Hospital,1601 Kirkwood Hwy,,Wilmington,DE,19805,New Castle,Delaware,...,60.0,60.0,0,0,,,1.0,0,39.740206,-75.606532


### get all records from 'countrycode'

In [20]:
table_name = 'countrycode'
response = athena.start_query_execution(
    QueryString=f"SELECT * from {DATABASE_NAME}.{table_name}",
    QueryExecutionContext = {"Database":DATABASE_NAME},
    ResultConfiguration={"OutputLocation": RESULT_OUTPUT_LOCATION}
    )
countrycode_df = download_and_load_query_results(athena,response)
countrycode_df.head()

Unnamed: 0,country,alpha-2 code,alpha-3 code,numeric code,latitude,longitude
0,Afghanistan,AF,AFG,4.0,33.0,65.0
1,Albania,AL,ALB,8.0,41.0,20.0
2,Algeria,DZ,DZA,12.0,28.0,3.0
3,American Samoa,AS,ASM,16.0,-14.3333,-170.0
4,Andorra,AD,AND,20.0,42.5,1.6


### get all records from 'countrypopulation'

In [21]:
table_name = 'countypopulation'
response = athena.start_query_execution(
    QueryString=f"SELECT * from {DATABASE_NAME}.{table_name}",
    QueryExecutionContext = {"Database":DATABASE_NAME},
    ResultConfiguration={"OutputLocation": RESULT_OUTPUT_LOCATION}
    )
countypopulation_df = download_and_load_query_results(athena,response)
countypopulation_df.head()

Unnamed: 0,id,id2,county,state,population estimate 2018
0,0500000US01001,1001,Autauga,Alabama,55601
1,0500000US01003,1003,Baldwin,Alabama,218022
2,0500000US01005,1005,Barbour,Alabama,24881
3,0500000US01007,1007,Bibb,Alabama,22400
4,0500000US01009,1009,Blount,Alabama,57840


### Creating Fact Table : fact_covid

In [None]:
factCovid1 = enigma_jhud_df[['fips','province_state','country_region','confirmed','deaths','recovered','active']]
factCovid2 = states_daily_df[['fips','date','positive','negative','hospitalized','hospitalizedcurrently','hospitalizeddischarged']]
factCovid = pd.merge(factCovid1,factCovid2, on='fips', how='inner')
factCovid['date'] = pd.to_datetime(factCovid['date'], format = '%Y%m%d')
factCovid['year'] = factCovid['date'].dt.year
factCovid['month'] = factCovid['date'].dt.month
factCovid['day_of_week'] = factCovid['date'].dt.day_of_week

In [43]:
factCovidSchema = pd.io.sql.get_schema(factCovid.reset_index(drop=True),'factCovid')
print(''.join(factCovidSchema))

CREATE TABLE "factCovid" (
"fips" REAL,
  "province_state" TEXT,
  "country_region" TEXT,
  "confirmed" REAL,
  "deaths" REAL,
  "recovered" REAL,
  "active" REAL,
  "date" TIMESTAMP,
  "positive" INTEGER,
  "negative" REAL,
  "hospitalized" REAL,
  "hospitalizedcurrently" REAL,
  "hospitalizeddischarged" REAL,
  "year" INTEGER,
  "month" INTEGER,
  "day_of_week" INTEGER
)


### Creating Dimesion Table : dim_region

In [None]:
dimRegion_1 = enigma_jhud_df[['fips','province_state','country_region','latitude','longitude']]
dimRegion_2 = us_county_df[['fips','county','state']]
dimRegion = pd.merge(dimRegion_1,dimRegion_2, on='fips', how='inner')

In [50]:
dimRegionSchema = pd.io.sql.get_schema(dimRegion.reset_index(drop=True),'dimRegion')
print(''.join(dimRegionSchema))

CREATE TABLE "dimRegion" (
"fips" REAL,
  "province_state" TEXT,
  "country_region" TEXT,
  "latitude" REAL,
  "longitude" REAL,
  "county" TEXT,
  "state" TEXT
)


### Creating Dimesion Table : dim_hospital

In [None]:
dimHospital = rearc_usa_hospital_beds_df[['fips','state_name','latitude','longtitude','hq_address','hospital_name','hq_city','hq_state']]

In [39]:
dimHospitalSchema = pd.io.sql.get_schema(dimHospital.reset_index(drop=True),'dimHospital')
print(''.join(dimHospitalSchema))

CREATE TABLE "dimHospital" (
"fips" INTEGER,
  "state_name" TEXT,
  "latitude" REAL,
  "longtitude" REAL,
  "hq_address" TEXT,
  "hospital_name" TEXT,
  "hq_city" TEXT,
  "hq_state" TEXT
)


### Creating Dimesion Table : dim_date

In [None]:
dimDate = states_daily_df[['fips','date']]
dimDate['date'] = pd.to_datetime(dimDate['date'], format = '%Y%m%d')
dimDate['year'] = dimDate['date'].dt.year
dimDate['month'] = dimDate['date'].dt.month
dimDate['day_of_week'] = dimDate['date'].dt.day_of_week

In [40]:
dimDateSchema = pd.io.sql.get_schema(dimDate.reset_index(drop=True),'dimDate')
print(''.join(dimDateSchema))

CREATE TABLE "dimDate" (
"fips" REAL,
  "date" TIMESTAMP,
  "year" INTEGER,
  "month" INTEGER,
  "day_of_week" INTEGER
)


### Uploading 'fact_covid' table to S3 bucket

In [46]:
s3_resource = boto3.resource('s3')
csv_buffer = StringIO()
factCovid.to_csv(csv_buffer, index= False)
s3_resource.Object(COVID_BUCKET,'output/factCovid.csv').put(Body=csv_buffer.getvalue())

{'ResponseMetadata': {'RequestId': 'YT7TJP25NV9HFG89',
  'HostId': 'LprNI/VCl81wB0ick+Qo68qzwNDLtEAYm8d7PfFLi4tWDBERZUCFzWd7XmWyxa3hK4GovfhGLiw=',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amz-id-2': 'LprNI/VCl81wB0ick+Qo68qzwNDLtEAYm8d7PfFLi4tWDBERZUCFzWd7XmWyxa3hK4GovfhGLiw=',
   'x-amz-request-id': 'YT7TJP25NV9HFG89',
   'date': 'Tue, 13 Dec 2022 17:29:01 GMT',
   'etag': '"8da7058c66685c6ac60686cea06159e3"',
   'server': 'AmazonS3',
   'content-length': '0'},
  'RetryAttempts': 0},
 'ETag': '"8da7058c66685c6ac60686cea06159e3"'}

### Uploading 'dim_date' table to S3 bucket

In [30]:
s3_resource = boto3.resource('s3')
csv_buffer = StringIO()
dimDate.to_csv(csv_buffer, index= False)
s3_resource.Object(COVID_BUCKET,'output/dimDate.csv').put(Body=csv_buffer.getvalue())

{'ResponseMetadata': {'RequestId': 'V9V97K3ZXC4ZFQQP',
  'HostId': 'FTTJHwi/rEg8noURr8YVZOBdSCbmPdOT8h2dSzMGJJ6SnN9TAtZtxu4S6OYUqpLy416QRmCCqwY=',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amz-id-2': 'FTTJHwi/rEg8noURr8YVZOBdSCbmPdOT8h2dSzMGJJ6SnN9TAtZtxu4S6OYUqpLy416QRmCCqwY=',
   'x-amz-request-id': 'V9V97K3ZXC4ZFQQP',
   'date': 'Tue, 13 Dec 2022 12:51:55 GMT',
   'etag': '"88c47e1713849caf09d79713c65110b4"',
   'server': 'AmazonS3',
   'content-length': '0'},
  'RetryAttempts': 0},
 'ETag': '"88c47e1713849caf09d79713c65110b4"'}

### Uploading 'dim_hospital' table to S3 bucket

In [31]:
s3_resource = boto3.resource('s3')
csv_buffer = StringIO()
dimHospital.to_csv(csv_buffer, index= False)
s3_resource.Object(COVID_BUCKET,'output/dimHospital.csv').put(Body=csv_buffer.getvalue())

{'ResponseMetadata': {'RequestId': 'N30EY33KREP1XQSD',
  'HostId': 'YoKuChiVhgdcLSWPq8lgKHY7wcMPZijeKAk7NLkRvRXq43blhisPDVaY+cxyYh/q3WcxI5rSB6o=',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amz-id-2': 'YoKuChiVhgdcLSWPq8lgKHY7wcMPZijeKAk7NLkRvRXq43blhisPDVaY+cxyYh/q3WcxI5rSB6o=',
   'x-amz-request-id': 'N30EY33KREP1XQSD',
   'date': 'Tue, 13 Dec 2022 12:51:57 GMT',
   'etag': '"775f754fde5f0d023b01f97220833b55"',
   'server': 'AmazonS3',
   'content-length': '0'},
  'RetryAttempts': 0},
 'ETag': '"775f754fde5f0d023b01f97220833b55"'}

### Uploading 'dim_region' table to S3 bucket

In [54]:
s3_resource = boto3.resource('s3')
csv_buffer = StringIO()
dimRegion.to_csv(csv_buffer, index= False)
s3_resource.Object(COVID_BUCKET,'output/dimRegion.csv').put(Body=csv_buffer.getvalue())