# aws covid project

# boto 3 is used to connect aws services
# stringIO we want to encode our data into binary forms
# panda is used for data manipulation and analysis for python


In [None]:
import boto3
import pandas as pd
from io import StringIO
import time

In [None]:
AWS_ACCESS_KEY = "YOUR_AWS_ACCESS_KEY"
AWS_SECRET_KEY = "YOUR_AWS_SECRET_KEY"
AWS_REGION = "YOUR_AWS_REGION"
SCHEMA_NAME = "covid_19"
S3_STAGING_DIR ="s3://your-bucket-name/output/"
S3_BUCKET_NAME = "your-bucket-name"
S3_OUTPUT_DIRECTORY = "output"

# this codewill connect to athena

In [None]:
athena_client = boto3.client(
    "athena",
    aws_access_key_id = AWS_ACCESS_KEY,
    aws_secret_access_key = AWS_SECRET_KEY,
    region_name = AWS_REGION,

)

# this will take boto3 object + dictionary and it will go to athena and run some query then stores it into s3

In [None]:
Dict = {}
def download_and_load_query_results(
    client: boto3.client, query_response: Dict
) -> pd.DataFrame:
    while True:
        try:
            #This function loads first 1000 rows
            client.get_query_results(
                QueryExecutionId = query_response["QueryExecutionId"]
            )
            break
        except Exception as err:
            if "not yet finished" in str(err):
                time.sleep(0.001)
            else:
                raise err
    temp_file_location: str = "athena_query_results.csv"
    s3_client = boto3.client(
        "s3",
        aws_access_key_id = AWS_ACCESS_KEY,
        aws_secret_access_key = AWS_SECRET_KEY,
        region_name = AWS_REGION,
    
    )
    s3_client.download_file(
        S3_BUCKET_NAME,
        f"{S3_OUTPUT_DIRECTORY}/{query_response['QueryExecutionId']}.csv",
        temp_file_location,
    )

    return pd.read_csv(temp_file_location)





# this code executes query with some parameters, like database and stuff like that

In [None]:
response = athena_client.start_query_execution(
    QueryString = "SELECT * FROM enigma_jhud",
    QueryExecutionContext = {"Database": SCHEMA_NAME},
    ResultConfiguration = {
        "OutputLocation": S3_STAGING_DIR,
        "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
    },
)

In [None]:
response


In [None]:
enigma_jhud = download_and_load_query_results(athena_client, response)

In [None]:
enigma_jhud.head()

In [None]:
response = athena_client.start_query_execution(
    QueryString = "SELECT * FROM hospitalorearc_usa_hospital_beds",
    QueryExecutionContext = {"Database": SCHEMA_NAME},
    ResultConfiguration = {
        "OutputLocation": S3_STAGING_DIR,
        "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
    },
)

hospitalorearc_usa_hospital_beds = download_and_load_query_results(athena_client, response)

In [None]:
hospitalorearc_usa_hospital_beds.head()

In [None]:
response = athena_client.start_query_execution(
    QueryString = 'SELECT * FROM "enigma-nytimes-data-in-usaus_county"',
    QueryExecutionContext = {"Database": SCHEMA_NAME},
    ResultConfiguration = {
        "OutputLocation": S3_STAGING_DIR,
        "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
    },
)
enigma_nytimes_data_in_usaus_county = download_and_load_query_results(athena_client, response)

In [None]:
enigma_nytimes_data_in_usaus_county.head()

In [None]:
response = athena_client.start_query_execution(
    QueryString = 'SELECT * FROM "enigma-nytimes-data-in-usaus_states"',
    QueryExecutionContext = {"Database": SCHEMA_NAME},
    ResultConfiguration = {
        "OutputLocation": S3_STAGING_DIR,
        "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
    },
)
enigma_nytimes_data_in_usaus_states = download_and_load_query_results(athena_client, response)

In [None]:
enigma_nytimes_data_in_usaus_states.head()

