In [30]:
%pip install boto3
%pip install pandas
%pip install redshift_connector

Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Collecting redshift_connector
  Obtaining dependency information for redshift_connector from https://files.pythonhosted.org/packages/dc/9a/75304eee1b65f1f206022220c6377fbad2f22669d52a3a8943a2851e3113/redshift_connector-2.0.914-py3-none-any.whl.metadata
  Downloading redshift_connector-2.0.914-py3-none-any.whl.metadata (68 kB)
     ---------------------------------------- 0.0/68.1 kB ? eta -:--:--
     ------ --------------------------------- 10.2/68.1 kB ? eta -:--:--
     -------------------------------------- 68.1/68.1 kB 741.9 kB/s eta 0:00:00
Collecting scramp<1.5.0,>=1.2.0 (from redshift_connector)
  Downloading scramp-1.4.4-py3-none-any.whl (13 kB)
Collecting beautifulsoup4<5.0.0,>=4.7.0 (from redshift_connector)
  Downloading beautifulsoup4-4.12.2-py3-none-any.whl (142 kB)
     ---------------------------------------- 0.0/143.0 kB ? eta -:--:--
    

In [1]:
import boto3
import pandas as pd
import time
from io import StringIO



In [2]:
keys_variables = {}
with open('../credentials.tfvars', 'r') as file :
    tvars_content = file.read()

for line in tvars_content.splitlines():
    if '=' in line:
        key, value = line.split('=')
        key = key.strip()
        value = value.strip()
        keys_variables[key] = value

In [3]:
AWS_ACCESS_KEY = ''
AWS_SECRET_KEY = ''
AWS_REGION = 'us-east-1'
SCHEMA_NAME = 'covid_19'
S3_STAGING = "s3://de-atilla-staging/output" 
S3_BUCKET_NAME = "de-atilla-staging"
S3_BUCKET_DIRECTORY = "output"


In [4]:
athena_client = boto3.client(
    'athena', 
    aws_access_key_id = AWS_ACCESS_KEY,
    aws_secret_access_key = AWS_SECRET_KEY,
    region_name = AWS_REGION
)

In [5]:
Dict = {}
def download_and_load_results(
        client:boto3.client, query_response:Dict
        ) -> pd.DataFrame:
    while True:
        try:
            client.get_query_results(
                QueryExecutionId = query_response['QueryExecutionId']
            )
            break
        except Exception as err:
            if 'not yet finished' in str(err):
                time.sleep(0.001)

            else:
                raise err
            
    temp_file_location: str = 'athena_query_results.csv'    
    s3_client = boto3.client(
    's3', 
    aws_access_key_id = AWS_ACCESS_KEY,
    aws_secret_access_key = AWS_SECRET_KEY,
    region_name = AWS_REGION
    )
    s3_client.download_file(
        S3_BUCKET_NAME,
        f"{S3_BUCKET_DIRECTORY}/{query_response['QueryExecutionId']}.csv",
        temp_file_location
    )
    return pd.read_csv(temp_file_location)

        

In [6]:
def get_table(table:str) -> pd.DataFrame:
    response = athena_client.start_query_execution(
    QueryString= f'SELECT * FROM {table}',
    QueryExecutionContext = {'Database': SCHEMA_NAME},
    ResultConfiguration = {
        'OutputLocation' : S3_STAGING,
        'EncryptionConfiguration': {'EncryptionOption': 'SSE_S3'} 
    },
    )
    df_table = download_and_load_results(athena_client, response)
    return df_table
    

In [13]:
enigma_jhud = get_table('enigma_jhud')
nytimes_data_in_usa_us_states = get_table('nytimes_data_in_usa_us_states')
nytimes_data_in_usa_us_county = get_table('nytimes_data_in_usa_us_county')
rearc_covid_19_testing_us_daily = get_table('rearc_covid_19_testing_us_daily')
rearc_covid_19_testing_us_states_daily = get_table('rearc_covid_19_testing_us_states_states_daily')
rearc_covid_19_testing_us_total_latest = get_table('rearc_covid_19_testing_us_total_latest')
rearc_usa_hospital_beds = get_table('rearc_usa_hospital_beds')
static_datasets_countrycode = get_table('static_datasets_countrycode')
static_datasets_countypopulation = get_table('static_datasets_countypopulation')
static_datasets_state_abv = get_table('static_datasets_state_abv')

In [14]:

static_datasets_state_abv.head()

Unnamed: 0,col0,col1
0,State,Abbreviation
1,Alabama,AL
2,Alaska,AK
3,Arizona,AZ
4,Arkansas,AR


