In [10]:
import boto3
import pandas as pd
from io import StringIO

Pyarrow will become a required dependency of pandas in the next major release of pandas (pandas 3.0),
(to allow more performant data types, such as the Arrow string type, and better interoperability with other libraries)
but was not found to be installed on your system.
If this would cause problems for you,
please provide us feedback at https://github.com/pandas-dev/pandas/issues/54466
        
  import pandas as pd


In [1]:
import configparser
config = configparser.ConfigParser()
config.read_file(open('proj.config'))

In [7]:
KEY = config.get("AWS", "KEY")
SECRET = config.get("AWS", "SECRET") 
REGION = config.get("AWS", "REGION")
SCHEMA_NAME = "covid_19"
S3_STAGING_DIR = "s3://proj-athena-op/output/"
S3_BUCKET_NAME = "proj-athena-op"
S3_OUTPUT_DIRECTORY = "output"
I_AM_ROLE = config.get("DWH", "DWH_IAM_ROLE_NAME")

In [23]:
#Connecting to Athena
athena_client = boto3.client(
                             'athena',
                             aws_access_key_id = KEY,
                             aws_secret_access_key = SECRET,
                             region_name = REGION,
                             )

In [24]:
 #To query the data and store it onto s3 bucket
import time
Dict = {}
def download_and_load_query_results(client: boto3.client, query_response: Dict) -> pd.DataFrame:
    # Wait for the query to complete
    while True:
        try:
            #This function only loads the first 1000 rows
            client.get_query_results(
                QueryExecutionId = query_response["QueryExecutionId"]
            )
            break
        except Exception as e:
            if "not yet finished" in str(e):
                time.sleep(0.001)
            else:
                raise e
        
    temp_file_loc: str = "athena_query_results.csv"

    s3_client = boto3.client(
            "s3",
            aws_access_key_id=KEY,
            aws_secret_access_key=SECRET,
            region_name=REGION,
        )

    s3_client.download_file(
            S3_BUCKET_NAME,
            f"{S3_OUTPUT_DIRECTORY}/{query_response['QueryExecutionId']}.csv",
            temp_file_loc,
        )
    return pd.read_csv(temp_file_loc)



In [25]:
#Defining Response
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM enigma_jhud_csv",
    QueryExecutionContext={"Database": "covid_19"},
    ResultConfiguration={"OutputLocation": S3_STAGING_DIR, "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
    },
)

enigma_jhud_csv = download_and_load_query_results(athena_client, response)

In [26]:
enigma_jhud_csv.head()