In [None]:
response = athena_client.start_query_execution(
    QueryString = 'SELECT * FROM "states_daily"',
    QueryExecutionContext = {"Database": SCHEMA_NAME},
    ResultConfiguration = {
        "OutputLocation": S3_STAGING_DIR,
        "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
    },
)
states_daily = download_and_load_query_results(athena_client, response)

In [None]:
states_daily.head()

In [None]:
response = athena_client.start_query_execution(
    QueryString = 'SELECT * FROM "rearc-covid-19-testing-data_states-us-total-latestus_total_latest"',
    QueryExecutionContext = {"Database": SCHEMA_NAME},
    ResultConfiguration = {
        "OutputLocation": S3_STAGING_DIR,
        "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
    },
)
rearc_covid_19_testing_data_states_us_total_latestus_total_latest = download_and_load_query_results(athena_client, response)

In [None]:
rearc_covid_19_testing_data_states_us_total_latestus_total_latest.head()

In [None]:
response = athena_client.start_query_execution(
    QueryString = 'SELECT * FROM "rearc-covid-19-testing-data_states-us_dailyus_daily"',
    QueryExecutionContext = {"Database": SCHEMA_NAME},
    ResultConfiguration = {
        "OutputLocation": S3_STAGING_DIR,
        "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
    },
)
rearc_covid_19_testing_data_states_us_dailyus_daily = download_and_load_query_results(athena_client, response)

In [None]:
rearc_covid_19_testing_data_states_us_dailyus_daily.head()

In [None]:
response = athena_client.start_query_execution(
    QueryString = 'SELECT * FROM "data-countrycodecountrycode"',
    QueryExecutionContext = {"Database": SCHEMA_NAME},
    ResultConfiguration = {
        "OutputLocation": S3_STAGING_DIR,
        "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
    },
)
data_countrycodecountrycode = download_and_load_query_results(athena_client, response)

In [None]:
data_countrycodecountrycode.head()

In [None]:
response = athena_client.start_query_execution(
    QueryString = 'SELECT * FROM "data-countrypopulationcountypopulation"',
    QueryExecutionContext = {"Database": SCHEMA_NAME},
    ResultConfiguration = {
        "OutputLocation": S3_STAGING_DIR,
        "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
    },
)
data_countrypopulationcountypopulation = download_and_load_query_results(athena_client, response)

In [None]:
data_countrypopulationcountypopulation.head()

In [None]:
response = athena_client.start_query_execution(
    QueryString = 'SELECT * FROM "data-state-abvstate_abv"',
    QueryExecutionContext = {"Database": SCHEMA_NAME},
    ResultConfiguration = {
        "OutputLocation": S3_STAGING_DIR,
        "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
    },
)
data_state_abvstate_abv = download_and_load_query_results(athena_client, response)

In [None]:
data_state_abvstate_abv.head()

# we did it for all tables  now we need to fix last tables sarting coloumns

In [None]:
new_header = data_state_abvstate_abv.iloc[0]
data_state_abvstate_abv = data_state_abvstate_abv[1:]
data_state_abvstate_abv.columns = new_header

In [None]:
data_state_abvstate_abv.head()

In [None]:
factCovid_1 = enigma_jhud[['fips','province_state', 'country_region', 'confirmed', 'deaths','recovered','active']]
factCovid_2 = states_daily[['fips','date', 'positive', 'negative', 'hospitalizedcurrently','hospitalized','hospitalizeddischarged']]
factCovid = pd.merge(factCovid_1, factCovid_2, on = 'fips', how = 'inner')

In [None]:
factCovid.shape

In [None]:
factCovid.head()

In [None]:
dimRegion_1 = enigma_jhud[['fips','province_state','country_region','latitude','longitude']]
dimRegion_2 = enigma_nytimes_data_in_usaus_county[['fips','county','state']]
dimRegion = pd.merge(dimRegion_1, dimRegion_2, on = 'fips', how = 'inner')

