In [1]:
import boto3
import pandas as pd
from io import StringIO  # python3; python2: BytesIO
import time

In [2]:
AWS_ACCESS_KEY = "----"
AWS_SECRET_KEY = "----"
AWS_REGION = "us-east-1"
SCHEMA_NAME = "datawarehouse"
S3_STAGING_DIR = "s3://test-etl-datawarehouse/output/"
S3_BUCKET_NAME = "test-etl-datawarehouse"
S3_OUTPUT_DIRECTORY = "output"

In [3]:
athena_client = boto3.client(
    "athena",
    aws_access_key_id=AWS_ACCESS_KEY,
    aws_secret_access_key=AWS_SECRET_KEY,
    region_name=AWS_REGION,
)


In [None]:
athena_client

<botocore.client.Athena at 0x1626c8cd090>

In [None]:
Dict = {}

def download_and_load_query_results(
    client: boto3.client, query_response: Dict
) -> pd.DataFrame:
    while True:
        try:
            client.get_query_results(
                QueryExecutionId=query_response["QueryExecutionId"]
            )
            break
        except Exception as err:
            if "not yet finished" in str(err):
                time.sleep(0.001)
            else:
                raise err
    temp_file_location: str = "athena_query_results.csv"
    s3_client = boto3.client(
        "s3",
        aws_access_key_id=AWS_ACCESS_KEY,
        aws_secret_access_key=AWS_SECRET_KEY,
        region_name=AWS_REGION,
    )
    s3_client.download_file(
        S3_BUCKET_NAME,
        f"{S3_OUTPUT_DIRECTORY}/{query_response['QueryExecutionId']}.csv",
        temp_file_location,
    )
    return pd.read_csv(temp_file_location)


In [None]:
response = athena_client.start_query_execution(
QueryString="SELECT * FROM enigma_jhud",
QueryExecutionContext={"Database": SCHEMA_NAME},
ResultConfiguration={ "OutputLocation": S3_STAGING_DIR,
"EncryptionConfiguration": {"EncryptionOption": "SSE_S3"}, },
)

enigma_jhud = download_and_load_query_results(athena_client, response)

enigma_jhud.head()
# 11

