Importing all required libraries

In [1]:
import boto3
import pandas as pd
from io import StringIO # python3; #python2: Bytes10

In [2]:
import configparser
config = configparser.ConfigParser()
config.read_file(open('cluster.config'))

In [76]:
AWS_ACCESS_KEY = config.get("AWS", "KEY")
AWS_SECRET_KEY = config.get("AWS", "SECRET")
AWS_REGION = "ap-south-1"
SCHEMA_NAME = "covid_19"
S3_STAGING_DIR = "s3://covid-de-output/output/"
S3_BUCKET_NAME = "covid-de-output"
S3_OUTPUT_DIRECTORY = "output"

Connecting to athena & quering data

In [4]:
athena_client = boto3.client(
    "athena",
    aws_access_key_id = AWS_ACCESS_KEY,
    aws_secret_access_key = AWS_SECRET_KEY,
    region_name = AWS_REGION)

In [29]:
import time
Dict = {}
def download_and_load_query_results(
    client: boto3.client, query_response: Dict
) -> pd.DataFrame:
    while True:
        try:
            #this func only loads the first 1000 rows
            client.get_query_results(
                QueryExecutionId = query_response["QueryExecutionId"]
            )
            break
        except Exception as err:
            if "not yet finished" in str(err):
                time.sleep(0.001)
            else:
                raise err
    temp_file_location: str = "athena_query_results.csv"
    s3_client = boto3.client(
    "s3",
    aws_access_key_id = AWS_ACCESS_KEY,
    aws_secret_access_key = AWS_SECRET_KEY,
    region_name = AWS_REGION,
    )
    s3_client.download_file(
    S3_BUCKET_NAME,
    f"{S3_OUTPUT_DIRECTORY}/{query_response['QueryExecutionId']}.csv",
    temp_file_location,
    )
    return pd.read_csv(temp_file_location)


In [6]:
response = athena_client.start_query_execution(
    QueryString = "SELECT * FROM enigma_jhud_csv",
    QueryExecutionContext = {"Database": SCHEMA_NAME},
    ResultConfiguration = {
        "OutputLocation": S3_STAGING_DIR,
        "EncryptionConfiguration": {"EncryptionOption" : "SSE_S3"},
    },
)

In [7]:
response

{'QueryExecutionId': 'd9e74cff-aa9c-435e-b9f2-81ece83800b7',
 'ResponseMetadata': {'RequestId': 'd1b79059-4a4c-463a-9ba0-4969e168f55b',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'date': 'Thu, 27 Oct 2022 09:09:52 GMT',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '59',
   'connection': 'keep-alive',
   'x-amzn-requestid': 'd1b79059-4a4c-463a-9ba0-4969e168f55b'},
  'RetryAttempts': 0}}

In [8]:
enigma_jhud = download_and_load_query_results(athena_client, response)

In [9]:
enigma_jhud.head()

