In [1]:
import boto3
import pandas as pd
from io import StringIO

In [2]:
AWS_ACCESS_KEY = ""
AWS_SECRET_KEY = ""
AWS_REGION = ""
SCHEMA_NAME = ""
S3_STAGING_DIR = ""
S3_BUCKET_NAME = "covid-project-athena-output"
S3_OUTPUT_DIRECTORY = "output"

In [3]:
athena_client = boto3.client(
    "athena",
    aws_access_key_id = AWS_ACCESS_KEY,
    aws_secret_access_key = AWS_SECRET_KEY,
    region_name = AWS_REGION,
)

In [4]:
#Gets the data from athena and converts it into pandas dataframe
Dict = {}
def download_and_load_query_results(
    client: boto3.client, query_response: Dict
) -> pd.DataFrame:
    while True:
        try:
            client.get_query_results(QueryExecutionId=query_response["QueryExecutionId"])
            break
        except Exception as err:
            if "not yet finished" in str(err):
                time.sleep(0.01)
            else:
                raise err
    temp_file_location: str = "athena_query_results.csv"
    s3_client = boto3.client("s3",
                            aws_access_key_id = AWS_ACCESS_KEY,
                            aws_secret_access_key = AWS_SECRET_KEY,
                            region_name = AWS_REGION,)
    s3_client.download_file(S3_BUCKET_NAME,
                           f"{S3_OUTPUT_DIRECTORY}/{query_response['QueryExecutionId']}.csv",
                           temp_file_location,)
    return pd.read_csv(temp_file_location)

In [5]:
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM enigma_jhud",
    QueryExecutionContext={"Database":SCHEMA_NAME},
    ResultConfiguration={"OutputLocation":S3_STAGING_DIR,
                        "EncryptionConfiguration":{"EncryptionOption":"SSE_S3"},
                        },
)

In [6]:
enigma_jhud = download_and_load_query_results(athena_client, response)

In [7]:
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM nytimes_data_in_usa_us_county",
    QueryExecutionContext={"Database":SCHEMA_NAME},
    ResultConfiguration={"OutputLocation":S3_STAGING_DIR,
                        "EncryptionConfiguration":{"EncryptionOption":"SSE_S3"},
                        },
)


In [8]:
nytimes_data_in_usa_us_county = download_and_load_query_results(athena_client, response)

In [9]:
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM nytimes_data_in_usaus_states",
    QueryExecutionContext={"Database":SCHEMA_NAME},
    ResultConfiguration={"OutputLocation":S3_STAGING_DIR,
                        "EncryptionConfiguration":{"EncryptionOption":"SSE_S3"},
                        },
)

In [10]:
nytimes_data_in_usaus_states = download_and_load_query_results(athena_client, response)

In [11]:
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM static_data_countypopulation",
    QueryExecutionContext={"Database":SCHEMA_NAME},
    ResultConfiguration={"OutputLocation":S3_STAGING_DIR,
                        "EncryptionConfiguration":{"EncryptionOption":"SSE_S3"},
                        },
)

In [12]:
countypopulation = download_and_load_query_results(athena_client, response)

In [13]:
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM rearc_usa_hospital_beds",
    QueryExecutionContext={"Database":SCHEMA_NAME},
    ResultConfiguration={"OutputLocation":S3_STAGING_DIR,
                        "EncryptionConfiguration":{"EncryptionOption":"SSE_S3"},
                        },
)

In [14]:
rearc_usa_hospital_beds = download_and_load_query_results(athena_client, response)

In [17]:
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM static_data_state_abv",
    QueryExecutionContext={"Database":SCHEMA_NAME},
    ResultConfiguration={"OutputLocation":S3_STAGING_DIR,
                        "EncryptionConfiguration":{"EncryptionOption":"SSE_S3"},
                        },
)

In [18]:
state_abv = download_and_load_query_results(athena_client, response)

In [19]:
state_abv.head()

Unnamed: 0,col0,col1
0,State,Abbreviation
1,Alabama,AL
2,Alaska,AK
3,Arizona,AZ
4,Arkansas,AR


In [20]:
new_header = state_abv.iloc[0]

In [21]:
state_abv = state_abv[1:]

In [22]:
state_abv.columns = new_header

In [23]:
state_abv.head()

Unnamed: 0,State,Abbreviation
1,Alabama,AL
2,Alaska,AK
3,Arizona,AZ
4,Arkansas,AR
5,California,CA


