In [1]:
import boto3 #used to connect and work with aws services
import pandas as pd
from io import StringIO  #encode data into binary form
import time

In [2]:
AWS_ACCESS_KEY = ''
AWS_SECRET_KEY = ''
AWS_REGION = 'us-east-1'
SCHEMA_NAME = 'covid_etl_dataset'
S3_STAGING_DIR = 's3://aws-covid-outputs/output/'
S3_BUCKET_NAME = 'aws-covid-outputs'
S3_OUTPUT_DIRECTORY = 'output'

In [3]:
athena_client = boto3.client("athena",
                            aws_access_key_id = AWS_ACCESS_KEY,
                            aws_secret_access_key = AWS_SECRET_KEY,
                            region_name = AWS_REGION,)

In [4]:
athena_client

<botocore.client.Athena at 0x1f581cb0f90>

In [5]:
#Gets the data from athena and converts it into pandas dataframe
Dict = {}
def download_and_load_query_results(
    client: boto3.client, query_response: Dict
) -> pd.DataFrame:
    while True:
        try:
            client.get_query_results(QueryExecutionId=query_response["QueryExecutionId"])
            break
        except Exception as err:
            if "not yet finished" in str(err):
                time.sleep(0.01)
            else:
                raise err
    temp_file_location: str = "athena_query_results.csv"
    s3_client = boto3.client("s3",
                            aws_access_key_id = AWS_ACCESS_KEY,
                            aws_secret_access_key = AWS_SECRET_KEY,
                            region_name = AWS_REGION,)
    s3_client.download_file(S3_BUCKET_NAME,
                           f"{S3_OUTPUT_DIRECTORY}/{query_response['QueryExecutionId']}.csv",
                           temp_file_location,)
    return pd.read_csv(temp_file_location)

In [6]:
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM enigma_jhud",
    QueryExecutionContext={"Database":SCHEMA_NAME},
    ResultConfiguration={"OutputLocation":S3_STAGING_DIR,
                        "EncryptionConfiguration":{"EncryptionOption":"SSE_S3"},
                        },
)


In [7]:
response

{'QueryExecutionId': '40054fd9-8357-49e7-a064-453418b1d525',
 'ResponseMetadata': {'RequestId': '68639e47-ee46-4031-ab0c-2fdef32c2135',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'date': 'Sat, 17 Aug 2024 22:59:05 GMT',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '59',
   'connection': 'keep-alive',
   'x-amzn-requestid': '68639e47-ee46-4031-ab0c-2fdef32c2135'},
  'RetryAttempts': 0}}

In [8]:
enigma_jhud = download_and_load_query_results(athena_client, response)

In [9]:
enigma_jhud.head(5)

