In [None]:
import boto3
import pandas as pd
from io import StringIO # python3; # python2: BytesIO
import time
import redshift_connector

In [None]:
AWS_ACCESS_KEY = " PROVIDE YOUR ACCESS KEY "
AWS_SECRET_KEY = " PROVIDE YOUR SECRET KEY "
AWS_REGION = "ca-central-1"
SCHEMA_NAME = "covid_19"
S3_STAGING_DIR = "s3://pushpak-test-bucket/output/"
S3_BUCKET_NAME = "pushpak-test-bucket"
S3_OUTPUT_DIRECTORY = "output"

In [None]:
athena_client = boto3.client(
    "athena",
    aws_access_key_id=AWS_ACCESS_KEY,
    aws_secret_access_key=AWS_SECRET_KEY,
    region_name=AWS_REGION,
)

In [None]:
Dict = {}

In [None]:
def download_and_load_query_results(
    client: boto3.client, query_response: Dict
) -> pd.DataFrame:
    while True:
        try:
            # This function only loads the first 1000 rows
            client.get_query_results(
                QueryExecutionId=query_response["QueryExecutionId"]
            )
            break
        except Exception as err:
            if "not yet finished" in str(err):
                time.sleep(0.001)
            else:
                raise err
    temp_file_location: str = "athena_query_results.csv"
    s3_client = boto3.client(
        "s3",
        aws_access_key_id=AWS_ACCESS_KEY,
        aws_secret_access_key=AWS_SECRET_KEY,
        region_name=AWS_REGION,        
    )
    s3_client.download_file(
        S3_BUCKET_NAME,
        f"{S3_OUTPUT_DIRECTORY}/{query_response['QueryExecutionId']}.csv",
        temp_file_location,        
    )
    return pd.read_csv(temp_file_location)

In [None]:
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM nytimes_data_in_usa_us_county",
    QueryExecutionContext={"Database":SCHEMA_NAME},
    ResultConfiguration={
        "OutputLocation": S3_STAGING_DIR,
        "EncryptionConfiguration":{"EncryptionOption":"SSE_S3"},
    },
)
nytimes_data_in_usa_us_county = download_and_load_query_results(athena_client, response)

In [None]:
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM rearc_covid_19_testing_data_states_dailystates_daily",
    QueryExecutionContext={"Database":SCHEMA_NAME},
    ResultConfiguration={
        "OutputLocation": S3_STAGING_DIR,
        "EncryptionConfiguration":{"EncryptionOption":"SSE_S3"},
    },
)
rearc_covid_19_testing_data_states_dailystates_daily = download_and_load_query_results(athena_client, response)

In [None]:
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM rearc_covid_19_testing_dataus_daily",
    QueryExecutionContext={"Database":SCHEMA_NAME},
    ResultConfiguration={
        "OutputLocation": S3_STAGING_DIR,
        "EncryptionConfiguration":{"EncryptionOption":"SSE_S3"},
    },
)
rearc_covid_19_testing_dataus_daily = download_and_load_query_results(athena_client, response)

In [None]:
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM rearc_covid_19_testing_dataus_total_latest",
    QueryExecutionContext={"Database":SCHEMA_NAME},
    ResultConfiguration={
        "OutputLocation": S3_STAGING_DIR,
        "EncryptionConfiguration":{"EncryptionOption":"SSE_S3"},
    },
)
rearc_covid_19_testing_dataus_total_latest = download_and_load_query_results(athena_client, response)

In [None]:
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM nytimes_data_in_usa_us_states",
    QueryExecutionContext={"Database":SCHEMA_NAME},
    ResultConfiguration={
        "OutputLocation": S3_STAGING_DIR,
        "EncryptionConfiguration":{"EncryptionOption":"SSE_S3"},
    },
)
nytimes_data_in_usa_us_states = download_and_load_query_results(athena_client, response)

In [None]:
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM static_datastate_abv",
    QueryExecutionContext={"Database":SCHEMA_NAME},
    ResultConfiguration={
        "OutputLocation": S3_STAGING_DIR,
        "EncryptionConfiguration":{"EncryptionOption":"SSE_S3"},
    },
)
static_datastate_abv = download_and_load_query_results(athena_client, response)
header = static_datastate_abv.iloc[0]
static_datastate_abv = static_datastate_abv[1:]
static_datastate_abv.columns = header

In [None]:
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM static_datacountypopulation",
    QueryExecutionContext={"Database":SCHEMA_NAME},
    ResultConfiguration={
        "OutputLocation": S3_STAGING_DIR,
        "EncryptionConfiguration":{"EncryptionOption":"SSE_S3"},
    },
)
static_datacountypopulation = download_and_load_query_results(athena_client, response)

In [None]:
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM static_datacountrycode",
    QueryExecutionContext={"Database":SCHEMA_NAME},
    ResultConfiguration={
        "OutputLocation": S3_STAGING_DIR,
        "EncryptionConfiguration":{"EncryptionOption":"SSE_S3"},
    },
)
static_datacountrycode = download_and_load_query_results(athena_client, response)

In [None]:
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM enigma_jhud LIMIT 1000",
    QueryExecutionContext={"Database":SCHEMA_NAME},
    ResultConfiguration={
        "OutputLocation": S3_STAGING_DIR,
        "EncryptionConfiguration":{"EncryptionOption":"SSE_S3"},
    },
)
enigma_jhud = download_and_load_query_results(athena_client, response)

