In [226]:
import boto3 ## Library used to connect to AWS services
import time
import pandas as pd
from io import StringIO

In [227]:
## Create a New user and generate the Access and Secret key pair (currently removing it)
AWS_ACCESS_KEY = ""
AWS_SECRET_KEY = ""
AWS_REGION = "ap-south-1"
SCHEMA_NAME = "covid_19_dataset"
S3_STAGING_DIR = "s3://covid-19-athena-query-results/Unsaved/"
S3_BUCKET_NAME = "covid-19-athena-query-results"
S3_OUTPUT_DIRECTORY = "Unsaved"

In [228]:
athena_client = boto3.client(
"athena",
aws_access_key_id = AWS_ACCESS_KEY,
aws_secret_access_key = AWS_SECRET_KEY,
region_name = AWS_REGION)

In [229]:
Dict = {}
def download_and_load_query_results(
    client: boto3.client, query_response: Dict
) -> pd.DataFrame:
    while True:
        try:
            client.get_query_results(
                QueryExecutionId = query_response["QueryExecutionId"]
                )
            break
        except Exception as err:
            if "not yet finished" in str(err):
                time.sleep(0.001)
            else:
                raise err
    temp_file_location: str = "athena_query_results.csv"
    s3_client = boto3.client(
    "s3",
    aws_access_key_id = AWS_ACCESS_KEY,
    aws_secret_access_key = AWS_SECRET_KEY,
    region_name = AWS_REGION,
    )
    s3_client.download_file(
    S3_BUCKET_NAME,
    f"{S3_OUTPUT_DIRECTORY}/{query_response['QueryExecutionId']}.csv",
    temp_file_location,
    )
    return pd.read_csv(temp_file_location)

In [230]:
response = athena_client.start_query_execution(
QueryString = "SELECT * FROM enigma_jhud LIMIT 10000",
QueryExecutionContext = {"Database": SCHEMA_NAME},
ResultConfiguration = {"OutputLocation": S3_STAGING_DIR,
                      "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
                      },
)

In [231]:
response

{'QueryExecutionId': 'e205e4c2-ad11-41b2-9295-5b73dc76cc7a',
 'ResponseMetadata': {'RequestId': '5e6395e5-afec-457d-83ff-53fc9bba619b',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'date': 'Thu, 17 Nov 2022 07:06:06 GMT',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '59',
   'connection': 'keep-alive',
   'x-amzn-requestid': '5e6395e5-afec-457d-83ff-53fc9bba619b'},
  'RetryAttempts': 0}}

In [232]:
enigma_jhud = download_and_load_query_results(athena_client,response)

In [233]:
enigma_jhud.head()

