In [9]:
import boto3
import pandas as pd
from io import StringIO
import time

In [None]:
key = pd.read_csv('/Users/ha/Project/AWS/hahoang-local_accessKeys.csv')

In [None]:
AWS_ACCESS_KEY = key['Access key ID'][0]
AWS_SECRET_KEY = key['Secret access key'][0]
REDSHIFT_ENDPOINT = key['Endpoint Redshift'][0]
REDSHIFT_USERNAME = key['User Redshift'][0]
REDSHIFT_PASSWORD = key['Password Redshift'][0]
AWS_REGION = 'ap-southeast-1'
SCHEMA_NAME = 'covid_19_dataset'
S3_STAGING_DIR = 's3://covid19-project-result/output/'
S3_BUCKET_NAME = 'covid19-project-result'
S3_OUTPUT_DIRECTORY = 'output'

## Create session object

In [4]:
session = boto3.Session(
    aws_access_key_id=AWS_ACCESS_KEY,
    aws_secret_access_key=AWS_SECRET_KEY,
)
athena_client = session.client("athena",region_name = AWS_REGION,)
s3_client = session.client("s3",region_name = AWS_REGION,)

## Read data from response
- download file result query of athena that save into s3 and read this file by pandas

In [5]:
#Gets the data from athena and converts it into pandas dataframe
Dict = {}
def download_and_load_query_results(
    athena_client: boto3.client, s3_client:boto3.client, query_response: Dict
) -> pd.DataFrame:
    while True:
        try:
            athena_client.get_query_results(QueryExecutionId=query_response["QueryExecutionId"])
            break
        except Exception as err:
            if "not yet finished" in str(err):
                time.sleep(0.01)
            else:
                raise err
    temp_file_location = "athena_query_results.csv"
    s3_client.download_file(S3_BUCKET_NAME,
                           f"{S3_OUTPUT_DIRECTORY}/{query_response['QueryExecutionId']}.csv",
                           temp_file_location,)
    return pd.read_csv(temp_file_location)

## Get data from tables in athena into dataframe

In [6]:
response = athena_client.start_query_execution(
    QueryString="Select * from enigma_jhub",
    QueryExecutionContext={"Database":SCHEMA_NAME},
    ResultConfiguration={"OutputLocation":S3_STAGING_DIR,
                        "EncryptionConfiguration":{"EncryptionOption":"SSE_S3"},
                        },
)

enigma_jhud = download_and_load_query_results(athena_client, s3_client, response)

In [7]:
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM us_country",
    QueryExecutionContext={"Database":SCHEMA_NAME},
    ResultConfiguration={"OutputLocation":S3_STAGING_DIR,
                        "EncryptionConfiguration":{"EncryptionOption":"SSE_S3"},
                        },
)

nytimes_data_in_usa_us_county = download_and_load_query_results(athena_client, s3_client, response)

In [8]:
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM us_states",
    QueryExecutionContext={"Database":SCHEMA_NAME},
    ResultConfiguration={"OutputLocation":S3_STAGING_DIR,
                        "EncryptionConfiguration":{"EncryptionOption":"SSE_S3"},
                        },
)

nytimes_data_in_usaus_states = download_and_load_query_results(athena_client, s3_client, response)

In [9]:
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM countrypopulation",
    QueryExecutionContext={"Database":SCHEMA_NAME},
    ResultConfiguration={"OutputLocation":S3_STAGING_DIR,
                        "EncryptionConfiguration":{"EncryptionOption":"SSE_S3"},
                        },
)

countypopulation = download_and_load_query_results(athena_client, s3_client, response)

In [10]:
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM reatc_usa_hospital_beds",
    QueryExecutionContext={"Database":SCHEMA_NAME},
    ResultConfiguration={"OutputLocation":S3_STAGING_DIR,
                        "EncryptionConfiguration":{"EncryptionOption":"SSE_S3"},
                        },
)

rearc_usa_hospital_beds = download_and_load_query_results(athena_client, s3_client, response)

In [11]:
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM state_abv",
    QueryExecutionContext={"Database":SCHEMA_NAME},
    ResultConfiguration={"OutputLocation":S3_STAGING_DIR,
                        "EncryptionConfiguration":{"EncryptionOption":"SSE_S3"},
                        },
)

state_abv = download_and_load_query_results(athena_client, s3_client, response)

In [12]:
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM states_daily",
    QueryExecutionContext={"Database":SCHEMA_NAME},
    ResultConfiguration={"OutputLocation":S3_STAGING_DIR,
                        "EncryptionConfiguration":{"EncryptionOption":"SSE_S3"},
                        },
)

