In [1]:
import boto3
import pandas as pd
from io import StringIO
import configparser
import time

In [2]:
config=configparser.ConfigParser()
config.read_file(open('cluster.config'))

In [3]:
AWS_KEY = config.get('AWS','KEY')
AWS_SECRET = config.get('AWS','SECRET')
AWS_REGION = "ap-south-1"
SCHEMA_NAME = "covid_19"
S3_STAGING_DIR ="s3://anupriyar-test-bucket/output/"
S3_BUCKET_NAME= "anupriyar-test-bucket"
S3_OUTPUT_DIR= "output"

In [4]:
athena_client = boto3.client("athena",
                            region_name=AWS_REGION,
                            aws_access_key_id=AWS_KEY,
                            aws_secret_access_key=AWS_SECRET)

In [8]:
Dict= {}
def download_and_load_query_results(
    client:boto3.client, query_response: Dict
) -> pd.DataFrame:
    while True:
        try:
            client.get_query_results(
                QueryExecutionId=query_response["QueryExecutionId"]
            )
            break
        except Exception as e:
            if "not yet finished" in str(e):
                time.sleep(0.01)
            else:
                raise e
    temp_file_location: str = "athena_query_results.csv"
    s3_client = boto3.client("s3",
                            aws_access_key_id=AWS_KEY,
                            aws_secret_access_key=AWS_SECRET,
                            region_name=AWS_REGION,                        
    )
    s3_client.download_file(
        S3_BUCKET_NAME,
        f"{S3_OUTPUT_DIR}/{query_response['QueryExecutionId']}.csv",
        temp_file_location,
    )
    
    return pd.read_csv(temp_file_location)

In [9]:
response = athena_client.start_query_execution(
    QueryString ="SELECT * FROM enigma_jhu",
    QueryExecutionContext = {"Database": SCHEMA_NAME},
    ResultConfiguration={
        "OutputLocation":S3_STAGING_DIR,
        "EncryptionConfiguration":{"EncryptionOption":"SSE_S3"},
    },
)

In [77]:
response

{'QueryExecutionId': '6ee907fc-7941-4f3d-812b-ffc5ecc35806',
 'ResponseMetadata': {'RequestId': '91f6b55c-410c-403b-a91c-2740b2e0b6bf',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'content-type': 'application/x-amz-json-1.1',
   'date': 'Sun, 30 Jan 2022 19:43:05 GMT',
   'x-amzn-requestid': '91f6b55c-410c-403b-a91c-2740b2e0b6bf',
   'content-length': '59',
   'connection': 'keep-alive'},
  'RetryAttempts': 0}}

In [10]:
enigma_jhu = download_and_load_query_results(athena_client, response)

In [11]:
enigma_jhu.head()

