In [88]:
import boto3
import pandas as pd
from io import StringIO  #python3; python2: BytesIO
import time
from typing import Dict

In [28]:
AWS_ACCESS_KEY = " "
AWS_SECRET_KEY = " "
AWS_REGION = "ap-south-1"
SCHEMA_NAME = "covid_19"
S3_STAGING_DIR = "s3://himanshu-athena-output-covid/output/"
S3_BUCKET_NAME = "himanshu-athena-output-covid"
S3_OUTPUT_DIRECTORY = "output"

In [29]:
athena_client = boto3.client("athena",
                            aws_access_key_id = AWS_ACCESS_KEY,
                            aws_secret_access_key = AWS_SECRET_KEY,
                            region_name = AWS_REGION,)

In [89]:
#Gets the data from athena and converts it into pandas dataframe
Dict = {}
def download_and_load_query_results(
    client: boto3.client, query_response: Dict
) -> pd.DataFrame:
    while True:
        try:
            client.get_query_results(QueryExecutionId=query_response["QueryExecutionId"])
            break
        except Exception as err:
             if "not yet finished" in str(err):
                 time.sleep(0.001)
             else:
                  print(f"An exception occurred: {err}")
                  raise err
            # if "not yet finished" in str(err):
            #     time.sleep(0.001)
            # else:
            #     raise err
    temp_file_location = "athena_query_results.csv"
    s3_client = boto3.client("s3",
                            aws_access_key_id = AWS_ACCESS_KEY,
                            aws_secret_access_key = AWS_SECRET_KEY,
                            region_name = AWS_REGION,)
    s3_client.download_file(S3_BUCKET_NAME,
                           f"{S3_OUTPUT_DIRECTORY}/{query_response['QueryExecutionId']}.csv",
                           temp_file_location,)
    return pd.read_csv(temp_file_location)


In [94]:
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM enigmaup_jhu_csv",
    QueryExecutionContext={"Database":SCHEMA_NAME},
    ResultConfiguration={
        "OutputLocation":S3_STAGING_DIR ,
        "EncryptionConfiguration":{"EncryptionOption":"SSE_S3"},
    },
)


In [95]:
response

{'QueryExecutionId': 'b0cfd5af-90d8-4ff4-bb89-8958f77081c3',
 'ResponseMetadata': {'RequestId': 'eae23b96-36f7-4ed8-b234-08a5f587d011',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'date': 'Fri, 02 Feb 2024 21:32:23 GMT',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '59',
   'connection': 'keep-alive',
   'x-amzn-requestid': 'eae23b96-36f7-4ed8-b234-08a5f587d011'},
  'RetryAttempts': 0}}

In [96]:
enigmaup_jhu_csv = download_and_load_query_results(athena_client, response)

In [97]:
enigmaup_jhu_csv.head()