In [None]:
#building multidimensional data

#building fact table
factCovid_1 = enigma_jhud[['fips','province_state','country_region','confirmed','deaths','recovered','active']]
factCovid_2 = rearc_covid_19_testing_data_states_dailystates_daily[['fips','date','positive','hospitalizedcurrently','hospitalized','hospitalizeddischarged']]
factCovid = pd.merge(factCovid_1, factCovid_2, on='fips', how='inner')
#print(''.join(factCovidsql))

#dimRegion table
dimRegion_1 = enigma_jhud[['fips','province_state','country_region','latitude','longitude']]
dimRegion_2 = nytimes_data_in_usa_us_county[['fips','county','state']]
dimRegion = pd.merge(dimRegion_1, dimRegion_2, on='fips', how='inner')
#print(''.join(dimRegionsql))

#dimRegion table
dimDate = rearc_covid_19_testing_data_states_dailystates_daily[['fips','date']]
dimDate['date'] = pd.to_datetime(dimDate['date'], format='%Y%m%d')
dimDate['year'] = dimDate['date'].dt.year
dimDate['month'] = dimDate['date'].dt.month
dimDate["day_of_week"] = dimDate['date'].dt.dayofweek
#print(''.join(dimDatesql))

#dimHospital table
dimHospital = enigma_jhud[['fips','latitude','longitude']]
#print(''.join(dimHospitalsql))

In [None]:
bucket = 'pushpak-covid-de-project'

In [None]:
csv_buffer = StringIO()

In [None]:
factCovid.to_csv(csv_buffer)

In [None]:
#uploading the fact and dim table to the output bucket

s3_resource = boto3.resource('s3',
                             aws_access_key_id=AWS_ACCESS_KEY,
                             aws_secret_access_key= AWS_SECRET_KEY)
s3_resource.Object(bucket,'output/factCovid.csv').put(Body=csv_buffer.getvalue())
s3_resource.Object(bucket,'output/dimRegion.csv').put(Body=csv_buffer.getvalue())
s3_resource.Object(bucket,'output/dimDate.csv').put(Body=csv_buffer.getvalue())
s3_resource.Object(bucket,'output/dimHospital.csv').put(Body=csv_buffer.getvalue())

In [None]:
dimDatesql = pd.io.sql.get_schema(dimDate.reset_index(), 'dimDate')

In [None]:
factCovidsql = pd.io.sql.get_schema(factCovid.reset_index(), 'factCovid')

In [None]:
dimRegionsql = pd.io.sql.get_schema(dimRegion.reset_index(), 'dimRegion')

In [None]:
dimHospitalsql = pd.io.sql.get_schema(dimHospital.reset_index(), 'dimHospital')

In [None]:
import redshift_connector

In [None]:
conn = redshift_connector.connect(
     host=' PROVIDE YOUR REDSHIFT HOST ID ',
     database='dev',
     user='awsuser',
     password='Pushpak14'
  )

In [None]:
conn.autocommit = True

In [None]:
cursor = redshift_connector.Cursor = conn.cursor()

In [None]:
cursor.execute("""
CREATE TABLE "factCovid" (
"index" INTEGER,
  "fips" REAL,
  "province_state" TEXT,
  "country_region" TEXT,
  "confirmed" REAL,
  "deaths" REAL,
  "recovered" REAL,
  "active" REAL,
  "date" INTEGER,
  "positive" INTEGER,
  "hospitalizedcurrently" REAL,
  "hospitalized" REAL,
  "hospitalizeddischarged" REAL
)
""")

In [None]:
cursor.execute("""
CREATE TABLE "dimRegion" (
"index" INTEGER,
  "fips" REAL,
  "province_state" TEXT,
  "country_region" TEXT,
  "latitude" REAL,
  "longitude" REAL,
  "county" TEXT,
  "state" TEXT
)
""")

In [None]:
cursor.execute("""
CREATE TABLE "dimHospital" (
"index" INTEGER,
  "fips" REAL,
  "latitude" REAL,
  "longitude" REAL
)
""")

In [None]:
cursor.execute("""
CREATE TABLE "dimDate" (
"index" INTEGER,
  "fips" REAL,
  "date" TIMESTAMP,
  "year" INTEGER,
  "month" INTEGER,
  "day_of_week" INTEGER
)
""")

In [None]:
cursor.execute("""
copy dimDate from 's3://pushpak-covid-de-project/output/dimHospital.csv'
credentials 'aws_iam_roles= PROVIDE YOUR ARN REDSHIFT ID '
delimiter ','
region 'ca-central-1'
IGNOREHEADER 1
""")
cursor.execute("""
copy dimDate from 's3://pushpak-covid-de-project/output/dimDate.csv'
credentials 'aws_iam_roles= PROVIDE YOUR ARN REDSHIFT ID '
delimiter ','
region 'ca-central-1'
IGNOREHEADER 1
""")
cursor.execute("""
copy dimDate from 's3://pushpak-covid-de-project/output/dimRegion.csv'
credentials 'aws_iam_roles= PROVIDE YOUR ARN REDSHIFT ID '
delimiter ','
region 'ca-central-1'
IGNOREHEADER 1
""")
cursor.execute("""
copy dimDate from 's3://pushpak-covid-de-project/output/factCovid.csv'
credentials 'aws_iam_roles= PROVIDE YOUR ARN REDSHIFT ID '
delimiter ','
region 'ca-central-1'
IGNOREHEADER 1
""")