Unnamed: 0,fips,admin2,province_state,country_region,last_update,latitude,longitude,confirmed,deaths,recovered,active,combined_key,partition_0
0,,,Anhui,China,2020-01-22T17:00:00,31.826,117.226,1.0,,,,"""Anhui",csv
1,,,Beijing,China,2020-01-22T17:00:00,40.182,116.414,14.0,,,,"""Beijing",csv
2,,,Chongqing,China,2020-01-22T17:00:00,30.057,107.874,6.0,,,,"""Chongqing",csv
3,,,Fujian,China,2020-01-22T17:00:00,26.079,117.987,1.0,,,,"""Fujian",csv
4,,,Gansu,China,2020-01-22T17:00:00,36.061,103.834,,,,,"""Gansu",csv


In [12]:
response = athena_client.start_query_execution(
    QueryString ="SELECT * FROM countrycode",
    QueryExecutionContext = {"Database": SCHEMA_NAME},
    ResultConfiguration={
        "OutputLocation":S3_STAGING_DIR,
        "EncryptionConfiguration":{"EncryptionOption":"SSE_S3"},
    },
)

In [13]:
country_code = download_and_load_query_results(athena_client, response)

In [14]:
response = athena_client.start_query_execution(
    QueryString ="SELECT * FROM countypopulation",
    QueryExecutionContext = {"Database": SCHEMA_NAME},
    ResultConfiguration={
        "OutputLocation":S3_STAGING_DIR,
        "EncryptionConfiguration":{"EncryptionOption":"SSE_S3"},
    },
)

countypopulation = download_and_load_query_results(athena_client, response)

In [15]:
response = athena_client.start_query_execution(
    QueryString ="SELECT * FROM rearc_usa_hospital_beds",
    QueryExecutionContext = {"Database": SCHEMA_NAME},
    ResultConfiguration={
        "OutputLocation":S3_STAGING_DIR,
        "EncryptionConfiguration":{"EncryptionOption":"SSE_S3"},
    },
)

rearc_usa_hospital_beds = download_and_load_query_results(athena_client, response)

In [16]:
response = athena_client.start_query_execution(
    QueryString ="SELECT * FROM states_daily",
    QueryExecutionContext = {"Database": SCHEMA_NAME},
    ResultConfiguration={
        "OutputLocation":S3_STAGING_DIR,
        "EncryptionConfiguration":{"EncryptionOption":"SSE_S3"},
    },
)

states_daily = download_and_load_query_results(athena_client, response)

In [17]:
response = athena_client.start_query_execution(
    QueryString ="SELECT * FROM us_county",
    QueryExecutionContext = {"Database": SCHEMA_NAME},
    ResultConfiguration={
        "OutputLocation":S3_STAGING_DIR,
        "EncryptionConfiguration":{"EncryptionOption":"SSE_S3"},
    },
)

us_county = download_and_load_query_results(athena_client, response)

In [18]:
response = athena_client.start_query_execution(
    QueryString ="SELECT * FROM us_daily",
    QueryExecutionContext = {"Database": SCHEMA_NAME},
    ResultConfiguration={
        "OutputLocation":S3_STAGING_DIR,
        "EncryptionConfiguration":{"EncryptionOption":"SSE_S3"},
    },
)

us_daily = download_and_load_query_results(athena_client, response)

In [19]:
response = athena_client.start_query_execution(
    QueryString ="SELECT * FROM us_states",
    QueryExecutionContext = {"Database": SCHEMA_NAME},
    ResultConfiguration={
        "OutputLocation":S3_STAGING_DIR,
        "EncryptionConfiguration":{"EncryptionOption":"SSE_S3"},
    },
)

us_states = download_and_load_query_results(athena_client, response)


In [20]:
us_states.head()

Unnamed: 0,date,state,fips,cases,deaths
0,2020-01-21,Washington,53,1,0
1,2020-01-22,Washington,53,1,0
2,2020-01-23,Washington,53,1,0
3,2020-01-24,Illinois,17,1,0
4,2020-01-24,Washington,53,1,0


In [21]:
response = athena_client.start_query_execution(
    QueryString ="SELECT * FROM state_abv",
    QueryExecutionContext = {"Database": SCHEMA_NAME},
    ResultConfiguration={
        "OutputLocation":S3_STAGING_DIR,
        "EncryptionConfiguration":{"EncryptionOption":"SSE_S3"},
    },
)

state_abv = download_and_load_query_results(athena_client, response)

In [22]:
state_abv.head()

Unnamed: 0,col0,col1
0,State,Abbreviation
1,Alabama,AL
2,Alaska,AK
3,Arizona,AZ
4,Arkansas,AR


In [23]:
new_header = state_abv.iloc[0]

In [24]:
new_header

col0           State
col1    Abbreviation
Name: 0, dtype: object

In [25]:
state_abv = state_abv[1:]

In [26]:
state_abv.head()

Unnamed: 0,col0,col1
1,Alabama,AL
2,Alaska,AK
3,Arizona,AZ
4,Arkansas,AR
5,California,CA


In [27]:
state_abv.columns = new_header

In [28]:
state_abv.head()

Unnamed: 0,State,Abbreviation
1,Alabama,AL
2,Alaska,AK
3,Arizona,AZ
4,Arkansas,AR
5,California,CA


In [29]:
covid_fact1 = enigma_jhu[['fips','province_state','country_region','latitude','longitude','deaths','confirmed','recovered','active']]
covid_fact2= states_daily[['fips','date','positive','negative','hospitalized','hospitalizedcurrently','hospitalizeddischarged']]
covid_fact = pd.merge(covid_fact1, covid_fact2, on='fips', how="inner")

In [30]:
covid_fact.head()

Unnamed: 0,fips,province_state,country_region,latitude,longitude,deaths,confirmed,recovered,active,date,positive,negative,hospitalized,hospitalizedcurrently,hospitalizeddischarged
0,72.0,Puerto Rico,US,18.221,-66.59,0.0,3.0,0.0,,20210307,101327.0,305972.0,,147.0,
1,72.0,Puerto Rico,US,18.221,-66.59,0.0,3.0,0.0,,20210306,101327.0,305972.0,,147.0,
2,72.0,Puerto Rico,US,18.221,-66.59,0.0,3.0,0.0,,20210305,101066.0,305972.0,,136.0,
3,72.0,Puerto Rico,US,18.221,-66.59,0.0,3.0,0.0,,20210304,100867.0,305972.0,,171.0,
4,72.0,Puerto Rico,US,18.221,-66.59,0.0,3.0,0.0,,20210303,100765.0,305972.0,,169.0,


In [34]:
region_dim1 = enigma_jhu[['fips','province_state','country_region','latitude','longitude']]
region_dim2 = us_county[['fips','county','state']]
region_dim = pd.merge(region_dim1, region_dim2, on='fips',how="inner")

In [35]:
date_dim =states_daily[['fips','date']]

In [36]:
date_dim.head()

Unnamed: 0,fips,date
0,2,20210307
1,1,20210307
2,5,20210307
3,60,20210307
4,4,20210307


In [38]:
date_dim['date'] =pd.to_datetime(date_dim['date'], format='%Y%m%d')

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  """Entry point for launching an IPython kernel.