Unnamed: 0,fips,admin2,province_state,country_region,last_update,latitude,longitude,confirmed,deaths,recovered,active,combined_key
0,,,Anhui,China,2020-01-22T17:00:00,31.826,117.226,1.0,,,,"""Anhui"
1,,,Beijing,China,2020-01-22T17:00:00,40.182,116.414,14.0,,,,"""Beijing"
2,,,Chongqing,China,2020-01-22T17:00:00,30.057,107.874,6.0,,,,"""Chongqing"
3,,,Fujian,China,2020-01-22T17:00:00,26.079,117.987,1.0,,,,"""Fujian"
4,,,Gansu,China,2020-01-22T17:00:00,36.061,103.834,,,,,"""Gansu"


In [30]:
response = athena_client.start_query_execution(
    QueryString = "SELECT * FROM nytimes_data_in_usa_us_county",
    QueryExecutionContext = {"Database": SCHEMA_NAME},
    ResultConfiguration = {
        "OutputLocation": S3_STAGING_DIR,
        "EncryptionConfiguration": {"EncryptionOption" : "SSE_S3"},
    },
)


In [144]:
nytimes_data_in_usa_us_county = download_and_load_query_results(athena_client, response)
nytimes_data_in_usa_us_county.head()

Unnamed: 0,date,state,positive,probablecases,negative,pending,totaltestresultssource,totaltestresults,hospitalizedcurrently,hospitalizedcumulative,...,dataqualitygrade,deathincrease,hospitalizedincrease,hash,commercialscore,negativeregularscore,negativescore,positivescore,score,grade
0,20210307,AK,56886,,,,totalTestsViral,1731628,33.0,1293.0,...,,0.0,0.0,dc4bccd4bb885349d7e94d6fed058e285d4be164,0.0,0.0,0.0,0.0,0.0,
1,20210307,AL,499819,107742.0,1931711.0,,totalTestsPeopleViral,2323788,494.0,45976.0,...,,-1.0,0.0,997207b430824ea40b8eb8506c19a93e07bc972e,0.0,0.0,0.0,0.0,0.0,
2,20210307,AR,324818,69092.0,2480716.0,,totalTestsViral,2736442,335.0,14926.0,...,,22.0,11.0,50921aeefba3e30d31623aa495b47fb2ecc72fae,0.0,0.0,0.0,0.0,0.0,
3,20210307,AS,0,,2140.0,,totalTestsViral,2140,,,...,,0.0,0.0,96d23f888c995b9a7f3b4b864de6414f45c728ff,0.0,0.0,0.0,0.0,0.0,
4,20210307,AZ,826454,56519.0,3073010.0,,totalTestsViral,7908105,963.0,57907.0,...,,5.0,44.0,0437a7a96f4471666f775e63e86923eb5cbd8cdf,0.0,0.0,0.0,0.0,0.0,


In [32]:
response = athena_client.start_query_execution(
    QueryString = "SELECT * FROM nytimes_data_in_usa_us_states",
    QueryExecutionContext = {"Database": SCHEMA_NAME},
    ResultConfiguration = {
        "OutputLocation": S3_STAGING_DIR,
        "EncryptionConfiguration": {"EncryptionOption" : "SSE_S3"},
    },
)


In [13]:
nytimes_data_in_usa_us_states = download_and_load_query_results(athena_client, response)
nytimes_data_in_usa_us_states

Unnamed: 0,date,state,fips,cases,deaths
0,2020-01-21,Washington,53,1,0
1,2020-01-22,Washington,53,1,0
2,2020-01-23,Washington,53,1,0
3,2020-01-24,Illinois,17,1,0
4,2020-01-24,Washington,53,1,0
...,...,...,...,...,...
3749,2020-05-09,Virginia,51,23196,827
3750,2020-05-09,Washington,53,17763,925
3751,2020-05-09,West Virginia,54,1347,53
3752,2020-05-09,Wisconsin,55,9939,398


In [53]:
response = athena_client.start_query_execution(
    QueryString = "SELECT * FROM rearc_covid_19_testing_data_states_daily",
    QueryExecutionContext = {"Database": SCHEMA_NAME},
    ResultConfiguration = {
        "OutputLocation": S3_STAGING_DIR,
        "EncryptionConfiguration": {"EncryptionOption" : "SSE_S3"},
    },
)

In [54]:
rearc_covid_19_testing_data_states_daily = download_and_load_query_results(athena_client, response)
rearc_covid_19_testing_data_states_daily.head()

Unnamed: 0,date,state,positive,probablecases,negative,pending,totaltestresultssource,totaltestresults,hospitalizedcurrently,hospitalizedcumulative,...,dataqualitygrade,deathincrease,hospitalizedincrease,hash,commercialscore,negativeregularscore,negativescore,positivescore,score,grade
0,20210307,AK,56886,,,,totalTestsViral,1731628,33.0,1293.0,...,,0.0,0.0,dc4bccd4bb885349d7e94d6fed058e285d4be164,0.0,0.0,0.0,0.0,0.0,
1,20210307,AL,499819,107742.0,1931711.0,,totalTestsPeopleViral,2323788,494.0,45976.0,...,,-1.0,0.0,997207b430824ea40b8eb8506c19a93e07bc972e,0.0,0.0,0.0,0.0,0.0,
2,20210307,AR,324818,69092.0,2480716.0,,totalTestsViral,2736442,335.0,14926.0,...,,22.0,11.0,50921aeefba3e30d31623aa495b47fb2ecc72fae,0.0,0.0,0.0,0.0,0.0,
3,20210307,AS,0,,2140.0,,totalTestsViral,2140,,,...,,0.0,0.0,96d23f888c995b9a7f3b4b864de6414f45c728ff,0.0,0.0,0.0,0.0,0.0,
4,20210307,AZ,826454,56519.0,3073010.0,,totalTestsViral,7908105,963.0,57907.0,...,,5.0,44.0,0437a7a96f4471666f775e63e86923eb5cbd8cdf,0.0,0.0,0.0,0.0,0.0,


In [34]:
response = athena_client.start_query_execution(
    QueryString = "SELECT * FROM rearc_covid_19_testing_data_us_daily",
    QueryExecutionContext = {"Database": SCHEMA_NAME},
    ResultConfiguration = {
        "OutputLocation": S3_STAGING_DIR,
        "EncryptionConfiguration": {"EncryptionOption" : "SSE_S3"},
    },
)

In [35]:
rearc_covid_19_testing_data_us_daily = download_and_load_query_results(athena_client, response)
rearc_covid_19_testing_data_us_daily.head()

Unnamed: 0,date,states,positive,negative,pending,hospitalizedcurrently,hospitalizedcumulative,inicucurrently,inicucumulative,onventilatorcurrently,...,lastmodified,recovered,total,posneg,deathincrease,hospitalizedincrease,negativeincrease,positiveincrease,totaltestresultsincrease,hash
0,20210307,56,28755524.0,74579770.0,11808.0,40212.0,878613.0,8137.0,45475.0,2801.0,...,2021-03-07T24:00:00Z,,0,0,839,726,130414,41265,1156241,8b26839690cd05c0cef69cb9ed85641a76b5e78e
1,20210306,56,28714259.0,74449356.0,11783.0,41401.0,877887.0,8409.0,45453.0,2811.0,...,2021-03-06T24:00:00Z,,0,0,1674,503,142201,59620,1409138,d0c0482ea549c9d5c04a7c86acb6fc6a8095a592
2,20210305,56,28654639.0,74307155.0,12213.0,42541.0,877384.0,8634.0,45373.0,2889.0,...,2021-03-05T24:00:00Z,,0,0,2221,2781,271917,68787,1744417,a35ea4289cec4bb55c9f29ae04ec0fd5ac4e0222
3,20210304,56,28585852.0,74035238.0,12405.0,44172.0,874603.0,8970.0,45293.0,2973.0,...,2021-03-04T24:00:00Z,,0,0,1743,1530,177957,65487,1590984,a19ad6379a653834cbda3093791ad2c3b9fab5ff
4,20210303,56,28520365.0,73857281.0,11778.0,45462.0,873073.0,9359.0,45214.0,3094.0,...,2021-03-03T24:00:00Z,,0,0,2449,2172,267001,66836,1406795,9e1d2afda1b0ec243060d6f68a7134d011c0cb2a


In [36]:
response = athena_client.start_query_execution(
    QueryString = "SELECT * FROM rearc_covid_19_testing_data_us_total_latest",
    QueryExecutionContext = {"Database": SCHEMA_NAME},
    ResultConfiguration = {
        "OutputLocation": S3_STAGING_DIR,
        "EncryptionConfiguration": {"EncryptionOption" : "SSE_S3"},
    },
)

In [37]:
rearc_covid_19_testing_data_us_total_latest = download_and_load_query_results(athena_client, response)
rearc_covid_19_testing_data_us_total_latest.head()

Unnamed: 0,positive,negative,pending,hospitalizedcurrently,hospitalizedcumulative,inicucurrently,inicucumulative,onventilatorcurrently,onventilatorcumulative,recovered,hash,lastmodified,death,hospitalized,total,totaltestresults,posneg,notes
0,1061101,5170081,2775,53793,111955,9486,4192,4712,373,153947,95064ba29ccbc20dbec397033dfe4b1f45137c99,2020-05-01T09:12:31.891Z,57266,111955,6233957,6231182,6231182,"""NOTE: """"total"""""


