# COVID-19 Data Engineering Project

In [223]:
import boto3
import pandas as pd
from io import StringIO
import time

## Initialize AWS Config

In [224]:
import configparser
config = configparser.ConfigParser()
config.read_file(open('aws.config'))

In [225]:
AWS_ACCESS_KEY = config.get("AWS", "AWS_ACCESS_KEY")
AWS_SECRET_KEY = config.get("AWS", "AWS_SECRET_KEY")
AWS_REGION = config.get("AWS", "AWS_REGION")
SCHEMA_NAME = config.get("AWS", "SCHEMA_NAME")
S3_STAGING_DIR = config.get("AWS", "S3_STAGING_DIR")
S3_BUCKET_NAME = config.get("AWS", "S3_BUCKET_NAME")
S3_OUTPUT_DIRECTORY = config.get("AWS", "S3_OUTPUT_DIRECTORY")
DWH_CLUSTER_TYPE = config.get("DWH","DWH_CLUSTER_TYPE")
DWH_NUM_NODES = config.get("DWH","DWH_NUM_NODES")
DWH_NODE_TYPE = config.get("DWH","DWH_NODE_TYPE")

DWH_CLUSTER_IDENTIFIER = config.get("DWH","DWH_CLUSTER_IDENTIFIER")
DWH_DB = config.get("DWH","DWH_DB")
DWH_DB_USER = config.get("DWH","DWH_DB_USER")
DWH_DB_PASSWORD = config.get("DWH","DWH_DB_PASSWORD")
DWH_PORT = config.get("DWH","DWH_PORT")

DWH_IAM_ROLE_NAME = config.get("DWH","DWH_IAM_ROLE_NAME")

## Connect to Athena

In [131]:
athena_client = boto3.client(
    "athena",
    aws_access_key_id=AWS_ACCESS_KEY,
    aws_secret_access_key=AWS_SECRET_KEY,
    region_name=AWS_REGION,
)

## Function to get Query results, store as csv in S3, and return the csv file as DataFrame

In [155]:
Dict = {}
def download_and_load_query_results(client:boto3.client, query_response: Dict) -> pd.DataFrame:
    count = 1
    while True:
        try:
            client.get_query_results(
                QueryExecutionId=query_response["QueryExecutionId"]
            )
            break
        except Exception as err:
            if "FAILED" in str(err) or "QUEUED" in str(err):
                pass
                print(f"{count=}")
                count+=1
                time.sleep(10)
            else:
                raise err
    temp_file_location: str = "athena_query_results.csv"
    s3_client = boto3.client(
        "s3",
        aws_access_key_id=AWS_ACCESS_KEY,
        aws_secret_access_key=AWS_SECRET_KEY,
        region_name='us-east-1'    
    )
    s3_client.download_file(
        S3_BUCKET_NAME,
        f"{S3_OUTPUT_DIRECTORY}/{query_response['QueryExecutionId']}.csv",
        temp_file_location, 
    )
    return pd.read_csv(temp_file_location)

## Store all the tables data into Dataframes

In [134]:
response = athena_client.start_query_execution(
        QueryString="SELECT * FROM enigma_jhud",
        QueryExecutionContext={"Database": SCHEMA_NAME},
        ResultConfiguration={
            "OutputLocation": S3_STAGING_DIR,
            "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
        }
)


In [135]:
response

{'QueryExecutionId': '7f17075e-9943-44b2-a6e6-5d96310302b7',
 'ResponseMetadata': {'RequestId': '5d10eae0-09c2-43b4-8920-01f43254737c',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'date': 'Wed, 21 Feb 2024 01:42:27 GMT',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '59',
   'connection': 'keep-alive',
   'x-amzn-requestid': '5d10eae0-09c2-43b4-8920-01f43254737c'},
  'RetryAttempts': 0}}

In [138]:
enigma_jhud = download_and_load_query_results(athena_client, response)

In [141]:
enigma_jhud.head()