Unnamed: 0,fips,admin2,province_state,country_region,last_update,latitude,longitude,confirmed,deaths,recovered,active,combined_key,partition_0
0,,,Anhui,China,2020-01-22T17:00:00,31.826,117.226,1.0,,,,"""Anhui",csv
1,,,Beijing,China,2020-01-22T17:00:00,40.182,116.414,14.0,,,,"""Beijing",csv
2,,,Chongqing,China,2020-01-22T17:00:00,30.057,107.874,6.0,,,,"""Chongqing",csv
3,,,Fujian,China,2020-01-22T17:00:00,26.079,117.987,1.0,,,,"""Fujian",csv
4,,,Gansu,China,2020-01-22T17:00:00,36.061,103.834,,,,,"""Gansu",csv


In [234]:
response = athena_client.start_query_execution(
QueryString = "SELECT * FROM nytimes_data_in_usa_us_county LIMIT 10000",
QueryExecutionContext = {"Database": SCHEMA_NAME},
ResultConfiguration = {"OutputLocation": S3_STAGING_DIR,
                      "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
                      },
)

nytimes_data_in_usa_us_county = download_and_load_query_results(athena_client,response)

In [235]:
nytimes_data_in_usa_us_county.head()

Unnamed: 0,date,county,state,fips,cases,deaths
0,2020-01-21,Snohomish,Washington,53061.0,1,0
1,2020-01-22,Snohomish,Washington,53061.0,1,0
2,2020-01-23,Snohomish,Washington,53061.0,1,0
3,2020-01-24,Cook,Illinois,17031.0,1,0
4,2020-01-24,Snohomish,Washington,53061.0,1,0


In [236]:
response = athena_client.start_query_execution(
QueryString = "SELECT * FROM nytimes_data_in_usa_us_states LIMIT 10000",
QueryExecutionContext = {"Database": SCHEMA_NAME},
ResultConfiguration = {"OutputLocation": S3_STAGING_DIR,
                      "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
                      },
)

nytimes_data_in_usa_us_states = download_and_load_query_results(athena_client,response)

In [237]:
nytimes_data_in_usa_us_states.head()

Unnamed: 0,date,state,fips,cases,deaths
0,2020-01-21,Washington,53,1,0
1,2020-01-22,Washington,53,1,0
2,2020-01-23,Washington,53,1,0
3,2020-01-24,Illinois,17,1,0
4,2020-01-24,Washington,53,1,0


In [238]:
response = athena_client.start_query_execution(
QueryString = "SELECT * FROM static_dataset_countrycode LIMIT 10000",
QueryExecutionContext = {"Database": SCHEMA_NAME},
ResultConfiguration = {"OutputLocation": S3_STAGING_DIR,
                      "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
                      },
)

static_dataset_countrycode = download_and_load_query_results(athena_client,response)

In [239]:
static_dataset_countrycode.head()

Unnamed: 0,country,alpha-2 code,alpha-3 code,numeric code,latitude,longitude
0,Afghanistan,AF,AFG,4.0,33.0,65.0
1,Albania,AL,ALB,8.0,41.0,20.0
2,Algeria,DZ,DZA,12.0,28.0,3.0
3,American Samoa,AS,ASM,16.0,-14.3333,-170.0
4,Andorra,AD,AND,20.0,42.5,1.6


In [240]:
response = athena_client.start_query_execution(
QueryString = "SELECT * FROM static_dataset_countypopulation LIMIT 10000",
QueryExecutionContext = {"Database": SCHEMA_NAME},
ResultConfiguration = {"OutputLocation": S3_STAGING_DIR,
                      "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
                      },
)

static_dataset_countypopulation = download_and_load_query_results(athena_client,response)

In [241]:
static_dataset_countypopulation.head()

Unnamed: 0,id,id2,county,state,population estimate 2018
0,0500000US01001,1001,Autauga,Alabama,55601
1,0500000US01003,1003,Baldwin,Alabama,218022
2,0500000US01005,1005,Barbour,Alabama,24881
3,0500000US01007,1007,Bibb,Alabama,22400
4,0500000US01009,1009,Blount,Alabama,57840


In [242]:
response = athena_client.start_query_execution(
QueryString = "SELECT * FROM static_dataset_countrycode LIMIT 10000",
QueryExecutionContext = {"Database": SCHEMA_NAME},
ResultConfiguration = {"OutputLocation": S3_STAGING_DIR,
                      "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
                      },
)

static_dataset_countrycode = download_and_load_query_results(athena_client,response)

In [243]:
static_dataset_countrycode.head()

Unnamed: 0,country,alpha-2 code,alpha-3 code,numeric code,latitude,longitude
0,Afghanistan,AF,AFG,4.0,33.0,65.0
1,Albania,AL,ALB,8.0,41.0,20.0
2,Algeria,DZ,DZA,12.0,28.0,3.0
3,American Samoa,AS,ASM,16.0,-14.3333,-170.0
4,Andorra,AD,AND,20.0,42.5,1.6


In [244]:
response = athena_client.start_query_execution(
QueryString = "SELECT * FROM rearc_covid_19_testing_data_states_daily LIMIT 10000",
QueryExecutionContext = {"Database": SCHEMA_NAME},
ResultConfiguration = {"OutputLocation": S3_STAGING_DIR,
                      "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
                      },
)

rearc_covid_19_testing_data_states_daily = download_and_load_query_results(athena_client,response)

In [245]:
rearc_covid_19_testing_data_states_daily.head()

Unnamed: 0,date,state,positive,probablecases,negative,pending,totaltestresultssource,totaltestresults,hospitalizedcurrently,hospitalizedcumulative,...,dataqualitygrade,deathincrease,hospitalizedincrease,hash,commercialscore,negativeregularscore,negativescore,positivescore,score,grade
0,20210307,AK,56886,,,,totalTestsViral,1731628,33.0,1293.0,...,,0.0,0.0,dc4bccd4bb885349d7e94d6fed058e285d4be164,0.0,0.0,0.0,0.0,0.0,
1,20210307,AL,499819,107742.0,1931711.0,,totalTestsPeopleViral,2323788,494.0,45976.0,...,,-1.0,0.0,997207b430824ea40b8eb8506c19a93e07bc972e,0.0,0.0,0.0,0.0,0.0,
2,20210307,AR,324818,69092.0,2480716.0,,totalTestsViral,2736442,335.0,14926.0,...,,22.0,11.0,50921aeefba3e30d31623aa495b47fb2ecc72fae,0.0,0.0,0.0,0.0,0.0,
3,20210307,AS,0,,2140.0,,totalTestsViral,2140,,,...,,0.0,0.0,96d23f888c995b9a7f3b4b864de6414f45c728ff,0.0,0.0,0.0,0.0,0.0,
4,20210307,AZ,826454,56519.0,3073010.0,,totalTestsViral,7908105,963.0,57907.0,...,,5.0,44.0,0437a7a96f4471666f775e63e86923eb5cbd8cdf,0.0,0.0,0.0,0.0,0.0,


In [246]:
response = athena_client.start_query_execution(
QueryString = "SELECT * FROM rearc_covid_19_testing_data_us_daily LIMIT 10000",
QueryExecutionContext = {"Database": SCHEMA_NAME},
ResultConfiguration = {"OutputLocation": S3_STAGING_DIR,
                      "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
                      },
)

rearc_covid_19_testing_data_us_daily = download_and_load_query_results(athena_client,response)

In [247]:
rearc_covid_19_testing_data_us_daily.head()

Unnamed: 0,date,states,positive,negative,pending,hospitalizedcurrently,hospitalizedcumulative,inicucurrently,inicucumulative,onventilatorcurrently,...,lastmodified,recovered,total,posneg,deathincrease,hospitalizedincrease,negativeincrease,positiveincrease,totaltestresultsincrease,hash
0,20210307,56,28755524.0,74579770.0,11808.0,40212.0,878613.0,8137.0,45475.0,2801.0,...,2021-03-07T24:00:00Z,,0,0,839,726,130414,41265,1156241,8b26839690cd05c0cef69cb9ed85641a76b5e78e
1,20210306,56,28714259.0,74449356.0,11783.0,41401.0,877887.0,8409.0,45453.0,2811.0,...,2021-03-06T24:00:00Z,,0,0,1674,503,142201,59620,1409138,d0c0482ea549c9d5c04a7c86acb6fc6a8095a592
2,20210305,56,28654639.0,74307155.0,12213.0,42541.0,877384.0,8634.0,45373.0,2889.0,...,2021-03-05T24:00:00Z,,0,0,2221,2781,271917,68787,1744417,a35ea4289cec4bb55c9f29ae04ec0fd5ac4e0222
3,20210304,56,28585852.0,74035238.0,12405.0,44172.0,874603.0,8970.0,45293.0,2973.0,...,2021-03-04T24:00:00Z,,0,0,1743,1530,177957,65487,1590984,a19ad6379a653834cbda3093791ad2c3b9fab5ff
4,20210303,56,28520365.0,73857281.0,11778.0,45462.0,873073.0,9359.0,45214.0,3094.0,...,2021-03-03T24:00:00Z,,0,0,2449,2172,267001,66836,1406795,9e1d2afda1b0ec243060d6f68a7134d011c0cb2a


In [248]:
response = athena_client.start_query_execution(
QueryString = "SELECT * FROM rearc_covid_19_testing_data_us_total_latest LIMIT 10000",
QueryExecutionContext = {"Database": SCHEMA_NAME},
ResultConfiguration = {"OutputLocation": S3_STAGING_DIR,
                      "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
                      },
)

rearc_covid_19_testing_data_us_total_latest = download_and_load_query_results(athena_client,response)

In [249]:
rearc_covid_19_testing_data_us_total_latest.head()

Unnamed: 0,positive,negative,pending,hospitalizedcurrently,hospitalizedcumulative,inicucurrently,inicucumulative,onventilatorcurrently,onventilatorcumulative,recovered,hash,lastmodified,death,hospitalized,total,totaltestresults,posneg,notes
0,1061101,5170081,2775,53793,111955,9486,4192,4712,373,153947,95064ba29ccbc20dbec397033dfe4b1f45137c99,2020-05-01T09:12:31.891Z,57266,111955,6233957,6231182,6231182,"""NOTE: """"total"""""