Unnamed: 0,fips,admin2,province_state,country_region,last_update,latitude,longitude,confirmed,deaths,recovered,active,combined_key
0,,,Anhui,China,2020-01-22T17:00:00,31.826,117.226,1.0,,,,"""Anhui"
1,,,Beijing,China,2020-01-22T17:00:00,40.182,116.414,14.0,,,,"""Beijing"
2,,,Chongqing,China,2020-01-22T17:00:00,30.057,107.874,6.0,,,,"""Chongqing"
3,,,Fujian,China,2020-01-22T17:00:00,26.079,117.987,1.0,,,,"""Fujian"
4,,,Gansu,China,2020-01-22T17:00:00,36.061,103.834,,,,,"""Gansu"


In [27]:
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM nytimes_data_in_usa_us_county",
    QueryExecutionContext={"Database": "covid_19"},
    ResultConfiguration={"OutputLocation": S3_STAGING_DIR, "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
    },
)

nytimes_data_in_usa_us_county = download_and_load_query_results(athena_client, response)

In [28]:
nytimes_data_in_usa_us_county.head()

Unnamed: 0,date,county,state,fips,cases,deaths
0,2020-01-21,Snohomish,Washington,53061.0,1.0,0.0
1,2020-01-22,Snohomish,Washington,53061.0,1.0,0.0
2,2020-01-23,Snohomish,Washington,53061.0,1.0,0.0
3,2020-01-24,Cook,Illinois,17031.0,1.0,0.0
4,2020-01-24,Snohomish,Washington,53061.0,1.0,0.0


In [29]:
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM nytimes_data_in_usa_us_states",
    QueryExecutionContext={"Database": "covid_19"},
    ResultConfiguration={"OutputLocation": S3_STAGING_DIR, "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
    },
)

nytimes_data_in_usa_us_states = download_and_load_query_results(athena_client, response)

In [30]:
nytimes_data_in_usa_us_states.head()

Unnamed: 0,date,state,fips,cases,deaths
0,2020-01-21,Washington,53,1,0
1,2020-01-22,Washington,53,1,0
2,2020-01-23,Washington,53,1,0
3,2020-01-24,Illinois,17,1,0
4,2020-01-24,Washington,53,1,0


In [31]:
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM rearc_covid_19_testing_data_states_daily",
    QueryExecutionContext={"Database": "covid_19"},
    ResultConfiguration={"OutputLocation": S3_STAGING_DIR, "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
    },
)

rearc_covid_19_testing_data_states_daily = download_and_load_query_results(athena_client, response)

In [32]:
rearc_covid_19_testing_data_states_daily.head()

Unnamed: 0,date,state,positive,probablecases,negative,pending,totaltestresultssource,totaltestresults,hospitalizedcurrently,hospitalizedcumulative,...,dataqualitygrade,deathincrease,hospitalizedincrease,hash,commercialscore,negativeregularscore,negativescore,positivescore,score,grade
0,20210307,AK,56886,,,,totalTestsViral,1731628,33.0,1293.0,...,,0.0,0.0,dc4bccd4bb885349d7e94d6fed058e285d4be164,0.0,0.0,0.0,0.0,0.0,
1,20210307,AL,499819,107742.0,1931711.0,,totalTestsPeopleViral,2323788,494.0,45976.0,...,,-1.0,0.0,997207b430824ea40b8eb8506c19a93e07bc972e,0.0,0.0,0.0,0.0,0.0,
2,20210307,AR,324818,69092.0,2480716.0,,totalTestsViral,2736442,335.0,14926.0,...,,22.0,11.0,50921aeefba3e30d31623aa495b47fb2ecc72fae,0.0,0.0,0.0,0.0,0.0,
3,20210307,AS,0,,2140.0,,totalTestsViral,2140,,,...,,0.0,0.0,96d23f888c995b9a7f3b4b864de6414f45c728ff,0.0,0.0,0.0,0.0,0.0,
4,20210307,AZ,826454,56519.0,3073010.0,,totalTestsViral,7908105,963.0,57907.0,...,,5.0,44.0,0437a7a96f4471666f775e63e86923eb5cbd8cdf,0.0,0.0,0.0,0.0,0.0,


In [33]:
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM rearc_covid_19_testing_data_us_daily",
    QueryExecutionContext={"Database": "covid_19"},
    ResultConfiguration={"OutputLocation": S3_STAGING_DIR, "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
    },
)

rearc_covid_19_testing_data_us_daily = download_and_load_query_results(athena_client, response)

In [34]:
rearc_covid_19_testing_data_us_daily.head()

Unnamed: 0,date,states,positive,negative,pending,hospitalizedcurrently,hospitalizedcumulative,inicucurrently,inicucumulative,onventilatorcurrently,...,lastmodified,recovered,total,posneg,deathincrease,hospitalizedincrease,negativeincrease,positiveincrease,totaltestresultsincrease,hash
0,20210307,56,28755524.0,74579770.0,11808.0,40212.0,878613.0,8137.0,45475.0,2801.0,...,2021-03-07T24:00:00Z,,0,0,839,726,130414,41265,1156241,8b26839690cd05c0cef69cb9ed85641a76b5e78e
1,20210306,56,28714259.0,74449356.0,11783.0,41401.0,877887.0,8409.0,45453.0,2811.0,...,2021-03-06T24:00:00Z,,0,0,1674,503,142201,59620,1409138,d0c0482ea549c9d5c04a7c86acb6fc6a8095a592
2,20210305,56,28654639.0,74307155.0,12213.0,42541.0,877384.0,8634.0,45373.0,2889.0,...,2021-03-05T24:00:00Z,,0,0,2221,2781,271917,68787,1744417,a35ea4289cec4bb55c9f29ae04ec0fd5ac4e0222
3,20210304,56,28585852.0,74035238.0,12405.0,44172.0,874603.0,8970.0,45293.0,2973.0,...,2021-03-04T24:00:00Z,,0,0,1743,1530,177957,65487,1590984,a19ad6379a653834cbda3093791ad2c3b9fab5ff
4,20210303,56,28520365.0,73857281.0,11778.0,45462.0,873073.0,9359.0,45214.0,3094.0,...,2021-03-03T24:00:00Z,,0,0,2449,2172,267001,66836,1406795,9e1d2afda1b0ec243060d6f68a7134d011c0cb2a


In [35]:
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM rearc_covid_19_testing_data_us_total_latest",
    QueryExecutionContext={"Database": "covid_19"},
    ResultConfiguration={"OutputLocation": S3_STAGING_DIR, "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
    },
)

rearc_covid_19_testing_data_us_total_latest = download_and_load_query_results(athena_client, response)

In [36]:
rearc_covid_19_testing_data_us_total_latest.head()

Unnamed: 0,positive,negative,pending,hospitalizedcurrently,hospitalizedcumulative,inicucurrently,inicucumulative,onventilatorcurrently,onventilatorcumulative,recovered,hash,lastmodified,death,hospitalized,total,totaltestresults,posneg,notes
0,1061101,5170081,2775,53793,111955,9486,4192,4712,373,153947,95064ba29ccbc20dbec397033dfe4b1f45137c99,2020-05-01T09:12:31.891Z,57266,111955,6233957,6231182,6231182,"""NOTE: """"total"""""