In [39]:
date_dim.head()

Unnamed: 0,fips,date
0,2,2021-03-07
1,1,2021-03-07
2,5,2021-03-07
3,60,2021-03-07
4,4,2021-03-07


In [40]:
date_dim['year'] = date_dim['date'].dt.year

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  """Entry point for launching an IPython kernel.


In [41]:
date_dim['month'] = date_dim['date'].dt.month
date_dim['week_day'] = date_dim['date'].dt.dayofweek

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  """Entry point for launching an IPython kernel.
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  


In [43]:
date_dim.head()

Unnamed: 0,fips,date,year,month,week_day
0,2,2021-03-07,2021,3,6
1,1,2021-03-07,2021,3,6
2,5,2021-03-07,2021,3,6
3,60,2021-03-07,2021,3,6
4,4,2021-03-07,2021,3,6


In [44]:
region_dim.head()

Unnamed: 0,fips,province_state,country_region,latitude,longitude,county,state
0,,Anhui,China,31.826,117.226,New York City,New York
1,,Anhui,China,31.826,117.226,Unknown,Rhode Island
2,,Anhui,China,31.826,117.226,New York City,New York
3,,Anhui,China,31.826,117.226,Unknown,Rhode Island
4,,Anhui,China,31.826,117.226,New York City,New York


In [60]:
hospital_dim2 = rearc_usa_hospital_beds[['fips','state_name','hospital_name','hospital_type','hq_address','hq_city','hq_state','latitude','longtitude']]

In [50]:
hospital_dim.head()