Unnamed: 0,fips,admin2,province_state,country_region,last_update,latitude,longitude,confirmed,deaths,recovered,active,combined_key
0,,,Anhui,China,2020-01-22T17:00:00,31.826,117.226,1.0,,,,"""Anhui"
1,,,Beijing,China,2020-01-22T17:00:00,40.182,116.414,14.0,,,,"""Beijing"
2,,,Chongqing,China,2020-01-22T17:00:00,30.057,107.874,6.0,,,,"""Chongqing"
3,,,Fujian,China,2020-01-22T17:00:00,26.079,117.987,1.0,,,,"""Fujian"
4,,,Gansu,China,2020-01-22T17:00:00,36.061,103.834,,,,,"""Gansu"


In [10]:
response = athena_client.start_query_execution(
    QueryString='SELECT * FROM "enigma-nytimes-data-inus_states"',
    QueryExecutionContext={"Database": SCHEMA_NAME},
    ResultConfiguration={
        "OutputLocation": S3_STAGING_DIR,
        "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
    },
)

In [11]:
nytimes_data_in_us_states = download_and_load_query_results(athena_client, response)

In [12]:
nytimes_data_in_us_states.head(5)

Unnamed: 0,date,state,fips,cases,deaths
0,2020-01-21,Washington,53,1,0
1,2020-01-22,Washington,53,1,0
2,2020-01-23,Washington,53,1,0
3,2020-01-24,Illinois,17,1,0
4,2020-01-24,Washington,53,1,0


In [13]:
response = athena_client.start_query_execution(
    QueryString='SELECT * FROM "enigma-nytimes-data-inus_county"',
    QueryExecutionContext={"Database": SCHEMA_NAME},
    ResultConfiguration={
        "OutputLocation": S3_STAGING_DIR,
        "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
    },
)

nytimes_data_in_us_county = download_and_load_query_results(athena_client, response)

In [14]:
nytimes_data_in_us_county.head(5)

Unnamed: 0,date,county,state,fips,cases,deaths
0,2020-01-21,Snohomish,Washington,53061.0,1.0,0.0
1,2020-01-22,Snohomish,Washington,53061.0,1.0,0.0
2,2020-01-23,Snohomish,Washington,53061.0,1.0,0.0
3,2020-01-24,Cook,Illinois,17031.0,1.0,0.0
4,2020-01-24,Snohomish,Washington,53061.0,1.0,0.0


In [15]:
response = athena_client.start_query_execution(
    QueryString='SELECT * FROM "rearc-covid-19-testing-data-states_daily"',
    QueryExecutionContext={"Database": SCHEMA_NAME},
    ResultConfiguration={
        "OutputLocation": S3_STAGING_DIR,
        "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
    },
)

rearc_states_daily = download_and_load_query_results(athena_client, response)

In [16]:
pd.set_option('display.max_columns', None)
rearc_states_daily.head(5)

Unnamed: 0,date,state,positive,probablecases,negative,pending,totaltestresultssource,totaltestresults,hospitalizedcurrently,hospitalizedcumulative,inicucurrently,inicucumulative,onventilatorcurrently,onventilatorcumulative,recovered,lastupdateet,datemodified,checktimeet,death,hospitalized,hospitalizeddischarged,datechecked,totaltestsviral,positivetestsviral,negativetestsviral,positivecasesviral,deathconfirmed,deathprobable,totaltestencountersviral,totaltestspeopleviral,totaltestsantibody,positivetestsantibody,negativetestsantibody,totaltestspeopleantibody,positivetestspeopleantibody,negativetestspeopleantibody,totaltestspeopleantigen,positivetestspeopleantigen,totaltestsantigen,positivetestsantigen,fips,positiveincrease,negativeincrease,total,totaltestresultsincrease,posneg,dataqualitygrade,deathincrease,hospitalizedincrease,hash,commercialscore,negativeregularscore,negativescore,positivescore,score,grade
0,20210307,AK,56886,,,,totalTestsViral,1731628,33.0,1293.0,,,2.0,,,3/5/2021 03:59,2021-03-05T03:59:00Z,03/04 22:59,305.0,1293.0,,2021-03-05T03:59:00Z,1731628.0,68693.0,1660758.0,,,,,,,,,,,,,,,,2.0,0.0,0.0,56886.0,0.0,56886.0,,0.0,0.0,dc4bccd4bb885349d7e94d6fed058e285d4be164,0.0,0.0,0.0,0.0,0.0,
1,20210307,AL,499819,107742.0,1931711.0,,totalTestsPeopleViral,2323788,494.0,45976.0,,2676.0,,1515.0,295690.0,3/7/2021 11:00,2021-03-07T11:00:00Z,03/07 06:00,10148.0,45976.0,,2021-03-07T11:00:00Z,,,,392077.0,7963.0,2185.0,,2323788.0,,,,119757.0,,,,,,,1.0,408.0,2087.0,2431530.0,2347.0,2431530.0,,-1.0,0.0,997207b430824ea40b8eb8506c19a93e07bc972e,0.0,0.0,0.0,0.0,0.0,
2,20210307,AR,324818,69092.0,2480716.0,,totalTestsViral,2736442,335.0,14926.0,141.0,,65.0,1533.0,315517.0,3/7/2021 00:00,2021-03-07T00:00:00Z,03/06 19:00,5319.0,14926.0,,2021-03-07T00:00:00Z,2736442.0,,2480716.0,255726.0,4308.0,1011.0,,,,,,,,,481311.0,81803.0,,,5.0,165.0,3267.0,2805534.0,3380.0,2805534.0,,22.0,11.0,50921aeefba3e30d31623aa495b47fb2ecc72fae,0.0,0.0,0.0,0.0,0.0,
3,20210307,AS,0,,2140.0,,totalTestsViral,2140,,,,,,,,12/1/2020 00:00,2020-12-01T00:00:00Z,11/30 19:00,0.0,,,2020-12-01T00:00:00Z,2140.0,,,,,,,,,,,,,,,,,,60.0,0.0,0.0,2140.0,0.0,2140.0,,0.0,0.0,96d23f888c995b9a7f3b4b864de6414f45c728ff,0.0,0.0,0.0,0.0,0.0,
4,20210307,AZ,826454,56519.0,3073010.0,,totalTestsViral,7908105,963.0,57907.0,273.0,,143.0,,,3/7/2021 00:00,2021-03-07T00:00:00Z,03/06 19:00,16328.0,57907.0,118932.0,2021-03-07T00:00:00Z,7908105.0,,,769935.0,14403.0,1925.0,,3842945.0,580569.0,,,444089.0,,,,,,,4.0,1335.0,13678.0,3899464.0,45110.0,3899464.0,,5.0,44.0,0437a7a96f4471666f775e63e86923eb5cbd8cdf,0.0,0.0,0.0,0.0,0.0,


In [17]:
response = athena_client.start_query_execution(
    QueryString='SELECT * FROM "rearc-covid-19-testing-data-us_daily"',
    QueryExecutionContext={"Database": SCHEMA_NAME},
    ResultConfiguration={
        "OutputLocation": S3_STAGING_DIR,
        "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
    },
)

rearc_us_daily = download_and_load_query_results(athena_client, response)

In [18]:
rearc_us_daily.head(5)

Unnamed: 0,date,states,positive,negative,pending,hospitalizedcurrently,hospitalizedcumulative,inicucurrently,inicucumulative,onventilatorcurrently,onventilatorcumulative,datechecked,death,hospitalized,totaltestresults,lastmodified,recovered,total,posneg,deathincrease,hospitalizedincrease,negativeincrease,positiveincrease,totaltestresultsincrease,hash
0,20210307,56,28755524.0,74579770.0,11808.0,40212.0,878613.0,8137.0,45475.0,2801.0,4281.0,2021-03-07T24:00:00Z,515142.0,878613.0,363789451,2021-03-07T24:00:00Z,,0,0,839,726,130414,41265,1156241,8b26839690cd05c0cef69cb9ed85641a76b5e78e
1,20210306,56,28714259.0,74449356.0,11783.0,41401.0,877887.0,8409.0,45453.0,2811.0,4280.0,2021-03-06T24:00:00Z,514303.0,877887.0,362633210,2021-03-06T24:00:00Z,,0,0,1674,503,142201,59620,1409138,d0c0482ea549c9d5c04a7c86acb6fc6a8095a592
2,20210305,56,28654639.0,74307155.0,12213.0,42541.0,877384.0,8634.0,45373.0,2889.0,4275.0,2021-03-05T24:00:00Z,512629.0,877384.0,361224072,2021-03-05T24:00:00Z,,0,0,2221,2781,271917,68787,1744417,a35ea4289cec4bb55c9f29ae04ec0fd5ac4e0222
3,20210304,56,28585852.0,74035238.0,12405.0,44172.0,874603.0,8970.0,45293.0,2973.0,4267.0,2021-03-04T24:00:00Z,510408.0,874603.0,359479655,2021-03-04T24:00:00Z,,0,0,1743,1530,177957,65487,1590984,a19ad6379a653834cbda3093791ad2c3b9fab5ff
4,20210303,56,28520365.0,73857281.0,11778.0,45462.0,873073.0,9359.0,45214.0,3094.0,4260.0,2021-03-03T24:00:00Z,508665.0,873073.0,357888671,2021-03-03T24:00:00Z,,0,0,2449,2172,267001,66836,1406795,9e1d2afda1b0ec243060d6f68a7134d011c0cb2a


In [19]:
response = athena_client.start_query_execution(
    QueryString='SELECT * FROM "rearc-covid-19-testing-data-us_total_latest"',
    QueryExecutionContext={"Database": SCHEMA_NAME},
    ResultConfiguration={
        "OutputLocation": S3_STAGING_DIR,
        "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
    },
)

rearc_us_total_latest = download_and_load_query_results(athena_client, response)

In [20]:
rearc_us_total_latest.head(5)

Unnamed: 0,positive,negative,pending,hospitalizedcurrently,hospitalizedcumulative,inicucurrently,inicucumulative,onventilatorcurrently,onventilatorcumulative,recovered,hash,lastmodified,death,hospitalized,total,totaltestresults,posneg,notes
0,1061101,5170081,2775,53793,111955,9486,4192,4712,373,153947,95064ba29ccbc20dbec397033dfe4b1f45137c99,2020-05-01T09:12:31.891Z,57266,111955,6233957,6231182,6231182,"""NOTE: """"total"""""