In [37]:
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM rearc_usa_hospital_beds",
    QueryExecutionContext={"Database": "covid_19"},
    ResultConfiguration={"OutputLocation": S3_STAGING_DIR, "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
    },
)

rearc_usa_hospital_beds = download_and_load_query_results(athena_client, response)

In [38]:
rearc_usa_hospital_beds.head()

Unnamed: 0,objectid,hospital_name,hospital_type,hq_address,hq_address1,hq_city,hq_state,hq_zip_code,county_name,state_name,...,num_licensed_beds,num_staffed_beds,num_icu_beds,adult_icu_beds,pedi_icu_beds,bed_utilization,avg_ventilator_usage,potential_increase_in_bed_capac,latitude,longtitude
0,1,Phoenix VA Health Care System (AKA Carl T Hayd...,VA Hospital,650 E Indian School Rd,,Phoenix,AZ,85012,Maricopa,Arizona,...,129.0,129.0,0,0,,,0.0,0,33.495498,-112.066157
1,2,Southern Arizona VA Health Care System,VA Hospital,3601 S 6th Ave,,Tucson,AZ,85723,Pima,Arizona,...,295.0,295.0,2,2,,,2.0,0,32.181263,-110.965885
2,3,VA Central California Health Care System,VA Hospital,2615 E Clinton Ave,,Fresno,CA,93703,Fresno,California,...,57.0,57.0,2,2,,,2.0,0,36.773324,-119.779742
3,4,VA Connecticut Healthcare System - West Haven ...,VA Hospital,950 Campbell Ave,,West Haven,CT,6516,New Haven,Connecticut,...,216.0,216.0,1,1,,,2.0,0,41.2844,-72.95761
4,5,Wilmington VA Medical Center,VA Hospital,1601 Kirkwood Hwy,,Wilmington,DE,19805,New Castle,Delaware,...,60.0,60.0,0,0,,,1.0,0,39.740206,-75.606532


In [39]:
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM static_datasets_countrycode",
    QueryExecutionContext={"Database": "covid_19"},
    ResultConfiguration={"OutputLocation": S3_STAGING_DIR, "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
    },
)

static_datasets_countrycode = download_and_load_query_results(athena_client, response)

In [40]:
static_datasets_countrycode.head()

Unnamed: 0,country,alpha-2 code,alpha-3 code,numeric code,latitude,longitude
0,Afghanistan,AF,AFG,4.0,33.0,65.0
1,Albania,AL,ALB,8.0,41.0,20.0
2,Algeria,DZ,DZA,12.0,28.0,3.0
3,American Samoa,AS,ASM,16.0,-14.3333,-170.0
4,Andorra,AD,AND,20.0,42.5,1.6


In [41]:
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM static_datasets_countypopulation",
    QueryExecutionContext={"Database": "covid_19"},
    ResultConfiguration={"OutputLocation": S3_STAGING_DIR, "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
    },
)

static_datasets_countypopulation = download_and_load_query_results(athena_client, response)

In [42]:
static_datasets_countypopulation.head()

Unnamed: 0,id,id2,county,state,population estimate 2018
0,0500000US01001,1001,Autauga,Alabama,55601
1,0500000US01003,1003,Baldwin,Alabama,218022
2,0500000US01005,1005,Barbour,Alabama,24881
3,0500000US01007,1007,Bibb,Alabama,22400
4,0500000US01009,1009,Blount,Alabama,57840