Unnamed: 0,fips,province_state,hospital_name,hospital_type,hq_address,hq_city,hq_state,latitude,longtitude
0,,Anhui,LBJ Tropical Medical Center,Short Term Acute Care Hospital,Fagaalu Village,Pago Pago,AS,-14.29019,-170.685773
1,,Anhui,Guam Memorial Hospital Authority,Short Term Acute Care Hospital,850 Gov Carlos G Camacho Rd,Tamuning,GU,13.4976,144.7761
2,,Anhui,Commonwealth Health Center,Short Term Acute Care Hospital,1 Lower Navy Hill Rd,Saipan,MP,15.210586,145.72417
3,,Anhui,Gov Juan F Luis Hospital & Medical Center,Short Term Acute Care Hospital,4007 Estate Diamond Ruby Christiansted,St Croix,VI,17.733677,-64.751576
4,,Anhui,Schneider Regional Medical Center,Short Term Acute Care Hospital,9048 Sugar Est,St Thomas,VI,18.340328,-64.914671


In [51]:
bucket = 'anupriya-covid-project-bucket'

## Storing Facts and Dimensional tables in s3 bucket 

In [56]:
s3_resource = boto3.resource('s3')
s3_resource.Object(bucket,'output/covid_fact.csv').put(Body=csv_buffer.getvalue())

{'ResponseMetadata': {'RequestId': '7AFJ8BG4RHP3NWZS',
  'HostId': 'I8MyJikIHro3JwzAbgOH3zK9XQZ3Chu6KOP5wuRHieqhdqDYmC9BUBmbjh3YPwjpxgV6y2ofrqs=',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amz-id-2': 'I8MyJikIHro3JwzAbgOH3zK9XQZ3Chu6KOP5wuRHieqhdqDYmC9BUBmbjh3YPwjpxgV6y2ofrqs=',
   'x-amz-request-id': '7AFJ8BG4RHP3NWZS',
   'date': 'Mon, 31 Jan 2022 07:21:47 GMT',
   'etag': '"54cdb4878f07d85d9bbc269dd2483244"',
   'server': 'AmazonS3',
   'content-length': '0'},
  'RetryAttempts': 0},
 'ETag': '"54cdb4878f07d85d9bbc269dd2483244"'}

In [57]:
csv_buffer = StringIO()
date_dim.to_csv(csv_buffer)
s3_resource.Object(bucket,'output/date_dim.csv').put(Body=csv_buffer.getvalue())

{'ResponseMetadata': {'RequestId': '2MT204R49AT3XXPB',
  'HostId': 'ZWU5E/q7nj+nydK+0JwfBFk0yWCD+3u9iJDhiD/zQPBoPAn11PWcwrXWKiKNAhZ1EmKpqQkuLRo=',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amz-id-2': 'ZWU5E/q7nj+nydK+0JwfBFk0yWCD+3u9iJDhiD/zQPBoPAn11PWcwrXWKiKNAhZ1EmKpqQkuLRo=',
   'x-amz-request-id': '2MT204R49AT3XXPB',
   'date': 'Mon, 31 Jan 2022 07:22:57 GMT',
   'etag': '"d3fe70cf82f87f493bb23acb3e32ef97"',
   'server': 'AmazonS3',
   'content-length': '0'},
  'RetryAttempts': 0},
 'ETag': '"d3fe70cf82f87f493bb23acb3e32ef97"'}

In [61]:
csv_buffer = StringIO()
hospital_dim.to_csv(csv_buffer)
s3_resource.Object(bucket,'output/hospital_dim.csv').put(Body=csv_buffer.getvalue())