In [24]:
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM rearc_covid_19_testing_data_states_daily",
    QueryExecutionContext={"Database":SCHEMA_NAME},
    ResultConfiguration={"OutputLocation":S3_STAGING_DIR,
                        "EncryptionConfiguration":{"EncryptionOption":"SSE_S3"},
                        },
)

In [25]:
states_daily = download_and_load_query_results(athena_client, response)

In [26]:
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM rearc_covid_19_testing_data_us_daily",
    QueryExecutionContext={"Database":SCHEMA_NAME},
    ResultConfiguration={"OutputLocation":S3_STAGING_DIR,
                        "EncryptionConfiguration":{"EncryptionOption":"SSE_S3"},
                        },
)

In [27]:
us_daily = download_and_load_query_results(athena_client, response)

In [28]:
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM rearc_covid_19_testing_data_us_total_latest",
    QueryExecutionContext={"Database":SCHEMA_NAME},
    ResultConfiguration={"OutputLocation":S3_STAGING_DIR,
                        "EncryptionConfiguration":{"EncryptionOption":"SSE_S3"},
                        },
)

In [29]:
us_total_latest = download_and_load_query_results(athena_client, response)

In [30]:
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM static_data_countrycode",
    QueryExecutionContext={"Database":SCHEMA_NAME},
    ResultConfiguration={"OutputLocation":S3_STAGING_DIR,
                        "EncryptionConfiguration":{"EncryptionOption":"SSE_S3"},
                        },
)

In [31]:
country_code = download_and_load_query_results(athena_client, response)

In [32]:
## Dimensional Database ##

In [33]:
factCovid_1 = enigma_jhud[['fips','province_state','country_region','confirmed','deaths','recovered','active']]
factCovid_2 = states_daily[['fips','date','positive','negative','hospitalizedcurrently','hospitalized','hospitalizeddischarged']]
factCovid = pd.merge(factCovid_1,factCovid_2,on='fips',how='inner')

In [34]:
dimRegion_1 = enigma_jhud[['fips','province_state','country_region','latitude','longitude']]
dimRegion_2 = nytimes_data_in_usa_us_county[['fips','county','state']]
dimRegion = pd.merge(dimRegion_1,dimRegion_2,on='fips',how='inner')

In [35]:
dimHospital = rearc_usa_hospital_beds[['fips','state_name','latitude','longtitude','hq_address','hospital_name','hospital_type','hq_city','hq_state']]

In [36]:
dimDate = states_daily[['fips','date']]

In [37]:
dimDate['date'] = pd.to_datetime(dimDate['date'], format='%Y%m%d')

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  dimDate['date'] = pd.to_datetime(dimDate['date'], format='%Y%m%d')


In [38]:
dimDate['year'] = dimDate['date'].dt.year
dimDate['month'] = dimDate['date'].dt.month
dimDate['day_of_week'] = dimDate['date'].dt.dayofweek

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  dimDate['year'] = dimDate['date'].dt.year
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  dimDate['month'] = dimDate['date'].dt.month
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  dimDate['day_of_week'] = dimDate['date'].dt.dayofweek


In [39]:
dimDate.head()

Unnamed: 0,fips,date,year,month,day_of_week
0,2,2021-03-07,2021,3,6
1,1,2021-03-07,2021,3,6
2,5,2021-03-07,2021,3,6
3,60,2021-03-07,2021,3,6
4,4,2021-03-07,2021,3,6


In [40]:
bucket = ''

In [41]:
csv_buffer = StringIO()
factCovid.to_csv(csv_buffer)
s3_resource = boto3.resource('s3')
s3_resource.Object(bucket,'output/factCovid.csv').put(Body=csv_buffer.getvalue())

{'ResponseMetadata': {'RequestId': '03AKNMXCPF9KDSY5',
  'HostId': '873PWa5ft6Jm3jKXofdkU7gmHHb/DYCgEFSmtWfY8fc46JeyxfGuOBKqdIUzg2J5omujQlpkMlQ=',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amz-id-2': '873PWa5ft6Jm3jKXofdkU7gmHHb/DYCgEFSmtWfY8fc46JeyxfGuOBKqdIUzg2J5omujQlpkMlQ=',
   'x-amz-request-id': '03AKNMXCPF9KDSY5',
   'date': 'Fri, 23 Jun 2023 20:25:40 GMT',
   'x-amz-server-side-encryption': 'AES256',
   'etag': '"6372f86ce96858dcb69570e9500bc3c7"',
   'server': 'AmazonS3',
   'content-length': '0'},
  'RetryAttempts': 1},
 'ETag': '"6372f86ce96858dcb69570e9500bc3c7"',
 'ServerSideEncryption': 'AES256'}