In [43]:
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM static_datasets_state_abv",
    QueryExecutionContext={"Database": "covid_19"},
    ResultConfiguration={"OutputLocation": S3_STAGING_DIR, "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
    },
)

static_datasets_state_abv = download_and_load_query_results(athena_client, response)

In [46]:
static_datasets_state_abv.head()

Unnamed: 0,State,Abbreviation
1,Alabama,AL
2,Alaska,AK
3,Arizona,AZ
4,Arkansas,AR
5,California,CA


In [45]:
#Updating column Names of static_datasets_state_abv

updated_header = static_datasets_state_abv.iloc[0] #To grab the first row
static_datasets_state_abv = static_datasets_state_abv[1:]   #to remove the first row

#Adding new header to dataframe
static_datasets_state_abv.columns = updated_header

**ETL using Python**

In [153]:
#Building Fact Table

factCovid1 = enigma_jhud_csv[['fips', 'province_state', 'country_region', 'confirmed', 'deaths', 'recovered', 'active']]
factCovid2 = rearc_covid_19_testing_data_states_daily[['fips', 'date', 'positive', 'negative', 'hospitalizedcurrently', 'hospitalized', 'hospitalizeddischarged']]

#Merging the above tables
factCovid = pd.merge(factCovid1, factCovid2, on='fips', how='inner')

In [154]:
factCovid.head()

Unnamed: 0,fips,province_state,country_region,confirmed,deaths,recovered,active,date,positive,negative,hospitalizedcurrently,hospitalized,hospitalizeddischarged
0,,Anhui,China,1.0,,,,20210119,289939,,1066.0,,
1,,Beijing,China,14.0,,,,20210119,289939,,1066.0,,
2,,Chongqing,China,6.0,,,,20210119,289939,,1066.0,,
3,,Fujian,China,1.0,,,,20210119,289939,,1066.0,,
4,,Gansu,China,,,,,20210119,289939,,1066.0,,


In [148]:
factCovid.shape

(27992, 13)

In [139]:
#Building dimRegion Table
dimRegion1 = enigma_jhud_csv[['fips', 'province_state', 'country_region', 'latitude', 'longitude']]
dimRegion2 = nytimes_data_in_usa_us_county[['fips', 'county', 'state']]

#Merging the above Tables
dimRegion = pd.merge(dimRegion1, dimRegion2, on='fips', how='inner')

In [140]:
dimRegion.head()

Unnamed: 0,fips,province_state,country_region,latitude,longitude,county,state
0,,Anhui,China,31.826,117.226,New York City,New York
1,,Anhui,China,31.826,117.226,Unknown,Rhode Island
2,,Anhui,China,31.826,117.226,New York City,New York
3,,Anhui,China,31.826,117.226,Unknown,Rhode Island
4,,Anhui,China,31.826,117.226,New York City,New York


In [133]:
#Building dimHospital

dimHospital = rearc_usa_hospital_beds[['fips', 'state_name', 'latitude', 'longtitude', 'hq_address', 'hospital_name', 'hospital_type', 'hq_city', 'hq_state']]

In [134]:
dimHospital.head()