{'ResponseMetadata': {'RequestId': 'W9XTGNSXYFPZX0F3',
  'HostId': 'GLuT827WKIgitnUkwoeaycqDUfPbD0b3FuH0nDl+OS1rRGV3d4l1gva/2fYJ++4jBHrMFQFmk8g=',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amz-id-2': 'GLuT827WKIgitnUkwoeaycqDUfPbD0b3FuH0nDl+OS1rRGV3d4l1gva/2fYJ++4jBHrMFQFmk8g=',
   'x-amz-request-id': 'W9XTGNSXYFPZX0F3',
   'date': 'Mon, 31 Jan 2022 08:15:55 GMT',
   'etag': '"93305e91471742e79161156d0ae38e3b"',
   'server': 'AmazonS3',
   'content-length': '0'},
  'RetryAttempts': 0},
 'ETag': '"93305e91471742e79161156d0ae38e3b"'}

In [59]:
csv_buffer = StringIO()
region_dim.to_csv(csv_buffer)
s3_resource.Object(bucket,'output/region_dim.csv').put(Body=csv_buffer.getvalue())

{'ResponseMetadata': {'RequestId': 'R553ZPMNSTBM4M4S',
  'HostId': 'WBAlh9hh7LP7LrAUT1lnoX6UCLtOiNXdrrhCNAtvhVjBEX3KIsLadVfsEJcDOscNkcaIAsJwntM=',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amz-id-2': 'WBAlh9hh7LP7LrAUT1lnoX6UCLtOiNXdrrhCNAtvhVjBEX3KIsLadVfsEJcDOscNkcaIAsJwntM=',
   'x-amz-request-id': 'R553ZPMNSTBM4M4S',
   'date': 'Mon, 31 Jan 2022 07:33:47 GMT',
   'etag': '"4f3519a476500a371cb37f3b9f7dc0f9"',
   'server': 'AmazonS3',
   'content-length': '0',
   'connection': 'close'},
  'RetryAttempts': 0},
 'ETag': '"4f3519a476500a371cb37f3b9f7dc0f9"'}

In [65]:
dateDimSql = pd.io.sql.get_schema(date_dim.reset_index(), 'date_dim')
print(dateDimSql)

CREATE TABLE "date_dim" (
"index" INTEGER,
  "fips" INTEGER,
  "date" TIMESTAMP,
  "year" INTEGER,
  "month" INTEGER,
  "week_day" INTEGER
)


In [68]:
regionDimSql = pd.io.sql.get_schema(region_dim.reset_index(), 'region_dim')
print(regionDimSql)

CREATE TABLE "region_dim" (
"index" INTEGER,
  "fips" REAL,
  "province_state" TEXT,
  "country_region" TEXT,
  "latitude" REAL,
  "longitude" REAL,
  "county" TEXT,
  "state" TEXT
)


In [69]:
covidFactSql = pd.io.sql.get_schema(covid_fact.reset_index(), 'covid_fact')
print(covidFactSql)

CREATE TABLE "covid_fact" (
"index" INTEGER,
  "fips" REAL,
  "province_state" TEXT,
  "country_region" TEXT,
  "latitude" REAL,
  "longitude" REAL,
  "deaths" REAL,
  "confirmed" REAL,
  "recovered" REAL,
  "active" REAL,
  "date" INTEGER,
  "positive" REAL,
  "negative" REAL,
  "hospitalized" REAL,
  "hospitalizedcurrently" REAL,
  "hospitalizeddischarged" REAL
)


In [70]:
hospitalDimSql = pd.io.sql.get_schema(hospital_dim.reset_index(), 'hospital_dim')
print(hospitalDimSql)

CREATE TABLE "hospital_dim" (
"index" INTEGER,
  "fips" REAL,
  "province_state" TEXT,
  "hospital_name" TEXT,
  "hospital_type" TEXT,
  "hq_address" TEXT,
  "hq_city" TEXT,
  "hq_state" TEXT,
  "latitude" REAL,
  "longtitude" REAL
)


In [71]:
pip install redshift_connector

Collecting redshift_connector
  Downloading https://files.pythonhosted.org/packages/4a/d3/581137322d29984663aeb0d518ca0c3fe1cb0bb0918f187bdc75a62bb28e/redshift_connector-2.0.903-py3-none-any.whl (96kB)