In [21]:
response = athena_client.start_query_execution(
    QueryString='SELECT * FROM "rearc-covid-19-testing-data-rearc_usa_hospital_beds"',
    QueryExecutionContext={"Database": SCHEMA_NAME},
    ResultConfiguration={
        "OutputLocation": S3_STAGING_DIR,
        "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
    },
)

rearc_us_hospital_beds = download_and_load_query_results(athena_client, response)

In [22]:
rearc_us_hospital_beds.head(5)

Unnamed: 0,objectid,hospital_name,hospital_type,hq_address,hq_address1,hq_city,hq_state,hq_zip_code,county_name,state_name,state_fips,cnty_fips,fips,num_licensed_beds,num_staffed_beds,num_icu_beds,adult_icu_beds,pedi_icu_beds,bed_utilization,avg_ventilator_usage,potential_increase_in_bed_capac,latitude,longtitude
0,1,Phoenix VA Health Care System (AKA Carl T Hayd...,VA Hospital,650 E Indian School Rd,,Phoenix,AZ,85012,Maricopa,Arizona,4.0,13.0,4013.0,129.0,129.0,0,0,,,0.0,0,33.495498,-112.066157
1,2,Southern Arizona VA Health Care System,VA Hospital,3601 S 6th Ave,,Tucson,AZ,85723,Pima,Arizona,4.0,19.0,4019.0,295.0,295.0,2,2,,,2.0,0,32.181263,-110.965885
2,3,VA Central California Health Care System,VA Hospital,2615 E Clinton Ave,,Fresno,CA,93703,Fresno,California,6.0,19.0,6019.0,57.0,57.0,2,2,,,2.0,0,36.773324,-119.779742
3,4,VA Connecticut Healthcare System - West Haven ...,VA Hospital,950 Campbell Ave,,West Haven,CT,6516,New Haven,Connecticut,9.0,9.0,9009.0,216.0,216.0,1,1,,,2.0,0,41.2844,-72.95761
4,5,Wilmington VA Medical Center,VA Hospital,1601 Kirkwood Hwy,,Wilmington,DE,19805,New Castle,Delaware,10.0,3.0,10003.0,60.0,60.0,0,0,,,1.0,0,39.740206,-75.606532


In [23]:
response = athena_client.start_query_execution(
    QueryString='SELECT * FROM "static-dataset-countrycode"',
    QueryExecutionContext={"Database": SCHEMA_NAME},
    ResultConfiguration={
        "OutputLocation": S3_STAGING_DIR,
        "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
    },
)

static_dataset_countrycode = download_and_load_query_results(athena_client, response)

In [24]:
static_dataset_countrycode.head(5)

Unnamed: 0,country,alpha-2 code,alpha-3 code,numeric code,latitude,longitude
0,Afghanistan,AF,AFG,4.0,33.0,65.0
1,Albania,AL,ALB,8.0,41.0,20.0
2,Algeria,DZ,DZA,12.0,28.0,3.0
3,American Samoa,AS,ASM,16.0,-14.3333,-170.0
4,Andorra,AD,AND,20.0,42.5,1.6


In [25]:
response = athena_client.start_query_execution(
    QueryString='SELECT * FROM "static-dataset-state_abv"',
    QueryExecutionContext={"Database": SCHEMA_NAME},
    ResultConfiguration={
        "OutputLocation": S3_STAGING_DIR,
        "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
    },
)

In [26]:
static_dataset_state_abv = download_and_load_query_results(athena_client, response)

In [27]:
new_header = static_dataset_state_abv.iloc[0]
static_dataset_state_abv = static_dataset_state_abv[1:]
static_dataset_state_abv.columns = new_header

In [28]:
static_dataset_state_abv.head(5)

Unnamed: 0,State,Abbreviation
1,Alabama,AL
2,Alaska,AK
3,Arizona,AZ
4,Arkansas,AR
5,California,CA


In [29]:
response = athena_client.start_query_execution(
    QueryString='SELECT * FROM "static-dataset-countypopulation"',
    QueryExecutionContext={"Database": SCHEMA_NAME},
    ResultConfiguration={
        "OutputLocation": S3_STAGING_DIR,
        "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
    },
)

static_dataset_countypopulation  = download_and_load_query_results(athena_client, response)

In [30]:
static_dataset_countypopulation.head(5)

Unnamed: 0,id,id2,county,state,population estimate 2018
0,0500000US01001,1001,Autauga,Alabama,55601
1,0500000US01003,1003,Baldwin,Alabama,218022
2,0500000US01005,1005,Barbour,Alabama,24881
3,0500000US01007,1007,Bibb,Alabama,22400
4,0500000US01009,1009,Blount,Alabama,57840


In [31]:
#Transformation - Converting ER model to Dimensional model
factCovid_1 = enigma_jhud[['fips','province_state','country_region','confirmed','deaths','recovered','active']]
factCovid_2 = rearc_states_daily[['fips','date','positive','negative','hospitalizedcurrently','hospitalized','hospitalizeddischarged']]
factCovid = pd.merge(factCovid_1,factCovid_2,on='fips',how='inner')

In [32]:
factCovid.shape

(27992, 13)

In [33]:
dimRegion_1 = enigma_jhud[['fips','province_state','country_region','latitude','longitude']]
dimRegion_2 = nytimes_data_in_us_county[['fips','county','state']]
dimRegion = pd.merge(dimRegion_1,dimRegion_2,on='fips',how='inner')

In [34]:
dimRegion.shape

(11752274, 7)

In [35]:
dimHospital = rearc_us_hospital_beds[['fips','state_name','latitude','longtitude','hq_address','hospital_name','hospital_type','hq_city','hq_state']]

In [36]:
dimHospital.shape

(6637, 9)

In [37]:
dimDate = rearc_states_daily[['fips','date']]

In [38]:
dimDate.head()

Unnamed: 0,fips,date
0,2.0,20210307
1,1.0,20210307
2,5.0,20210307
3,60.0,20210307
4,4.0,20210307


In [39]:
dimDate['date'] = pd.to_datetime(dimDate['date'], format='%Y%m%d')

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  dimDate['date'] = pd.to_datetime(dimDate['date'], format='%Y%m%d')


In [40]:
dimDate.head()

Unnamed: 0,fips,date
0,2.0,2021-03-07
1,1.0,2021-03-07
2,5.0,2021-03-07
3,60.0,2021-03-07
4,4.0,2021-03-07


In [41]:
dimDate = pd.DataFrame(dimDate)

# Convert the 'date' column to datetime format
dimDate['date'] = pd.to_datetime(dimDate['date'])

# Extract year, month, and day
dimDate['year'] = dimDate['date'].dt.year
dimDate['month'] = dimDate['date'].dt.month
dimDate['day'] = dimDate['date'].dt.day

In [42]:
dimDate.head(5)

Unnamed: 0,fips,date,year,month,day
0,2.0,2021-03-07,2021,3,7
1,1.0,2021-03-07,2021,3,7
2,5.0,2021-03-07,2021,3,7
3,60.0,2021-03-07,2021,3,7
4,4.0,2021-03-07,2021,3,7


In [43]:
#Saving fact and dim csv to s3

bucket = 'aws-covid-elt-pipeline'  

s3_resource = boto3.resource(
    's3',
    aws_access_key_id = AWS_ACCESS_KEY,
    aws_secret_access_key = AWS_SECRET_KEY,
    region_name = AWS_REGION
)

In [44]:
csv_buffer = StringIO()  #stores actual information in binary format
factCovid.to_csv(csv_buffer)
s3_resource.Object(bucket,'output/factCovid.csv').put(Body=csv_buffer.getvalue())

{'ResponseMetadata': {'RequestId': '22QWT1DFYJMTK3NN',
  'HostId': 'bM7e6TICZElIMiSpjhVfwh27EUvzjF5UTodngsJLfOlog55iPq/WH5hQnoiPyyhbgzaNBF5oAQIFr2go3trYrsc7zprYY6FL',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amz-id-2': 'bM7e6TICZElIMiSpjhVfwh27EUvzjF5UTodngsJLfOlog55iPq/WH5hQnoiPyyhbgzaNBF5oAQIFr2go3trYrsc7zprYY6FL',
   'x-amz-request-id': '22QWT1DFYJMTK3NN',
   'date': 'Sat, 17 Aug 2024 23:02:49 GMT',
   'x-amz-server-side-encryption': 'AES256',
   'etag': '"fc9be4f5fc50864df1f4ed534cf29988"',
   'server': 'AmazonS3',
   'content-length': '0'},
  'RetryAttempts': 0},
 'ETag': '"fc9be4f5fc50864df1f4ed534cf29988"',
 'ServerSideEncryption': 'AES256'}

