In [None]:
import boto3
import pandas as pd
from io import StringIO
import time

In [None]:
AWS_ACCESS_KEY = "*******************"
AWS_SECRET_KEY = "*****************************"
AWS_REGION = "ap-south-1"
SCHEMA_NAME = "covid19_2003"
S3_STAGING_DIR = "s3://test-bucket-2003/output/"
S3_BUCKET_NAME = "test-bucket-2003"
S3_OUTPUT_DIRECTORY = "output"

In [None]:
athena_client = boto3.client(
    "athena",
    aws_access_key_id=AWS_ACCESS_KEY,
    aws_secret_access_key=AWS_SECRET_KEY,
    region_name=AWS_REGION,
)

In [None]:
Dict = {}
def download_and_load_query_results(
    client: boto3.client, query_response:Dict
) -> pd.DataFrame:
    while True:
        try:
            client.get_query_results(
                QueryExecutionId=query_response["QueryExecutionId"]
            )
            break
        except Exception as err:
            if "not yet finished" in str(err):
                time.sleep(0.001)
            else:
                raise err
    temp_file_location: str = "athena_query_results.csv"
    s3_client = boto3.client(
        "s3",
        aws_access_key_id=AWS_ACCESS_KEY,
        aws_secret_access_key=AWS_SECRET_KEY,
        region_name = AWS_REGION,
    )
    s3_client.download_file(
        S3_BUCKET_NAME,
        f"{S3_OUTPUT_DIRECTORY}/{query_response['QueryExecutionId']}.csv",
        temp_file_location,
    )
    return pd.read_csv(temp_file_location)
    

In [None]:
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM enigma_jhud ",
    QueryExecutionContext={"Database": SCHEMA_NAME},
    ResultConfiguration={
        "OutputLocation" : S3_STAGING_DIR,
        "EncryptionConfiguration":{"EncryptionOption":"SSE_S3"},
    },
)


In [None]:
response

In [None]:
enigma_jhud = download_and_load_query_results(athena_client, response)

In [None]:
enigma_jhud.head()

In [None]:
enigma_jhud.info()

In [None]:
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM nytimes_dataus_county ",
    QueryExecutionContext={"Database": SCHEMA_NAME},
    ResultConfiguration={
        "OutputLocation" : S3_STAGING_DIR,
        "EncryptionConfiguration":{"EncryptionOption":"SSE_S3"},
    },
)

nytimes_data_in_us_county = download_and_load_query_results(athena_client, response)

In [None]:
nytimes_data_in_us_county.head()

In [None]:
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM nytimes_dataus_states ",
    QueryExecutionContext={"Database": SCHEMA_NAME},
    ResultConfiguration={
        "OutputLocation" : S3_STAGING_DIR,
        "EncryptionConfiguration":{"EncryptionOption":"SSE_S3"},
    },
)

nytimes_data_in_us_states = download_and_load_query_results(athena_client, response)

In [None]:
nytimes_data_in_us_states.head()

In [None]:
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM testing_data_states_dailystates_daily",
    QueryExecutionContext={"Database": SCHEMA_NAME},
    ResultConfiguration={
        "OutputLocation" : S3_STAGING_DIR,
        "EncryptionConfiguration":{"EncryptionOption":"SSE_S3"},
    },
)

rearc_covid_19_testing_data_states_daily = download_and_load_query_results(athena_client, response)

In [None]:
rearc_covid_19_testing_data_states_daily.head()

In [None]:
rearc_covid_19_testing_data_states_daily.info()

In [None]:
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM testing_data_us_dailyus_daily ",
    QueryExecutionContext={"Database": SCHEMA_NAME},
    ResultConfiguration={
        "OutputLocation" : S3_STAGING_DIR,
        "EncryptionConfiguration":{"EncryptionOption":"SSE_S3"},
    },
)

rearc_covid_19_testing_data_us_daily = download_and_load_query_results(athena_client, response)

In [None]:
rearc_covid_19_testing_data_us_daily.head()

In [None]:
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM testing_data_us_total_latestus_total_latest ",
    QueryExecutionContext={"Database": SCHEMA_NAME},
    ResultConfiguration={
        "OutputLocation" : S3_STAGING_DIR,
        "EncryptionConfiguration":{"EncryptionOption":"SSE_S3"},
    },
)