In [38]:
response = athena_client.start_query_execution(
    QueryString = "SELECT * FROM rearc_usa_hospital_beds",
    QueryExecutionContext = {"Database": SCHEMA_NAME},
    ResultConfiguration = {
        "OutputLocation": S3_STAGING_DIR,
        "EncryptionConfiguration": {"EncryptionOption" : "SSE_S3"},
    },
)

In [143]:
#rearc_usa_hospital_beds = download_and_load_query_results(athena_client, response)
#rearc_usa_hospital_beds.head()

In [40]:
response = athena_client.start_query_execution(
    QueryString = "SELECT * FROM static_datasets_countrycode",
    QueryExecutionContext = {"Database": SCHEMA_NAME},
    ResultConfiguration = {
        "OutputLocation": S3_STAGING_DIR,
        "EncryptionConfiguration": {"EncryptionOption" : "SSE_S3"},
    },
)

In [41]:
static_datasets_countrycode = download_and_load_query_results(athena_client, response)
static_datasets_countrycode.head()

Unnamed: 0,country,alpha-2 code,alpha-3 code,numeric code,latitude,longitude
0,Afghanistan,AF,AFG,4.0,33.0,65.0
1,Albania,AL,ALB,8.0,41.0,20.0
2,Algeria,DZ,DZA,12.0,28.0,3.0
3,American Samoa,AS,ASM,16.0,-14.3333,-170.0
4,Andorra,AD,AND,20.0,42.5,1.6


