In [2]:
# 🔧 Data Handling
import pandas as pd
import numpy as np

# 📅 Date & Time
from datetime import datetime

import time


In [3]:
!pip install pycountry



In [4]:
# ☁️ AWS S3 Interaction
import boto3
from io import StringIO

# 🌍 Country Info (used for dimregion country column)
import pycountry

# 🧼 Optional: Logging if you want to trace/debug stuff
import logging

In [5]:
aws_access_key = 'AKIA3XAY6JXGPAT4BAER'
aws_secret_key = '******' 
aws_region = 'us-east-2'
schema_name = 'covid_database-diljot'
s3_staging_dir = 's3://covidproject-buc/output/'
s3_bucket_name = 'covidproject-buc'
s3_output_directory = 'output'


athena_client = boto3.client(
                "athena",
                aws_access_key_id = aws_access_key,
                aws_secret_access_key = aws_secret_key,
                region_name = aws_region
)

In [6]:
query_response = athena_client.start_query_execution(
    QueryString="SELECT * FROM hospital_beds LIMIT 10",  # Ensure the SQL query is correct
    QueryExecutionContext={"Database": schema_name},
    ResultConfiguration={
        "OutputLocation": s3_staging_dir,
        "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
    },
)

query_execution_id = query_response["QueryExecutionId"]

# Wait for the query to complete
while True:
    print("Results fetched:")
    response = athena_client.get_query_execution(QueryExecutionId=query_execution_id)
    status = response["QueryExecution"]["Status"]["State"]
    print(status)
    
    if status in ["SUCCEEDED", "FAILED", "CANCELLED"]:
        break
    time.sleep(1)

# If the query failed, print the failure reason
if status == "FAILED":
    failure_reason = response["QueryExecution"]["Status"].get("StateChangeReason")
    print(f"Query failed. Reason: {failure_reason}")
else:
    # Proceed with fetching the results if the query succeeded
    results = []
    next_token = None

    while True:
        kwargs = {"QueryExecutionId": query_execution_id}
        if next_token:
            kwargs["NextToken"] = next_token

        print("loop running")
        result_response = athena_client.get_query_results(**kwargs)
        results.extend(result_response["ResultSet"]["Rows"])
        
        next_token = result_response.get("NextToken")
        if not next_token:
            break

    # Process the results
    for row in results:
        print(row)