Collecting pytz<2021.9,>=2020.1 (from redshift_connector)
  Downloading https://files.pythonhosted.org/packages/d3/e3/d9f046b5d1c94a3aeab15f1f867aa414f8ee9d196fae6865f1d6a0ee1a0b/pytz-2021.3-py2.py3-none-any.whl (503kB)
Collecting scramp<1.5.0,>=1.2.0 (from redshift_connector)
  Downloading https://files.pythonhosted.org/packages/27/31/80bfb02ba2daa9a0ca66f82650c411f1a2b21ce85164408f57e99aab4e4e/scramp-1.4.1-py3-none-any.whl
Collecting requests<2.27.2,>=2.23.0 (from redshift_connector)
  Downloading https://files.pythonhosted.org/packages/2d/61/08076519c80041bc0ffa1a8af0cbd3bf3e2b62af10435d269a9d0f40564d/requests-2.27.1-py2.py3-none-any.whl (63kB)
Collecting lxml>=4.6.5 (from redshift_connector)
  Downloading https://files.pythonhosted.org/packages/1d/b2/3ede6607742c5667f5bbf43abf159ff27

In [72]:
import redshift_connector as rc

In [95]:
conn = rc.connect(host='covi-dwh-cluster.cpnjdn2pqkpb.ap-south-1.redshift.amazonaws.com',
                 database='myfirstdb',
                  user='awsuser',
                  password='Passw0rd')

In [97]:
conn.autocommit=True

In [96]:
cursor=rc.Cursor = conn.cursor()

In [89]:
cursor.execute("""
CREATE TABLE "date_dim" (
"index" INTEGER,
  "fips" INTEGER,
  "date" TIMESTAMP,
  "year" INTEGER,
  "month" INTEGER,
  "week_day" INTEGER
)
""")

<redshift_connector.cursor.Cursor at 0x1cb3b771e08>

In [90]:
cursor.execute(hospitalDimSql)

<redshift_connector.cursor.Cursor at 0x1cb3b771e08>

In [91]:
cursor.execute(regionDimSql)

<redshift_connector.cursor.Cursor at 0x1cb3b771e08>

In [92]:
cursor.execute(covidFactSql)

<redshift_connector.cursor.Cursor at 0x1cb3b771e08>

In [98]:
cursor.execute("""
copy date_dim from 's3://anupriya-covid-project-bucket/output/date_dim.csv'
credentials 'aws_iam_role=arn:aws:iam::898023266264:role/redshift-s3-access'
delimiter ','
region 'ap-south-1'
IGNOREHEADER 1
""")

<redshift_connector.cursor.Cursor at 0x1cb37d3ee08>

In [100]:
cursor.execute("""
copy region_dim from 's3://anupriya-covid-project-bucket/output/region_dim.csv'
credentials 'aws_iam_role=arn:aws:iam::898023266264:role/redshift-s3-access'
delimiter ','
region 'ap-south-1'
IGNOREHEADER 1
""")

<redshift_connector.cursor.Cursor at 0x1cb37d3ee08>

In [101]:
cursor.execute("""
copy hospital_dim from 's3://anupriya-covid-project-bucket/output/hospital_dim.csv'
credentials 'aws_iam_role=arn:aws:iam::898023266264:role/redshift-s3-access'
delimiter ','
region 'ap-south-1'
IGNOREHEADER 1
""")

<redshift_connector.cursor.Cursor at 0x1cb37d3ee08>

In [102]:
cursor.execute("""
copy covid_fact from 's3://anupriya-covid-project-bucket/output/covid_fact.csv'
credentials 'aws_iam_role=arn:aws:iam::898023266264:role/redshift-s3-access'
delimiter ','
region 'ap-south-1'
IGNOREHEADER 1
""")

<redshift_connector.cursor.Cursor at 0x1cb37d3ee08>