In [250]:
response = athena_client.start_query_execution(
QueryString = "SELECT * FROM rearc_usa_hospital_beds LIMIT 10000",
QueryExecutionContext = {"Database": SCHEMA_NAME},
ResultConfiguration = {"OutputLocation": S3_STAGING_DIR,
                      "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
                      },
)

rearc_usa_hospital_beds = download_and_load_query_results(athena_client,response)

In [251]:
rearc_usa_hospital_beds.head()

Unnamed: 0,objectid,hospital_name,hospital_type,hq_address,hq_address1,hq_city,hq_state,hq_zip_code,county_name,state_name,...,num_licensed_beds,num_staffed_beds,num_icu_beds,adult_icu_beds,pedi_icu_beds,bed_utilization,avg_ventilator_usage,potential_increase_in_bed_capac,latitude,longtitude
0,1,Phoenix VA Health Care System (AKA Carl T Hayd...,VA Hospital,650 E Indian School Rd,,Phoenix,AZ,85012,Maricopa,Arizona,...,129.0,129.0,0,0,,,0.0,0,33.495498,-112.066157
1,2,Southern Arizona VA Health Care System,VA Hospital,3601 S 6th Ave,,Tucson,AZ,85723,Pima,Arizona,...,295.0,295.0,2,2,,,2.0,0,32.181263,-110.965885
2,3,VA Central California Health Care System,VA Hospital,2615 E Clinton Ave,,Fresno,CA,93703,Fresno,California,...,57.0,57.0,2,2,,,2.0,0,36.773324,-119.779742
3,4,VA Connecticut Healthcare System - West Haven ...,VA Hospital,950 Campbell Ave,,West Haven,CT,6516,New Haven,Connecticut,...,216.0,216.0,1,1,,,2.0,0,41.2844,-72.95761
4,5,Wilmington VA Medical Center,VA Hospital,1601 Kirkwood Hwy,,Wilmington,DE,19805,New Castle,Delaware,...,60.0,60.0,0,0,,,1.0,0,39.740206,-75.606532


In [252]:
response = athena_client.start_query_execution(
QueryString = "SELECT * FROM static_dataset_state_abv LIMIT 10000",
QueryExecutionContext = {"Database": SCHEMA_NAME},
ResultConfiguration = {"OutputLocation": S3_STAGING_DIR,
                      "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
                      },
)

static_dataset_state_abv = download_and_load_query_results(athena_client,response)

In [253]:
static_dataset_state_abv.head()

Unnamed: 0,col0,col1
0,State,Abbreviation
1,Alabama,AL
2,Alaska,AK
3,Arizona,AZ
4,Arkansas,AR


In [254]:
new_header = static_dataset_state_abv.iloc[0] ## Grab the first row of header

In [255]:
new_header

col0           State
col1    Abbreviation
Name: 0, dtype: object

In [256]:
static_dataset_state_abv = static_dataset_state_abv[1:] ## Take the data less than header row

In [257]:
static_dataset_state_abv

Unnamed: 0,col0,col1
1,Alabama,AL
2,Alaska,AK
3,Arizona,AZ
4,Arkansas,AR
5,California,CA
6,Colorado,CO
7,Connecticut,CT
8,Delaware,DE
9,District of Columbia,DC
10,Florida,FL


In [258]:
static_dataset_state_abv.columns = new_header ## set the header row as the df header

In [259]:
static_dataset_state_abv.head()

Unnamed: 0,State,Abbreviation
1,Alabama,AL
2,Alaska,AK
3,Arizona,AZ
4,Arkansas,AR
5,California,CA


In [260]:
static_dataset_state_abv.columns

Index(['State', 'Abbreviation'], dtype='object', name=0)

In [261]:
## Build the tables with attributes that has to be placed inside the Redshift database
factCovid_1 = enigma_jhud[['fips', 'province_state', 'country_region', 'confirmed', 'deaths', 'recovered', 'active']]
factCovid_2 = rearc_covid_19_testing_data_states_daily[['fips', 'date', 'positive', 'negative', 'hospitalizedcurrently', 'hospitalized', 'hospitalizeddischarged']]
factCovid = pd.merge(factCovid_1, factCovid_2, on='fips', how='inner')

In [262]:
factCovid.head()

Unnamed: 0,fips,province_state,country_region,confirmed,deaths,recovered,active,date,positive,negative,hospitalizedcurrently,hospitalized,hospitalizeddischarged
0,,Anhui,China,1.0,,,,20210119,289939,,1066.0,,
1,,Beijing,China,14.0,,,,20210119,289939,,1066.0,,
2,,Chongqing,China,6.0,,,,20210119,289939,,1066.0,,
3,,Fujian,China,1.0,,,,20210119,289939,,1066.0,,
4,,Gansu,China,,,,,20210119,289939,,1066.0,,


In [263]:
factCovid.shape

(5281, 13)

In [264]:
dimRegion_1 = enigma_jhud[['fips', 'province_state', 'country_region', 'latitude', 'longitude']]
dimRegion_2 = nytimes_data_in_usa_us_county[['fips', 'county', 'state']]
dimRegion = pd.merge(dimRegion_1, dimRegion_2, on='fips', how='inner')

In [265]:
dimHospital = rearc_usa_hospital_beds[['fips', 'state_name', 'latitude', 'longtitude', 'hq_address', 'hospital_name', 'hospital_type', 'hq_city', 'hq_state']]

In [266]:
dimDate = rearc_covid_19_testing_data_states_daily[['fips', 'date']]

In [267]:
dimDate.head()

Unnamed: 0,fips,date
0,2.0,20210307
1,1.0,20210307
2,5.0,20210307
3,60.0,20210307
4,4.0,20210307


In [268]:
## Covert the data to date time format
dimDate['date'] = pd.to_datetime(dimDate['date'], format='%Y%m%d')

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  dimDate['date'] = pd.to_datetime(dimDate['date'], format='%Y%m%d')


In [269]:
dimDate.head()

Unnamed: 0,fips,date
0,2.0,2021-03-07
1,1.0,2021-03-07
2,5.0,2021-03-07
3,60.0,2021-03-07
4,4.0,2021-03-07


In [270]:
dimDate['year'] = dimDate['date'].dt.year
dimDate['month'] = dimDate['date'].dt.month
dimDate['day_of_week'] = dimDate['date'].dt.dayofweek

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  dimDate['year'] = dimDate['date'].dt.year
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  dimDate['month'] = dimDate['date'].dt.month
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  dimDate['day_of_week'] = dimDate['date'].dt.dayofweek


In [271]:
dimDate.head()

Unnamed: 0,fips,date,year,month,day_of_week
0,2.0,2021-03-07,2021,3,6
1,1.0,2021-03-07,2021,3,6
2,5.0,2021-03-07,2021,3,6
3,60.0,2021-03-07,2021,3,6
4,4.0,2021-03-07,2021,3,6


In [272]:
## Create the S3 bucket to store the output 
bucket = 'godwin-covid19-pipeline'

In [273]:
csv_buffer = StringIO()

In [274]:
csv_buffer

<_io.StringIO at 0x209bd43c0d0>

In [275]:
factCovid.to_csv(csv_buffer)

In [276]:
s3_resource = boto3.resource('s3',
                            aws_access_key_id = AWS_ACCESS_KEY,
aws_secret_access_key = AWS_SECRET_KEY)
s3_resource.Object(bucket, 'output/factCovid.csv').put(Body=csv_buffer.getvalue())

{'ResponseMetadata': {'RequestId': 'V91D9Y5VCWE8NXVN',
  'HostId': 'EjAThSYyXuNq3rCATGoQtAiEyRjoIRzvCxU3WM+mQRSTATprDTUkItEcEkWjocE2y+eKHif5fvs=',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amz-id-2': 'EjAThSYyXuNq3rCATGoQtAiEyRjoIRzvCxU3WM+mQRSTATprDTUkItEcEkWjocE2y+eKHif5fvs=',
   'x-amz-request-id': 'V91D9Y5VCWE8NXVN',
   'date': 'Thu, 17 Nov 2022 07:06:53 GMT',
   'etag': '"1090713e48da75e6d0e3f99efa4b61ee"',
   'server': 'AmazonS3',
   'content-length': '0'},
  'RetryAttempts': 1},
 'ETag': '"1090713e48da75e6d0e3f99efa4b61ee"'}