Unnamed: 0,fips,state_name,latitude,longtitude,hq_address,hospital_name,hospital_type,hq_city,hq_state
0,4013,Arizona,33.495498,-112.066157,650 E Indian School Rd,Phoenix VA Health Care System (AKA Carl T Hayd...,VA Hospital,Phoenix,AZ
1,4019,Arizona,32.181263,-110.965885,3601 S 6th Ave,Southern Arizona VA Health Care System,VA Hospital,Tucson,AZ
2,6019,California,36.773324,-119.779742,2615 E Clinton Ave,VA Central California Health Care System,VA Hospital,Fresno,CA
3,9009,Connecticut,41.2844,-72.95761,950 Campbell Ave,VA Connecticut Healthcare System - West Haven ...,VA Hospital,West Haven,CT
4,10003,Delaware,39.740206,-75.606532,1601 Kirkwood Hwy,Wilmington VA Medical Center,VA Hospital,Wilmington,DE


In [115]:
#Building dimDate

dimDate = rearc_covid_19_testing_data_states_daily[['fips', 'date']]

In [116]:
dimDate.head()

Unnamed: 0,fips,date
0,2.0,20210307
1,1.0,20210307
2,5.0,20210307
3,60.0,20210307
4,4.0,20210307


In [117]:
#Formatting date from dimDate

dimDate['date'] = pd.to_datetime(dimDate['date'], format='%Y%m%d')

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  dimDate['date'] = pd.to_datetime(dimDate['date'], format='%Y%m%d')


In [118]:
dimDate.head()

Unnamed: 0,fips,date
0,2.0,2021-03-07
1,1.0,2021-03-07
2,5.0,2021-03-07
3,60.0,2021-03-07
4,4.0,2021-03-07


In [119]:
#Exctracting Year, month and date individually from date column

dimDate['year'] = dimDate['date'].dt.year
dimDate['month'] = dimDate['date'].dt.month
dimDate['day'] = dimDate['date'].dt.dayofweek

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  dimDate['year'] = dimDate['date'].dt.year
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  dimDate['month'] = dimDate['date'].dt.month
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  dimDate['day'] = dimDate['date'].dt.dayofweek


In [120]:
dimDate.head()

Unnamed: 0,fips,date,year,month,day
0,2.0,2021-03-07,2021,3,6
1,1.0,2021-03-07,2021,3,6
2,5.0,2021-03-07,2021,3,6
3,60.0,2021-03-07,2021,3,6
4,4.0,2021-03-07,2021,3,6


**Storing Output onto S3**

In [155]:
bucket = 'manoj-covid-de-project'

#Creating StringIO Buffer object to put our value into binary format
csv_buffer = StringIO()

In [156]:
csv_buffer

<_io.StringIO at 0x15ba0dea0>

In [157]:
#Writing data onto buffer
factCovid.to_csv(csv_buffer)

In [123]:
dimDate.to_csv(csv_buffer)

In [137]:
dimHospital.to_csv(csv_buffer)

In [143]:
dimRegion.to_csv(csv_buffer)

In [158]:
# Creating s3 object
s3_resource = boto3.resource('s3',
                            aws_access_key_id=KEY,
                            aws_secret_access_key=SECRET,
                            region_name=REGION,
                            )

#Storing the buffer data onto s3
s3_resource.Object(bucket, 'output/factCovid.csv').put(Body=csv_buffer.getvalue())

{'ResponseMetadata': {'RequestId': 'QR1VJPFWB1NRWN4M',
  'HostId': '0TZhfYPwT9ekfhnQEsLzxqVbJ17S1EzUawhd9dmR6fU8LkJYlsS7v6HtyE3ao7yauK+630fL0R91jsU3Tlxa7QkCc3N+4FwVr+wxNhi4cUE=',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amz-id-2': '0TZhfYPwT9ekfhnQEsLzxqVbJ17S1EzUawhd9dmR6fU8LkJYlsS7v6HtyE3ao7yauK+630fL0R91jsU3Tlxa7QkCc3N+4FwVr+wxNhi4cUE=',
   'x-amz-request-id': 'QR1VJPFWB1NRWN4M',
   'date': 'Thu, 29 Feb 2024 00:50:58 GMT',
   'x-amz-server-side-encryption': 'AES256',
   'etag': '"ceb661db241606c22d4ba00f83bc9639"',
   'server': 'AmazonS3',
   'content-length': '0'},
  'RetryAttempts': 0},
 'ETag': '"ceb661db241606c22d4ba00f83bc9639"',
 'ServerSideEncryption': 'AES256'}

**Extracting Schema from stored data**

In [125]:
dimDatesql = pd.io.sql.get_schema(dimDate.reset_index(), 'dimDate')
print(''.join(dimDatesql))

CREATE TABLE "dimDate" (
"index" INTEGER,
  "fips" REAL,
  "date" TIMESTAMP,
  "year" INTEGER,
  "month" INTEGER,
  "day" INTEGER
)


In [107]:
factCovidsql = pd.io.sql.get_schema(factCovid.reset_index(), 'factCovid')
print(''.join(factCovidsql))

CREATE TABLE "factCovid" (
"index" INTEGER,
  "fips" REAL,
  "province_state" TEXT,
  "country_region" TEXT,
  "confirmed" REAL,
  "deaths" REAL,
  "recovered" REAL,
  "active" REAL,
  "date" INTEGER,
  "positive" INTEGER,
  "negative" REAL,
  "hospitalizedcurrently" REAL,
  "hospitalized" REAL,
  "hospitalizeddischarged" REAL
)


In [108]:
dimRegionsql = pd.io.sql.get_schema(dimRegion.reset_index(), 'dimRegion')
print(''.join(dimRegionsql))