In [42]:
csv_buffer = StringIO()
dimHospital.to_csv(csv_buffer)
s3_resource = boto3.resource('s3')
s3_resource.Object(bucket,'output/dimHospital.csv').put(Body=csv_buffer.getvalue())

{'ResponseMetadata': {'RequestId': 'CFTF5WVRKTSWVD05',
  'HostId': 'n5ivUKXZqzvafEe+xmX1U2B0V+sD5PwHgLd3sr519PXg6/BzGRbrUOr7W9egW7QR2aKoTjh6gqo=',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amz-id-2': 'n5ivUKXZqzvafEe+xmX1U2B0V+sD5PwHgLd3sr519PXg6/BzGRbrUOr7W9egW7QR2aKoTjh6gqo=',
   'x-amz-request-id': 'CFTF5WVRKTSWVD05',
   'date': 'Fri, 23 Jun 2023 20:25:58 GMT',
   'x-amz-server-side-encryption': 'AES256',
   'etag': '"a26c4e35d128fe6f64955ba9aac1d221"',
   'server': 'AmazonS3',
   'content-length': '0'},
  'RetryAttempts': 1},
 'ETag': '"a26c4e35d128fe6f64955ba9aac1d221"',
 'ServerSideEncryption': 'AES256'}

In [43]:
csv_buffer = StringIO()
dimDate.to_csv(csv_buffer)
s3_resource = boto3.resource('s3')
s3_resource.Object(bucket,'output/dimDate.csv').put(Body=csv_buffer.getvalue())

{'ResponseMetadata': {'RequestId': 'MQ2CSK6DKYFCSJ4G',
  'HostId': 'w8rWgKpPUSsdnS86ZHdm3JnT46vtPeqqKtWgOvtpzrSoKns4thVflaTjkYl1DdW+Dykn9ZtTIPVAY0OBVgrsGw==',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amz-id-2': 'w8rWgKpPUSsdnS86ZHdm3JnT46vtPeqqKtWgOvtpzrSoKns4thVflaTjkYl1DdW+Dykn9ZtTIPVAY0OBVgrsGw==',
   'x-amz-request-id': 'MQ2CSK6DKYFCSJ4G',
   'date': 'Fri, 23 Jun 2023 20:26:10 GMT',
   'x-amz-server-side-encryption': 'AES256',
   'etag': '"d6eae31844ae905ae6342f60fffb5dc6"',
   'server': 'AmazonS3',
   'content-length': '0'},
  'RetryAttempts': 1},
 'ETag': '"d6eae31844ae905ae6342f60fffb5dc6"',
 'ServerSideEncryption': 'AES256'}

In [44]:
csv_buffer = StringIO()
dimRegion.to_csv(csv_buffer)
s3_resource = boto3.resource('s3')
s3_resource.Object(bucket,'output/dimRegion.csv').put(Body=csv_buffer.getvalue())

{'ResponseMetadata': {'RequestId': '0VWGP93TENVD77W1',
  'HostId': 'jkyfSQZtnWnWdrMT0EBQHJOCA9tpzQLbPBY7oalo9KpesFgOSuyCfOLCFSsw85Rc6tVJ+OpomE8=',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amz-id-2': 'jkyfSQZtnWnWdrMT0EBQHJOCA9tpzQLbPBY7oalo9KpesFgOSuyCfOLCFSsw85Rc6tVJ+OpomE8=',
   'x-amz-request-id': '0VWGP93TENVD77W1',
   'date': 'Fri, 23 Jun 2023 20:50:04 GMT',
   'x-amz-server-side-encryption': 'AES256',
   'etag': '"245ccb76e9ea4c1763ece831bc3922a4"',
   'server': 'AmazonS3',
   'content-length': '0',
   'connection': 'close'},
  'RetryAttempts': 2},
 'ETag': '"245ccb76e9ea4c1763ece831bc3922a4"',
 'ServerSideEncryption': 'AES256'}

In [None]:
#Extracting schema from dataset

In [45]:
dimDatesql = pd.io.sql.get_schema(dimDate.reset_index(),'dimDate')
print(''.join(dimDatesql))

CREATE TABLE "dimDate" (
"index" INTEGER,
  "fips" INTEGER,
  "date" TIMESTAMP,
  "year" INTEGER,
  "month" INTEGER,
  "day_of_week" INTEGER
)