In [277]:
csv_buffer.getvalue()

",fips,province_state,country_region,confirmed,deaths,recovered,active,date,positive,negative,hospitalizedcurrently,hospitalized,hospitalizeddischarged\r\n0,,Anhui,China,1.0,,,,20210119,289939,,1066.0,,\r\n1,,Beijing,China,14.0,,,,20210119,289939,,1066.0,,\r\n2,,Chongqing,China,6.0,,,,20210119,289939,,1066.0,,\r\n3,,Fujian,China,1.0,,,,20210119,289939,,1066.0,,\r\n4,,Gansu,China,,,,,20210119,289939,,1066.0,,\r\n5,,Guangdong,China,26.0,,,,20210119,289939,,1066.0,,\r\n6,,Guangxi,China,2.0,,,,20210119,289939,,1066.0,,\r\n7,,Guizhou,China,1.0,,,,20210119,289939,,1066.0,,\r\n8,,Hai,China,4.0,,,,20210119,289939,,1066.0,,\r\n9,,Hebei,China,1.0,,,,20210119,289939,,1066.0,,\r\n10,,Heilongjiang,China,,,,,20210119,289939,,1066.0,,\r\n11,,He,China,5.0,,,,20210119,289939,,1066.0,,\r\n12,,Hong Kong,Hong Kong,,,,,20210119,289939,,1066.0,,\r\n13,,Hubei,China,444.0,17.0,28.0,,20210119,289939,,1066.0,,\r\n14,,Hu,China,4.0,,,,20210119,289939,,1066.0,,\r\n15,,Inner Mongolia,China,,,,,20210119,289939,,1066

