## Covid19 - Google Cloud Platform Data Pipeline
The helper code, functions etc. that were used to create the ETL Data Pipeline for the project. Well commented tutorial. Can be used to follow along and the functions can be used for making modifications to the pipeline.
<div>
    <img src="Covid_19_Data_Pipeline.png" width="600"/>
</div>

In [6]:
# Essential Imports
from sqlalchemy import create_engine
import pandas as pd
from psycopg2 import ProgrammingError, errors, IntegrityError
import subprocess

# Covid19.org API Data ingestion helpers :
from Covid19_india_org_api import make_dataframe, get_test_dataframe, make_state_dataframe

# Cloud SQL Server Credentials
db_user = 'postgres'
db_password = '***REMOVED***'
cloud_sql_ip = 'localhost:5432'# connecting through CloudSQL proxy
db_name = 'covid19-india'

# Sections :
[1.](#CN) Connecting to CloudSQL PostgreSQL Instance.

[2.](#MOD) Creating and Modifying Tables 

[3.](#ADD) Adding Records from cleaned CSVs/Pandas DataFrames to Tables using Pandas

[4.](#BKP) Database Backup

[5.](#DVV) Data Validation Before Ingestion

[6.](#CFN) Cloud Functions need to be modified if the Data Pipeline is modified. <br> Notebook versions of :
1. Google cloud function to fetch raw data - GCP_Cloud_function.py <br> 
2. Google cloud function to load Raw data, transform it and move to CloudSQL Server - Cloud_function_Ingestion_SQL.py

### <a name="CN"></a>Connecting to Google Cloud SQL Server
1. Connect to CloudSQL instance using CloudSQL proxy and SQL Alchemy engine:
   * For GCP accounts with permission - Cloud SQL -> Client, Admin etc. 
   * cloud_sql_proxy needs to be running and set-up on your local machine and your credentials need to accessible.
   * Lets the User update, modify the tables acc. to the permission given.
   * Receive CloudSQL Instance Public IP and your username, password.
   * Navigate to CloudSQL 
   * Use SQL Alchemy to connect.
   
Refer : https://cloud.google.com/sql/docs/postgres/connect-external-app

In [8]:
# Create SQLAlchemy Engine and connect to CloudSQl instance through CloudSQL proxy

# Credentials
db_user = 'postgres'
db_password = '***REMOVED***'
cloud_sql_ip = 'localhost:5432'# connecting through proxy
db_name = 'covid19-india'

engine = create_engine(f'postgresql://{db_user}:{db_password}@{cloud_sql_ip}/{db_name}')

In [9]:
# List available tables 
engine.table_names()

['overall_stats', 'states_info', 'testing_stats']

### <a name="MOD"></a>Creating and Modifying SQL Database using SQLAlchemy
SQLAlchemy provides an easy to use engine to interface with your RDBMS. Pass queries to a connected Database Using it.

Refer : https://docs.sqlalchemy.org/en/13/ <br>

#### Helper Functions - Creating Tables
After Connecting SQLAlchemy to a Database, you can directly create tables using the engine.<br> The tables here are created using the current data dictionary and schema :
<div>
<img src="DB_ERD.png" width="300"/>
</div>

In [12]:
def create_table_overall_stats(engine):
    """ Initial setup of overall_stats table according to Schema
    (rigid, hard-coded, can cause problems) - consult others. 
    """
    # Creating Overall_stats table
    engine.execute(""" CREATE TABLE overall_stats(
                "Date" DATE PRIMARY KEY,
                "DailyConfirmed" INT NOT NULL,
                "DailyDeceased" INT NOT NULL,
                "DailyRecovered" INT NOT NULL,
                "TotalConfirmed" INT NOT NULL,
                "TotalDeceased" INT NOT NULL,
                "TotalRecovered" INT NOT NULL
                )""")

In [13]:
def create_table_testing_stats(engine):
    """ Initial setup of testing_stats table.
    """
# Creating testing stats table
    engine.execute(""" CREATE TABLE testing_stats(
                "Date" DATE PRIMARY KEY,
                "TestingSamples" INT NOT NULL,
                FOREIGN KEY("Date")
                    REFERENCES overall_stats("Date")
                )""")

In [14]:
def create_table_state_info(engine):
    """ Initial setup of state_info table, used pandas.io.sql.get_schema to create schema and added
    keys later due to the number of columns. 
    """
    # Creating state_info table
    engine.execute("""CREATE TABLE "states_info" (
    "Date" DATE ,
    "State" TEXT,
    "Confirmed" INTEGER,
    "Deceased" INTEGER,
    "Recovered" INTEGER,
    PRIMARY KEY("Date", "State"),
    FOREIGN KEY("Date")
    REFERENCES overall_stats("Date")
    )
    """)

In [2]:
# Make sure engine is connected to SQL Server.

engine = create_engine(f'postgresql://{db_user}:{db_password}@{cloud_sql_ip}/{db_name}')

In [20]:
# Creating Tables 

create_table_overall_stats(engine)
create_table_testing_stats(engine)
create_table_state_info(engine)

# List available tables 
engine.table_names()

### <a name="ADD"></a>Adding Data to SQL Server : Data Ingestion Functions
Uses the Covid19.org API Data ingsestion functions that provide the cleaned Dataframes acc. to schema.<br>
The Ingestion function requires just a Clean Pandas DF that matches the schema of the table in which data is to sent.<br>
Uses DF.to_sql()

In [5]:
# append problem : duplicate key values raises error even for append. Shouldn't happen with append but here we are.
# workaround. fetch length of existing records in table and then only store records after that. Can be problematic.
# Cannot replace due to the presence of foreign key.

def add_data_table(engine, tablename, df):
    """ Appends New Data to table if it exists
    Takes in engine connected to DB, tablename and dataframe to store.
    Throws error if 1. Table Doesn't Exist, 2. incorrect table and dataframe ?(abstract this coice away from user)
    Problematic for testing_table as it has duplicates. - Possible solution, find last index and not length.
    """
    
    try:
        results = engine.execute(f"""SELECT * FROM {tablename}""")
        num_records = len(results.fetchall())
        print(f'{num_records} Records in {tablename}')

        df[num_records:].to_sql(tablename, engine, if_exists='append')
        print(f'Added {len(df[num_records:])} Records to table')
    
    # Catches all error, currently no logging/fault tolerance.
    except IntegrityError as e:
        print(e)
        if err == IntegrityError :
            print('Update Master Table first')

In [None]:
# Make sure engine is connected to SQL Server.

engine = create_engine(f'postgresql://{db_user}:{db_password}@{cloud_sql_ip}/{db_name}')

# List available tables 
engine.table_names()

# Check current Data in Tables 

#pd.read_sql('SELECT * FROM states_info', engine , parse_dates = True, index_col ='Date')

In [19]:
# Add Data to the tables

add_data_table(engine, 'overall_stats', make_dataframe())

195 Records in overall_stats
Added 4 Records to table


In [22]:
# Test data has duplicate entries, fix that.
test = get_test_dataframe()
test = test.loc[~test.index.duplicated(keep='last')]
add_data_table(engine, 'testing_stats', test)

138 Records in testing_stats
Added 6 Records to table


In [23]:
add_data_table(engine, 'states_info', make_state_dataframe())

5889 Records in states_info
Added 156 Records to table


### <a name="BKP"></a> Dumping PostgreSQL DB
Backed up using GUI for now. <br>
https://www.postgresqltutorial.com/postgresql-backup-database/

In [25]:
def backup_dB(path):
    subprocess.run(['pg_dump', '--host=localhost', '--dbname=Covid19-India',
                    '--username=postgres', '--no-password','--format=p',
                    f'--file={path}'])

In [26]:
backup_dB('Covid19-India_backup.sql')

#### Main Function - Data Ingestion

In [3]:
# Ingesting overall stats data 
data = make_dataframe()

In [6]:
add_data_table(engine, 'overall_stats', data)

205 Records in overall_stats
Added 2 Records to table


In [10]:
# Ingesting Testing Data 

# test has duplicates for a single date, will fail the unique constraint for key, remove first.
test = get_test_dataframe()
test = test.loc[~test.index.duplicated(keep='last')]
add_data_table(engine, 'testing_stats', test[:-1])

151 Records in testing_stats
Added 1 Records to table


In [9]:
# Ingesting State column

states = make_state_dataframe()

add_data_table(engine, 'states_info', states)

6279 Records in states_info
Added 78 Records to table


### <a name="DVV"></a> Data Validation Before Ingestion to SQL Server
Using CSVValidator(outdated project, not that useful), why use this in place of assertion tests(that would only check data type for a python object and not the data in the csv, like range etc.) and unit tests(because we're not testing code, outputs etc., but data) <br> <br>
PandasSchema - Another Open Source project that allows schema validation, Unique constraint, Date format , List validation<br> Idea is to have a schema dataframe stored as backup that can be used to check the incoming dataframes. Not Deployed.

In [89]:
from pandas_schema import Column, Schema
import numpy as np
from pandas_schema.validation import InListValidation, IsDtypeValidation, IsDistinctValidation, DateFormatValidation

In [228]:
main_data = make_dataframe().reset_index()
state_data = make_state_dataframe().reset_index()
testing_data = get_test_dataframe().reset_index()

In [201]:
# also validate no null data and that final date is today. 
def test_missing(df):
    missing_columns = []
    for col in df.columns:
        if df[col].isnull().sum() != 0 :
                missing_columns.append(col)  
    if len(missing_columns) > 0:
        print(f'Missing values in DF ')

In [206]:
# Validate schema 

schema_main = Schema([
    Column('Date', [DateFormatValidation('%Y-%m-%d'), IsDistinctValidation(), IsDtypeValidation(np.dtype('<M8[ns]'))]),
    Column('DailyConfirmed',[IsDtypeValidation(np.dtype('int64'))]),
    Column('DailyDeceased',[IsDtypeValidation(np.dtype('int64'))]),
    Column('DailyRecovered',[IsDtypeValidation(np.dtype('int64'))]),
    Column('TotalConfirmed',[IsDtypeValidation(np.dtype('int64'))]),
    Column('TotalDeceased',[IsDtypeValidation(np.dtype('int64'))]),
    Column('TotalRecovered',[IsDtypeValidation(np.dtype('int64'))])
])


In [207]:
errors_main = schema_main.validate(main_data)

In [226]:
for error in errors_main:
    print(error)

In [223]:
# Validate schema 

schema_state = Schema([
    Column('Date', [DateFormatValidation('%Y-%m-%d'), IsDtypeValidation(np.dtype('<M8[ns]'))]),
    Column('State',[IsDtypeValidation(np.dtype(np.dtype('O'))), InListValidation(['Andaman and Nicobar Islands', 'Andhra Pradesh',
       'Arunachal Pradesh', 'Assam', 'Bihar', 'Chandigarh',
       'Chhattisgarh', 'DD', 'Dadra and Nagar Haveli and Daman and Diu',
       'Delhi', 'Goa', 'Gujarat', 'Haryana', 'Himachal Pradesh',
       'Jammu and Kashmir', 'Jharkhand', 'Karnataka', 'Kerala', 'Ladakh',
       'Lakshadweep', 'Madhya Pradesh', 'Maharashtra', 'Manipur',
       'Meghalaya', 'Mizoram', 'Nagaland', 'Odisha', 'Puducherry',
       'Punjab', 'Rajasthan', 'Sikkim', 'State Unassigned', 'Tamil Nadu',
       'Telangana', 'Total', 'Tripura', 'Uttar Pradesh', 'Uttarakhand',
       'West Bengal'])]),
    Column('Confirmed',[IsDtypeValidation(np.dtype('int64'))]),
    Column('Deceased',[IsDtypeValidation(np.dtype('int64'))]),
    Column('Recovered',[IsDtypeValidation(np.dtype('int64'))])
])

In [224]:
errors_state = schema_state.validate(state_data)

In [225]:
for error in errors_state:
    print(error)

In [227]:
test_missing(state_data)

In [234]:
testing_data.head(2)

Unnamed: 0,Date,TestingSamples
0,2020-03-13,6500
1,2020-03-18,13125


In [236]:
# Validate schema 

schema_test = Schema([
    Column('Date', [DateFormatValidation('%Y-%m-%d'), IsDtypeValidation(np.dtype('<M8[ns]'))]),
    Column('TestingSamples',[IsDtypeValidation(np.dtype('int64'))])
])

In [238]:
errors_test = schema_test.validate(testing_data)

In [239]:
for error in errors_test:
    print(error)

In [240]:
test_missing(testing_data)

In [245]:
# now to check if the final data of update of the Dfs is the same or not 

In [272]:
main_state = main_data.Date[-1:].dt.to_pydatetime() == state_data.Date[-1:].dt.to_pydatetime()
main_test = main_data.Date[-1:].dt.to_pydatetime() == testing_data.Date[-1:].dt.to_pydatetime()

main_state == main_test

array([ True])

### <a name="CFN"></a> Cloud Functions
These might not be consistent with the scripts. in the Data_Pipeline Folder

#### Data Ingestion (Raw- Bucket) to (Clean - Cloud SQL) Cloud Function
1. Create connection to bucket. 
2. Download files from Bucket. 
3. (optional) Data validation of CSVs.- CSV validator (Validate when before Ingestion to Clean- Skipped Right now)
4. Use Pandas as an intermediary to clean data. 
5. Create SQLAlchemy connection to CloudSQL Server.
6. Upload Pandas DF to Cloud SQL Using SQLAlchmey.
7. For completely empty tables and ingesting entire data to date (overall - 190, state - 5694, testing - 135 records):<br>Function execution took 22190 ms, finished with status code: 200
8. For fetching a single day's data :<br> A. Fetch Raw Data Using fetch_raw_covid_api_data : "Function execution took 6836 ms, finished with status code: 200"<br>
B. Cleaning Data and Ingesting into CloudSQl : "Function execution took 2147 ms, finished with status code: 200"   

In [81]:
from google.cloud import storage
import sqlalchemy
import pandas as pd
import pg8000 # databse driver
import os

In [117]:
def download_folder_bucket(bucket, bucket_folder, local_folder):
    """Download all files from a GCS bucket folder to a local folder.
    """
    # list of filenames in bucket_folder
    file_list = [file.name for file in bucket.list_blobs(prefix=bucket_folder)]
    
    # iterate over blobs and doenload to local folder + filename

    for file in file_list :
        blob = bucket.blob(file)
        # filename by splitting name by '/' and keeping last item
        filename = blob.name.split('/')[-1]
        # download to local folder
        blob.download_to_filename(local_folder + filename)
    return f'Downloaded {len(file_list)} Files'

In [7]:
def add_data_table(engine, tablename, df):
    """ Appends New Data to table if it exists
    Takes in engine connected to DB, tablename and dataframe to store.
    Throws error if 1. Table Doesn't Exist, 2. incorrect table and dataframe ?(abstract this coice away from user)
    Problematic for testing_table as it has duplicates. - Possible solution, find last index and not length.
    """
    
    try:
        results = engine.execute(f"""SELECT * FROM {tablename}""")
        num_records = len(results.fetchall())
        print(f'{num_records} Records in {tablename}')

        df[num_records:].to_sql(tablename, engine, if_exists='append')
        print(f'Added {len(df[num_records:])} Records to table')
    
    # Just can't seem to get errors to work 
    except :
        print('Errored. Investigate')

In [135]:
# Create SQLAlchemy connection to CloudSQL Server.

# Remember - storing secrets in plaintext is potentially unsafe.

def connect_db():
    """ Connects to Cloud SQL DB Using provided Unix Socket. Username, Password etc. Hardcoded.
    Problematic.
    """
    db_user = 'postgres'
    db_pass = ''
    db_name = 'covid19-data'
    db_socket_dir = os.environ.get("DB_SOCKET_DIR", "/cloudsql")
    cloud_sql_connection_name = 'covid19-india-analysis-284814:***REMOVED***:covid19-data-server'

    engine = sqlalchemy.create_engine(
        # Equivalent URL:
        # postgres+pg8000://<db_user>:<db_pass>@/<db_name>
        #                         ?unix_sock=<socket_path>/<cloud_sql_instance_name>/.s.PGSQL.5432
        sqlalchemy.engine.url.URL(
            drivername="postgres+pg8000",
            username=db_user,  # e.g. "my-database-user"
            password=db_pass,  # e.g. "my-database-password"
            database=db_name,  # e.g. "my-database-name"
            query={
                "unix_sock": "{}/{}/.s.PGSQL.5432".format(
                    db_socket_dir,  # e.g. "/cloudsql"
                    cloud_sql_connection_name)  # i.e "<PROJECT-NAME>:<INSTANCE-REGION>:<INSTANCE-NAME>"
            }
        ),
        # ... Specify additional properties here.
    )
    
    return engine

In [115]:
def main(request):
    """ Driver function for CLoud Function. Request doesn't do anything. 
    """
    # Create GCS client 
    storage_client = storage.Client()
    
    # connect to a bucket 
    bucket = storage_client.get_bucket('covid19-india-analysis-bucket')
    
    # Download RAW CSVs from GCS Bucket to Cloud Function temp. storage.
    download_folder_bucket(bucket, 'Data/Raw/', '/tmp/')
    
    # Loading and Transforming data
    data = pd.read_csv('/tmp/COVID_India_National.csv', parse_dates=True, index_col=0)
    state = pd.read_csv('/tmp/COVID_India_State.csv', parse_dates=True, index_col=0)
    # Load and clean test data 
    test = pd.read_csv('/tmp/COVID_India_Test_data.csv', parse_dates=True, index_col=0)
    test = test.loc[~test.index.duplicated(keep='last')]
    
    # Connect to CloudSQL DB
    engine = connect_db()
    
    # Uploading Data to DB 
    add_data_table(engine, 'overall_stats', data)
    add_data_table(engine, 'states_info', state)
    add_data_table(engine, 'testing_stats', test)

### Misc.
Code used to modify the PD stateDF structure for Wide DB structure. Important PD multindex, Pivot table, Melt Examples.

In [19]:
import os 
os.listdir()

['.DS_Store',
 'Data_Ingestion.py',
 'Data_Ingestion.ipynb',
 '__pycache__',
 'Covid19_india_org_api.py',
 '.ipynb_checkpoints',
 'Cloud_function_Ingestion_SQL.py',
 'tmp',
 'GCP_Cloud_function.py']

In [30]:
states = make_state_dataframe()

In [27]:
# Melt stacked Column index Dataframe to long format
states.reset_index(inplace=True)
new_data = states.melt(id_vars=['Date'])
# renaming orphan column with state data
new_data.rename(axis = 1, mapper={None: 'State'}, inplace=True)

In [314]:
# Pivoting to reshape Status column values(Recovered, Confirmed, Deceased Cases) to columns 
pivot_data = new_data.pivot_table(index = ['Date','State'],columns = 'Status', values = 'value')

In [286]:
# Reset index to transfer stacked index Date for to date column
final_data = pivot_data.reset_index().set_index('Date')

In [296]:
# Just a series form of the above DF 
#new_data.groupby(['Date', 'State', 'Status'])['value'].sum()

In [322]:
# Creates the Stacked dataframe Again 
#new_data.pivot_table(values='value', index = 'Date', columns=['State', 'Status'])