In [38]:
import boto3
import pandas as pd
from io import StringIO
import time

In [26]:
AWS_ACCESS_KEY = "YOUR_ACCESS_KEY"
AWS_SECRET_KEY = "YOUR_SECRET_KEY"
AWS_REGION = "us-east-1"
SCHEMA_NAME = "covid_db"
S3_STAGING_DIR = "s3://athena-query-result/output/"
S3_BUCKET_NAME = "athena-query-result"
S3_OUTPUT_DIRECTORY = "output"

In [28]:
athena_client = boto3.client(
    "athena",
    aws_access_key_id=AWS_ACCESS_KEY,
    aws_secret_access_key=AWS_SECRET_KEY,
    region_name=AWS_REGION,
)

In [29]:
#Gets the data from athena and converts it into pandas dataframe
Dict = {}
def download_and_load_query_results(
    client: boto3.client, query_response: Dict
) -> pd.DataFrame:
    while True:
        try:
            client.get_query_results(QueryExecutionId=query_response["QueryExecutionId"])
            break
        except Exception as err:
            if "not yet finished" in str(err):
                time.sleep(0.01)
            else:
                raise err
    temp_file_location: str = "athena_query_results.csv"
    s3_client = boto3.client("s3",
                            aws_access_key_id = AWS_ACCESS_KEY,
                            aws_secret_access_key = AWS_SECRET_KEY,
                            region_name = AWS_REGION,)
    s3_client.download_file(S3_BUCKET_NAME,
                           f"{S3_OUTPUT_DIRECTORY}/{query_response['QueryExecutionId']}.csv",
                           temp_file_location,)
    return pd.read_csv(temp_file_location)

In [30]:
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM enigma_jhud",
    QueryExecutionContext={"Database":SCHEMA_NAME},
    ResultConfiguration={"OutputLocation":S3_STAGING_DIR,
                        "EncryptionConfiguration":{"EncryptionOption":"SSE_S3"},
                        },
)


In [31]:
enigma_jhud = download_and_load_query_results(athena_client, response)

In [39]:
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM nytimes_data_in_usa_us_county",
    QueryExecutionContext={"Database":SCHEMA_NAME},
    ResultConfiguration={"OutputLocation":S3_STAGING_DIR,
                        "EncryptionConfiguration":{"EncryptionOption":"SSE_S3"},
                        },
)

nytimes_data_in_usa_us_county = download_and_load_query_results(athena_client, response)

In [40]:
nytimes_data_in_usa_us_county.head(5)

Unnamed: 0,date,county,state,fips,cases,deaths
0,2020-01-21,Snohomish,Washington,53061.0,1.0,0.0
1,2020-01-22,Snohomish,Washington,53061.0,1.0,0.0
2,2020-01-23,Snohomish,Washington,53061.0,1.0,0.0
3,2020-01-24,Cook,Illinois,17031.0,1.0,0.0
4,2020-01-24,Snohomish,Washington,53061.0,1.0,0.0


In [41]:
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM nytimes_data_in_usaus_states",
    QueryExecutionContext={"Database":SCHEMA_NAME},
    ResultConfiguration={"OutputLocation":S3_STAGING_DIR,
                        "EncryptionConfiguration":{"EncryptionOption":"SSE_S3"},
                        },
)

nytimes_data_in_usaus_states = download_and_load_query_results(athena_client, response)

In [43]:
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM countypopulation",
    QueryExecutionContext={"Database":SCHEMA_NAME},
    ResultConfiguration={"OutputLocation":S3_STAGING_DIR,
                        "EncryptionConfiguration":{"EncryptionOption":"SSE_S3"},
                        },
)

countypopulation = download_and_load_query_results(athena_client, response)

In [52]:
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM rearc_usa_hospital_beds",
    QueryExecutionContext={"Database":SCHEMA_NAME},
    ResultConfiguration={"OutputLocation":S3_STAGING_DIR,
                        "EncryptionConfiguration":{"EncryptionOption":"SSE_S3"},
                        },
)

