In [1]:
import psycopg2
import os
import glob
import json

In [2]:
try:
  from google.colab import drive
  drive.mount('/content/drive', force_remount=True)
  is_local = False
except ModuleNotFoundError:
  is_local = True

In [3]:
folder_landing = "./landing" if (is_local) else "/content/drive/MyDrive/ADSDB/landing"

folder_temporal = os.path.join(folder_landing, "temporal")
folder_persistent = os.path.join(folder_landing, "persistent")

extract_dir = os.path.join(folder_persistent, "extracted")

In [4]:
table_spec = dict()

for spec_file in glob.glob(os.path.join(folder_landing, 'sql_spec', '*')):
    table_name = os.path.basename(spec_file)
    
    with open(spec_file, 'r') as f:
        table_spec[table_name] = f.read()

In [5]:
with open(os.path.join(folder_landing, 'spec_tables.json'), 'r') as f:
    table_equi = json.load(f)

In [6]:
table_equi

{'Documentation_21June2021.doc': None,
 'list_ctry_yrs_21June2021.xlsx': None,
 'country_codes': {'spec': 'CountryCodes'},
 'notes': {'spec': 'Notes'},
 'pop': {'spec': 'Population'},
 'MortIcd7': {'spec': 'MortICD', 'rev': '7'},
 'Morticd8': {'spec': 'MortICD', 'rev': '8'},
 'Morticd9': {'spec': 'MortICD', 'rev': '9'},
 'Morticd10_part1': {'spec': 'MortICD', 'rev': '10_1'},
 'Morticd10_part2': {'spec': 'MortICD', 'rev': '10_2'},
 'Morticd10_part3': {'spec': 'MortICD', 'rev': '10_3'},
 'Morticd10_part4': {'spec': 'MortICD', 'rev': '10_4'},
 'Morticd10_part5': {'spec': 'MortICD', 'rev': '10_5'},
 'DEM_COUNTRY.csv': {'spec': 'DemographicCountry'},
 'DEM_LABEL.csv': {'spec': 'DemographicLabels'},
 'DEM_DATA_NATIONAL.csv': {'spec': 'Demographic'}}

In [10]:
def table_exists(cur, table_name):
    cur.execute('''SELECT EXISTS(SELECT 1 FROM information_schema.tables 
              WHERE table_catalog='adsdb' AND 
                    table_schema='formatted' AND 
                    table_name=%s);''', (table_name.lower(),))
    return cur.fetchone()[0]

def create_table(cursor, table_type, icd_rev, timestamp, overwritte = False):
    if icd_rev is None:
        icd_rev = 0

    table_name = f"{table_type}_{icd_rev}_{timestamp}"
    table_name_full = f"formatted.{table_name}"
    
    if table_exists(cursor, table_name):
        if not overwritte:
            return None
        print(f"Overwritting table {table_name_full}")
        cursor.execute(f"DROP TABLE {table_name_full} CASCADE;");

    cursor.execute(f'''CREATE TABLE {table_name_full} (
        {table_spec[table_type]}
    );
    ''')
    
    return table_name_full

def load_csv(cursor, table_name, filename):
    with open(filename, 'r') as csvfile:
        cursor.copy_expert(f'''
            COPY {table_name}
            FROM STDIN
            DELIMITER ','
            CSV HEADER;
        ''', csvfile)
        
def ingest_folder(folder_path):
    with open(os.path.join(folder_path, "metadata.json"), 'r') as f:
        metadata = json.load(f)

    folder_base = os.path.basename(folder_path)
    
    contents = glob.glob(f"{folder_path}/*")
    
    name_sha, _, timestamp = folder_base.rpartition("-")
    name, _, sha = name_sha.rpartition("-")
    version = sha[:4] + "_" + timestamp.partition(".")[0]

    for i in contents:
        filename = os.path.basename(i)
        if filename == "metadata.json":
            continue

        table = table_equi.get(filename)
    
        if table is not None:
            target_table = create_table(cur, table['spec'], table.get('rev'), version)
            
            if target_table is None:
                print("SKIP ALREADY_IN_DB", filename)
                continue

            load_csv(cur, target_table, i)

            print("LOAD", filename, "==>", target_table)
        else:
            print("SKIP IGNORED", filename)

In [11]:
conn = psycopg2.connect(dbname="adsdb", user="adsdb", password="adsdb")

In [12]:
cur = conn.cursor()
cur.execute('''CREATE SCHEMA IF NOT EXISTS formatted''')

for meta in glob.glob(f"{folder_persistent}/extracted/*/metadata.json"):
    ingest_folder(os.path.dirname(meta))

conn.commit()

SKIP IGNORED list_ctry_yrs_21June2021.xlsx
LOAD country_codes ==> formatted.CountryCodes_0_8c41_1642852775
SKIP IGNORED Documentation_21June2021.doc
LOAD notes ==> formatted.Notes_0_a67b_1642852775
LOAD pop ==> formatted.Population_0_3b8f_1642852775
LOAD MortIcd7 ==> formatted.MortICD_7_22b4_1642852775
LOAD Morticd8 ==> formatted.MortICD_8_9366_1642852775
LOAD Morticd9 ==> formatted.MortICD_9_71c9_1642852775
LOAD Morticd10_part1 ==> formatted.MortICD_10_1_f695_1642852776
LOAD Morticd10_part2 ==> formatted.MortICD_10_2_ffb1_1642852776
LOAD Morticd10_part3 ==> formatted.MortICD_10_3_1c6a_1642852776
LOAD Morticd10_part4 ==> formatted.MortICD_10_4_453e_1642852776
LOAD Morticd10_part5 ==> formatted.MortICD_10_5_8f17_1642852776
SKIP IGNORED list_ctry_yrs_21June2021.xlsx
LOAD country_codes ==> formatted.CountryCodes_0_8c41_1642868814
SKIP IGNORED Documentation_21June2021.doc
LOAD notes ==> formatted.Notes_0_a67b_1642868814
LOAD pop ==> formatted.Population_0_3b8f_1642868814
LOAD MortIcd7 ==> 