Unnamed: 0,fips,admin2,province_state,country_region,last_update,latitude,longitude,confirmed,deaths,recovered,active,combined_key
0,,,Anhui,China,2020-01-22T17:00:00,31.826,117.226,1.0,,,,"""Anhui"
1,,,Beijing,China,2020-01-22T17:00:00,40.182,116.414,14.0,,,,"""Beijing"
2,,,Chongqing,China,2020-01-22T17:00:00,30.057,107.874,6.0,,,,"""Chongqing"
3,,,Fujian,China,2020-01-22T17:00:00,26.079,117.987,1.0,,,,"""Fujian"
4,,,Gansu,China,2020-01-22T17:00:00,36.061,103.834,,,,,"""Gansu"


In [46]:
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM enigma_nytimes_data_in_usaus_county",
    QueryExecutionContext={"Database":SCHEMA_NAME},
    ResultConfiguration={"OutputLocation":S3_STAGING_DIR ,
                        "EncryptionConfiguration":{"EncryptionOption":"SSE_S3"},
                        },
)

enigma_nytimes_data_in_usaus_county = download_and_load_query_results(athena_client, response)

In [47]:
enigma_nytimes_data_in_usaus_county.head()

Unnamed: 0,date,county,state,fips,cases,deaths
0,2020-01-21,Snohomish,Washington,53061.0,1.0,0.0
1,2020-01-22,Snohomish,Washington,53061.0,1.0,0.0
2,2020-01-23,Snohomish,Washington,53061.0,1.0,0.0
3,2020-01-24,Cook,Illinois,17031.0,1.0,0.0
4,2020-01-24,Snohomish,Washington,53061.0,1.0,0.0


In [48]:
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM enigma_nytimes_dataus_states",
    QueryExecutionContext={"Database":SCHEMA_NAME},
    ResultConfiguration={"OutputLocation":S3_STAGING_DIR ,
                        "EncryptionConfiguration":{"EncryptionOption":"SSE_S3"},
                        },
)

enigma_nytimes_dataus_states = download_and_load_query_results(athena_client, response)

In [49]:
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM static_datasets_countypopulation",
    QueryExecutionContext={"Database":SCHEMA_NAME},
    ResultConfiguration={"OutputLocation":S3_STAGING_DIR ,
                        "EncryptionConfiguration":{"EncryptionOption":"SSE_S3"},
                        },
)

static_datasets_countypopulation = download_and_load_query_results(athena_client, response)


In [58]:
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM rearc_usa_hospital_bedsusa_hospital_beds_geojson_4cca297c",
    QueryExecutionContext={"Database":SCHEMA_NAME},
    ResultConfiguration={"OutputLocation":S3_STAGING_DIR ,
                        "EncryptionConfiguration":{"EncryptionOption":"SSE_S3"},
                        },
)

rearc_usa_hospital_bedsusa_hospital_beds_geojson_4cca297c = download_and_load_query_results(athena_client, response)

In [51]:
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM static_datasets_state_abv",
    QueryExecutionContext={"Database":SCHEMA_NAME},
    ResultConfiguration={"OutputLocation":S3_STAGING_DIR ,
                        "EncryptionConfiguration":{"EncryptionOption":"SSE_S3"},
                        },
)

static_datasets_state_abv = download_and_load_query_results(athena_client, response)

In [53]:
static_datasets_state_abv.head()

Unnamed: 0,col0,col1
0,State,Abbreviation
1,Alabama,AL
2,Alaska,AK
3,Arizona,AZ
4,Arkansas,AR


In [54]:
new_header = static_datasets_state_abv.iloc[0] #grab the first row for the header

In [55]:
static_datasets_state_abv = static_datasets_state_abv[1:] #slicing , take the data less than the header row

In [57]:
static_datasets_state_abv.head()

Unnamed: 0,col0,col1
1,Alabama,AL
2,Alaska,AK
3,Arizona,AZ
4,Arkansas,AR
5,California,CA


In [63]:
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM rearc_covid_19_testing_datastates_daily",
    QueryExecutionContext={"Database":SCHEMA_NAME},
    ResultConfiguration={"OutputLocation":S3_STAGING_DIR ,
                        "EncryptionConfiguration":{"EncryptionOption":"SSE_S3"},
                        },
)

rearc_covid_19_testing_datastates_daily = download_and_load_query_results(athena_client, response)

In [62]:
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM rearc_covid_19_testing_dataus_daily",
    QueryExecutionContext={"Database":SCHEMA_NAME},
    ResultConfiguration={"OutputLocation":S3_STAGING_DIR ,
                        "EncryptionConfiguration":{"EncryptionOption":"SSE_S3"},
                        },
)

rearc_covid_19_testing_dataus_daily = download_and_load_query_results(athena_client, response)

In [64]:
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM rearc_covid_19_testing_dataus_total_latest",
    QueryExecutionContext={"Database":SCHEMA_NAME},
    ResultConfiguration={"OutputLocation":S3_STAGING_DIR ,
                        "EncryptionConfiguration":{"EncryptionOption":"SSE_S3"},
                        },
)

rearc_covid_19_testing_dataus_total_latest = download_and_load_query_results(athena_client, response)


In [None]:
us_total_latest.head() #Checking the dataframe

In [99]:
factCovid_1 = enigmaup_jhu_csv[['fips','province_state','country_region','confirmed','deaths','recovered','active']]
factCovid_2 = rearc_covid_19_testing_datastates_daily[['fips','date','positive','negative','hospitalizedcurrently','hospitalized','hospitalizeddischarged']]
factCovid = pd.merge(factCovid_1,factCovid_2,on='fips',how='inner')

In [101]:
factCovid.shape

(27992, 13)

In [103]:
dimRegion_1 = enigmaup_jhu_csv[['fips','province_state','country_region','latitude','longitude']]
dimRegion_2 = enigma_nytimes_data_in_usaus_county[['fips','county','state']]
dimRegion = pd.merge(dimRegion_1,dimRegion_2,on='fips',how='inner')

In [105]:
dimHospital = rearc_usa_hospital_bedsusa_hospital_beds_geojson_4cca297c[['fips','state_name','latitude','longtitude','hq_address','hospital_name','hospital_type','hq_city','hq_state']]


In [107]:
dimDate = rearc_covid_19_testing_datastates_daily[['fips','date']]

In [108]:
dimDate.head()

Unnamed: 0,fips,date
0,2.0,20210307
1,1.0,20210307
2,5.0,20210307
3,60.0,20210307
4,4.0,20210307


In [109]:
dimDate['date'] = pd.to_datetime(dimDate['date'], format='%Y%m%d')

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  dimDate['date'] = pd.to_datetime(dimDate['date'], format='%Y%m%d')


In [119]:
dimDate.head()

Unnamed: 0,fips,date,year,month,day_of_week
0,2.0,1970-01-01 00:00:00.000000003,1970,1,3
1,1.0,1970-01-01 00:00:00.000000003,1970,1,3
2,5.0,1970-01-01 00:00:00.000000003,1970,1,3
3,60.0,1970-01-01 00:00:00.000000003,1970,1,3
4,4.0,1970-01-01 00:00:00.000000003,1970,1,3


In [122]:
dimDate['year'] = dimDate['date'].dt.year
dimDate['month'] = dimDate['date'].dt.month
dimDate["day_of_week"] = dimDate['date'].dt.dayofweek

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  dimDate['year'] = dimDate['date'].dt.year
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  dimDate['month'] = dimDate['date'].dt.month
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  dimDate["day_of_week"] = dimDate['date'].dt.dayofweek


In [123]:
dimDate.head()

Unnamed: 0,fips,date,year,month,day_of_week
0,2.0,1970-01-01 00:00:00.000000003,1970,1,3
1,1.0,1970-01-01 00:00:00.000000003,1970,1,3
2,5.0,1970-01-01 00:00:00.000000003,1970,1,3
3,60.0,1970-01-01 00:00:00.000000003,1970,1,3
4,4.0,1970-01-01 00:00:00.000000003,1970,1,3


# saving to s3

In [None]:
bucket = 'oovk-covid-project-output-buck'

In [None]:
csv_buffer = StringIO()
factCovid.to_csv(csv_buffer)
s3_resource = boto3.resource('s3')
s3_resource.Object(bucket,'output/factCovid.csv').put(Body=csv_buffer.getvalue())

In [None]:
csv_buffer.getvalue()

In [None]:
dimDatesql = pd.io.sql.get_schema(dimDate.reset_index(),'dimDate')
print(''.join(dimDatesql))

In [None]:
factCovidsql = pd.io.sql.get_schema(factCovid.reset_index(),'factCovid')
print(''.join(factCovidsql))


In [None]:
dimRegionsql = pd.io.sql.get_schema(dimRegion.reset_index(),'dimRegion')
print(''.join(dimRegionsql))


In [None]:
dimHospitalsql = pd.io.sql.get_schema(dimHospital.reset_index(),'dimHospital')
print(''.join(dimHospitalsql))


## TO connect & create table in Redshift 

In [None]:
import redshift_connector


In [None]:
conn = redshift_connector.connect(
    host='',
    databse='dev',
    user='awsuser',
    password='Passw0rd123'
)

In [None]:
conn.autocommit = True

In [None]:
cursor=redshift_connector.Cursor = conn.cursor()


In [None]:
cursor.execute("""
CREATE TABLE "dimDate" (
"index" INTEGER,
"fips" INTEGER,
"date" TIMESTAMP,
"year" INTEGER,
"month" INTEGER,
"day_of_week" INTEGER
)
""")

In [None]:
cursor.execute("""
CREATE TABLE "dimHospital" (
"index" INTEGER,
"fips" REAL,
"state_name" TEXT,
"longitutde" REAL,
"latitude" REAL,
"hq_address" TEXT,
"hospital_name" TEXT,
"hospital_type" TEXT,
"hq_city" TEXT,
"hq_state" TEXT,
)
""")

cursor.execute("""
CREATE TABLE "factCovid" (
"index" INTEGER,
"fips" REAL,
"province_state" TEXT,
"country_region" TEXT,
"confirmed" REAL,
"deaths" REAL,
"recovered" REAL,
"active" REAL,
"date" INTEGER,
"positive" REAL,
"negative" REAL,
"hospitalizedcurrently" REAL,
"hospitalized" REAL,
"hospitalizeddischarged" REAL
)
""")

In [None]:
cursor.execute("""
CREATE TABLE "dimRegion" (
"index" INTEGER,
"fips" REAL,
"province_state" TEXT,
"country_region" TEXT,
"latitude" REAL,
"longititude" REAL,
"county" TEXT,
"state" TEXT
)
""")

In [None]:
cursor.execute("""
copy dimDate from 's3_uri'
credentials 'aws_iam_role=arn:aws:iam:iamrole'
delimiter ','
region 'ap-south-1'
IGNOREHEADER 1
""")