In [None]:
import boto3
import pandas as pd
import psycopg2 
import json
import configparser
import os
from io import StringIO
import time
import redshift_connector
from pandas.api.types import is_numeric_dtype

In [None]:
config = configparser.ConfigParser()
config.read_file(open('/Users/toluwalopebabington/projects/DE/covid_data/config_file.config'))

In [None]:
key = config.get('AWS','key')
secret = config.get('AWS','secret')
region = config.get('AWS','region')
DB_name = config.get('DB','DB_name')
bucket_name = config.get('s3','Bucket_name')
folder_path = config.get('s3','folder_path')
staging_area_path  = config.get('s3','staging_area_path')
staging_area_folder  = config.get('s3','staging_area_folder')
iam_role = config.get('s3','iam_role')
DB_user = config.get('DWH','DB_user')
DB_Password = config.get('DWH','Password')
Workgroup = config.get('DWH','Workgroup')
RedShift_DB_name = config.get('DWH','DB_name')
DWH_region = config.get('DWH','DWH_region')

In [None]:
s3 = boto3.client ('s3',region_name = region,
                                      aws_access_key_id = key, aws_secret_access_key = secret)

In [None]:
athena = boto3.client ('athena',region_name = region,
                                      aws_access_key_id = key, aws_secret_access_key = secret)

In [None]:
redshift_serverless = boto3.client ('redshift-serverless',region_name = DWH_region,
                                      aws_access_key_id = key, aws_secret_access_key = secret)

In [None]:
hostname = redshift_serverless.get_workgroup(workgroupName = Workgroup)['workgroup']['endpoint']['address']


In [None]:
'''
create a new bucket to house the raw files then upload the only the csv files in the specified local folder to the s3 bucket
'''

def create_bucket(bucketname):
    try:
        s3.create_bucket (Bucket=bucketname, CreateBucketConfiguration={'LocationConstraint'
        :region})
    except Exception as e:
        print(e)
    else :
        print (bucketname, " bucket successfully created")


def upload_folder ():
    try: 
        for root, dirs, files in os.walk(folder_path):
            for file in files:
                if file.endswith('.csv'):
                    local_path = os.path.join(root,file)
                    relative_path = os.path.relpath(local_path, folder_path)
                    key = os.path.join(bucket_name, relative_path)
                    s3.upload_file(local_path, bucket_name, key)
    except Exception as e:
        print(e)
    else:
        print('completed upload to ',bucket_name)

create_bucket(bucket_name)
upload_folder ()


In [None]:
def Download_and_load_query_results_upd(client:boto3.client,table) -> pd.DataFrame:

    #query the data in the csv files on s3 using Athena
    query = "SELECT * FROM " + table
    query_response = client.start_query_execution(QueryString = query ,
    QueryExecutionContext={'Database':DB_name},ResultConfiguration= {
    'OutputLocation': staging_area_path ,'EncryptionConfiguration': {'EncryptionOption': 'SSE_S3'}
    })

    #check if query execution was successful
    while True:
        execution_response = client.get_query_execution(QueryExecutionId= query_response["QueryExecutionId"])

        if execution_response["QueryExecution"]["Status"]["State"] == "SUCCEEDED":
            # get query results
            while True:
                try:
                    client.get_query_results(QueryExecutionId=query_response["QueryExecutionId"])
                    break
                except Exception as err:
                    if "not yet finished" in str(err):
                        time.sleep(0.001)
                    else:
                        raise err
            break
        elif execution_response["QueryExecution"]["Status"]["State"] in ('FAILED','CANCELLED'):
            print ("query execution for "+ query_response["QueryExecutionId"] +" "+ execution_response["QueryExecution"]["Status"]["State"])
            break
        else:
            time.sleep(1)

    
    #download the query results and convert to a dataframe    
    temp_file_location = "athena_query_results.csv"
    s3.download_file(bucket_name,f"{staging_area_folder}/{query_response['QueryExecutionId']}.csv",temp_file_location)
    
    return pd.read_csv(temp_file_location)

stateabv = Download_and_load_query_results_upd(athena,'state_abv')
uscounty = Download_and_load_query_results_upd(athena,'us_county')
hospitalbed = Download_and_load_query_results_upd(athena,'hospitalbed')
statesdaily = Download_and_load_query_results_upd(athena,'states_daily')
usstates = Download_and_load_query_results_upd(athena,'us_States')