In [46]:
factCovidsql = pd.io.sql.get_schema(factCovid.reset_index(),'factCovid')
print(''.join(factCovidsql))

CREATE TABLE "factCovid" (
"index" INTEGER,
  "fips" REAL,
  "province_state" TEXT,
  "country_region" TEXT,
  "confirmed" REAL,
  "deaths" REAL,
  "recovered" REAL,
  "active" REAL,
  "date" INTEGER,
  "positive" REAL,
  "negative" REAL,
  "hospitalizedcurrently" REAL,
  "hospitalized" REAL,
  "hospitalizeddischarged" REAL
)


In [47]:
dimRegionsql = pd.io.sql.get_schema(dimRegion.reset_index(),'dimRegion')
print(''.join(dimRegionsql))

CREATE TABLE "dimRegion" (
"index" INTEGER,
  "fips" REAL,
  "province_state" TEXT,
  "country_region" TEXT,
  "latitude" REAL,
  "longitude" REAL,
  "county" TEXT,
  "state" TEXT
)


In [48]:
dimHospitalsql = pd.io.sql.get_schema(dimHospital.reset_index(),'dimHospital')
print(''.join(dimHospitalsql))

CREATE TABLE "dimHospital" (
"index" INTEGER,
  "fips" REAL,
  "state_name" TEXT,
  "latitude" REAL,
  "longtitude" REAL,
  "hq_address" TEXT,
  "hospital_name" TEXT,
  "hospital_type" TEXT,
  "hq_city" TEXT,
  "hq_state" TEXT
)


In [49]:
#Redshift Connector

In [52]:
import redshift_connector

In [57]:
conn = redshift_connector.connect(
    host='',
    database= '',
    port = 0000,
    user='',
    password=''
)

In [58]:
conn.autocommit = True

In [59]:
cursor=redshift_connector.Cursor = conn.cursor()

In [62]:
cursor.execute("""
CREATE TABLE "dimDate" (
"index" INTEGER,
"fips" INTEGER,
"date" TIMESTAMP,
"year" INTEGER,
"month" INTEGER,
"day_of_week" INTEGER
)
""")

<redshift_connector.cursor.Cursor at 0x205df2dcd90>

In [64]:
cursor.execute("""
CREATE TABLE "dimHospital" (
"index" INTEGER,
"fips" REAL,
"state_name" TEXT,
"longitutde" REAL,
"latitude" REAL,
"hq_address" TEXT,
"hospital_name" TEXT,
"hospital_type" TEXT,
"hq_city" TEXT,
"hq_state" TEXT
)
""")

cursor.execute("""
CREATE TABLE "factCovid" (
"index" INTEGER,
"fips" REAL,
"province_state" TEXT,
"country_region" TEXT,
"confirmed" REAL,
"deaths" REAL,
"recovered" REAL,
"active" REAL,
"date" INTEGER,
"positive" REAL,
"negative" REAL,
"hospitalizedcurrently" REAL,
"hospitalized" REAL,
"hospitalizeddischarged" REAL
)
""")

cursor.execute("""
CREATE TABLE "dimRegion" (
"index" INTEGER,
"fips" REAL,
"province_state" TEXT,
"country_region" TEXT,
"latitude" REAL,
"longititude" REAL,
"county" TEXT,
"state" TEXT
)
""")

<redshift_connector.cursor.Cursor at 0x205df2dcd90>

In [67]:
cursor.execute("""
copy dimDate from 's3://ajabcovidproject/output/dimDate.csv'
credentials ''
delimiter ','
region ''
IGNOREHEADER 1
""")

<redshift_connector.cursor.Cursor at 0x205df2dcd90>

In [66]:
cursor.execute("""
copy dimHospital from 's3://ajabcovidproject/output/dimHospital.csv'
credentials ''
delimiter ','
region ''
IGNOREHEADER 1
""")

<redshift_connector.cursor.Cursor at 0x205df2dcd90>

In [68]:
cursor.execute("""
copy factCovid from 's3://ajabcovidproject/output/factCovid.csv'
credentials ''
delimiter ','
region ''
IGNOREHEADER 1
""")

<redshift_connector.cursor.Cursor at 0x205df2dcd90>

In [69]:
cursor.execute("""
copy dimRegion from 's3://ajabcovidproject/output/dimRegion.csv'
credentials ''
delimiter ','
region ''
IGNOREHEADER 1
""")

<redshift_connector.cursor.Cursor at 0x205df2dcd90>