In [278]:
csv_buffer = StringIO()
dimHospital.to_csv(csv_buffer)

In [279]:
s3_resource = boto3.resource('s3',
                            aws_access_key_id = AWS_ACCESS_KEY,
aws_secret_access_key = AWS_SECRET_KEY)
s3_resource.Object(bucket, 'output/dimHospital.csv').put(Body=csv_buffer.getvalue())

{'ResponseMetadata': {'RequestId': 'KEF5RR8P3MMR86ED',
  'HostId': '7uZcYtPy4EpUdnXphObI2rxV5anM3N5Yfac3J9IqNCFtrqibIb1Sbcdm/Y1Vk3voQwi1KtDns1Y=',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amz-id-2': '7uZcYtPy4EpUdnXphObI2rxV5anM3N5Yfac3J9IqNCFtrqibIb1Sbcdm/Y1Vk3voQwi1KtDns1Y=',
   'x-amz-request-id': 'KEF5RR8P3MMR86ED',
   'date': 'Thu, 17 Nov 2022 07:06:55 GMT',
   'etag': '"a26c4e35d128fe6f64955ba9aac1d221"',
   'server': 'AmazonS3',
   'content-length': '0'},
  'RetryAttempts': 1},
 'ETag': '"a26c4e35d128fe6f64955ba9aac1d221"'}