In [None]:
dimHospital = hospitalorearc_usa_hospital_beds[['fips','state_name','latitude', 'longtitude', 'hq_address','hospital_name','hospital_type','hq_city','hq_state']]

In [None]:
dimDate = states_daily[['fips','date']]

In [None]:
dimDate.head()

In [None]:
dimDate['date'] = pd.to_datetime(dimDate['date'], format = '%Y%m%d')

In [None]:
dimDate.head()

In [None]:
dimDate['year'] = dimDate['date'].dt.year
dimDate['month'] = dimDate['date'].dt.month
dimDate["day_of_week"] = dimDate['date'].dt.dayofweek

In [None]:
dimDate.head()

In [None]:
bucket = 'covid-19-project-dato' 

In [None]:
csv_buffer = StringIO()

In [None]:
csv_buffer

In [None]:
factCovid.to_csv(csv_buffer)

In [None]:
s3_resource = boto3.resource('s3')
s3_resource.Object(bucket, 'output/factCovid.csv').put(Body = csv_buffer.getvalue())

In [None]:
csv_buffer.getvalue()

In [None]:
!pip install awscli

In [None]:
!aws --version

In [None]:
!aws configure list

# I had an error uploading to s3 so I decided to do it manually

In [None]:
pip install redshift-connector

In [None]:
import redshift_connector

In [None]:
conn = redshift_connector.connect(
    host="<REDSHIFT_HOST>",
    database="<DATABASE_NAME>",
    user="<REDSHIFT_USER>",
    password="<REDSHIFT_PASSWORD>",
)

In [None]:
pip install psycopg2-binary

# I try to connect to aws redshift other way around cause I tried a lot but nothing worked

# hardcoding credentials again 

In [None]:
redshift_endpoint1 = "<REDSHIFT_HOST>"
redshift_user1 = "<REDSHIFT_USER>"
redshift_pass1 = "<REDSHIFT_PASSWORD>"
port1 = 5439
dbname1 = "<DATABASE_NAME>"

# SQL code

In [None]:
sql = """
SELECT top 10 *
FROM
pg_user
;
"""

# creating the sql engine

In [None]:
from sqlalchemy import create_engine
from sqlalchemy import text
engine_string = "postgresql+psycopg2://%s:%s@%s:%d/%s" \
% (redshift_user1, redshift_pass1, redshift_endpoint1, port1, dbname1)
engine1 = create_engine(engine_string)

# Creating the dataframe using the Redshift Database

In [None]:
df1 = pd.read_sql_query(text(sql), engine1)

In [None]:
df1.head()

In [None]:
conn.autocommit = True

In [None]:
cursor = redshift_connector.Cursor =conn.cursor()

In [None]:
cursor.execute("""
CREATE TABLE "dimDate"(
"index" INTEGER,
"fips" INTEGER,
"date" TIMESTAMP,
"year" INTEGER,
"month" INTEGER,
"day_of_week" INTEGER
)

""")

In [None]:
cursor.execute("""
CREATE TABLE "dimHospital"(
"index" INTEGER,
"fips" REAL,
"state_name" TEXT,
"latitude" REAL,
"longtitude" REAL,
"hq_address" TEXT,
"hospital_name" TEXT,
"hospital_type" TEXT,
"hq_city" TEXT,
"hq_state" TEXT
)
""")
cursor.execute("""
CREATE TABLE "factCovid"(
"index" INTEGER,
"fips" REAL,
"province_state" TEXT,
"country_region" TEXT,
"confirmed" REAL,
"deaths" REAL,
"recovered" REAL,
"active" REAL,
"date" INTEGER,
"positive" REAL,
"negative" REAL,
"hospitalizedcurrently" REAL,
"hospitalized" REAL,
"hospitalizeddischarged" REAL

)

""")


In [None]:
cursor.execute("""
    CREATE TABLE "dimRegion"(
    "index" INTEGER,
    "fips" REAL,
    "province_state" TEXT,
    "country_region" TEXT,
    "longitude" REAL,
    "country" TEXT,
    "state" TEXT
    )"""
)