In [42]:
response = athena_client.start_query_execution(
    QueryString = "SELECT * FROM static_datasets_countypopulation",
    QueryExecutionContext = {"Database": SCHEMA_NAME},
    ResultConfiguration = {
        "OutputLocation": S3_STAGING_DIR,
        "EncryptionConfiguration": {"EncryptionOption" : "SSE_S3"},
    },
)

In [43]:
static_datasets_countypopulation = download_and_load_query_results(athena_client, response)
static_datasets_countypopulation.head()

Unnamed: 0,id,id2,county,state,population estimate 2018
0,0500000US01001,1001,Autauga,Alabama,55601
1,0500000US01003,1003,Baldwin,Alabama,218022
2,0500000US01005,1005,Barbour,Alabama,24881
3,0500000US01007,1007,Bibb,Alabama,22400
4,0500000US01009,1009,Blount,Alabama,57840


In [44]:
response = athena_client.start_query_execution(
    QueryString = "SELECT * FROM static_datasets_state_abv",
    QueryExecutionContext = {"Database": SCHEMA_NAME},
    ResultConfiguration = {
        "OutputLocation": S3_STAGING_DIR,
        "EncryptionConfiguration": {"EncryptionOption" : "SSE_S3"},
    },
)

In [45]:
static_datasets_state_abv = download_and_load_query_results(athena_client, response)
static_datasets_state_abv.head()

Unnamed: 0,col0,col1
0,State,Abbreviation
1,Alabama,AL
2,Alaska,AK
3,Arizona,AZ
4,Arkansas,AR


Data cleaning

In [46]:
new_header = static_datasets_state_abv.iloc[0] #grab first row for the header

In [47]:
new_header

col0           State
col1    Abbreviation
Name: 0, dtype: object

In [48]:
static_datasets_state_abv = static_datasets_state_abv[1:] #storing data starting from 2nd row

In [49]:
static_datasets_state_abv.head()

Unnamed: 0,col0,col1
1,Alabama,AL
2,Alaska,AK
3,Arizona,AZ
4,Arkansas,AR
5,California,CA


In [50]:
static_datasets_state_abv.columns = new_header #setting the header row as the df header

In [51]:
static_datasets_state_abv.head()

Unnamed: 0,State,Abbreviation
1,Alabama,AL
2,Alaska,AK
3,Arizona,AZ
4,Arkansas,AR
5,California,CA