In [280]:
csv_buffer.getvalue()

",fips,state_name,latitude,longtitude,hq_address,hospital_name,hospital_type,hq_city,hq_state\r\n0,4013.0,Arizona,33.49549780000007,-112.06615689999995,650 E Indian School Rd,Phoenix VA Health Care System (AKA Carl T Hayden VA Medical Center),VA Hospital,Phoenix,AZ\r\n1,4019.0,Arizona,32.181263400000034,-110.96588519999996,3601 S 6th Ave,Southern Arizona VA Health Care System,VA Hospital,Tucson,AZ\r\n2,6019.0,California,36.77332350000006,-119.77974209999996,2615 E Clinton Ave,VA Central California Health Care System,VA Hospital,Fresno,CA\r\n3,9009.0,Connecticut,41.28440040000004,-72.95761029999994,950 Campbell Ave,VA Connecticut Healthcare System - West Haven Campus (AKA West Haven VA Medical Center),VA Hospital,West Haven,CT\r\n4,10003.0,Delaware,39.74020630000007,-75.60653249999996,1601 Kirkwood Hwy,Wilmington VA Medical Center,VA Hospital,Wilmington,DE\r\n5,11001.0,District of Columbia,38.93068230000006,-77.01119479999994,50 Irving St Nw,Washington DC VA Medical Center,VA Hospital,W

In [281]:
csv_buffer = StringIO()

In [282]:
dimRegion.to_csv(csv_buffer)

In [286]:
s3_resource = boto3.resource('s3',
                            aws_access_key_id = AWS_ACCESS_KEY,
aws_secret_access_key = AWS_SECRET_KEY)
s3_resource.Object(bucket, 'output/dimRegion.csv').put(Body=csv_buffer.getvalue())

{'ResponseMetadata': {'RequestId': 'AJ48AEYJQ36R4N2F',
  'HostId': '0x3YpFRdNVJSchpJMq1QJToMYLUGlZjsBBcoBrrvyFj7W4jaPXg9koBpqydZOrjYlXiV/fjbxNWoAXjIsuYNpA==',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amz-id-2': '0x3YpFRdNVJSchpJMq1QJToMYLUGlZjsBBcoBrrvyFj7W4jaPXg9koBpqydZOrjYlXiV/fjbxNWoAXjIsuYNpA==',
   'x-amz-request-id': 'AJ48AEYJQ36R4N2F',
   'date': 'Thu, 17 Nov 2022 07:07:33 GMT',
   'etag': '"19eb0b77e7f7441c686829bc3fd1a906"',
   'server': 'AmazonS3',
   'content-length': '0'},
  'RetryAttempts': 1},
 'ETag': '"19eb0b77e7f7441c686829bc3fd1a906"'}