In [45]:
csv_buffer = StringIO()  #stores actual information in binary format
dimDate.to_csv(csv_buffer)
s3_resource.Object(bucket,'output/dimDate.csv').put(Body=csv_buffer.getvalue())

{'ResponseMetadata': {'RequestId': '6DBHSXYX44486P1J',
  'HostId': 'PLuuJGwKNMy7cdEQT2bewqAloMFRiVItK+1oz0ElfmMtaHq5M5wWi0O4DnHVTKASb5YygPKnDtkHa2rZ9mMH4dhjDKt1VhRc',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amz-id-2': 'PLuuJGwKNMy7cdEQT2bewqAloMFRiVItK+1oz0ElfmMtaHq5M5wWi0O4DnHVTKASb5YygPKnDtkHa2rZ9mMH4dhjDKt1VhRc',
   'x-amz-request-id': '6DBHSXYX44486P1J',
   'date': 'Sat, 17 Aug 2024 23:02:55 GMT',
   'x-amz-server-side-encryption': 'AES256',
   'etag': '"ef22e778a939fe0c2a9be66dfb933ffa"',
   'server': 'AmazonS3',
   'content-length': '0'},
  'RetryAttempts': 0},
 'ETag': '"ef22e778a939fe0c2a9be66dfb933ffa"',
 'ServerSideEncryption': 'AES256'}