ETL job in python

In [55]:
factCovid1 = enigma_jhud[['fips', 'province_state', 'country_region', 'confirmed','deaths', 'recovered', 'active']]
factCovid2 = rearc_covid_19_testing_data_states_daily[['fips', 'date', 'positive', 'negative', 'hospitalizedcurrently', 'hospitalized', 'hospitalizeddischarged']]
factCovid = pd.merge(factCovid1, factCovid2, on = 'fips', how = 'inner')

In [56]:
factCovid.shape

(9063, 13)

In [57]:
dimRegion1 = enigma_jhud[['fips', 'province_state', 'country_region', 'latitude', 'longitude']]
dimRegion2 = nytimes_data_in_usa_us_county[['fips', 'county', 'state']]
dimRegion = pd.merge(dimRegion1, dimRegion2, on = 'fips', how = 'inner')

In [58]:
dimDate = rearc_covid_19_testing_data_states_daily[['fips', 'date']]

In [59]:
dimDate.head()

Unnamed: 0,fips,date
0,2.0,20210307
1,1.0,20210307
2,5.0,20210307
3,60.0,20210307
4,4.0,20210307


In [61]:
dimDate['date'] = pd.to_datetime(dimDate['date'], format = '%Y%M%d')

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  dimDate['date'] = pd.to_datetime(dimDate['date'], format = '%Y%M%d')


In [62]:
dimDate.head()

Unnamed: 0,fips,date
0,2.0,2021-01-07 00:03:00
1,1.0,2021-01-07 00:03:00
2,5.0,2021-01-07 00:03:00
3,60.0,2021-01-07 00:03:00
4,4.0,2021-01-07 00:03:00


In [63]:
dimDate['year'] = dimDate['date'].dt.year
dimDate['month'] = dimDate['date'].dt.month
dimDate['day_of_week'] = dimDate['date'].dt.dayofweek

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  dimDate['year'] = dimDate['date'].dt.year
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  dimDate['month'] = dimDate['date'].dt.month
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  dimDate['day_of_week'] = dimDate['date'].dt.dayofweek


In [64]:
dimDate.head()

Unnamed: 0,fips,date,year,month,day_of_week
0,2.0,2021-01-07 00:03:00,2021,1,3
1,1.0,2021-01-07 00:03:00,2021,1,3
2,5.0,2021-01-07 00:03:00,2021,1,3
3,60.0,2021-01-07 00:03:00,2021,1,3
4,4.0,2021-01-07 00:03:00,2021,1,3


Saving results in s3

In [65]:
bucket = 'shyan-covid19-de-project' 

In [87]:
csv_buffer = StringIO()

In [88]:
csv_buffer

<_io.StringIO at 0x1c8981613f0>

In [73]:
factCovid.to_csv(csv_buffer)

In [77]:
s3_resource = boto3.resource(
    's3',
    aws_access_key_id = AWS_ACCESS_KEY,
    aws_secret_access_key = AWS_SECRET_KEY,
    region_name = AWS_REGION)
     
s3_resource.Object(bucket, 'output/factCovid.csv').put(Body = csv_buffer.getvalue())

{'ResponseMetadata': {'RequestId': 'C58WB5KS0NXES06Q',
  'HostId': 'hvcbEcHsJogl+UXtrd68xk1Km5E9/Tg6Jk0LAgBdX5Lr46KYOeBVlWUbi5n3E/b2vGVzZCZmEi3GjKLz+4CmCQ==',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amz-id-2': 'hvcbEcHsJogl+UXtrd68xk1Km5E9/Tg6Jk0LAgBdX5Lr46KYOeBVlWUbi5n3E/b2vGVzZCZmEi3GjKLz+4CmCQ==',
   'x-amz-request-id': 'C58WB5KS0NXES06Q',
   'date': 'Thu, 27 Oct 2022 10:12:20 GMT',
   'etag': '"be18bd30222ad38773ae3bdbc4481a3f"',
   'server': 'AmazonS3',
   'content-length': '0'},
  'RetryAttempts': 0},
 'ETag': '"be18bd30222ad38773ae3bdbc4481a3f"'}