In [287]:
csv_buffer = StringIO()

In [288]:
dimDate.to_csv(csv_buffer)

In [289]:
s3_resource = boto3.resource('s3',
                            aws_access_key_id = AWS_ACCESS_KEY,
aws_secret_access_key = AWS_SECRET_KEY)
s3_resource.Object(bucket, 'output/dimDate.csv').put(Body=csv_buffer.getvalue())

{'ResponseMetadata': {'RequestId': 'JR42ME6HTA7YB7SB',
  'HostId': '8DNhwpc+YEYWdYyJwgUxwMq4mXm6Fo2AnOCVW/KoTpacDq2hjsgWQVIMbfpVqVLM7vMFQY+F+7A=',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amz-id-2': '8DNhwpc+YEYWdYyJwgUxwMq4mXm6Fo2AnOCVW/KoTpacDq2hjsgWQVIMbfpVqVLM7vMFQY+F+7A=',
   'x-amz-request-id': 'JR42ME6HTA7YB7SB',
   'date': 'Thu, 17 Nov 2022 07:07:39 GMT',
   'etag': '"19eb0b77e7f7441c686829bc3fd1a906"',
   'server': 'AmazonS3',
   'content-length': '0'},
  'RetryAttempts': 1},
 'ETag': '"19eb0b77e7f7441c686829bc3fd1a906"'}

In [290]:
csv_buffer.getvalue()

',fips,date,year,month,day_of_week\r\n0,2.0,2021-03-07,2021,3,6\r\n1,1.0,2021-03-07,2021,3,6\r\n2,5.0,2021-03-07,2021,3,6\r\n3,60.0,2021-03-07,2021,3,6\r\n4,4.0,2021-03-07,2021,3,6\r\n5,6.0,2021-03-07,2021,3,6\r\n6,8.0,2021-03-07,2021,3,6\r\n7,9.0,2021-03-07,2021,3,6\r\n8,11.0,2021-03-07,2021,3,6\r\n9,10.0,2021-03-07,2021,3,6\r\n10,12.0,2021-03-07,2021,3,6\r\n11,13.0,2021-03-07,2021,3,6\r\n12,66.0,2021-03-07,2021,3,6\r\n13,15.0,2021-03-07,2021,3,6\r\n14,19.0,2021-03-07,2021,3,6\r\n15,16.0,2021-03-07,2021,3,6\r\n16,17.0,2021-03-07,2021,3,6\r\n17,18.0,2021-03-07,2021,3,6\r\n18,20.0,2021-03-07,2021,3,6\r\n19,21.0,2021-03-07,2021,3,6\r\n20,22.0,2021-03-07,2021,3,6\r\n21,25.0,2021-03-07,2021,3,6\r\n22,24.0,2021-03-07,2021,3,6\r\n23,23.0,2021-03-07,2021,3,6\r\n24,26.0,2021-03-07,2021,3,6\r\n25,27.0,2021-03-07,2021,3,6\r\n26,29.0,2021-03-07,2021,3,6\r\n27,69.0,2021-03-07,2021,3,6\r\n28,28.0,2021-03-07,2021,3,6\r\n29,30.0,2021-03-07,2021,3,6\r\n30,37.0,2021-03-07,2021,3,6\r\n31,38.0,2021-03-07

In [291]:
## Extracting the schema from the tables(the dataframe created now using pandas)
dimDatesql = pd.io.sql.get_schema(dimDate.reset_index(), 'dimDate')
print(''.join(dimDatesql))

CREATE TABLE "dimDate" (
"index" INTEGER,
  "fips" REAL,
  "date" TIMESTAMP,
  "year" INTEGER,
  "month" INTEGER,
  "day_of_week" INTEGER
)


In [292]:
factCovidsql = pd.io.sql.get_schema(factCovid.reset_index(), 'factCovid')
print(''.join(factCovidsql))

CREATE TABLE "factCovid" (
"index" INTEGER,
  "fips" REAL,
  "province_state" TEXT,
  "country_region" TEXT,
  "confirmed" REAL,
  "deaths" REAL,
  "recovered" REAL,
  "active" REAL,
  "date" INTEGER,
  "positive" INTEGER,
  "negative" REAL,
  "hospitalizedcurrently" REAL,
  "hospitalized" REAL,
  "hospitalizeddischarged" REAL
)