states_daily = download_and_load_query_results(athena_client, s3_client, response)

In [13]:
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM us_daily",
    QueryExecutionContext={"Database":SCHEMA_NAME},
    ResultConfiguration={"OutputLocation":S3_STAGING_DIR,
                        "EncryptionConfiguration":{"EncryptionOption":"SSE_S3"},
                        },
)

us_daily = download_and_load_query_results(athena_client, s3_client, response)

In [14]:
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM us_total_latest",
    QueryExecutionContext={"Database":SCHEMA_NAME},
    ResultConfiguration={"OutputLocation":S3_STAGING_DIR,
                        "EncryptionConfiguration":{"EncryptionOption":"SSE_S3"},
                        },
)

us_total_latest = download_and_load_query_results(athena_client, s3_client, response)

## Building Dimensional Model

In [16]:
factCovid_1 = enigma_jhud[
    ['fips','province_state','country_region','confirmed','deaths','recovered','active']
]
factCovid_2 = states_daily[
    ['fips','date','positive','negative','hospitalizedcurrently','hospitalized','hospitalizeddischarged']
]
factCovid = pd.merge(factCovid_1,factCovid_2,on='fips',how='inner')

  if not (lk == lk.astype(rk.dtype))[~np.isnan(lk)].all():


In [17]:
dimRegion_1 = enigma_jhud[['fips','province_state','country_region','latitude','longitude']]
dimRegion_2 = nytimes_data_in_usa_us_county[['fips','county','state']]
dimRegion = pd.merge(dimRegion_1,dimRegion_2,on='fips',how='inner')

In [18]:
dimHospital = rearc_usa_hospital_beds[
    ['fips','state_name','latitude','longtitude','hq_address','hospital_name',\
     'hospital_type','hq_city','hq_state']
    ]

In [19]:
dimDate = states_daily[['fips','date']]

In [20]:
dimDate['date'] = pd.to_datetime(dimDate['date'], format='%Y%m%d')

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  dimDate['date'] = pd.to_datetime(dimDate['date'], format='%Y%m%d')


In [21]:
dimDate['year'] = dimDate['date'].dt.year
dimDate['month'] = dimDate['date'].dt.month
dimDate['day_of_week'] = dimDate['date'].dt.dayofweek

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  dimDate['year'] = dimDate['date'].dt.year
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  dimDate['month'] = dimDate['date'].dt.month
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  dimDate['day_of_week'] = dimDate['date'].dt.dayofweek


## Saving to S3

In [22]:
csv_buffer = StringIO()
factCovid.to_csv(csv_buffer)
s3_resource = session.resource('s3')
s3_resource.Object(S3_BUCKET_NAME,'result/factCovid.csv').put(Body=csv_buffer.getvalue())

{'ResponseMetadata': {'RequestId': 'A8YJE9HGRZ4HG5BF',
  'HostId': 'bHx+HGFrIt+O0OVpTJn7CCqQxxg4wb4Tnskn9/msosuEyh9rVDJCnPGynkuwgGad8grPyGA6P8M=',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amz-id-2': 'bHx+HGFrIt+O0OVpTJn7CCqQxxg4wb4Tnskn9/msosuEyh9rVDJCnPGynkuwgGad8grPyGA6P8M=',
   'x-amz-request-id': 'A8YJE9HGRZ4HG5BF',
   'date': 'Wed, 28 Jun 2023 09:50:07 GMT',
   'x-amz-server-side-encryption': 'AES256',
   'etag': '"8ec89299f8b8c8e218cebfa5c9af6b49"',
   'server': 'AmazonS3',
   'content-length': '0'},
  'RetryAttempts': 1},
 'ETag': '"8ec89299f8b8c8e218cebfa5c9af6b49"',
 'ServerSideEncryption': 'AES256'}

In [28]:
csv_buffer.truncate(0)
dimRegion.to_csv(csv_buffer)
s3_resource.Object(S3_BUCKET_NAME,'result/dimRegion.csv').put(Body=csv_buffer.getvalue())