Unnamed: 0,fips,admin2,province_state,country_region,last_update,latitude,longitude,confirmed,deaths,recovered,active,combined_key
0,,,Anhui,China,2020-01-22T17:00:00,31.826,117.226,1.0,,,,"""Anhui"
1,,,Beijing,China,2020-01-22T17:00:00,40.182,116.414,14.0,,,,"""Beijing"
2,,,Chongqing,China,2020-01-22T17:00:00,30.057,107.874,6.0,,,,"""Chongqing"
3,,,Fujian,China,2020-01-22T17:00:00,26.079,117.987,1.0,,,,"""Fujian"
4,,,Gansu,China,2020-01-22T17:00:00,36.061,103.834,,,,,"""Gansu"


In [None]:
enigma_jhud.shape

(10002, 12)

In [None]:
# fips		province_state	country_region	last_update	latitude	longitude	confirmed	deaths	recovered	active	combined_key
factCovid_1= enigma_jhud[['fips', 'province_state', 'country_region', 'confirmed', 'deaths', 'recovered', 'active']]
factCovid_2= states_daily[['fips', 'date', 'positive', 'negative', 'hospitalizedcurrently','hospitalized','hospitalizeddischarged']]
factCovid = pd.merge(factCovid_1, factCovid_2, on='fips', how='inner')



In [None]:
factCovid.head()

Unnamed: 0,fips,province_state,country_region,confirmed,deaths,recovered,active,date,positive,negative,hospitalizedcurrently,hospitalized,hospitalizeddischarged
0,72.0,Puerto Rico,US,3.0,0.0,0.0,,20210307,101327,305972.0,147.0,,
1,72.0,Puerto Rico,US,3.0,0.0,0.0,,20210306,101327,305972.0,147.0,,
2,72.0,Puerto Rico,US,3.0,0.0,0.0,,20210305,101066,305972.0,136.0,,
3,72.0,Puerto Rico,US,3.0,0.0,0.0,,20210304,100867,305972.0,171.0,,
4,72.0,Puerto Rico,US,3.0,0.0,0.0,,20210303,100765,305972.0,169.0,,


In [None]:
dimRegion_1 = enigma_jhud [['fips', 'province_state', 'country_region', 'latitude', 'longitude']]
dimRegion_2 = nytimes_data_in_usa_us_county [['fips', 'county', 'state']]
dimRegion = pd.merge(dimRegion_1, dimRegion_2, on='fips', how='inner')




In [None]:
dimHospital = rearc_usa_hospital_beds[['fips', 'state_name', 'latitude', 'longtitude', 'hq_address', 'hospital_name', 'hospital_type', 'hq_city', 'hq_state']]


In [None]:
dimDate = states_daily[['fips', 'date']]
dimDate.head()


Unnamed: 0,fips,date
0,2,20210307
1,1,20210307
2,5,20210307
3,60,20210307
4,4,20210307


In [None]:
dimDate['date'] = pd.to_datetime(dimDate['date'], format='%Y%m%d')


In [None]:
dimDate.loc[:, 'date'] = pd.to_datetime(dimDate['date'], format='%Y%m%d')


In [None]:
dimDate['month'] = dimDate['date'].dt.month
dimDate['year'] = dimDate['date'].dt.year
dimDate['day_of_week'] = dimDate['date'].dt.dayofweek


In [None]:
# Convert to datetime format
dimDate['date'] = pd.to_datetime(dimDate['date'], format='%m/%d/%Y')

# Format as YYYY-MM-DD
dimDate['date'] =dimDate['date'].dt.strftime('%Y-%m-%d')
dimDate.head()

In [None]:
dimDate.head()

In [None]:
output_bucket = '' # already created on S3

csv_buffer = StringIO()
csv_buffer

In [None]:
csv_buffer = StringIO()
dimDate.to_csv(csv_buffer,index=False)

s3_resource =boto3.resource('s3')
s3_resource.Object(output_bucket, 'dimDate.csv').put(Body=csv_buffer.getvalue())


In [None]:
csv_buffer = StringIO()
dimRegion.to_csv(csv_buffer,index=False)

s3_resource =boto3.resource('s3')
s3_resource.Object(output_bucket, 'dimRegion.csv').put(Body=csv_buffer.getvalue())


In [None]:
csv_buffer = StringIO()
dimHospital.to_csv(csv_buffer,index=False)

s3_resource =boto3.resource('s3')
s3_resource.Object(output_bucket, 'dimHospital.csv').put(Body=csv_buffer.getvalue())


In [None]:
csv_buffer = StringIO()
factCovid.to_csv(csv_buffer,index=False)

s3_resource =boto3.resource('s3')
s3_resource.Object(output_bucket, 'factCovid.csv').put(Body=csv_buffer.getvalue())

In [None]:
import pandas as pd

# Assuming dimDate, factCovid, dimRegion, and dimHospital are predefined DataFrames

dimDatesql = pd.io.sql.get_schema(dimDate.reset_index(), 'dimDate')
print(''.join(dimDatesql))

factCovidsql = pd.io.sql.get_schema(factCovid.reset_index(), 'factCovid')
print(''.join(factCovidsql))

dimRegionsql = pd.io.sql.get_schema(dimRegion.reset_index(), 'dimRegion')
print(''.join(dimRegionsql))

dimHospitalsql = pd.io.sql.get_schema(dimHospital.reset_index(), 'dimHospital')
print(''.join(dimHospitalsql))


In [None]:
pip install redshift-connector

In [None]:
import redshift_connector
import logging

# logging.basicConfig(level=logging.DEBUG)

try:
    conn = redshift_connector.connect(
        host='redshift-cluster-2.czfgt1tbsuck.us-east-2.redshift.amazonaws.com',
        database='dev',
        user='---',
        password='-----',
        ssl=True,
        timeout=60  # Increase the timeout if the connection takes longer
    )
    print(conn)
except redshift_connector.Error as e:
    print(f"Error: {e}")


In [None]:
# Enabling autocommit
conn.autocommit = True

# Creating a cursor object to interact with the database
cursor = conn.cursor()

# Creating the dimDate table
cursor.execute("""
CREATE TABLE "dimDate" (
    "fips" INTEGER,
    "date" TIMESTAMP,
    "year" INTEGER,
    "month" INTEGER,
    "day_of_week" INTEGER
)
""")


cursor.execute("""
CREATE TABLE "dimHospital" (
  "fips" REAL,
  "state_name" TEXT,
  "latitude" REAL,
  "longitude" REAL,
  "hq_address" TEXT,
  "hospital_name" TEXT,
  "hospital_type" TEXT,
  "hq_city" TEXT,
  "hq_state" TEXT
)
""")



<redshift_connector.cursor.Cursor at 0x29859daa490>

In [None]:
# Enabling autocommit
conn.autocommit = True

# Creating a cursor object to interact with the database
cursor = conn.cursor()

# Creating the dimDate table
cursor.execute("""
CREATE TABLE "factCovid" (
  "fips" REAL,
  "province_state" TEXT,
  "country_region" TEXT,
  "confirmed" REAL,
  "deaths" REAL,
  "recovered" REAL,
  "active" REAL,
  "date" INTEGER,
  "positive" INTEGER,
  "negative" REAL,
  "hospitalized_currently" REAL,
  "hospitalized" REAL,
  "hospitalized_discharged" REAL
)
""")

# Creating the dimRegion table
cursor.execute("""
CREATE TABLE "dimRegion" (
  "fips" REAL,
  "province_state" TEXT,
  "country_region" TEXT,
  "latitude" REAL,
  "longitude" REAL,
  "county" TEXT,
  "state" TEXT
)
""")



<redshift_connector.cursor.Cursor at 0x29859da9e10>

In [None]:
cursor.execute("""
copy factcovid from 's3://aws-session-datawarehousing-output/factCovid.csv'
credentials 'aws_iam_role=arn:aws:iam::975050334693:role/service-role/AmazonRedshift-CommandsAccessRole-20240625T125922'
delimiter ','
region 'us-east-2'
IGNOREHEADER 1;
""")

cursor.execute("""
copy dimregion from 's3://aws-session-datawarehousing-output/dimRegion.csv'
credentials 'aws_iam_role=arn:aws:iam::975050334693:role/service-role/AmazonRedshift-CommandsAccessRole-20240625T125922'
delimiter ','
region 'us-east-2'
IGNOREHEADER 1;
""")


cursor.execute("""
copy dimhospital from 's3://aws-session-datawarehousing-output/dimHospital.csv'
credentials 'aws_iam_role=arn:aws:iam::975050334693:role/service-role/AmazonRedshift-CommandsAccessRole-20240625T125922'
delimiter ','
region 'us-east-2'
IGNOREHEADER 1;
""")


cursor.execute("""
copy dimdate from 's3://aws-session-datawarehousing-output/dimDate.csv'
credentials 'aws_iam_role=arn:aws:iam::975050334693:role/service-role/AmazonRedshift-CommandsAccessRole-20240625T125922'
delimiter ','
region 'us-east-2'
IGNOREHEADER 1;
""")