https://www.datacamp.com/community/tutorials/beginners-introduction-postgresql#comments
<br>
#### TO DO 
1. Convert Table state info to 5 columns - date, state, Confirmed, Recovered, Deceased (Primary Key combination of Date and State) instead of the super long format you have right now. (Done ! - Document and Refactor) 
2. Connect to Google Cloud SQL etc. (Created Online Instance)
<br><b>GCP PostgreSQL CLoud Instance Details: </b><br>
covid19-data-server<br>
<Pass>
3. Convert Ingestion Function for Use with Cloud Function.

### Accessing SQL using magic Commands

In [51]:
# load the SQL extension in jupyter notebook
%load_ext sql

In [56]:
# connecting to local PostgreSql Server
%sql postgresql://postgres:<Pass>@localhost:5432/Covid19-India

In [58]:
%sql SELECT * FROM testing_stats

   postgresql://postgres:***@localhost:5432/Covid19-India
 * postgresql://postgres:***@localhost:5433/covid19-data
133 rows affected.


Date,TestingSamples
2020-03-13,6500
2020-03-18,13125
2020-03-19,14175
2020-03-20,15404
2020-03-21,15701
2020-03-22,16999
2020-03-23,20707
2020-03-24,20864
2020-03-25,25144
2020-03-27,27688


In [5]:
# Drop the table and create it again 

%sql DROP table overall_stats

 * postgresql://postgres:***@localhost:5432/Covid19-India
Done.


[]

In [6]:
%%sql
CREATE TABLE overall_stats(
date DATE PRIMARY KEY,
DailyConfirmed INT NOT NULL,
DailyDeceased INT NOT NULL,
DailyRecovered INT NOT NULL,
TotalConfirmed INT NOT NULL,
TotalDeceased INT NOT NULL,
TotalRecovered INT NOT NULL
);

 * postgresql://postgres:***@localhost:5432/Covid19-India
Done.


[]

In [13]:
def query_table(limit_rows):
    """querying table using magic function
    can be embedded with python code and can also insert python vars etc. within {}
    """
    
    %sql SELECT * FROM overall_stats LIMIT {limit_rows}

In [14]:
query_table(10)

 * postgresql://postgres:***@localhost:5432/Covid19-India
0 rows affected.


### Accessing Local database through your Python code Using SQLAlchemy
https://docs.sqlalchemy.org/en/13/ <br>
SQLAlchemy provides a more suitable engine to interface with your RDBMS.
Supported Dialects :
PostgreSQL | MySQL | SQLite | Oracle | Microsoft SQL Server

In [None]:
#### Preparing DataFrames, The Idea is to run the Ingestion Script with all fetching 
# fucntions and save function to true. 

In [1]:
from sqlalchemy import create_engine
import pandas as pd
from Covid19_india_org_api import make_dataframe, get_test_dataframe, make_state_dataframe
from psycopg2 import ProgrammingError, errors, IntegrityError
import subprocess

#### Creating Tables

In [2]:
def create_table_overall_stats(engine):
    """ Initial setup of overall_stats table according to Schema
    (rigid, hard-coded, can cause problems) - consult others. 
    """
    # Creating Overall_stats table
    engine.execute(""" CREATE TABLE overall_stats(
                "Date" DATE PRIMARY KEY,
                "DailyConfirmed" INT NOT NULL,
                "DailyDeceased" INT NOT NULL,
                "DailyRecovered" INT NOT NULL,
                "TotalConfirmed" INT NOT NULL,
                "TotalDeceased" INT NOT NULL,
                "TotalRecovered" INT NOT NULL
                )""")

In [3]:
def create_table_testing_stats(engine):
    """ Initial setup of testing_stats table.
    """
# Creating testing stats table
    engine.execute(""" CREATE TABLE testing_stats(
                "Date" DATE PRIMARY KEY,
                "TestingSamples" INT NOT NULL,
                FOREIGN KEY("Date")
                    REFERENCES overall_stats("Date")
                )""")