{'ResponseMetadata': {'RequestId': '3KSD1ZDK1DJVF8YR',
  'HostId': '+2W3SqJ3ZqW//CBc8UM0TUMfrW6xD815GIr7n3odAoov8bQM+LRryrNWrmY7ZIqH2MuaVW35kFU=',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amz-id-2': '+2W3SqJ3ZqW//CBc8UM0TUMfrW6xD815GIr7n3odAoov8bQM+LRryrNWrmY7ZIqH2MuaVW35kFU=',
   'x-amz-request-id': '3KSD1ZDK1DJVF8YR',
   'date': 'Wed, 28 Jun 2023 09:55:38 GMT',
   'x-amz-server-side-encryption': 'AES256',
   'etag': '"ac0232f9ecdbf1cbc70875c8fd97fa8c"',
   'server': 'AmazonS3',
   'content-length': '0',
   'connection': 'close'},
  'RetryAttempts': 0},
 'ETag': '"ac0232f9ecdbf1cbc70875c8fd97fa8c"',
 'ServerSideEncryption': 'AES256'}

In [25]:
csv_buffer.truncate(0)
dimHospital.to_csv(csv_buffer)
s3_resource.Object(S3_BUCKET_NAME,'result/dimHospital.csv').put(Body=csv_buffer.getvalue())

{'ResponseMetadata': {'RequestId': '3B1199E66F263GR4',
  'HostId': 'AenRUDAoIoDcJGc/gFWW/sDPfsYnJ47PXG15d7n3D/6HMsiglhnl0BODKUE1LPc7G2hk6LeTzUM=',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amz-id-2': 'AenRUDAoIoDcJGc/gFWW/sDPfsYnJ47PXG15d7n3D/6HMsiglhnl0BODKUE1LPc7G2hk6LeTzUM=',
   'x-amz-request-id': '3B1199E66F263GR4',
   'date': 'Wed, 28 Jun 2023 09:50:54 GMT',
   'x-amz-server-side-encryption': 'AES256',
   'etag': '"be6edb2409a04232f1d25e65b116e5e3"',
   'server': 'AmazonS3',
   'content-length': '0'},
  'RetryAttempts': 0},
 'ETag': '"be6edb2409a04232f1d25e65b116e5e3"',
 'ServerSideEncryption': 'AES256'}

In [27]:
csv_buffer.truncate(0)
dimDate.to_csv(csv_buffer)
s3_resource.Object(S3_BUCKET_NAME,'result/dimDate.csv').put(Body=csv_buffer.getvalue())

{'ResponseMetadata': {'RequestId': 'EFE0DK1FB241PADG',
  'HostId': 'mhgwfUsw9w1/N+zRKH/1m54OiLLp8X0PmnM/3mqlmx/WbEiIw4rO+9hq8QzK5oPMYnjWMAEiusE=',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amz-id-2': 'mhgwfUsw9w1/N+zRKH/1m54OiLLp8X0PmnM/3mqlmx/WbEiIw4rO+9hq8QzK5oPMYnjWMAEiusE=',
   'x-amz-request-id': 'EFE0DK1FB241PADG',
   'date': 'Wed, 28 Jun 2023 09:51:08 GMT',
   'x-amz-server-side-encryption': 'AES256',
   'etag': '"b56d7bf50acaee462b4ab749379ac0d8"',
   'server': 'AmazonS3',
   'content-length': '0'},
  'RetryAttempts': 0},
 'ETag': '"b56d7bf50acaee462b4ab749379ac0d8"',
 'ServerSideEncryption': 'AES256'}

## Extracting schema from dataset

In [29]:
dimDatesql = pd.io.sql.get_schema(dimDate.reset_index(),'dimDate')
print(''.join(dimDatesql))

CREATE TABLE "dimDate" (
"index" INTEGER,
  "fips" INTEGER,
  "date" TIMESTAMP,
  "year" INTEGER,
  "month" INTEGER,
  "day_of_week" INTEGER
)


In [30]:
factCovidsql = pd.io.sql.get_schema(factCovid.reset_index(),'factCovid')
print(''.join(factCovidsql))

CREATE TABLE "factCovid" (
"index" INTEGER,
  "fips" REAL,
  "province_state" TEXT,
  "country_region" TEXT,
  "confirmed" REAL,
  "deaths" REAL,
  "recovered" REAL,
  "active" REAL,
  "date" INTEGER,
  "positive" REAL,
  "negative" REAL,
  "hospitalizedcurrently" REAL,
  "hospitalized" REAL,
  "hospitalizeddischarged" REAL
)


In [31]:
dimRegionsql = pd.io.sql.get_schema(dimRegion.reset_index(),'dimRegion')
print(''.join(dimRegionsql))