In [None]:
# format all dates the same way
statesdaily['date'] = pd.to_datetime(statesdaily['date'],format='%Y%m%d')
uscounty['date'] = pd.to_datetime(uscounty['date'])

In [None]:
DimHospital = hospitalbed[['objectid','hospital_name','hospital_type','state_name','county_name','hq_address','hq_city','hq_state','hq_zip_code','latitude','longtitude']]
DimHospital.rename(columns={'objectid':'hospital_id'},inplace = True)

In [None]:
# remove duplicates to get distinct state/county names and corresponding fips
DimRegion1 = usstates[['state','fips']].drop_duplicates()
DimRegion2 = uscounty[['state','fips','county']].drop_duplicates()
# merge the dataframes on state column and rename the fips accordingly
DimRegion = DimRegion1.merge(DimRegion2, left_on ='state',right_on ='state')
DimRegion.rename(columns={'fips_x':'statefips','fips_y':'countyfips'},inplace = True)

In [None]:
# pick all possible dates from the state and county dataframes
DimDate1 = statesdaily[['date']]
DimDate2 = uscounty[['date']]
# create a union of all the dates, remove duplicates and reset the index
DimDate = pd.concat([DimDate1,DimDate2], ignore_index = True).drop_duplicates().reset_index(drop = True)
DimDate['Year'] = DimDate['date'].dt.year
DimDate['Month'] = DimDate['date'].dt.month
DimDate['Quarter'] = DimDate['date'].dt.quarter
DimDate.rename(columns={'date':'event_date'},inplace = True)

In [None]:
# define a fact tables for the state and county levels
FactCovidState = statesdaily[['date','state','positive','negative','death','deathconfirmed','deathprobable','hospitalizedcurrently','hospitalizedcumulative','hospitalizeddischarged','recovered']]
FactCovidCounty = uscounty[['date','county','state','cases','deaths']]

# create a df with hospital_id and the necessary columns needed to merge on the fact tables
HospitalId = DimHospital[['hospital_id','state_name','county_name']].merge(stateabv, left_on = 'state_name', right_on = 'state')

# Add hospital_id to county fact table
FactCovidCounty = FactCovidCounty.merge(HospitalId, left_on ='state',right_on ='state_name').drop(columns = ['state_name','county_name','state_y','abbreviation'])
FactCovidCounty.rename(columns={'date':'event_date'},inplace = True)

# Add hospital_id to state fact table
FactCovidState = FactCovidState.merge(HospitalId, left_on ='state',right_on ='abbreviation').drop(columns = ['county_name','state_y','abbreviation','state_x'])
FactCovidState.rename(columns={'date':'event_date'},inplace = True)


In [None]:
# save the final dataframes to S3
def savefiletos3(df,key):
    try:
        csv_buffer = StringIO()
        df.to_csv(csv_buffer,index = False)
        s3.put_object(Bucket = bucket_name, Body = csv_buffer.getvalue(), Key = f"final-tables/{key}.csv" )
    except Exception as e:
        print(e)

savefiletos3(FactCovidState,'FactCovidState')
savefiletos3(FactCovidCounty,'FactCovidCounty')
savefiletos3(DimDate,'DimDate')
savefiletos3(DimRegion,'DimRegion')
savefiletos3(DimHospital,'DimHospital')

In [None]:
'''connect to redshift, create database and tables then load the data into the tables'''


def connect_db (dbname):
    try:
        conn = redshift_connector.connect(host=hostname,database=dbname,user=DB_user,password=DB_Password)
        conn.autocommit=True
    except redshift_connector.Error as e:
        print(e)
    else:
        try:
            cur = conn.cursor()
        except redshift_connector.Error as e:
            print(e)
        else:
            return conn,cur

def run_query (cursor_name,sql_statement):
    try:
        cursor_name.execute(sql_statement)
    except redshift_connector.Error as e:
            print(e)
    else:
        msg = 'success'
        return msg
        
def close_connection (cursor_name,connection):
    try:
        cursor_name.close()
        connection.close()
    except redshift_connector.Error as e:
            print(e)
    else:
        print ("database connection successfully closed")

def create_table (tables):
    DDL = []
    for table in table_list:
        query = pd.io.sql.get_schema(table[0],table[1])
        DDL.append(query)
    return DDL