In [None]:
cursor.execute("""
COPY dimDate
FROM 's3://covid-19-project-dato/output/dimDatecp.csv'
CREDENTIALS 'aws_iam_role=arn:aws:iam::<ACCOUNT_ID>:role/redshift-s3-access'
DELIMITER ','
REGION 'eu-central-1'
IGNOREHEADER 1
""")

In [None]:
dimDatesql = pd.io.sql.get_schema(dimDate.reset_index(), 'dimDate')
print(''.join(dimDatesql))

In [None]:
factCovidsql = pd.io.sql.get_schema(factCovid.reset_index(), 'factCovid')
print(''.join(factCovidsql))

In [None]:
dimRegionsql = pd.io.sql.get_schema(dimRegion.reset_index(), 'dimRegion')
print(''.join(dimRegionsql))

In [None]:
dimHospitalsql = pd.io.sql.get_schema(dimHospital.reset_index(), 'dimHospital')
print(''.join(dimHospitalsql))

In [None]:
cursor.execute("""
CREATE TABLE "dimDate" (
  "fips" REAL,
  "date" TIMESTAMP,
  "year" INTEGER,
  "month" INTEGER,
  "day_of_week" INTEGER
)
""")

In [None]:
cursor.execute("""
CREATE TABLE "factCovid" (
  "fips" REAL,
  "province_state" TEXT,
  "country_region" TEXT,
  "confirmed" REAL,
  "deaths" REAL,
  "recovered" REAL,
  "active" REAL,
  "date" INTEGER,
  "positive" INTEGER,
  "negative" REAL,
  "hospitalizedcurrently" REAL,
  "hospitalized" REAL,
  "hospitalizeddischarged" REAL
)
""")

In [None]:
cursor.execute("""
CREATE TABLE "dimRegion" (

  "fips" REAL,
  "province_state" TEXT,
  "country_region" TEXT,
  "latitude" REAL,
  "longitude" REAL,
  "county" TEXT,
  "state" TEXT
)
""")

In [None]:
cursor.execute("""
CREATE TABLE "dimHospital" (
  "fips" REAL,
  "state_name" TEXT,
  "latitude" REAL,
  "longtitude" REAL,
  "hq_address" TEXT,
  "hospital_name" TEXT,
  "hospital_type" TEXT,
  "hq_city" TEXT,
  "hq_state" TEXT
)
""")

In [None]:
cursor.execute("""
COPY dimDate
FROM 's3://covid-19-project-dato/output/dimDate.csv'
CREDENTIALS 'aws_iam_role=arn:aws:iam::<ACCOUNT_ID>:role/redshift-s3-access'
DELIMITER ','
REGION 'eu-central-1'
IGNOREHEADER 1
""")

In [None]:
cursor.execute("""
COPY dimHospital
FROM 's3://covid-19-project-dato/output/dimHospital.csv'
CREDENTIALS 'aws_iam_role=arn:aws:iam::<ACCOUNT_ID>:role/redshift-s3-access'
DELIMITER ','
REGION 'eu-central-1'
IGNOREHEADER 1
""")

In [None]:
cursor.execute("""
COPY dimRegion
FROM 's3://covid-19-project-dato/output/dimRegion.csv'
CREDENTIALS 'aws_iam_role=arn:aws:iam::<ACCOUNT_ID>:role/redshift-s3-access'
DELIMITER ','
REGION 'eu-central-1'
IGNOREHEADER 1
""")


In [None]:
cursor.execute("""
COPY factCovid
FROM 's3://covid-19-project-dato/output/factCovid.csv'
CREDENTIALS 'aws_iam_role=arn:aws:iam::<ACCOUNT_ID>:role/redshift-s3-access'
DELIMITER ','
REGION 'eu-central-1'
IGNOREHEADER 1
""")