In [46]:
csv_buffer = StringIO()  #stores actual information in binary format
dimHospital.to_csv(csv_buffer)
s3_resource.Object(bucket,'output/dimHospital.csv').put(Body=csv_buffer.getvalue())

{'ResponseMetadata': {'RequestId': 'A5HW1KXWWBV4ETYT',
  'HostId': 'hBOCpSVIA/4Zgw4bNQZGIX/NgNpmkZQM+kHqfw+17cv0XLps/mZrrd/iu/9WqWFqrOcnm1p6S7Sq9plroR00xIFvxIrxUQUR',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amz-id-2': 'hBOCpSVIA/4Zgw4bNQZGIX/NgNpmkZQM+kHqfw+17cv0XLps/mZrrd/iu/9WqWFqrOcnm1p6S7Sq9plroR00xIFvxIrxUQUR',
   'x-amz-request-id': 'A5HW1KXWWBV4ETYT',
   'date': 'Sat, 17 Aug 2024 23:03:00 GMT',
   'x-amz-server-side-encryption': 'AES256',
   'etag': '"a26c4e35d128fe6f64955ba9aac1d221"',
   'server': 'AmazonS3',
   'content-length': '0'},
  'RetryAttempts': 0},
 'ETag': '"a26c4e35d128fe6f64955ba9aac1d221"',
 'ServerSideEncryption': 'AES256'}