rearc_usa_hospital_beds = download_and_load_query_results(athena_client, response)

In [81]:
rearc_usa_hospital_beds[['latitude','longtitude']].head(2)

Unnamed: 0,latitude,longtitude
0,33.495498,-112.066157
1,32.181263,-110.965885


In [47]:
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM states_daily",
    QueryExecutionContext={"Database":SCHEMA_NAME},
    ResultConfiguration={"OutputLocation":S3_STAGING_DIR,
                        "EncryptionConfiguration":{"EncryptionOption":"SSE_S3"},
                        },
)

states_daily = download_and_load_query_results(athena_client, response)


In [71]:
states_daily.head(5)

Unnamed: 0,date,state,positive,probablecases,negative,pending,totaltestresultssource,totaltestresults,hospitalizedcurrently,hospitalizedcumulative,...,dataqualitygrade,deathincrease,hospitalizedincrease,hash,commercialscore,negativeregularscore,negativescore,positivescore,score,grade
0,20210307,AK,56886,,,,totalTestsViral,1731628,33.0,1293.0,...,,0.0,0.0,dc4bccd4bb885349d7e94d6fed058e285d4be164,0.0,0.0,0.0,0.0,0.0,
1,20210307,AL,499819,107742.0,1931711.0,,totalTestsPeopleViral,2323788,494.0,45976.0,...,,-1.0,0.0,997207b430824ea40b8eb8506c19a93e07bc972e,0.0,0.0,0.0,0.0,0.0,
2,20210307,AR,324818,69092.0,2480716.0,,totalTestsViral,2736442,335.0,14926.0,...,,22.0,11.0,50921aeefba3e30d31623aa495b47fb2ecc72fae,0.0,0.0,0.0,0.0,0.0,
3,20210307,AS,0,,2140.0,,totalTestsViral,2140,,,...,,0.0,0.0,96d23f888c995b9a7f3b4b864de6414f45c728ff,0.0,0.0,0.0,0.0,0.0,
4,20210307,AZ,826454,56519.0,3073010.0,,totalTestsViral,7908105,963.0,57907.0,...,,5.0,44.0,0437a7a96f4471666f775e63e86923eb5cbd8cdf,0.0,0.0,0.0,0.0,0.0,


In [49]:
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM us_daily",
    QueryExecutionContext={"Database":SCHEMA_NAME},
    ResultConfiguration={"OutputLocation":S3_STAGING_DIR,
                        "EncryptionConfiguration":{"EncryptionOption":"SSE_S3"},
                        },
)

us_daily = download_and_load_query_results(athena_client, response)

In [70]:
us_daily.head(5)

Unnamed: 0,date,states,positive,negative,pending,hospitalizedcurrently,hospitalizedcumulative,inicucurrently,inicucumulative,onventilatorcurrently,...,lastmodified,recovered,total,posneg,deathincrease,hospitalizedincrease,negativeincrease,positiveincrease,totaltestresultsincrease,hash
0,20210307,56,28755524.0,74579770.0,11808.0,40212.0,878613.0,8137.0,45475.0,2801.0,...,2021-03-07T24:00:00Z,,0,0,839,726,130414,41265,1156241,8b26839690cd05c0cef69cb9ed85641a76b5e78e
1,20210306,56,28714259.0,74449356.0,11783.0,41401.0,877887.0,8409.0,45453.0,2811.0,...,2021-03-06T24:00:00Z,,0,0,1674,503,142201,59620,1409138,d0c0482ea549c9d5c04a7c86acb6fc6a8095a592
2,20210305,56,28654639.0,74307155.0,12213.0,42541.0,877384.0,8634.0,45373.0,2889.0,...,2021-03-05T24:00:00Z,,0,0,2221,2781,271917,68787,1744417,a35ea4289cec4bb55c9f29ae04ec0fd5ac4e0222
3,20210304,56,28585852.0,74035238.0,12405.0,44172.0,874603.0,8970.0,45293.0,2973.0,...,2021-03-04T24:00:00Z,,0,0,1743,1530,177957,65487,1590984,a19ad6379a653834cbda3093791ad2c3b9fab5ff
4,20210303,56,28520365.0,73857281.0,11778.0,45462.0,873073.0,9359.0,45214.0,3094.0,...,2021-03-03T24:00:00Z,,0,0,2449,2172,267001,66836,1406795,9e1d2afda1b0ec243060d6f68a7134d011c0cb2a