In [72]:
csv_buffer.getvalue()

',fips,province_state,country_region,confirmed,deaths,recovered,active,date,positive,negative,hospitalizedcurrently,hospitalized,hospitalizeddischarged\r\n0,,Anhui,China,1.0,,,,20210119,289939,,1066.0,,\r\n1,,Beijing,China,14.0,,,,20210119,289939,,1066.0,,\r\n2,,Chongqing,China,6.0,,,,20210119,289939,,1066.0,,\r\n3,,Fujian,China,1.0,,,,20210119,289939,,1066.0,,\r\n4,,Gansu,China,,,,,20210119,289939,,1066.0,,\r\n5,,Guangdong,China,26.0,,,,20210119,289939,,1066.0,,\r\n6,,Guangxi,China,2.0,,,,20210119,289939,,1066.0,,\r\n7,,Guizhou,China,1.0,,,,20210119,289939,,1066.0,,\r\n8,,Hai,China,4.0,,,,20210119,289939,,1066.0,,\r\n9,,Hebei,China,1.0,,,,20210119,289939,,1066.0,,\r\n10,,Heilongjiang,China,,,,,20210119,289939,,1066.0,,\r\n11,,He,China,5.0,,,,20210119,289939,,1066.0,,\r\n12,,Hong Kong,Hong Kong,,,,,20210119,289939,,1066.0,,\r\n13,,Hubei,China,444.0,17.0,28.0,,20210119,289939,,1066.0,,\r\n14,,Hu,China,4.0,,,,20210119,289939,,1066.0,,\r\n15,,Inner Mongolia,China,,,,,20210119,289939,,1066

In [91]:
csv_buffer = StringIO()
dimDate.to_csv(csv_buffer)
s3_resource.Object(bucket, 'output/dimDate.csv').put(Body = csv_buffer.getvalue())

{'ResponseMetadata': {'RequestId': 'FJ5KGQXZTJ7H1SSR',
  'HostId': 'Pys+S/JgDKga5nlW1vrVVnCSnjA0wrdOcpTYQHpeVkKbZTlELod6F37hzagYGvEuJVS1J5cmltQ=',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amz-id-2': 'Pys+S/JgDKga5nlW1vrVVnCSnjA0wrdOcpTYQHpeVkKbZTlELod6F37hzagYGvEuJVS1J5cmltQ=',
   'x-amz-request-id': 'FJ5KGQXZTJ7H1SSR',
   'date': 'Thu, 27 Oct 2022 10:28:18 GMT',
   'etag': '"51523cb7724fb82b9a8358671b738531"',
   'server': 'AmazonS3',
   'content-length': '0'},
  'RetryAttempts': 0},
 'ETag': '"51523cb7724fb82b9a8358671b738531"'}

In [92]:
csv_buffer.getvalue()

',fips,date,year,month,day_of_week\r\n0,2.0,2021-01-07 00:03:00,2021,1,3\r\n1,1.0,2021-01-07 00:03:00,2021,1,3\r\n2,5.0,2021-01-07 00:03:00,2021,1,3\r\n3,60.0,2021-01-07 00:03:00,2021,1,3\r\n4,4.0,2021-01-07 00:03:00,2021,1,3\r\n5,6.0,2021-01-07 00:03:00,2021,1,3\r\n6,8.0,2021-01-07 00:03:00,2021,1,3\r\n7,9.0,2021-01-07 00:03:00,2021,1,3\r\n8,11.0,2021-01-07 00:03:00,2021,1,3\r\n9,10.0,2021-01-07 00:03:00,2021,1,3\r\n10,12.0,2021-01-07 00:03:00,2021,1,3\r\n11,13.0,2021-01-07 00:03:00,2021,1,3\r\n12,66.0,2021-01-07 00:03:00,2021,1,3\r\n13,15.0,2021-01-07 00:03:00,2021,1,3\r\n14,19.0,2021-01-07 00:03:00,2021,1,3\r\n15,16.0,2021-01-07 00:03:00,2021,1,3\r\n16,17.0,2021-01-07 00:03:00,2021,1,3\r\n17,18.0,2021-01-07 00:03:00,2021,1,3\r\n18,20.0,2021-01-07 00:03:00,2021,1,3\r\n19,21.0,2021-01-07 00:03:00,2021,1,3\r\n20,22.0,2021-01-07 00:03:00,2021,1,3\r\n21,25.0,2021-01-07 00:03:00,2021,1,3\r\n22,24.0,2021-01-07 00:03:00,2021,1,3\r\n23,23.0,2021-01-07 00:03:00,2021,1,3\r\n24,26.0,2021-01-07 