rearc_covid_19_testing_data_us_total_latest = download_and_load_query_results(athena_client, response)

In [None]:
rearc_covid_19_testing_data_us_total_latest.head()

In [None]:
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM rearc_rearc_usa_hospital_beds",
    QueryExecutionContext={"Database": SCHEMA_NAME},
    ResultConfiguration={
        "OutputLocation" : S3_STAGING_DIR,
        "EncryptionConfiguration":{"EncryptionOption":"SSE_S3"},
    },
)

rearc_usa_hospital_beds = download_and_load_query_results(athena_client, response)

In [None]:
rearc_usa_hospital_beds.head()

In [None]:
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM static_datasets_countrycodecountrycode ",
    QueryExecutionContext={"Database": SCHEMA_NAME},
    ResultConfiguration={
        "OutputLocation" : S3_STAGING_DIR,
        "EncryptionConfiguration":{"EncryptionOption":"SSE_S3"},
    },
)

static_dataset_country_code = download_and_load_query_results(athena_client, response)

In [None]:
static_dataset_country_code.head()


In [None]:
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM countypopulation ",
    QueryExecutionContext={"Database": SCHEMA_NAME},
    ResultConfiguration={
        "OutputLocation" : S3_STAGING_DIR,
        "EncryptionConfiguration":{"EncryptionOption":"SSE_S3"},
    },
)

static_dataset_county_population = download_and_load_query_results(athena_client, response)

In [None]:
static_dataset_county_population.head()

In [None]:
response = athena_client.start_query_execution(
    QueryString='SELECT * FROM "static_dataset_state-abvstate_abv"',
    QueryExecutionContext={"Database": SCHEMA_NAME},
    ResultConfiguration={
        "OutputLocation" : S3_STAGING_DIR,
        "EncryptionConfiguration":{"EncryptionOption":"SSE_S3"},
    },
)

static_dataset_state_abv = download_and_load_query_results(athena_client, response)

In [None]:
static_dataset_state_abv.head()

In [None]:
new_header = static_dataset_state_abv.iloc[0]

In [None]:
static_dataset_state_abv = static_dataset_state_abv[1:]
static_dataset_state_abv.columns = new_header

In [None]:
static_dataset_state_abv.head()

In [None]:
factCovid_1 = enigma_jhud[['fips','province_state','country_region','confirmed','deaths','recovered','active']]
factCovid_2 = rearc_covid_19_testing_data_states_daily[['fips','date','positive','negative','hospitalizedcurrently','hospitalized','hospitalizeddischarged']]
factCovid = pd.merge(factCovid_1,factCovid_2,on='fips',how='inner')

In [None]:
factCovid.head()

In [None]:
factCovid.shape


In [None]:
dimRegion_1 = enigma_jhud[['fips','province_state','country_region','latitude','longitude']]
dimRegion_2 = nytimes_data_in_us_county[['fips','county','state']]
dimRegion = pd.merge(dimRegion_1, dimRegion_2, on='fips',how='inner')

In [None]:
dimRegion.head()

In [None]:
dimHospitalBeds = rearc_usa_hospital_beds[['fips','state_name','latitude','longitude','hq_address','hospital_name','hospital_type','hq_city','hq_state']]

In [None]:
dimHospitalBeds.head()

In [None]:
dimDate = rearc_covid_19_testing_data_states_daily[['fips','date']]

In [None]:
dimDate.head()

In [None]:
dimDate['date'] = pd.to_datetime(dimDate['date'], format='%Y%m%d')

In [None]:
dimDate.head()

In [None]:
dimDate['year'] = dimDate['date'].dt.year

In [None]:
dimDate['month'] = dimDate['date'].dt.month
dimDate['day_of_week'] = dimDate['date'].dt.dayofweek

In [None]:
dimDate.head()

In [None]:
bucket = 'mycovidproject'
csv_buffer = StringIO()


In [None]:
csv_buffer

In [None]:
dimRegion.to_csv(csv_buffer)