In [51]:
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM us_total_latest",
    QueryExecutionContext={"Database":SCHEMA_NAME},
    ResultConfiguration={"OutputLocation":S3_STAGING_DIR,
                        "EncryptionConfiguration":{"EncryptionOption":"SSE_S3"},
                        },
)

us_total_latest = download_and_load_query_results(athena_client, response)

In [63]:
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM state_abv",
    QueryExecutionContext={"Database":SCHEMA_NAME},
    ResultConfiguration={"OutputLocation":S3_STAGING_DIR,
                        "EncryptionConfiguration":{"EncryptionOption":"SSE_S3"},
                        },
)

state_abv = download_and_load_query_results(athena_client, response)

In [64]:
state_abv.head(5)

Unnamed: 0,col0,col1
0,State,Abbreviation
1,Alabama,AL
2,Alaska,AK
3,Arizona,AZ
4,Arkansas,AR


In [65]:
new_header = state_abv.iloc[0] #grab the first row of the header
state_abv = state_abv[1:] #take the data after header row
state_abv.columns = new_header

In [68]:
state_abv.head(5)

Unnamed: 0,State,Abbreviation
1,Alabama,AL
2,Alaska,AK
3,Arizona,AZ
4,Arkansas,AR
5,California,CA


In [73]:
#Building Dimensional Model
factCovid_1 = enigma_jhud[['fips','province_state','country_region','confirmed','deaths','recovered','active']]
factCovid_2 = states_daily[['fips','date','positive','negative','hospitalizedcurrently','hospitalized','hospitalizeddischarged']]
factCovid = pd.merge(factCovid_1,factCovid_2,on='fips',how='inner')

In [75]:
factCovid.shape

(8435, 13)

In [76]:
dimRegion_1 = enigma_jhud[['fips','province_state','country_region','latitude','longitude']]
dimRegion_2 = nytimes_data_in_usa_us_county[['fips','county','state']]
dimRegion = pd.merge(dimRegion_1,dimRegion_2,on='fips',how='inner')

In [83]:
dimHospital = rearc_usa_hospital_beds[['fips','state_name','latitude','longtitude','hq_address','hospital_name','hospital_type','hq_city','hq_state']]

In [109]:
dimDate = states_daily[['fips','date']]

In [110]:
dimDate.head()

Unnamed: 0,fips,date
0,2.0,20210307
1,1.0,20210307
2,5.0,20210307
3,60.0,20210307
4,4.0,20210307


In [111]:
dimDate['date'] = pd.to_datetime(dimDate['date'], format='%Y%m%d')

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  dimDate['date'] = pd.to_datetime(dimDate['date'], format='%Y%m%d')


In [112]:
dimDate.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2685 entries, 0 to 2684
Data columns (total 2 columns):
 #   Column  Non-Null Count  Dtype         
---  ------  --------------  -----         
 0   fips    2684 non-null   float64       
 1   date    2685 non-null   datetime64[ns]
dtypes: datetime64[ns](1), float64(1)
memory usage: 42.1 KB


In [113]:
dimDate.info()
dimDate['day_of_week'] = dimDate['date'].dt.dayofweek

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2685 entries, 0 to 2684
Data columns (total 2 columns):
 #   Column  Non-Null Count  Dtype         
---  ------  --------------  -----         
 0   fips    2684 non-null   float64       
 1   date    2685 non-null   datetime64[ns]
dtypes: datetime64[ns](1), float64(1)
memory usage: 42.1 KB