In [93]:
csv_buffer = StringIO()
dimRegion.to_csv(csv_buffer)
s3_resource.Object(bucket, 'output/dimRegion.csv').put(Body = csv_buffer.getvalue())

{'ResponseMetadata': {'RequestId': '2M6FBZVAWE54XXZJ',
  'HostId': 'Q3dym+Ip9P77rg/Mt4pJCO76TCxHVG6ym4+ZAfCXXkncDFr8Ll/W0vSdIB1/s0yog9Hpg4gWHMw=',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amz-id-2': 'Q3dym+Ip9P77rg/Mt4pJCO76TCxHVG6ym4+ZAfCXXkncDFr8Ll/W0vSdIB1/s0yog9Hpg4gWHMw=',
   'x-amz-request-id': '2M6FBZVAWE54XXZJ',
   'date': 'Thu, 27 Oct 2022 10:31:12 GMT',
   'etag': '"31e9deac71e4d9ee244e709a45b5edf5"',
   'server': 'AmazonS3',
   'content-length': '0'},
  'RetryAttempts': 0},
 'ETag': '"31e9deac71e4d9ee244e709a45b5edf5"'}

In [96]:
dimRegion

Unnamed: 0,fips,province_state,country_region,latitude,longitude,county,state
0,,Anhui,China,31.826,117.226,New York City,New York
1,,Anhui,China,31.826,117.226,Unknown,Rhode Island
2,,Anhui,China,31.826,117.226,New York City,New York
3,,Anhui,China,31.826,117.226,Unknown,Rhode Island
4,,Anhui,China,31.826,117.226,New York City,New York
...,...,...,...,...,...,...,...
3714008,56043.0,Wyoming,US,43.905,-107.680,Washakie,Wyoming
3714009,56043.0,Wyoming,US,43.905,-107.680,Washakie,Wyoming
3714010,56043.0,Wyoming,US,43.905,-107.680,Washakie,Wyoming
3714011,56043.0,Wyoming,US,43.905,-107.680,Washakie,Wyoming


Getting schema to create tables in redshift

In [100]:
factCovidsql = pd.io.sql.get_schema(factCovid.reset_index(), 'factCovid')
print(''.join(factCovidsql))

CREATE TABLE "factCovid" (
"index" INTEGER,
  "fips" REAL,
  "province_state" TEXT,
  "country_region" TEXT,
  "confirmed" REAL,
  "deaths" REAL,
  "recovered" REAL,
  "active" REAL,
  "date" INTEGER,
  "positive" INTEGER,
  "negative" REAL,
  "hospitalizedcurrently" REAL,
  "hospitalized" REAL,
  "hospitalizeddischarged" REAL
)


In [106]:
dimRegionsql = pd.io.sql.get_schema(dimRegion.reset_index(), 'dimRegion')
print(''.join(dimRegionsql))

CREATE TABLE "dimRegion" (
"index" INTEGER,
  "fips" REAL,
  "province_state" TEXT,
  "country_region" TEXT,
  "latitude" REAL,
  "longitude" REAL,
  "county" TEXT,
  "state" TEXT
)


In [107]:
dimDatesql = pd.io.sql.get_schema(dimDate.reset_index(), 'dimDate')
print(''.join(dimDatesql))