In [47]:
csv_buffer = StringIO()  #stores actual information in binary format
dimRegion.to_csv(csv_buffer)
s3_resource.Object(bucket,'output/dimRegion.csv').put(Body=csv_buffer.getvalue())

{'ResponseMetadata': {'RequestId': 'S3KZFSW67YN47JWS',
  'HostId': 'EBuiiaFrTXLKyiLFw6KaytdEVJoo3Bh7W8SoEksc1sPSS1jnC9yYsl8powhZK4io85lN2q1ixsG9b/e7I8PdUxIALPmpAzM4uKRkomhGeDk=',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amz-id-2': 'EBuiiaFrTXLKyiLFw6KaytdEVJoo3Bh7W8SoEksc1sPSS1jnC9yYsl8powhZK4io85lN2q1ixsG9b/e7I8PdUxIALPmpAzM4uKRkomhGeDk=',
   'x-amz-request-id': 'S3KZFSW67YN47JWS',
   'date': 'Sat, 17 Aug 2024 23:05:07 GMT',
   'x-amz-server-side-encryption': 'AES256',
   'etag': '"2fdf65265c31f0f6e8c02b4bd066d670"',
   'server': 'AmazonS3',
   'content-length': '0',
   'connection': 'close'},
  'RetryAttempts': 0},
 'ETag': '"2fdf65265c31f0f6e8c02b4bd066d670"',
 'ServerSideEncryption': 'AES256'}