In [15]:
new_header = static_datasets_state_abv.iloc[0]
static_datasets_state_abv = static_datasets_state_abv[1:]
static_datasets_state_abv.columns = new_header
static_datasets_state_abv.head()

Unnamed: 0,State,Abbreviation
1,Alabama,AL
2,Alaska,AK
3,Arizona,AZ
4,Arkansas,AR
5,California,CA


In [16]:
fact_covid_1 = enigma_jhud[['fips', 'province_state', 'country_region', 'confirmed' ,'deaths', 'recovered', 'active'  ]]
fact_covid_2 = rearc_covid_19_testing_us_states_daily[['fips', 'date', 'positive', 'negative', 'hospitalizedcurrently', 'hospitalized', 'hospitalizedcumulative']]
fact_covid = pd.merge(fact_covid_1, fact_covid_2, on='fips', how='inner')
fact_covid.head()

Unnamed: 0,fips,province_state,country_region,confirmed,deaths,recovered,active,date,positive,negative,hospitalizedcurrently,hospitalized,hospitalizedcumulative
0,,Anhui,China,1.0,,,,20210119,289939,,1066.0,,
1,,Beijing,China,14.0,,,,20210119,289939,,1066.0,,
2,,Chongqing,China,6.0,,,,20210119,289939,,1066.0,,
3,,Fujian,China,1.0,,,,20210119,289939,,1066.0,,
4,,Gansu,China,,,,,20210119,289939,,1066.0,,


In [17]:
dim_region_1 = enigma_jhud[['fips', 'province_state', 'country_region','latitude', 'longitude']]
dim_region_2 = nytimes_data_in_usa_us_county[['fips', 'county', 'state']]
dim_region = pd.merge(fact_covid_1, fact_covid_2, on='fips', how='inner')
dim_region.dropna(subset=['fips'],inplace=True)
dim_region

Unnamed: 0,fips,province_state,country_region,confirmed,deaths,recovered,active,date,positive,negative,hospitalizedcurrently,hospitalized,hospitalizedcumulative
24440,72.0,Puerto Rico,US,3.0,0.0,0.0,,20210307,101327,305972.0,147.0,,
24441,72.0,Puerto Rico,US,3.0,0.0,0.0,,20210306,101327,305972.0,147.0,,
24442,72.0,Puerto Rico,US,3.0,0.0,0.0,,20210305,101066,305972.0,136.0,,
24443,72.0,Puerto Rico,US,3.0,0.0,0.0,,20210304,100867,305972.0,171.0,,
24444,72.0,Puerto Rico,US,3.0,0.0,0.0,,20210303,100765,305972.0,169.0,,
...,...,...,...,...,...,...,...,...,...,...,...,...,...
27987,72.0,Puerto Rico,US,3647.0,132.0,0.0,3515.0,20210123,90073,305972.0,325.0,,
27988,72.0,Puerto Rico,US,3647.0,132.0,0.0,3515.0,20210122,89282,305972.0,341.0,,
27989,72.0,Puerto Rico,US,3647.0,132.0,0.0,3515.0,20210121,88728,305972.0,344.0,,
27990,72.0,Puerto Rico,US,3647.0,132.0,0.0,3515.0,20210120,88513,305972.0,331.0,,


In [18]:
dim_hospital = rearc_usa_hospital_beds[['fips', 'state_name', 'latitude', 'longtitude','hq_address', 'hospital_name', 'hospital_type', 'hq_city','hq_state']]

In [19]:
dim_date = rearc_covid_19_testing_us_states_daily[['fips', 'date']]

In [20]:
dim_date['date'] = pd.to_datetime(dim_date['date'], format='%Y%m%d')
dim_date.head()

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  dim_date['date'] = pd.to_datetime(dim_date['date'], format='%Y%m%d')


Unnamed: 0,fips,date
0,2.0,2021-03-07
1,1.0,2021-03-07
2,5.0,2021-03-07
3,60.0,2021-03-07
4,4.0,2021-03-07


In [21]:
dim_date['year'] = dim_date['date'].dt.year
dim_date['month']= dim_date['date'].dt.month
dim_date['day_of_week']  = dim_date['date'].dt.dayofweek
dim_date.head()

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  dim_date['year'] = dim_date['date'].dt.year
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  dim_date['month']= dim_date['date'].dt.month
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  dim_date['day_of_week']  = dim_date['date'].dt.dayofweek


Unnamed: 0,fips,date,year,month,day_of_week
0,2.0,2021-03-07,2021,3,6
1,1.0,2021-03-07,2021,3,6
2,5.0,2021-03-07,2021,3,6
3,60.0,2021-03-07,2021,3,6
4,4.0,2021-03-07,2021,3,6