CREATE TABLE "dimRegion" (
"index" INTEGER,
  "fips" REAL,
  "province_state" TEXT,
  "country_region" TEXT,
  "latitude" REAL,
  "longitude" REAL,
  "county" TEXT,
  "state" TEXT
)


In [109]:
dimHospitalsql = pd.io.sql.get_schema(dimHospital.reset_index(), 'dimHospital')
print(''.join(dimHospitalsql))

CREATE TABLE "dimHospital" (
"index" INTEGER,
  "fips" INTEGER,
  "state_name" TEXT,
  "latitude" REAL,
  "longtitude" REAL,
  "hq_address" TEXT,
  "hospital_name" TEXT,
  "hospital_type" TEXT,
  "hq_city" TEXT,
  "hq_state" TEXT
)


**Connecting to Redshift**

In [55]:
#installing redshift connector
!pip install redshift-connector



In [4]:
import redshift_connector

In [6]:
rs_conn = redshift_connector.connect(
    host = '{host_URL}',
    port= '{port_number}',
    database = '{DB_NAME}',
    user = '{USERNAME}',
    password = '{PASSWORD}'
)

In [162]:
rs_conn.autocommit = True

In [163]:
rs_cursor = redshift_connector.Cursor = rs_conn.cursor()

**Table Creation**

In [164]:
rs_cursor.execute("""
CREATE TABLE "dimDate" (
"index" INTEGER,
  "fips" REAL,
  "date" TIMESTAMP,
  "year" INTEGER,
  "month" INTEGER,
  "day" INTEGER
)
""")

rs_cursor.execute("""
CREATE TABLE "factCovid" (
"index" INTEGER,
  "fips" REAL,
  "province_state" TEXT,
  "country_region" TEXT,
  "confirmed" REAL,
  "deaths" REAL,
  "recovered" REAL,
  "active" REAL,
  "date" INTEGER,
  "positive" INTEGER,
  "negative" REAL,
  "hospitalizedcurrently" REAL,
  "hospitalized" REAL,
  "hospitalizeddischarged" REAL
)
""")

rs_cursor.execute("""
CREATE TABLE "dimRegion" (
"index" INTEGER,
  "fips" REAL,
  "province_state" TEXT,
  "country_region" TEXT,
  "latitude" REAL,
  "longitude" REAL,
  "county" TEXT,
  "state" TEXT
)
""")

rs_cursor.execute("""
CREATE TABLE "dimHospital" (
"index" INTEGER,
  "fips" INTEGER,
  "state_name" TEXT,
  "latitude" REAL,
  "longtitude" REAL,
  "hq_address" TEXT,
  "hospital_name" TEXT,
  "hospital_type" TEXT,
  "hq_city" TEXT,
  "hq_state" TEXT
)
""")


<redshift_connector.cursor.Cursor at 0x13a9886d0>

**Using COPY command to  copy files from one s3 to Redshift**

In [165]:
rs_cursor.execute("""
                copy dimDate from 's3://manoj-covid-de-project/output/dimDate.csv'
                credentials 'aws_iam_role=arn:aws:iam::590183649199:role/redshift-s3-access'
                delimiter ','
                region 'us-east-1'
                IGNOREHEADER 1                  
""")

<redshift_connector.cursor.Cursor at 0x13a9886d0>

In [166]:
rs_cursor.execute("""
                copy dimHospital from 's3://manoj-covid-de-project/output/dimHospital.csv'
                credentials 'aws_iam_role=arn:aws:iam::590183649199:role/redshift-s3-access'
                delimiter ','
                region 'us-east-1'
                IGNOREHEADER 1                  
""")

<redshift_connector.cursor.Cursor at 0x13a9886d0>

In [167]:
rs_cursor.execute("""
                copy dimRegion from 's3://manoj-covid-de-project/output/dimRegion.csv'
                credentials 'aws_iam_role=arn:aws:iam::590183649199:role/redshift-s3-access'
                delimiter ','
                region 'us-east-1'
                IGNOREHEADER 1                  
""")

<redshift_connector.cursor.Cursor at 0x13a9886d0>

In [168]:
rs_cursor.execute("""
                copy factCovid from 's3://manoj-covid-de-project/output/factCovid.csv'
                credentials 'aws_iam_role=arn:aws:iam::590183649199:role/redshift-s3-access'
                delimiter ','
                region 'us-east-1'
                IGNOREHEADER 1                  
""")

<redshift_connector.cursor.Cursor at 0x13a9886d0>