In [49]:
#Extracting schema from fact and dimension tables
dimDatesql = pd.io.sql.get_schema(dimDate.reset_index(),'dimDate')
print(''.join(dimDatesql))

CREATE TABLE "dimDate" (
"index" INTEGER,
  "fips" REAL,
  "date" TIMESTAMP,
  "year" INTEGER,
  "month" INTEGER,
  "day" INTEGER
)


In [50]:
factCovidsql = pd.io.sql.get_schema(factCovid.reset_index(),'factCovid')
print(''.join(factCovidsql))

CREATE TABLE "factCovid" (
"index" INTEGER,
  "fips" REAL,
  "province_state" TEXT,
  "country_region" TEXT,
  "confirmed" REAL,
  "deaths" REAL,
  "recovered" REAL,
  "active" REAL,
  "date" INTEGER,
  "positive" INTEGER,
  "negative" REAL,
  "hospitalizedcurrently" REAL,
  "hospitalized" REAL,
  "hospitalizeddischarged" REAL
)


In [51]:
dimRegionsql = pd.io.sql.get_schema(dimRegion.reset_index(),'dimRegion')
print(''.join(dimRegionsql))

CREATE TABLE "dimRegion" (
"index" INTEGER,
  "fips" REAL,
  "province_state" TEXT,
  "country_region" TEXT,
  "latitude" REAL,
  "longitude" REAL,
  "county" TEXT,
  "state" TEXT
)


In [52]:
dimHospitalsql = pd.io.sql.get_schema(dimHospital.reset_index(),'dimHospital')
print(''.join(dimHospitalsql))

CREATE TABLE "dimHospital" (
"index" INTEGER,
  "fips" REAL,
  "state_name" TEXT,
  "latitude" REAL,
  "longtitude" REAL,
  "hq_address" TEXT,
  "hospital_name" TEXT,
  "hospital_type" TEXT,
  "hq_city" TEXT,
  "hq_state" TEXT
)