In [293]:
dimRegionsql = pd.io.sql.get_schema(dimRegion.reset_index(), 'dimRegion')
print(''.join(dimRegionsql))

CREATE TABLE "dimRegion" (
"index" INTEGER,
  "fips" REAL,
  "province_state" TEXT,
  "country_region" TEXT,
  "latitude" REAL,
  "longitude" REAL,
  "county" TEXT,
  "state" TEXT
)


In [294]:
dimHospitalsql = pd.io.sql.get_schema(dimHospital.reset_index(), 'dimHospital')
print(''.join(dimHospitalsql))

CREATE TABLE "dimHospital" (
"index" INTEGER,
  "fips" REAL,
  "state_name" TEXT,
  "latitude" REAL,
  "longtitude" REAL,
  "hq_address" TEXT,
  "hospital_name" TEXT,
  "hospital_type" TEXT,
  "hq_city" TEXT,
  "hq_state" TEXT
)


In [360]:
import redshift_connector

In [361]:
conn = redshift_connector.connect(
host = 'redshift-cluster-1.cbrhbgnfrhzr.ap-south-1.redshift.amazonaws.com',
database = 'dev',
user = 'awsuser',
password = 'AWSuser1'
)


In [362]:
conn.autocommit = True

In [363]:
cursor = redshift_connector.Cursor = conn.cursor()

In [305]:
cursor.execute("""
CREATE TABLE "dimDate" (
"index" INTEGER,
  "fips" REAL,
  "date" TIMESTAMP,
  "year" INTEGER,
  "month" INTEGER,
  "day_of_week" INTEGER
)""")

<redshift_connector.cursor.Cursor at 0x209bb7d7e50>

In [312]:
cursor.execute("""
CREATE TABLE "factCovid" (
  "index" INTEGER,
  "fips" REAL,
  "province_state" TEXT,
  "country_region" TEXT,
  "confirmed" REAL,
  "deaths" REAL,
  "recovered" REAL,
  "active" REAL,
  "date" INTEGER,
  "positive" INTEGER,
  "negative" REAL,
  "hospitalizedcurrently" REAL,
  "hospitalized" REAL,
  "hospitalizeddischarged" REAL
)""")

cursor.execute("""
CREATE TABLE "dimRegion" (
  "index" INTEGER,
  "fips" REAL,
  "province_state" TEXT,
  "country_region" TEXT,
  "latitude" REAL,
  "longitude" REAL,
  "county" TEXT,
  "state" TEXT
)""")

cursor.execute("""
CREATE TABLE "dimHospital" (
  "index" INTEGER,
  "fips" REAL,
  "state_name" TEXT,
  "latitude" REAL,
  "longtitude" REAL,
  "hq_address" TEXT,
  "hospital_name" TEXT,
  "hospital_type" TEXT,
  "hq_city" TEXT,
  "hq_state" TEXT
)""")


<redshift_connector.cursor.Cursor at 0x209b9147010>

In [339]:
## Copy the S3 bucket to redshift
cursor.execute("""
copy dimDate from 's3://godwin-covid19-pipeline/output/dimDate.csv'
credentials 'aws_iam_role=arn:aws:iam::916018974910:role/redshift-s3-access'
delimiter ','
region 'ap-south-1'
IGNOREHEADER 1
""")

<redshift_connector.cursor.Cursor at 0x209bae6f040>

In [345]:
cursor.execute("""
copy factCovid from 's3://godwin-covid19-pipeline/output/factCovid.csv'
credentials 'aws_iam_role=arn:aws:iam::916018974910:role/redshift-s3-access'
delimiter ','
region 'ap-south-1'
IGNOREHEADER 1
""")

<redshift_connector.cursor.Cursor at 0x209bae6d150>

In [364]:
cursor.execute("""
copy dimHospital from 's3://godwin-covid19-pipeline/output/dimHospital.csv'
credentials 'aws_iam_role=arn:aws:iam::916018974910:role/redshift-s3-access'
delimiter ','
region 'ap-south-1'
IGNOREHEADER 1
""")

<redshift_connector.cursor.Cursor at 0x209bb440f10>

In [None]:
### There was a error while loading Region data from S3 to Redshift(Need to Debug)
cursor.execute("""
copy dimRegion from 's3://godwin-covid19-pipeline/output/dimRegion.csv'
credentials 'aws_iam_role=arn:aws:iam::916018974910:role/redshift-s3-access'
delimiter ','
region 'ap-south-1'
IGNOREHEADER 1
""")

In [None]:
## Finally configure the Redshift code on AWS glue and create a JOB