A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  dimDate['day_of_week'] = dimDate['date'].dt.dayofweek


In [115]:
dimDate['Year'] = dimDate['date'].dt.year

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  dimDate['Year'] = dimDate['date'].dt.year


In [116]:
dimDate['Month'] = dimDate['date'].dt.month

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  dimDate['Month'] = dimDate['date'].dt.month


In [117]:
dimDate.head()

Unnamed: 0,fips,date,day_of_week,Year,Month
0,2.0,2021-03-07,6,2021,3
1,1.0,2021-03-07,6,2021,3
2,5.0,2021-03-07,6,2021,3
3,60.0,2021-03-07,6,2021,3
4,4.0,2021-03-07,6,2021,3


In [120]:
#Loading to S3
bucket = 'abakash-de-covid-project'
csv_buffer = StringIO()
factCovid.to_csv(csv_buffer)
s3_resource = boto3.resource('s3')
s3_resource.Object(bucket,'output/factCovid.csv').put(Body=csv_buffer.getvalue())

{'ResponseMetadata': {'RequestId': 'NJ5C9S80MAMC19EY',
  'HostId': 'icVrZxbEjepiujCDrH/QfAir31caWzyu8+cH1U4tiVVTW3pBel7Vifx+5GxzjLqEGAzR9HfVNcI=',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amz-id-2': 'icVrZxbEjepiujCDrH/QfAir31caWzyu8+cH1U4tiVVTW3pBel7Vifx+5GxzjLqEGAzR9HfVNcI=',
   'x-amz-request-id': 'NJ5C9S80MAMC19EY',
   'date': 'Thu, 02 Nov 2023 05:18:54 GMT',
   'x-amz-server-side-encryption': 'AES256',
   'etag': '"6ee424f7de879bc8a0c8a26f495d6e79"',
   'server': 'AmazonS3',
   'content-length': '0'},
  'RetryAttempts': 1},
 'ETag': '"6ee424f7de879bc8a0c8a26f495d6e79"',
 'ServerSideEncryption': 'AES256'}

In [121]:
bucket = 'abakash-de-covid-project'
csv_buffer = StringIO()
dimDate.to_csv(csv_buffer)
s3_resource = boto3.resource('s3')
s3_resource.Object(bucket,'output/dimDate.csv').put(Body=csv_buffer.getvalue())

{'ResponseMetadata': {'RequestId': 'PDAY6J1M0CDYP825',
  'HostId': '6iAjI9S6ZoUxIDjkrc5sNdcVmKkla2pZVICkcbn0e2I06LenFmpzepH7LnUj63fIdiyghDY1oBA=',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amz-id-2': '6iAjI9S6ZoUxIDjkrc5sNdcVmKkla2pZVICkcbn0e2I06LenFmpzepH7LnUj63fIdiyghDY1oBA=',
   'x-amz-request-id': 'PDAY6J1M0CDYP825',
   'date': 'Thu, 02 Nov 2023 05:23:06 GMT',
   'x-amz-server-side-encryption': 'AES256',
   'etag': '"3195fa217e14f434fdb397ee89ad9ff8"',
   'server': 'AmazonS3',
   'content-length': '0'},
  'RetryAttempts': 2},
 'ETag': '"3195fa217e14f434fdb397ee89ad9ff8"',
 'ServerSideEncryption': 'AES256'}

In [122]:
bucket = 'abakash-de-covid-project'
csv_buffer = StringIO()
dimHospital.to_csv(csv_buffer)
s3_resource = boto3.resource('s3')
s3_resource.Object(bucket,'output/dimHospital.csv').put(Body=csv_buffer.getvalue())