In [53]:
#Now, using the DDL commands, we can create tables in Redshift
!pip install redshift-connector



In [54]:
import redshift_connector

In [55]:
import redshift_connector
conn = redshift_connector.connect(
    host='redshift-cluster-1.cjzcckpx94ge.us-east-1.redshift.amazonaws.com',
    port=5439,
    database='dev',
    user='awsuser',
    password=''
)


In [56]:
conn.autocommit = True

In [57]:
cursor=redshift_connector.Cursor = conn.cursor()

In [71]:
cursor.execute("""
CREATE TABLE "dimDate" (
"index" INTEGER,
  "fips" REAL,
  "date" TIMESTAMP,
  "year" INTEGER,
  "month" INTEGER,
  "day" INTEGER
)
""")

<redshift_connector.cursor.Cursor at 0x1f582b27110>

In [61]:
cursor.execute("""
CREATE TABLE "dimHospital" (
"index" INTEGER,
  "fips" REAL,
  "state_name" TEXT,
  "latitude" REAL,
  "longtitude" REAL,
  "hq_address" TEXT,
  "hospital_name" TEXT,
  "hospital_type" TEXT,
  "hq_city" TEXT,
  "hq_state" TEXT
)
""")


<redshift_connector.cursor.Cursor at 0x1f582b27110>

In [62]:
cursor.execute("""
CREATE TABLE "factCovid" (
"index" INTEGER,
"fips" REAL,
"province_state" TEXT,
"country_region" TEXT,
"confirmed" REAL,
"deaths" REAL,
"recovered" REAL,
"active" REAL,
"date" INTEGER,
"positive" REAL,
"negative" REAL,
"hospitalizedcurrently" REAL,
"hospitalized" REAL,
"hospitalizeddischarged" REAL
)
""")

<redshift_connector.cursor.Cursor at 0x1f582b27110>

In [63]:
cursor.execute("""
CREATE TABLE "dimRegion" (
"index" INTEGER,
"fips" REAL,
"province_state" TEXT,
"country_region" TEXT,
"latitude" REAL,
"longititude" REAL,
"county" TEXT,
"state" TEXT
)
""")

<redshift_connector.cursor.Cursor at 0x1f582b27110>

In [64]:
cursor.execute("""
COPY factCovid
FROM 's3://aws-covid-elt-pipeline/output/factCovid.csv'
IAM_ROLE 'arn:aws:iam::440744222547:role/redshift_s3_access'
DELIMITER ','
REGION 'us-east-1'
IGNOREHEADER 1
CSV;
""")

<redshift_connector.cursor.Cursor at 0x1f582b27110>

In [66]:
cursor.execute("""
COPY dimRegion
FROM 's3://aws-covid-elt-pipeline/output/dimRegion.csv'
IAM_ROLE 'arn:aws:iam::440744222547:role/redshift_s3_access'
DELIMITER ','
REGION 'us-east-1'
IGNOREHEADER 1
CSV;
""")

<redshift_connector.cursor.Cursor at 0x1f582b27110>

In [72]:
cursor.execute("""
COPY dimDate
FROM 's3://aws-covid-elt-pipeline/output/dimDate.csv'
IAM_ROLE 'arn:aws:iam::440744222547:role/redshift_s3_access'
DELIMITER ','
REGION 'us-east-1'
IGNOREHEADER 1
CSV;
""")

<redshift_connector.cursor.Cursor at 0x1f582b27110>

In [68]:
cursor.execute("""
COPY dimHospital
FROM 's3://aws-covid-elt-pipeline/output/dimHospital.csv'
IAM_ROLE 'arn:aws:iam::440744222547:role/redshift_s3_access'
DELIMITER ','
REGION 'us-east-1'
IGNOREHEADER 1
CSV;
""")

<redshift_connector.cursor.Cursor at 0x1f582b27110>