CREATE TABLE "dimRegion" (
"index" INTEGER,
  "fips" REAL,
  "province_state" TEXT,
  "country_region" TEXT,
  "latitude" REAL,
  "longitude" REAL,
  "county" TEXT,
  "state" TEXT
)


In [32]:
dimHospitalsql = pd.io.sql.get_schema(dimHospital.reset_index(),'dimHospital')
print(''.join(dimHospitalsql))

CREATE TABLE "dimHospital" (
"index" INTEGER,
  "fips" REAL,
  "state_name" TEXT,
  "latitude" REAL,
  "longtitude" REAL,
  "hq_address" TEXT,
  "hospital_name" TEXT,
  "hospital_type" TEXT,
  "hq_city" TEXT,
  "hq_state" TEXT
)


In [36]:
dimHospitalsql.replace('\n','')

'CREATE TABLE "dimHospital" ("index" INTEGER,  "fips" REAL,  "state_name" TEXT,  "latitude" REAL,  "longtitude" REAL,  "hq_address" TEXT,  "hospital_name" TEXT,  "hospital_type" TEXT,  "hq_city" TEXT,  "hq_state" TEXT)'

## Redshift Connector

In [20]:
## if connect timeout -> enable publicly accessible in network and security of redshift group 
# then edit VPC security group -> add inbound rules for type Redshift and source 0.0.0.0/0
 
import redshift_connector
conn = redshift_connector.connect(
    host='default.693798565576.ap-southeast-1.redshift-serverless.amazonaws.com',
    database='dev',
    port=5439,
    user='admin',
    password='Admin123.'
)

In [21]:
conn.autocommit = True
cursor=redshift_connector.Cursor = conn.cursor()

In [22]:
cursor.execute("""
CREATE TABLE "dimDate" (
"index" INTEGER,
"fips" INTEGER,
"date" TIMESTAMP,
"year" INTEGER,
"month" INTEGER,
"day_of_week" INTEGER
)
""")

<redshift_connector.cursor.Cursor at 0x13c3d9000>

In [25]:
cursor.execute("""
CREATE TABLE "dimHospital" (
"index" INTEGER,
"fips" REAL,
"state_name" TEXT,
"longitutde" REAL,
"latitude" REAL,
"hq_address" TEXT,
"hospital_name" TEXT,
"hospital_type" TEXT,
"hq_city" TEXT,
"hq_state" TEXT
)
""")
               
cursor.execute("""
CREATE TABLE "factCovid" (
"index" INTEGER,
"fips" REAL,
"province_state" TEXT,
"country_region" TEXT,
"confirmed" REAL,
"deaths" REAL,
"recovered" REAL,
"active" REAL,
"date" INTEGER,
"positive" REAL,
"negative" REAL,
"hospitalizedcurrently" REAL,
"hospitalized" REAL,
"hospitalizeddischarged" REAL
)
""")
               
cursor.execute("""
CREATE TABLE "dimRegion" (
"index" INTEGER,
"fips" REAL,
"province_state" TEXT,
"country_region" TEXT,
"latitude" REAL,
"longititude" REAL,
"county" TEXT,
"state" TEXT
)
""")

<redshift_connector.cursor.Cursor at 0x13c3d9000>

## Copy data from S3 to Redshift

In [30]:
cursor.execute("""
copy dimDate from 's3://covid19-project-result/result/dimDate.csv' 
credentials 'aws_iam_role=arn:aws:iam::693798565576:role/redshift' 
delimiter ',' 
region 'ap-southeast-1' 
IGNOREHEADER 1
""")

<redshift_connector.cursor.Cursor at 0x13c3d9000>

In [31]:
cursor.execute("""
copy dimHospital from 's3://covid19-project-result/result/dimHospital.csv' 
credentials 'aws_iam_role=arn:aws:iam::693798565576:role/redshift' 
delimiter ',' 
region 'ap-southeast-1' 
IGNOREHEADER 1
""")
               
cursor.execute("""
copy factCovid from 's3://covid19-project-result/result/factCovid.csv' 
credentials 'aws_iam_role=arn:aws:iam::693798565576:role/redshift' 
delimiter ',' 
region 'ap-southeast-1' 
IGNOREHEADER 1
""")
               
cursor.execute("""
copy dimRegion from 's3://covid19-project-result/result/dimRegion.csv' 
credentials 'aws_iam_role=arn:aws:iam::693798565576:role/redshift' 
delimiter ',' 
region 'ap-southeast-1' 
IGNOREHEADER 1
""")

<redshift_connector.cursor.Cursor at 0x13c3d9000>