In [24]:
def put_in_bucket(df:pd.DataFrame, df_name:str):
    output_bucket = 'de-atilla-covid-data-v2'
    csv_buffer = StringIO()
    df.to_csv(csv_buffer)
    s3_resource = boto3.resource('s3',aws_access_key_id= AWS_ACCESS_KEY, aws_secret_access_key= AWS_SECRET_KEY)
    s3_resource.Object(output_bucket, f'output/{df_name}.csv').put(Body=csv_buffer.getvalue())



In [25]:
put_in_bucket(fact_covid, 'fact_covid')
put_in_bucket(dim_region, 'dim_region')
put_in_bucket(dim_hospital, 'dim_hospital')
put_in_bucket(dim_date, 'dim_date')

In [26]:
dim_date_sql  = pd.io.sql.get_schema(dim_date.reset_index(), 'dim_date')
dim_date_sql = str(''.join(dim_date_sql))
print(dim_date_sql)

CREATE TABLE "dim_date" (
"index" INTEGER,
  "fips" REAL,
  "date" TIMESTAMP,
  "year" INTEGER,
  "month" INTEGER,
  "day_of_week" INTEGER
)


In [27]:
fact_covid_sql  = pd.io.sql.get_schema(fact_covid.reset_index(), 'fact_covid')
fact_covid_sql = str(''.join(fact_covid_sql))
print(fact_covid_sql)

CREATE TABLE "fact_covid" (
"index" INTEGER,
  "fips" REAL,
  "province_state" TEXT,
  "country_region" TEXT,
  "confirmed" REAL,
  "deaths" REAL,
  "recovered" REAL,
  "active" REAL,
  "date" INTEGER,
  "positive" INTEGER,
  "negative" REAL,
  "hospitalizedcurrently" REAL,
  "hospitalized" REAL,
  "hospitalizedcumulative" REAL
)


In [28]:
dim_region_sql  = pd.io.sql.get_schema(dim_region.reset_index(), 'dim_region')
dim_region_sql = str(''.join(dim_region_sql))
print(dim_region_sql)

CREATE TABLE "dim_region" (
"index" INTEGER,
  "fips" REAL,
  "province_state" TEXT,
  "country_region" TEXT,
  "confirmed" REAL,
  "deaths" REAL,
  "recovered" REAL,
  "active" REAL,
  "date" INTEGER,
  "positive" INTEGER,
  "negative" REAL,
  "hospitalizedcurrently" REAL,
  "hospitalized" REAL,
  "hospitalizedcumulative" REAL
)


In [43]:
dim_hospital_sql  = pd.io.sql.get_schema(dim_hospital.reset_index(), 'dim_hospital')
dim_hospital_sql = str(''.join(dim_hospital_sql))
print(dim_hospital_sql)

CREATE TABLE "dim_hospital" (
"index" INTEGER,
  "fips" REAL,
  "state_name" TEXT,
  "latitude" REAL,
  "longtitude" REAL,
  "hq_address" TEXT,
  "hospital_name" TEXT,
  "hospital_type" TEXT,
  "hq_city" TEXT,
  "hq_state" TEXT
)


In [53]:
import redshift_connector

conn = redshift_connector.connect(
    host = 'de-covid-cluster.coe8joravz9x.us-east-1.redshift.amazonaws.com',
    database='dev',
    port=5439,
    user='awsuser',
    password='Passw0rd123!', 

)


In [54]:
conn.autocommit = True
cursor = redshift_connector.Cursor = conn.cursor()

In [39]:
cursor.execute(dim_date_sql)

<redshift_connector.cursor.Cursor at 0x277953e5690>

In [40]:
cursor.execute(fact_covid_sql)

<redshift_connector.cursor.Cursor at 0x277953e5690>

In [41]:
cursor.execute(dim_region_sql)

<redshift_connector.cursor.Cursor at 0x277953e5690>

In [44]:
cursor.execute(dim_hospital_sql)

<redshift_connector.cursor.Cursor at 0x277953e5690>

In [55]:
def fill_tables(table:str):
    cursor.execute(f"""
    COPY  {table} from 's3://de-atilla-covid-data-v2/output/{table}.csv'
    CREDENTIALS  'aws_iam_role=arn:aws:iam::750431063106:role/redshift-s3'
    DELIMITER  ','
    REGION  'us-east-1'
    IGNOREHEADER 1
    """)



In [56]:
fill_tables('dim_date')
fill_tables('fact_covid')
fill_tables('dim_region')
fill_tables('dim_hospital')