In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
import os
import sys

import pandas as pd

#sys.path.append(os.path.join('..', os.path.abspath('')))
sys.path.append(os.path.abspath(os.path.join(os.path.dirname('covid'), os.path.pardir)))

# Ingestion stage

## Upload the population table separately

In [23]:
from covid.connector import snowflake_con
from snowflake.connector.pandas_tools import write_pandas

with snowflake_con.create_connector(schema='INGESTION_SCHEMA') as con:
    df = pd.read_csv("Total_population.csv")
    success, nchunks, nrows, _ = write_pandas(
        con,
        df,
        'POPULATION_RAW',
        quote_identifiers=True
    )
success

2022-05-13 09:09:15,583 - Snowflake Connector for Python Version: 2.7.7, Python Version: 3.9.0, Platform: Linux-5.15.0-27-generic-x86_64-with-glibc2.35
2022-05-13 09:09:15,584 - This connection is in OCSP Fail Open Mode. TLS Certificates would be checked for validity and revocation status. Any other Certificate Revocation related exceptions or OCSP Responder failures would be disregarded in favor of connectivity.
2022-05-13 09:09:16,406 - query: [create temporary stage /* Python:snowflake.connector.pandas_tools.write_pandas()...]
2022-05-13 09:09:16,618 - query execution done
2022-05-13 09:09:16,630 - query: [PUT /* Python:snowflake.connector.pandas_tools.write_pandas() */ 'file:///tmp/tm...]
2022-05-13 09:09:16,915 - query execution done
2022-05-13 09:09:18,406 - query: [COPY INTO "POPULATION_RAW" /* Python:snowflake.connector.pandas_tools.write_pand...]
2022-05-13 09:09:19,736 - query execution done
2022-05-13 09:09:19,738 - query: [COMMIT]
2022-05-13 09:09:20,202 - query execution d

True

In [4]:
from covid.ingestion import IngestionStage
link_list = ["https://covid19.who.int/WHO-COVID-19-global-data.csv",
             "https://covid19.who.int/who-data/vaccination-data.csv",
             "https://covid19.who.int/who-data/vaccination-metadata.csv"
]
ingestion =  IngestionStage()

In [5]:
extract = ingestion.run(link_list)

2022-05-13 08:33:33,605 - Starting Ingestion stage
2022-05-13 08:33:33,606 - Extraction...
2022-05-13 08:33:36,903 - Load...
2022-05-13 08:33:36,904 - Snowflake Connector for Python Version: 2.7.7, Python Version: 3.9.0, Platform: Linux-5.15.0-27-generic-x86_64-with-glibc2.35
2022-05-13 08:33:36,905 - This connection is in OCSP Fail Open Mode. TLS Certificates would be checked for validity and revocation status. Any other Certificate Revocation related exceptions or OCSP Responder failures would be disregarded in favor of connectivity.
2022-05-13 08:33:36,905 - Setting use_openssl_only mode to False
2022-05-13 08:33:38,352 - query: [create temporary stage /* Python:snowflake.connector.pandas_tools.write_pandas()...]
2022-05-13 08:33:38,616 - query execution done
2022-05-13 08:33:38,804 - query: [PUT /* Python:snowflake.connector.pandas_tools.write_pandas() */ 'file:///tmp/tm...]
2022-05-13 08:33:39,133 - query execution done
2022-05-13 08:33:40,992 - query: [COPY INTO GLOBAL_RAW /* Pyt

# Preprocessing stage

In [54]:
from covid.preprocessing import PreprocessingStage
from covid.connector import snowflake_con
from snowflake.connector.pandas_tools import write_pandas
import pandas as pd
prep =  PreprocessingStage()

In [51]:
variable_prep = prep.run()

2022-05-13 13:31:16,342 - Starting Preprocessing stage
2022-05-13 13:31:16,343 - Extraction...
2022-05-13 13:31:16,344 - Snowflake Connector for Python Version: 2.7.7, Python Version: 3.9.0, Platform: Linux-5.15.0-27-generic-x86_64-with-glibc2.35
2022-05-13 13:31:16,344 - This connection is in OCSP Fail Open Mode. TLS Certificates would be checked for validity and revocation status. Any other Certificate Revocation related exceptions or OCSP Responder failures would be disregarded in favor of connectivity.
2022-05-13 13:31:17,430 - query: [ROLLBACK]
2022-05-13 13:31:17,607 - query execution done
2022-05-13 13:31:17,609 - query: [SELECT * FROM POPULATION_RAW]
2022-05-13 13:31:17,926 - query execution done
2022-05-13 13:31:17,929 - query: [ROLLBACK]
2022-05-13 13:31:18,092 - query execution done
2022-05-13 13:31:18,099 - query: [SELECT * FROM GLOBAL_RAW]
2022-05-13 13:31:18,403 - query execution done
2022-05-13 13:31:20,941 - query: [ROLLBACK]
2022-05-13 13:31:21,106 - query execution do

In [62]:
date = pd.DataFrame({'date':pd.date_range(start='1/1/2018', end='31/12/2022')})
date['year'] = pd.DatetimeIndex(date['date']).year
date['month'] = pd.DatetimeIndex(date['date']).month
date['day'] = pd.DatetimeIndex(date['date']).day
date['date_key'] = date['date'].dt.strftime("%Y%m%d")
date['date']=date['date'].astype(str)

  exec(code_obj, self.user_global_ns, self.user_ns)


In [61]:
with snowflake_con.create_connector(schema='PREPROCESSED_SCHEMA') as con:
    success, nchunks, nrows, _ = write_pandas(
        con,
        date,
        'DATE',
        quote_identifiers=False
    )
success

2022-05-13 14:08:03,982 - Snowflake Connector for Python Version: 2.7.7, Python Version: 3.9.0, Platform: Linux-5.15.0-27-generic-x86_64-with-glibc2.35
2022-05-13 14:08:03,983 - This connection is in OCSP Fail Open Mode. TLS Certificates would be checked for validity and revocation status. Any other Certificate Revocation related exceptions or OCSP Responder failures would be disregarded in favor of connectivity.
2022-05-13 14:08:04,850 - query: [create temporary stage /* Python:snowflake.connector.pandas_tools.write_pandas()...]
2022-05-13 14:08:05,063 - query execution done
2022-05-13 14:08:05,067 - query: [PUT /* Python:snowflake.connector.pandas_tools.write_pandas() */ 'file:///tmp/tm...]
2022-05-13 14:08:05,366 - query execution done
2022-05-13 14:08:06,375 - query: [COPY INTO DATE /* Python:snowflake.connector.pandas_tools.write_pandas() */ (dat...]
2022-05-13 14:08:07,228 - query execution done
2022-05-13 14:08:07,230 - query: [COMMIT]
2022-05-13 14:08:07,669 - query execution d

True