def populate_table (cursor_name,table_name,delimiter):
    s3uri = f"s3://{bucket_name}/final-tables/{table_name}.csv"
    query = f'''copy {table_name} 
    from '{s3uri}' 
    iam_role '{iam_role}'
    delimiter '{delimiter}' 
    timeformat 'auto' 
    region '{region}' 
    ignoreheader 1'''
    
    try:
        cursor_name.execute(query)
    except redshift_connector.Error as e:
        print(e)
    else:
        msg = 'success'
        return msg

'''
connect to base database and create a new database for the dataset
closeout from the database connection when the action is completed
successfully
'''
conn,cur = connect_db(RedShift_DB_name)

if conn and cur is not None:
    response = run_query(cur,"CREATE DATABASE coviddata;")
    if response == 'success':
        close_connection(conn,cur) 

'''
connect to the coviddata database and create the tables
'''
conn2,cur2 = connect_db('coviddata')

if conn2 and cur2 is not None:
    table_list =[(FactCovidState,'FactCovidState'),(FactCovidCounty,'FactCovidCounty'),
    (DimDate,'DimDate'),(DimRegion,'DimRegion'),(DimHospital,'DimHospital')]

    Queries = create_table(table_list)

    for query in Queries:
        response2 = run_query(cur2,query)

#if all the tables were created successfully, populate the tables
    if  response2 =='success':
        for table in table_list:
            response3 = populate_table(cur2,table[1],",")
        if response3 == 'success':
            close_connection(conn2,cur2) 

In [None]:
#Glue script to copy the data from S3 to redshift

import configparser
import redshift_connector
import pandas as pd
import boto3


s3 = boto3.client ('s3')
file1,file2 = "configfile.config","create_statements.txt"
s3.download_file("covid-tolub","config_file.config",file1)
s3.download_file("covid-tolub","create_statements.txt",file2)
    
config = configparser.ConfigParser()
config.read(file1)

region = config.get('AWS','region')
bucket_name = config.get('s3','Bucket_name')
iam_role = config.get('s3','iam_role')
DB_user = config.get('DWH','DB_user')
DB_Password = config.get('DWH','Password')
RedShift_DB_name = config.get('DWH','DB_name')
hostname =  config.get('DWH','host')

with open('create_statements.txt', 'r') as file:
    queries = file.readlines()
    queries = [query.strip() for query in queries]
    
table_list =['FactCovidState','FactCovidCounty','DimDate','DimRegion','DimHospital']
    

def connect_db (dbname):
    try:
        conn = redshift_connector.connect(host=hostname,database=dbname,user=DB_user,password=DB_Password)
        conn.autocommit=True
    except redshift_connector.Error as e:
        print(e)
    else:
        try:
            cur = conn.cursor()
        except redshift_connector.Error as e:
            print(e)
        else:
            return conn,cur

def run_query (cursor_name,sql_statement):
    try:
        cursor_name.execute(sql_statement)
    except redshift_connector.Error as e:
            print(e)
    else:
        msg = 'success'
        return msg
        
def close_connection (cursor_name,connection):
    try:
        cursor_name.close()
        connection.close()
    except redshift_connector.Error as e:
            print(e)
    else:
        print ("database connection successfully closed")

def populate_table (cursor_name,table_name,delimiter):
    s3uri = f"s3://{bucket_name}/final-tables/{table_name}.csv"
    query = f'''copy {table_name} 
    from '{s3uri}' 
    iam_role '{iam_role}'
    delimiter '{delimiter}' 
    timeformat 'auto' 
    region '{region}' 
    ignoreheader 1'''
    
    try:
        cursor_name.execute(query)
    except redshift_connector.Error as e:
        print(e)
    else:
        msg = 'success'
        return msg
        
        
'''
connect to base database and create a new database for the dataset
closeout from the database connection when the action is completed
successfully
'''
conn,cur = connect_db(RedShift_DB_name)

if conn and cur is not None:
    response = run_query(cur,"CREATE DATABASE coviddata;")
    if response == 'success':
        close_connection(conn,cur) 

'''
connect to the coviddata database and create the tables
'''
conn2,cur2 = connect_db('coviddata')

if conn2 and cur2 is not None:
   
    for query in queries:
        response2 = run_query(cur2,query)

#if all the tables were created successfully, populate the tables
    if  response2 =='success':
        for table in table_list:
            response3 = populate_table(cur2,table[1],",")
        if response3 == 'success':
            close_connection(conn2,cur2) 