Unnamed: 0,fips,admin2,province_state,country_region,last_update,latitude,longitude,confirmed,deaths,recovered,active,combined_key,partition_0
0,,,Anhui,China,2020-01-22T17:00:00,31.826,117.226,1.0,,,,"""Anhui",csv
1,,,Beijing,China,2020-01-22T17:00:00,40.182,116.414,14.0,,,,"""Beijing",csv
2,,,Chongqing,China,2020-01-22T17:00:00,30.057,107.874,6.0,,,,"""Chongqing",csv
3,,,Fujian,China,2020-01-22T17:00:00,26.079,117.987,1.0,,,,"""Fujian",csv
4,,,Gansu,China,2020-01-22T17:00:00,36.061,103.834,,,,,"""Gansu",csv


In [147]:
response = athena_client.start_query_execution(
        QueryString="SELECT * FROM nytimes_data_in_usa_us_county",
        QueryExecutionContext={"Database": SCHEMA_NAME},
        ResultConfiguration={
            "OutputLocation": S3_STAGING_DIR,
            "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
        }
)
nytimes_data_in_usa_us_county = download_and_load_query_results(athena_client, response)

count=1


In [148]:
response = athena_client.start_query_execution(
        QueryString="SELECT * FROM nytimes_data_in_usa_us_states",
        QueryExecutionContext={"Database": SCHEMA_NAME},
        ResultConfiguration={
            "OutputLocation": S3_STAGING_DIR,
            "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
        }
)
nytimes_data_in_usa_us_states = download_and_load_query_results(athena_client, response)

count=1


In [149]:
response = athena_client.start_query_execution(
        QueryString="SELECT * FROM rearc_covid_19_testing_data_states_dailystates_daily",
        QueryExecutionContext={"Database": SCHEMA_NAME},
        ResultConfiguration={
            "OutputLocation": S3_STAGING_DIR,
            "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
        }
)
rearc_covid_19_testing_data_states_dailystates_daily = download_and_load_query_results(athena_client, response)

count=1


In [150]:
response = athena_client.start_query_execution(
        QueryString="SELECT * FROM rearc_covid_19_testing_data_us_daily",
        QueryExecutionContext={"Database": SCHEMA_NAME},
        ResultConfiguration={
            "OutputLocation": S3_STAGING_DIR,
            "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
        }
)
rearc_covid_19_testing_data_us_daily = download_and_load_query_results(athena_client, response)

count=1


In [159]:
response = athena_client.start_query_execution(
        QueryString="SELECT * FROM rearc_usa_hospital_beds",
        QueryExecutionContext={"Database": SCHEMA_NAME},
        ResultConfiguration={
            "OutputLocation": S3_STAGING_DIR,
            "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
        }
)
rearc_usa_hospital_beds = download_and_load_query_results(athena_client, response)

count=1


In [157]:
response = athena_client.start_query_execution(
        QueryString="SELECT * FROM static_dataset_countrycode",
        QueryExecutionContext={"Database": SCHEMA_NAME},
        ResultConfiguration={
            "OutputLocation": S3_STAGING_DIR,
            "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
        }
)
static_dataset_countrycode = download_and_load_query_results(athena_client, response)

count=1


In [160]:
response = athena_client.start_query_execution(
        QueryString="SELECT * FROM static_dataset_countypopulation",
        QueryExecutionContext={"Database": SCHEMA_NAME},
        ResultConfiguration={
            "OutputLocation": S3_STAGING_DIR,
            "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
        }
)
static_dataset_countypopulation = download_and_load_query_results(athena_client, response)

count=1


In [161]:
response = athena_client.start_query_execution(
        QueryString="SELECT * FROM static_dataset_state_abv",
        QueryExecutionContext={"Database": SCHEMA_NAME},
        ResultConfiguration={
            "OutputLocation": S3_STAGING_DIR,
            "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
        }
)
static_dataset_state_abv = download_and_load_query_results(athena_client, response)

count=1


In [162]:
static_dataset_state_abv.head()

Unnamed: 0,col0,col1
0,State,Abbreviation
1,Alabama,AL
2,Alaska,AK
3,Arizona,AZ
4,Arkansas,AR


In [165]:
new_header_data = static_dataset_state_abv.iloc[0]
static_dataset_state_abv = static_dataset_state_abv[1:]
static_dataset_state_abv.columns = new_header_data

In [166]:
static_dataset_state_abv.head()

Unnamed: 0,State,Abbreviation
1,Alabama,AL
2,Alaska,AK
3,Arizona,AZ
4,Arkansas,AR
5,California,CA


## Building Dimensional Model Dataframes

In [171]:
rearc_covid_19_testing_data_states_dailystates_daily.columns

Index(['date', 'state', 'positive', 'probablecases', 'negative', 'pending',
       'totaltestresultssource', 'totaltestresults', 'hospitalizedcurrently',
       'hospitalizedcumulative', 'inicucurrently', 'inicucumulative',
       'onventilatorcurrently', 'onventilatorcumulative', 'recovered',
       'lastupdateet', 'datemodified', 'checktimeet', 'death', 'hospitalized',
       'hospitalizeddischarged', 'datechecked', 'totaltestsviral',
       'positivetestsviral', 'negativetestsviral', 'positivecasesviral',
       'deathconfirmed', 'deathprobable', 'totaltestencountersviral',
       'totaltestspeopleviral', 'totaltestsantibody', 'positivetestsantibody',
       'negativetestsantibody', 'totaltestspeopleantibody',
       'positivetestspeopleantibody', 'negativetestspeopleantibody',
       'totaltestspeopleantigen', 'positivetestspeopleantigen',
       'totaltestsantigen', 'positivetestsantigen', 'fips', 'positiveincrease',
       'negativeincrease', 'total', 'totaltestresultsincrease', 

In [173]:
factCovid1 = enigma_jhud[['fips','province_state','country_region','confirmed', 'deaths', 'recovered', 'active']]
factCovid2 = rearc_covid_19_testing_data_states_dailystates_daily[['fips','date','positive','negative', 'hospitalizedcurrently', 'hospitalized', 'hospitalizeddischarged']]
factCovid = pd.merge(factCovid1, factCovid2, on='fips', how='inner')

In [174]:
factCovid.head()

Unnamed: 0,fips,province_state,country_region,confirmed,deaths,recovered,active,date,positive,negative,hospitalizedcurrently,hospitalized,hospitalizeddischarged
0,,Anhui,China,1.0,,,,20210119,289939,,1066.0,,
1,,Beijing,China,14.0,,,,20210119,289939,,1066.0,,
2,,Chongqing,China,6.0,,,,20210119,289939,,1066.0,,
3,,Fujian,China,1.0,,,,20210119,289939,,1066.0,,
4,,Gansu,China,,,,,20210119,289939,,1066.0,,


In [175]:
factCovid.shape

(8729, 13)

In [177]:
dimRegion1 = enigma_jhud[['fips','province_state','country_region','latitude', 'longitude']]
dimRegion2 = nytimes_data_in_usa_us_county[['fips','county','state']]
dimRegion = pd.merge(dimRegion1, dimRegion2, on='fips', how='inner')

In [178]:
rearc_usa_hospital_beds.columns

Index(['objectid', 'hospital_name', 'hospital_type', 'hq_address',
       'hq_address1', 'hq_city', 'hq_state', 'hq_zip_code', 'county_name',
       'state_name', 'state_fips', 'cnty_fips', 'fips', 'num_licensed_beds',
       'num_staffed_beds', 'num_icu_beds', 'adult_icu_beds', 'pedi_icu_beds',
       'bed_utilization', 'avg_ventilator_usage',
       'potential_increase_in_bed_capac', 'latitude', 'longtitude'],
      dtype='object')

In [182]:
dimHospital = rearc_usa_hospital_beds[['fips','state_name', 'latitude', 'longtitude','hq_address','hq_city', 'hq_state', 'hospital_name', 'hospital_type']]

In [183]:
dimDate = rearc_covid_19_testing_data_states_dailystates_daily[['fips','date']]

In [184]:
dimDate.head()

Unnamed: 0,fips,date
0,2.0,20210307
1,1.0,20210307
2,5.0,20210307
3,60.0,20210307
4,4.0,20210307


In [186]:
dimDate['date'] = pd.to_datetime(dimDate['date'], format='%Y%m%d')

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  dimDate['date'] = pd.to_datetime(dimDate['date'], format='%Y%m%d')


In [187]:
dimDate.head()

Unnamed: 0,fips,date
0,2.0,2021-03-07
1,1.0,2021-03-07
2,5.0,2021-03-07
3,60.0,2021-03-07
4,4.0,2021-03-07


In [189]:
dimDate['year'] = dimDate['date'].dt.year
dimDate['month'] = dimDate['date'].dt.month
dimDate['day_of_week'] = dimDate['date'].dt.dayofweek


A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  dimDate['year'] = dimDate['date'].dt.year
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  dimDate['month'] = dimDate['date'].dt.month
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  dimDate['day_of_week'] = dimDate['date'].dt.dayofweek


In [190]:
dimDate.head()

Unnamed: 0,fips,date,year,month,day_of_week
0,2.0,2021-03-07,2021,3,6
1,1.0,2021-03-07,2021,3,6
2,5.0,2021-03-07,2021,3,6
3,60.0,2021-03-07,2021,3,6
4,4.0,2021-03-07,2021,3,6


## Storing Dimensional Model data as CSV on S3

In [191]:
bucket = 'covid-19-datasets'

In [192]:
csv_buffer = StringIO()

In [193]:
csv_buffer

<_io.StringIO at 0x7f39c7e79a20>

In [194]:
factCovid.to_csv(csv_buffer)

In [196]:
s3_resource = boto3.resource(
    's3',region_name=AWS_REGION,
    aws_access_key_id=AWS_ACCESS_KEY,
    aws_secret_access_key=AWS_SECRET_KEY)
s3_resource.Object(bucket, 'output/factCovid.csv').put(Body=csv_buffer.getvalue())

{'ResponseMetadata': {'RequestId': 'T24W53M1KX7PM5Q7',
  'HostId': 'X4sxRZTDroNa3Sj+PkS/i7/IaA76//VWVEvpjUYZWMm5Rbv3hcCBi5u0eOTSnNlTqbybZ1kVaA+EhLI4A/YHC+6Mv1NmpX0f',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amz-id-2': 'X4sxRZTDroNa3Sj+PkS/i7/IaA76//VWVEvpjUYZWMm5Rbv3hcCBi5u0eOTSnNlTqbybZ1kVaA+EhLI4A/YHC+6Mv1NmpX0f',
   'x-amz-request-id': 'T24W53M1KX7PM5Q7',
   'date': 'Wed, 21 Feb 2024 04:46:02 GMT',
   'x-amz-server-side-encryption': 'AES256',
   'etag': '"142c1e29702a1de73dd01b170574b0bd"',
   'server': 'AmazonS3',
   'content-length': '0'},
  'RetryAttempts': 0},
 'ETag': '"142c1e29702a1de73dd01b170574b0bd"',
 'ServerSideEncryption': 'AES256'}

In [197]:
csv_buffer.getvalue()

',fips,province_state,country_region,confirmed,deaths,recovered,active,date,positive,negative,hospitalizedcurrently,hospitalized,hospitalizeddischarged\n0,,Anhui,China,1.0,,,,20210119,289939,,1066.0,,\n1,,Beijing,China,14.0,,,,20210119,289939,,1066.0,,\n2,,Chongqing,China,6.0,,,,20210119,289939,,1066.0,,\n3,,Fujian,China,1.0,,,,20210119,289939,,1066.0,,\n4,,Gansu,China,,,,,20210119,289939,,1066.0,,\n5,,Guangdong,China,26.0,,,,20210119,289939,,1066.0,,\n6,,Guangxi,China,2.0,,,,20210119,289939,,1066.0,,\n7,,Guizhou,China,1.0,,,,20210119,289939,,1066.0,,\n8,,Hai,China,4.0,,,,20210119,289939,,1066.0,,\n9,,Hebei,China,1.0,,,,20210119,289939,,1066.0,,\n10,,Heilongjiang,China,,,,,20210119,289939,,1066.0,,\n11,,He,China,5.0,,,,20210119,289939,,1066.0,,\n12,,Hong Kong,Hong Kong,,,,,20210119,289939,,1066.0,,\n13,,Hubei,China,444.0,17.0,28.0,,20210119,289939,,1066.0,,\n14,,Hu,China,4.0,,,,20210119,289939,,1066.0,,\n15,,Inner Mongolia,China,,,,,20210119,289939,,1066.0,,\n16,,Jiangsu,China,1.0,,,,2

In [198]:
dimDate.to_csv(csv_buffer)
s3_resource.Object(bucket, 'output/dimDate.csv').put(Body=csv_buffer.getvalue())

{'ResponseMetadata': {'RequestId': '3PEH1SENGY402XP6',
  'HostId': 'UowzpQMYvFw0PmobBDYvYZxqaZxL/34wOdF4PWs7y9iRDmkDZb9UYVEj2BB9PIqD0kWUnumsm7I=',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amz-id-2': 'UowzpQMYvFw0PmobBDYvYZxqaZxL/34wOdF4PWs7y9iRDmkDZb9UYVEj2BB9PIqD0kWUnumsm7I=',
   'x-amz-request-id': '3PEH1SENGY402XP6',
   'date': 'Wed, 21 Feb 2024 04:50:28 GMT',
   'x-amz-server-side-encryption': 'AES256',
   'etag': '"85a0cdbb340da371ec24367c083b7879"',
   'server': 'AmazonS3',
   'content-length': '0'},
  'RetryAttempts': 0},
 'ETag': '"85a0cdbb340da371ec24367c083b7879"',
 'ServerSideEncryption': 'AES256'}

In [199]:
dimHospital.to_csv(csv_buffer)
s3_resource.Object(bucket, 'output/dimHospital.csv').put(Body=csv_buffer.getvalue())

{'ResponseMetadata': {'RequestId': 'JZVB5241JHJ9Z9MH',
  'HostId': 'CEa1b6zNjFd5C3IZfs8XqbAB8P5kHdPJ4Za1LmPV3wAeRbB/CfnHPcnfxFpNHhGWVGA/qbzaWRw=',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amz-id-2': 'CEa1b6zNjFd5C3IZfs8XqbAB8P5kHdPJ4Za1LmPV3wAeRbB/CfnHPcnfxFpNHhGWVGA/qbzaWRw=',
   'x-amz-request-id': 'JZVB5241JHJ9Z9MH',
   'date': 'Wed, 21 Feb 2024 04:50:40 GMT',
   'x-amz-server-side-encryption': 'AES256',
   'etag': '"6d797db6e45fe9992982df363d205563"',
   'server': 'AmazonS3',
   'content-length': '0'},
  'RetryAttempts': 0},
 'ETag': '"6d797db6e45fe9992982df363d205563"',
 'ServerSideEncryption': 'AES256'}

In [200]:
dimRegion.to_csv(csv_buffer)
s3_resource.Object(bucket, 'output/dimRegion.csv').put(Body=csv_buffer.getvalue())

{'ResponseMetadata': {'RequestId': 'KV2WYQBHTA0E51H3',
  'HostId': 'MBL4FOYvVRklFQzRjCcsWYxOKjh0L+bRSNRWJVzAB7u9ECAzRlEwD4AQTyeJxf0RYaDDCKGT8zb178IkDKCSAQ==',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amz-id-2': 'MBL4FOYvVRklFQzRjCcsWYxOKjh0L+bRSNRWJVzAB7u9ECAzRlEwD4AQTyeJxf0RYaDDCKGT8zb178IkDKCSAQ==',
   'x-amz-request-id': 'KV2WYQBHTA0E51H3',
   'date': 'Wed, 21 Feb 2024 04:50:59 GMT',
   'x-amz-server-side-encryption': 'AES256',
   'etag': '"c6e4276ec6541b28997a5937760d7cc5"',
   'server': 'AmazonS3',
   'content-length': '0'},
  'RetryAttempts': 0},
 'ETag': '"c6e4276ec6541b28997a5937760d7cc5"',
 'ServerSideEncryption': 'AES256'}

## Extract Schema from Dataframe

In [201]:
dimDatesql = pd.io.sql.get_schema(dimDate.reset_index(), 'dimDate')
print(''.join(dimDatesql))

CREATE TABLE "dimDate" (
"index" INTEGER,
  "fips" REAL,
  "date" TIMESTAMP,
  "year" INTEGER,
  "month" INTEGER,
  "day_of_week" INTEGER
)


In [202]:
dimHospitalsql = pd.io.sql.get_schema(dimHospital.reset_index(), 'dimHospital')
print(''.join(dimHospitalsql))

CREATE TABLE "dimHospital" (
"index" INTEGER,
  "fips" INTEGER,
  "state_name" TEXT,
  "latitude" REAL,
  "longtitude" REAL,
  "hq_address" TEXT,
  "hq_city" TEXT,
  "hq_state" TEXT,
  "hospital_name" TEXT,
  "hospital_type" TEXT
)


In [203]:
dimRegionsql = pd.io.sql.get_schema(dimRegion.reset_index(), 'dimRegion')
print(''.join(dimRegionsql))

CREATE TABLE "dimRegion" (
"index" INTEGER,
  "fips" REAL,
  "province_state" TEXT,
  "country_region" TEXT,
  "latitude" REAL,
  "longitude" REAL,
  "county" TEXT,
  "state" TEXT
)


In [204]:
factCovidsql = pd.io.sql.get_schema(factCovid.reset_index(), 'factCovid')
print(''.join(factCovidsql))

CREATE TABLE "factCovid" (
"index" INTEGER,
  "fips" REAL,
  "province_state" TEXT,
  "country_region" TEXT,
  "confirmed" REAL,
  "deaths" REAL,
  "recovered" REAL,
  "active" REAL,
  "date" INTEGER,
  "positive" INTEGER,
  "negative" REAL,
  "hospitalizedcurrently" REAL,
  "hospitalized" REAL,
  "hospitalizeddischarged" REAL
)


In [227]:
redshift = boto3.client('redshift',
                   region_name='us-east-1',
                   aws_access_key_id=AWS_ACCESS_KEY,
                   aws_secret_access_key=AWS_SECRET_KEY
                   )

In [230]:
iam = boto3.client('iam',
                   region_name='us-east-1',
                   aws_access_key_id=AWS_ACCESS_KEY,
                   aws_secret_access_key=AWS_SECRET_KEY
                   )

In [231]:
roleArn = iam.get_role(RoleName=DWH_IAM_ROLE_NAME)['Role']['Arn']

In [232]:
try:
    response = redshift.create_cluster(
        ClusterType=DWH_CLUSTER_TYPE,
        NodeType=DWH_NODE_TYPE,
        
        #identifiers & credentials
        DBName=DWH_DB,
        ClusterIdentifier=DWH_CLUSTER_IDENTIFIER,
        MasterUsername=DWH_DB_USER,
        MasterUserPassword=DWH_DB_PASSWORD,
        
        #roles
        IamRoles=[roleArn]
    )
except Exception as e:
    print(e)
    

In [238]:
redshift.describe_clusters(ClusterIdentifier = DWH_CLUSTER_IDENTIFIER)

{'Clusters': [{'ClusterIdentifier': 'my-first-redshift',
   'NodeType': 'dc2.large',
   'ClusterStatus': 'available',
   'ClusterAvailabilityStatus': 'Unavailable',
   'MasterUsername': 'awsuser',
   'DBName': 'myfirstdb',
   'Endpoint': {'Address': 'my-first-redshift.cccyn6m0cjco.us-east-1.redshift.amazonaws.com',
    'Port': 5439},
   'ClusterCreateTime': datetime.datetime(2024, 2, 21, 6, 32, 49, 531000, tzinfo=tzutc()),
   'AutomatedSnapshotRetentionPeriod': 1,
   'ManualSnapshotRetentionPeriod': -1,
   'ClusterSecurityGroups': [],
   'VpcSecurityGroups': [{'VpcSecurityGroupId': 'sg-03796813f026f3722',
     'Status': 'active'}],
   'ClusterParameterGroups': [{'ParameterGroupName': 'default.redshift-1.0',
     'ParameterApplyStatus': 'in-sync'}],
   'ClusterSubnetGroupName': 'default',
   'VpcId': 'vpc-06a71eb2adf418c95',
   'AvailabilityZone': 'us-east-1c',
   'PreferredMaintenanceWindow': 'thu:04:30-thu:05:00',
   'PendingModifiedValues': {},
   'ClusterVersion': '1.0',
   'AllowVe

In [239]:
myClusterProps = redshift.describe_clusters(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)['Clusters'][0]

In [240]:
myClusterProps

{'ClusterIdentifier': 'my-first-redshift',
 'NodeType': 'dc2.large',
 'ClusterStatus': 'available',
 'ClusterAvailabilityStatus': 'Unavailable',
 'MasterUsername': 'awsuser',
 'DBName': 'myfirstdb',
 'Endpoint': {'Address': 'my-first-redshift.cccyn6m0cjco.us-east-1.redshift.amazonaws.com',
  'Port': 5439},
 'ClusterCreateTime': datetime.datetime(2024, 2, 21, 6, 32, 49, 531000, tzinfo=tzutc()),
 'AutomatedSnapshotRetentionPeriod': 1,
 'ManualSnapshotRetentionPeriod': -1,
 'ClusterSecurityGroups': [],
 'VpcSecurityGroups': [{'VpcSecurityGroupId': 'sg-03796813f026f3722',
   'Status': 'active'}],
 'ClusterParameterGroups': [{'ParameterGroupName': 'default.redshift-1.0',
   'ParameterApplyStatus': 'in-sync'}],
 'ClusterSubnetGroupName': 'default',
 'VpcId': 'vpc-06a71eb2adf418c95',
 'AvailabilityZone': 'us-east-1c',
 'PreferredMaintenanceWindow': 'thu:04:30-thu:05:00',
 'PendingModifiedValues': {},
 'ClusterVersion': '1.0',
 'AllowVersionUpgrade': True,
 'NumberOfNodes': 1,
 'PubliclyAccess

In [241]:
DWH_ENDPOINT = myClusterProps['Endpoint']['Address']
DWH_ROLE_NAME = myClusterProps['IamRoles'][0]['IamRoleArn']
DB_NAME = myClusterProps['DBName']
DB_USER = myClusterProps['MasterUsername']

In [243]:
ec2 = boto3.resource('ec2',
                        region_name='us-east-1',
                   aws_access_key_id=AWS_ACCESS_KEY,
                   aws_secret_access_key=AWS_SECRET_KEY
                   )

In [244]:
try:
    vpc = ec2.Vpc(id=myClusterProps['VpcId'])
    defaultSg = list(vpc.security_groups.all())[0]
    print(defaultSg)
    
    defaultSg.authorize_ingress(
        GroupName=defaultSg.group_name,
        CidrIp='0.0.0.0/0',
        IpProtocol='TCP',
        FromPort=int(DWH_PORT),
        ToPort=int(DWH_PORT)
    )
except Exception as e:
    print(e)

ec2.SecurityGroup(id='sg-03796813f026f3722')


In [245]:
import psycopg2
try:
    conn = psycopg2.connect(host=DWH_ENDPOINT, dbname=DB_NAME, user=DB_USER, password=DWH_DB_PASSWORD, port=5439)
except Exception as e:
    print("Error Could not make connection to the postgres DB")
    print(e)

conn.set_session(autocommit=True)

In [246]:
try:
    cur = conn.cursor()
except Exception as e:
    print("Error Could not get cursor to the DB")
    print(e)

In [248]:
cur.execute("""
CREATE TABLE "dimDate" (
"index" INTEGER,
  "fips" REAL,
  "date" TIMESTAMP,
  "year" INTEGER,
  "month" INTEGER,
  "day_of_week" INTEGER
)
""")

In [249]:
cur.execute("""
CREATE TABLE "dimHospital" (
"index" INTEGER,
  "fips" INTEGER,
  "state_name" TEXT,
  "latitude" REAL,
  "longtitude" REAL,
  "hq_address" TEXT,
  "hq_city" TEXT,
  "hq_state" TEXT,
  "hospital_name" TEXT,
  "hospital_type" TEXT
)
""")

In [250]:
cur.execute("""
CREATE TABLE "dimRegion" (
"index" INTEGER,
  "fips" REAL,
  "province_state" TEXT,
  "country_region" TEXT,
  "latitude" REAL,
  "longitude" REAL,
  "county" TEXT,
  "state" TEXT
)
""")

In [251]:
cur.execute("""
CREATE TABLE "factCovid" (
"index" INTEGER,
  "fips" REAL,
  "province_state" TEXT,
  "country_region" TEXT,
  "confirmed" REAL,
  "deaths" REAL,
  "recovered" REAL,
  "active" REAL,
  "date" INTEGER,
  "positive" INTEGER,
  "negative" REAL,
  "hospitalizedcurrently" REAL,
  "hospitalized" REAL,
  "hospitalizeddischarged" REAL
)
""")

In [None]:
try:
    cur.execute("""
        COPY dimDate from 's3://covid-19-datasets/output/dimDate.csv' 
        credentials 'aws_iam_role=arn:aws:iam::992382696383:role/redshift-s3-access'
        delimiter ','
        region 'us-east-1'
        IGNOREHEADER 1
        
    """)
except psycopg2.Error as e:
    print("Error: Issue Copying the data from s3 to redshift tables")
    print(e)