In [4]:
def create_table_state_info(engine):
    """ Initial setup of state_info table, used pandas.io.sql.get_schema to create schema and added
    keys later due to the number of columns. 
    """
    # Creating state_info table
    engine.execute("""CREATE TABLE "states_info" (
    "Date" DATE ,
    "State" TEXT,
    "Confirmed" INTEGER,
    "Deceased" INTEGER,
    "Recovered" INTEGER,
    PRIMARY KEY("Date", "State"),
    FOREIGN KEY("Date")
    REFERENCES overall_stats("Date")
    )
    """)

#### Data Ingestion Function

In [5]:
# append problem, duplicate key values. Shouldn't happen with append but here we are.
# workaround. fetch length of existing records in table and then only store records after that. Can be problematic.
# Cannot replace due to the presence of foreign key.

def add_data_table(engine, tablename, df):
    """ Appends New Data to table if it exists
    Takes in engine connected to DB, tablename and dataframe to store.
    Throws error if 1. Table Doesn't Exist, 2. incorrect table and dataframe ?(abstract this coice away from user)
    Problematic for testing_table as it has duplicates. - Possible solution, find last index and not length.
    """
    
    try:
        results = engine.execute(f"""SELECT * FROM {tablename}""")
        num_records = len(results.fetchall())
        print(f'{num_records} Records in {tablename}')

        df[num_records:].to_sql(tablename, engine, if_exists='append')
        print(f'Added {len(df[num_records:])} Records to table')
    
    # Just can't seem to get errors to work 
    except IntegrityError as e:
        print(e)
        if err == IntegrityError :
            print('Update Master Table first')

#### Main Function - Data Ingestion

In [84]:
# creating engine for executing sql queries
engine = create_engine('postgresql://postgres:<Pass>@localhost:5432/Covid19-India')

In [20]:
# Creating Tables 
create_table_overall_stats(engine)
create_table_testing_stats(engine)
create_table_state_info(engine)

In [104]:
# Ingesting overall stats data 
data = make_dataframe()

In [22]:
add_data_table(engine, 'overall_stats', data)

0 Records in overall_stats
Added 187 Records to table


In [23]:
# Ingesting Testing Data 

# test has duplicates for a single date, will fail the unique constraint for key, remove first.
test = get_test_dataframe()
test = test.loc[~test.index.duplicated(keep='last')]
add_data_table(engine, 'testing_stats', test[:-1])

0 Records in testing_stats
Added 131 Records to table


In [24]:
# Ingesting State column

states = make_state_dataframe()

add_data_table(engine, 'states_info', states)

0 Records in states_info
Added 5577 Records to table


#### Dumping PostgreSQL DB
Backed up using GUI for now. <br>
https://www.postgresqltutorial.com/postgresql-backup-database/

In [25]:
def backup_dB(path):
    subprocess.run(['pg_dump', '--host=localhost', '--dbname=Covid19-India',
                    '--username=postgres', '--no-password','--format=p',
                    f'--file={path}'])

In [26]:
backup_dB('/Users/apple/Desktop/DS/Covid19-Kaggle_and_End-End_project/Data/Cleaned/Covid19-India_backup.sql')

### Connecting to Google Cloud SQL Server
1. Create sql Alchemy engine and connect to CloudSQL Server. (Make sure instance in running and Cloud SQL Proxy is active)
2. cloud_sql_proxy needs to be running and set-up on client server.(Less painful method ? - Giving Access to public IP was not working.)
3. Simply Use Pandas and SqlAlchemy to fetch the data.
4. Able to update Tables this way too.

In [3]:
from sqlalchemy import create_engine

In [6]:
engine = create_engine('postgresql://postgres:<pass>@localhost:5433/covid19-data')

In [16]:
states = pd.read_sql('SELECT * FROM states_info', engine , parse_dates = True, index_col ='Date')