{'ResponseMetadata': {'RequestId': '3K7PCKMEP4QKSDGG',
  'HostId': 'N58e4tcIG64txYN3oCBbCGGkOXV8fp0RpfojZP+9NEDiEI925gW1tjZuhByte3GY3E2S03XZYQs=',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amz-id-2': 'N58e4tcIG64txYN3oCBbCGGkOXV8fp0RpfojZP+9NEDiEI925gW1tjZuhByte3GY3E2S03XZYQs=',
   'x-amz-request-id': '3K7PCKMEP4QKSDGG',
   'date': 'Thu, 02 Nov 2023 05:24:10 GMT',
   'x-amz-server-side-encryption': 'AES256',
   'etag': '"e81caded3358841b6b8f6a6b8920270b"',
   'server': 'AmazonS3',
   'content-length': '0'},
  'RetryAttempts': 1},
 'ETag': '"e81caded3358841b6b8f6a6b8920270b"',
 'ServerSideEncryption': 'AES256'}

In [123]:
bucket = 'abakash-de-covid-project'
csv_buffer = StringIO()
dimRegion.to_csv(csv_buffer)
s3_resource = boto3.resource('s3')
s3_resource.Object(bucket,'output/dimRegion.csv').put(Body=csv_buffer.getvalue())

{'ResponseMetadata': {'RequestId': 'B4HRHRJEJ46BBJXA',
  'HostId': 'Nc9/ZIyTdmDDAOOkDzuqT5fD+Jjhbidj1coB0KiVfw0LfMIej56rvBajz0SAgZ/aXp06hBlibQo=',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amz-id-2': 'Nc9/ZIyTdmDDAOOkDzuqT5fD+Jjhbidj1coB0KiVfw0LfMIej56rvBajz0SAgZ/aXp06hBlibQo=',
   'x-amz-request-id': 'B4HRHRJEJ46BBJXA',
   'date': 'Thu, 02 Nov 2023 06:03:07 GMT',
   'x-amz-server-side-encryption': 'AES256',
   'etag': '"68178b0eac7079bf1c8e4b41933e1798"',
   'server': 'AmazonS3',
   'content-length': '0',
   'connection': 'close'},
  'RetryAttempts': 3},
 'ETag': '"68178b0eac7079bf1c8e4b41933e1798"',
 'ServerSideEncryption': 'AES256'}

In [125]:
#Extracting Schema from dataset
dimDatesql = pd.io.sql.get_schema(dimDate.reset_index(),'dimDate')
print(dimDatesql)

CREATE TABLE "dimDate" (
"index" INTEGER,
  "fips" REAL,
  "date" TIMESTAMP,
  "day_of_week" INTEGER,
  "Year" INTEGER,
  "Month" INTEGER
)


In [128]:
factCovidsql = pd.io.sql.get_schema(factCovid.reset_index(),'factCovid')
print(factCovidsql)

CREATE TABLE "factCovid" (
"index" INTEGER,
  "fips" REAL,
  "province_state" TEXT,
  "country_region" TEXT,
  "confirmed" REAL,
  "deaths" REAL,
  "recovered" REAL,
  "active" REAL,
  "date" INTEGER,
  "positive" INTEGER,
  "negative" REAL,
  "hospitalizedcurrently" REAL,
  "hospitalized" REAL,
  "hospitalizeddischarged" REAL
)


In [129]:
dimRegionsql = pd.io.sql.get_schema(dimRegion.reset_index(),'dimRegion')
print(dimRegionsql)


CREATE TABLE "dimRegion" (
"index" INTEGER,
  "fips" REAL,
  "province_state" TEXT,
  "country_region" TEXT,
  "latitude" REAL,
  "longitude" REAL,
  "county" TEXT,
  "state" TEXT
)


In [130]:
dimHospitalsql = pd.io.sql.get_schema(dimHospital.reset_index(),'dimHospital')
print(dimHospitalsql)

CREATE TABLE "dimHospital" (
"index" INTEGER,
  "fips" INTEGER,
  "state_name" TEXT,
  "latitude" REAL,
  "longtitude" REAL,
  "hq_address" TEXT,
  "hospital_name" TEXT,
  "hospital_type" TEXT,
  "hq_city" TEXT,
  "hq_state" TEXT
)