In [None]:
s3_resource = boto3.resource(
        "s3",
        aws_access_key_id=AWS_ACCESS_KEY,
        aws_secret_access_key=AWS_SECRET_KEY,
        region_name = AWS_REGION,# I assume you know how to provide credentials etc.
)


s3_resource.Object(bucket, 'output/factCovid.csv').put(Body=csv_buffer.getvalue())

In [None]:
s3_resource.Object(bucket, 'output/dimHospitalBeds.csv').put(Body=csv_buffer.getvalue())
s3_resource.Object(bucket, 'output/dimDate.csv').put(Body=csv_buffer.getvalue())
s3_resource.Object(bucket, 'output/dimRegion.csv').put(Body=csv_buffer.getvalue())

In [None]:
dimRegion.head()

In [None]:
dimRegion.info()

In [None]:
dimRegion.shape

In [None]:
print(type(dimRegion))

In [None]:
dimDatesql = pd.io.sql.get_schema(dimDate.reset_index(),'dimDate')
print(''.join(dimDatesql))

In [None]:
dimRegionsql = pd.io.sql.get_schema(dimRegion.reset_index(),'dimRegion')
print(''.join(dimRegionsql))

In [None]:
dimHospitalBedssql = pd.io.sql.get_schema(dimHospitalBeds.reset_index(),'dimHospitalBeds')
print(''.join(dimHospitalBedssql))

In [None]:
factCovidsql = pd.io.sql.get_schema(factCovid.reset_index(),'factCovid')
print(''.join(factCovidsql))

In [None]:
!pip install redshift_connector

In [None]:
import redshift_connector

In [None]:


# Redshift Connection Details
HOST = "default-workgroup.009160055743.ap-south-1.redshift-serverless.amazonaws.com"
PORT = 5439
DATABASE = "dev"  # Or another database name
USER = "awsuser"  # Redshift username
PASSWORD = "Ak_160403"  # Redshift password

# Connect to Redshift

conn = redshift_connector.connect(
    host=HOST,
    database=DATABASE,
    user=USER,
    password=PASSWORD
)
    


In [None]:
conn.autocommit = True

In [None]:
cursor = redshift_connector.Cursor = conn.cursor()

In [None]:
cursor.execute("""
CREATE TABLE "dimDate" (
"index" INTEGER,
  "fips" REAL,
  "date" TIMESTAMP,
  "year" INTEGER,
  "month" INTEGER,
  "day_of_week" INTEGER
)
""")

In [None]:
cursor.execute("""
CREATE TABLE "dimRegion" (
"index" INTEGER,
  "fips" REAL,
  "province_state" TEXT,
  "country_region" TEXT,
  "latitude" REAL,
  "longitude" REAL,
  "county" TEXT,
  "state" TEXT
)
""")

In [None]:
cursor.execute("""
CREATE TABLE "dimHospitalBeds" (
"index" INTEGER,
  "fips" INTEGER,
  "state_name" TEXT,
  "latitude" REAL,
  "longitude" REAL,
  "hq_address" TEXT,
  "hospital_name" TEXT,
  "hospital_type" TEXT,
  "hq_city" TEXT,
  "hq_state" TEXT
)
""")

In [None]:
cursor.execute("""
CREATE TABLE "factCovid" (
"index" INTEGER,
  "fips" REAL,
  "province_state" TEXT,
  "country_region" TEXT,
  "confirmed" REAL,
  "deaths" REAL,
  "recovered" REAL,
  "active" REAL,
  "date" INTEGER,
  "positive" INTEGER,
  "negative" REAL,
  "hospitalizedcurrently" REAL,
  "hospitalized" REAL,
  "hospitalizeddischarged" REAL
)
""")

In [None]:


# Redshift Connection Details
HOST = "second-workgrp.009160055743.ap-south-1.redshift-serverless.amazonaws.com"
PORT = 5439
DATABASE = "dev"  # Or another database name
USER = "akash"  # Redshift username
PASSWORD = "Akashsecond2"  # Redshift password

# Connect to Redshift

conn = redshift_connector.connect(
    host=HOST,
    database=DATABASE,
    user=USER,
    password=PASSWORD
)
    


In [None]:
conn.autocommit = True

In [None]:
cursor = redshift_connector.Cursor = conn.cursor()