CREATE TABLE "dimDate" (
"index" INTEGER,
  "fips" REAL,
  "date" TIMESTAMP,
  "year" INTEGER,
  "month" INTEGER,
  "day_of_week" INTEGER
)


In [109]:
!pip install redshift_connector

Collecting redshift_connector
  Downloading redshift_connector-2.0.909-py3-none-any.whl (112 kB)
     ------------------------------------ 112.1/112.1 kB 934.8 kB/s eta 0:00:00
Collecting requests<3.0.0,>=2.23.0
  Downloading requests-2.28.1-py3-none-any.whl (62 kB)
     ---------------------------------------- 62.8/62.8 kB 3.5 MB/s eta 0:00:00
Collecting scramp<1.5.0,>=1.2.0
  Downloading scramp-1.4.3-py3-none-any.whl (12 kB)
Collecting lxml>=4.6.5
  Downloading lxml-4.9.1-cp310-cp310-win_amd64.whl (3.6 MB)
     ---------------------------------------- 3.6/3.6 MB 3.6 MB/s eta 0:00:00
Collecting idna<4,>=2.5
  Downloading idna-3.4-py3-none-any.whl (61 kB)
     ---------------------------------------- 61.5/61.5 kB 1.1 MB/s eta 0:00:00
Collecting certifi>=2017.4.17
  Downloading certifi-2022.9.24-py3-none-any.whl (161 kB)
     -------------------------------------- 161.1/161.1 kB 3.2 MB/s eta 0:00:00
Collecting charset-normalizer<3,>=2
  Downloading charset_normalizer-2.1.1-py3-none-any.


[notice] A new release of pip available: 22.2.2 -> 22.3
[notice] To update, run: python.exe -m pip install --upgrade pip


Connecting to redshift using redshift_connector and then building tables on redshift
followed by Copying data to redshift from s3.

Uploading this entire script to glue.

In [125]:
import redshift_connector
import psycopg2
conn = redshift_connector.connect(
    host='my-first-redshift-cluster.cpsgscpuauoo.ap-south-1.redshift.amazonaws.com',
    database = 'dev',
    user = 'awsuser',
    password = 'Geforce1050',
    port = 5439
)
conn.autocommit = True
cursor = conn.cursor()


cursor.execute("""
copy dimDate from 's3://shyan-covid19-de-project/output/dimDate.csv'
credentials 'aws_iam_role=arn:aws:iam::470048303105:role/redshift-s3-access'
delimiter ','
region 'ap-south-1'
IGNOREHEADER 1
""")

cursor.execute("""
CREATE TABLE "dimRegion" (
"index" INTEGER,
  "fips" REAL,
  "province_state" TEXT,
  "country_region" TEXT,
  "latitude" REAL,
  "longitude" REAL,
  "county" TEXT,
  "state" TEXT
)
""")

cursor.execute("""
CREATE TABLE "factCovid" (
"index" INTEGER,
  "fips" REAL,
  "province_state" TEXT,
  "country_region" TEXT,
  "confirmed" REAL,
  "deaths" REAL,
  "recovered" REAL,
  "active" REAL,
  "date" INTEGER,
  "positive" INTEGER,
  "negative" REAL,
  "hospitalizedcurrently" REAL,
  "hospitalized" REAL,
  "hospitalizeddischarged" REAL
)
""")

cursor.execute("""
CREATE TABLE "dimDate" (
"index" INTEGER,
  "fips" REAL,
  "date" TIMESTAMP,
  "year" INTEGER,
  "month" INTEGER,
  "day_of_week" INTEGER
)
""")


cursor.execute("""
copy dimRegion from 's3://shyan-covid19-de-project/output/dimRegion.csv'
credentials 'aws_iam_role=arn:aws:iam::470048303105:role/redshift-s3-access'
delimiter ','
region 'ap-south-1'
IGNOREHEADER 1
""")

cursor.execute("""
copy factCovid from 's3://shyan-covid19-de-project/output/factCovid.csv'
credentials 'aws_iam_role=arn:aws:iam::470048303105:role/redshift-s3-access'
delimiter ','
region 'ap-south-1'
IGNOREHEADER 1
""")