In [9]:
testing = pd.read_sql('SELECT * FROM testing_stats', engine , parse_dates = True, index_col ='Date')

In [19]:
overall = pd.read_sql('SELECT * FROM overall_stats', engine, parse_dates = True, index_col ='Date')

In [141]:
overall.join(testing, on = 'Date', how = 'left')

Unnamed: 0_level_0,DailyConfirmed,DailyDeceased,DailyRecovered,TotalConfirmed,TotalDeceased,TotalRecovered,TestingSamples
Date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
2020-01-30,1,0,0,1,0,0,
2020-01-31,0,0,0,1,0,0,
2020-02-01,0,0,0,1,0,0,
2020-02-02,1,0,0,2,0,0,
2020-02-03,1,0,0,3,0,0,
...,...,...,...,...,...,...,...
2020-08-02,52672,760,40355,1804857,38180,1187261,19821831.0
2020-08-03,50475,806,43070,1855332,38986,1230331,20202858.0
2020-08-04,51282,849,51220,1906627,39835,1281551,20864750.0
2020-08-05,56626,919,45583,1963253,40754,1327134,21484402.0


In [145]:
add_data_table(engine, 'overall_stats', make_dataframe())

190 Records in overall_stats
Added 0 Records to table


In [146]:
# Test data has duplicate entries, fix that.
test = get_test_dataframe()
test = test.loc[~test.index.duplicated(keep='last')]

In [149]:
add_data_table(engine, 'testing_stats', test)

135 Records in testing_stats
Added 0 Records to table


In [148]:
add_data_table(engine, 'states_info', make_state_dataframe())

5694 Records in states_info
Added 0 Records to table


### Data Ingestion (Raw- Bucket) to (Clean - Cloud SQL) Cloud Function
1. Create connection to bucket. 
2. Download files from Bucket. 
3. (optional) Data validation of CSVs.- CSV validator (Validate when before Ingestion to Clean- Skipped Right now)
4. Use Pandas as an intermediary to clean data. 
5. Create SQLAlchemy connection to CloudSQL Server.
6. Upload Pandas DF to Cloud SQL Using SQLAlchmey.
7. For completely empty tables and ingesting entire data to date (overall - 190, state - 5694, testing - 135 records):<br>Function execution took 22190 ms, finished with status code: 200
8. For fetching a single day's data :<br> A. Fetch Raw Data Using fetch_raw_covid_api_data : "Function execution took 6836 ms, finished with status code: 200"<br>
B. Cleaning Data and Ingesting into CloudSQl : "Function execution took 2147 ms, finished with status code: 200"   

In [81]:
from google.cloud import storage
import sqlalchemy
import pandas as pd
import pg8000 # databse driver
import os

In [117]:
def download_folder_bucket(bucket, bucket_folder, local_folder):
    """Download all files from a GCS bucket folder to a local folder.
    """
    # list of filenames in bucket_folder
    file_list = [file.name for file in bucket.list_blobs(prefix=bucket_folder)]
    
    # iterate over blobs and doenload to local folder + filename

    for file in file_list :
        blob = bucket.blob(file)
        # filename by splitting name by '/' and keeping last item
        filename = blob.name.split('/')[-1]
        # download to local folder
        blob.download_to_filename(local_folder + filename)
    return f'Downloaded {len(file_list)} Files'

In [7]:
def add_data_table(engine, tablename, df):
    """ Appends New Data to table if it exists
    Takes in engine connected to DB, tablename and dataframe to store.
    Throws error if 1. Table Doesn't Exist, 2. incorrect table and dataframe ?(abstract this coice away from user)
    Problematic for testing_table as it has duplicates. - Possible solution, find last index and not length.
    """
    
    try:
        results = engine.execute(f"""SELECT * FROM {tablename}""")
        num_records = len(results.fetchall())
        print(f'{num_records} Records in {tablename}')

        df[num_records:].to_sql(tablename, engine, if_exists='append')
        print(f'Added {len(df[num_records:])} Records to table')
    
    # Just can't seem to get errors to work 
    except :
        print('Errored. Investigate')