Results fetched:
QUEUED
Results fetched:
SUCCEEDED
loop running
{'Data': [{'VarCharValue': 'objectid'}, {'VarCharValue': 'hospital_name'}, {'VarCharValue': 'hospital_type'}, {'VarCharValue': 'hq_address'}, {'VarCharValue': 'hq_address1'}, {'VarCharValue': 'hq_city'}, {'VarCharValue': 'hq_state'}, {'VarCharValue': 'hq_zip_code'}, {'VarCharValue': 'county_name'}, {'VarCharValue': 'state_name'}, {'VarCharValue': 'state_fips'}, {'VarCharValue': 'cnty_fips'}, {'VarCharValue': 'fips'}, {'VarCharValue': 'num_licensed_beds'}, {'VarCharValue': 'num_staffed_beds'}, {'VarCharValue': 'num_icu_beds'}, {'VarCharValue': 'adult_icu_beds'}, {'VarCharValue': 'pedi_icu_beds'}, {'VarCharValue': 'bed_utilization'}, {'VarCharValue': 'avg_ventilator_usage'}, {'VarCharValue': 'potential_increase_in_bed_capac'}, {'VarCharValue': 'latitude'}, {'VarCharValue': 'longtitude'}]}
{'Data': [{'VarCharValue': '1'}, {'VarCharValue': 'Phoenix VA Health Care System (AKA Carl T Hayden VA Medical Center)'}, {'VarCharValue':

In [8]:

# Initialize the S3 client
s3_client = boto3.client(
    's3',
    aws_access_key_id=aws_access_key,
    aws_secret_access_key=aws_secret_key,
    region_name=aws_region
)

def get_pd_frame_data(file_info):
    file_key = s3_client.get_object(Bucket=s3_bucket_name, Key=file_info['file_path'])
    csv_data = file_key['Body'].read().decode('utf-8')  # Decode the byte data into string
    file_info['data'] = pd.read_csv(StringIO(csv_data)) 

data_entities = [{'df_name': 'hospital_beds', 'file_path': 'covidproject_tables/hospital_beds/rearc_usa_hospital_beds.csv', 'data':None},
                 {'df_name': 'covid_data', 'file_path': 'covidproject_tables/covid-data/covid_testing_data_states_daily.csv', 'data':None},
                {'df_name': 'enigma_jhud', 'file_path': 'covidproject_tables/enigma-jhud/enigma_jhud.csv', 'data':None}]

for i in range(len(data_entities)):
    get_pd_frame_data(data_entities[i])

hospital_beds = data_entities[0]['data']
covid_data = data_entities[1]['data']
enigma_jhud = data_entities[2]['data']

        
# GET hospital_beds data and store it in a python dataframe
# s3_hospitalsData_key = 
# response_hospitals = s3_client.get_object(Bucket=s3_bucket_name, Key=s3_hospitalsData_key)

# # Read the CSV content into a pandas DataFrame
# csv_data = response_hospitals['Body'].read().decode('utf-8')  # Decode the byte data into string
# hospital_beds = pd.read_csv(StringIO(csv_data))  # Load into DataFrame

# Verify the DataFrame

# lets create the hospital dimension table 
dimhospital = hospital_beds[['FIPS', 'HOSPITAL_NAME', 'HOSPITAL_TYPE', 'HQ_ADDRESS', 'HQ_ADDRESS1', 'HQ_CITY', 'HQ_STATE', 'COUNTY_NAME', 'latitude', 'longtitude']]

##############################################################################
# for the facts table
factscovid_daily = covid_data[['fips', 'date', 'state', 'positive', 'negative', 'pending', 'hospitalized', 'dateModified', 'recovered', 'deathConfirmed', 'hospitalizedDischarged']]
factscovid_enigma = enigma_jhud[['fips', 'active', 'recovered', 'confirmed']]
factscovid = pd.merge(factscovid_daily, factscovid_enigma, on = 'fips', how = 'inner')

print(factscovid.head())


#############################################################################
#for the region dimension table 
dimregion = enigma_jhud[['fips', 'province_state', 'country_region', 'latitude', 'longitude']]

def get_country_name(state_name):
    if pd.isna(state_name):
        return None  # Return None or any default value you prefer for NaN entries
    state_name = str(state_name)  # Convert the state_name to a string
    for subdivision in pycountry.subdivisions:
        if subdivision.name.lower() == state_name.lower():
            return pycountry.countries.get(alpha_2=subdivision.country_code).name
    return None
dimregion['country'] = dimregion['province_state'].apply(get_country_name)


#############################################################################
# lastly, lets work on the dimension date table 
#first, I covert the date column to datetime format
covid_data['date'] = pd.to_datetime(covid_data['date'])
# now, lets get the month, year, weekend and day 
dimdate = pd.DataFrame({
    'date': covid_data['date'],
    'day': covid_data['date'].dt.day,
    'month': covid_data['date'].dt.month,
    'year': covid_data['date'].dt.year,
    'is_weekend': covid_data['date'].dt.weekday >= 5
})

   fips      date state  positive  negative  pending  hospitalized  \
0    72  20210307    PR  101327.0  305972.0      NaN           NaN   
1    72  20210307    PR  101327.0  305972.0      NaN           NaN   
2    72  20210307    PR  101327.0  305972.0      NaN           NaN   
3    72  20210307    PR  101327.0  305972.0      NaN           NaN   
4    72  20210307    PR  101327.0  305972.0      NaN           NaN   

           dateModified  recovered_x  deathConfirmed  hospitalizedDischarged  \
0  2021-03-06T00:00:00Z      91987.0          1749.0                     NaN   
1  2021-03-06T00:00:00Z      91987.0          1749.0                     NaN   
2  2021-03-06T00:00:00Z      91987.0          1749.0                     NaN   
3  2021-03-06T00:00:00Z      91987.0          1749.0                     NaN   
4  2021-03-06T00:00:00Z      91987.0          1749.0                     NaN   

   active  recovered_y  confirmed  
0     NaN          0.0        3.0  
1     NaN          0.0    

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  dimregion['country'] = dimregion['province_state'].apply(get_country_name)


In [11]:
bucket_name = 'covidproject-buc'
csv_buffer = StringIO()
s3_resource = boto3.resource('s3')

# Dimregion back to S3 bucket in Output directory
dimregion.to_csv(csv_buffer)
s3_resource.Object(bucket_name, 'output/dimregion.csv').put(Body=csv_buffer.getvalue())

# Dimdate back to S3 bucket in Output directory
dimdate.to_csv(csv_buffer)
s3_resource.Object(bucket_name, 'output/dimdate.csv').put(Body=csv_buffer.getvalue())

# Dimregion back to S3 bucket in Output directory
dimhospital.to_csv(csv_buffer)
s3_resource.Object(bucket_name, 'output/dimhospital.csv').put(Body=csv_buffer.getvalue())

# Factscovid back to S3 bucket in Output directory
factscovid.to_csv(csv_buffer)
s3_resource.Object(bucket_name, 'output/factscovid.csv').put(Body=csv_buffer.getvalue())

{'ResponseMetadata': {'RequestId': 'NZ5A55SAYHHGA02C',
  'HostId': 'OI2LGjWw92Agm3J9vBaf0JdAyKYpaEWZb8T+NtNE2+Sjytvixrzi8KBDNndbs6Z/9+5ThO5nWL5BgLhoyoB+JA==',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amz-id-2': 'OI2LGjWw92Agm3J9vBaf0JdAyKYpaEWZb8T+NtNE2+Sjytvixrzi8KBDNndbs6Z/9+5ThO5nWL5BgLhoyoB+JA==',
   'x-amz-request-id': 'NZ5A55SAYHHGA02C',
   'date': 'Tue, 15 Apr 2025 02:47:48 GMT',
   'x-amz-server-side-encryption': 'AES256',
   'etag': '"b7e9664f890a1936fa0fcfccbbbb6b57"',
   'x-amz-checksum-crc32': '4Q6Ocw==',
   'x-amz-checksum-type': 'FULL_OBJECT',
   'content-length': '0',
   'server': 'AmazonS3'},
  'RetryAttempts': 0},
 'ETag': '"b7e9664f890a1936fa0fcfccbbbb6b57"',
 'ChecksumCRC32': '4Q6Ocw==',
 'ChecksumType': 'FULL_OBJECT',
 'ServerSideEncryption': 'AES256'}