In [135]:
# Create SQLAlchemy connection to CloudSQL Server.

# Remember - storing secrets in plaintext is potentially unsafe.

def connect_db():
    """ Connects to Cloud SQL DB Using provided Unix Socket. Username, Password etc. Hardcoded.
    Problematic.
    """
    db_user = 'postgres'
    db_pass = ''
    db_name = 'covid19-data'
    db_socket_dir = os.environ.get("DB_SOCKET_DIR", "/cloudsql")
    cloud_sql_connection_name = 'covid19-india-analysis-284814:asia-south1:covid19-data-server'

    engine = sqlalchemy.create_engine(
        # Equivalent URL:
        # postgres+pg8000://<db_user>:<db_pass>@/<db_name>
        #                         ?unix_sock=<socket_path>/<cloud_sql_instance_name>/.s.PGSQL.5432
        sqlalchemy.engine.url.URL(
            drivername="postgres+pg8000",
            username=db_user,  # e.g. "my-database-user"
            password=db_pass,  # e.g. "my-database-password"
            database=db_name,  # e.g. "my-database-name"
            query={
                "unix_sock": "{}/{}/.s.PGSQL.5432".format(
                    db_socket_dir,  # e.g. "/cloudsql"
                    cloud_sql_connection_name)  # i.e "<PROJECT-NAME>:<INSTANCE-REGION>:<INSTANCE-NAME>"
            }
        ),
        # ... Specify additional properties here.
    )
    
    return engine

In [115]:
def main(request):
    """ Driver function for CLoud Function. Request doesn't do anything. 
    """
    # Create GCS client 
    storage_client = storage.Client()
    
    # connect to a bucket 
    bucket = storage_client.get_bucket('covid19-india-analysis-bucket')
    
    # Download RAW CSVs from GCS Bucket to Cloud Function temp. storage.
    download_folder_bucket(bucket, 'Data/Raw/', '/tmp/')
    
    # Loading and Transforming data
    data = pd.read_csv('/tmp/COVID_India_National.csv', parse_dates=True, index_col=0)
    state = pd.read_csv('/tmp/COVID_India_State.csv', parse_dates=True, index_col=0)
    # Load and clean test data 
    test = pd.read_csv('/tmp/COVID_India_Test_data.csv', parse_dates=True, index_col=0)
    test = test.loc[~test.index.duplicated(keep='last')]
    
    # Connect to CloudSQL DB
    engine = connect_db()
    
    # Uploading Data to DB 
    add_data_table(engine, 'overall_stats', data)
    add_data_table(engine, 'states_info', state)
    add_data_table(engine, 'testing_stats', test)

### Misc.
Code used to modify the PD stateDF structure for Wide DB structure. Important PD multindex, Pivot table, Melt Examples.

In [30]:
states = make_state_dataframe()

In [27]:
# Melt stacked Column index Dataframe to long format
states.reset_index(inplace=True)
new_data = states.melt(id_vars=['Date'])
# renaming orphan column with state data
new_data.rename(axis = 1, mapper={None: 'State'}, inplace=True)

In [314]:
# Pivoting to reshape Status column values(Recovered, Confirmed, Deceased Cases) to columns 
pivot_data = new_data.pivot_table(index = ['Date','State'],columns = 'Status', values = 'value')

In [286]:
# Reset index to transfer stacked index Date for to date column
final_data = pivot_data.reset_index().set_index('Date')

In [296]:
# Just a series form of the above DF 
#new_data.groupby(['Date', 'State', 'Status'])['value'].sum()

In [322]:
# Creates the Stacked dataframe Again 
#new_data.pivot_table(values='value', index = 'Date', columns=['State', 'Status'])