# Load data tables to the database

This stage should be run only after the metadata have been updated in the database in stage 03.



In [78]:
with open('config/pg_conn.txt') as conn_details:
    conn_str_psyco = conn_details.readline()
    conn_str_sqlalchemy = conn_details.readline()

import pandas as pd
import psycopg2 as pg
from sqlalchemy import create_engine
import os
import glob

In [79]:
engine = create_engine(conn_str_sqlalchemy)

In [3]:
SPEC_SCHEMA = 'dhs_survey_specs'
DATA_SCHEMA = 'dhs_data_tables'

TABLESPEC_TABLENAME = 'dhs_table_specs_flat'
VALUESPEC_TABLENAME = 'dhs_value_descs'
SURVEYLIST_TABLENAME = 'dhs_survey_listing'

TABLE_SPEC_TABLE = ".".join((SPEC_SCHEMA, TABLESPEC_TABLENAME))
VALUE_SPEC_TABLE = ".".join((SPEC_SCHEMA, VALUESPEC_TABLENAME))
SURVEYLIST_TABLE = ".".join((SPEC_SCHEMA, SURVEYLIST_TABLENAME))

STAGING_FOLDER = "/mnt/c/Users/harry/OneDrive - Nexus365/Informal_Cities/DHS_Data_And_Prep/Staging"


In [4]:
_data_folder = os.path.join(STAGING_FOLDER, 'tables')
data_files = glob.glob(os.path.join(_data_folder, "*.csv"))

In [83]:
data_files[:5]

['/mnt/c/Users/harry/OneDrive - Nexus365/Informal_Cities/DHS_Data_And_Prep/Staging/tables/1.ESIR01.REC01.csv',
 '/mnt/c/Users/harry/OneDrive - Nexus365/Informal_Cities/DHS_Data_And_Prep/Staging/tables/1.ESIR01.REC11.csv',
 '/mnt/c/Users/harry/OneDrive - Nexus365/Informal_Cities/DHS_Data_And_Prep/Staging/tables/1.ESIR01.REC21.csv',
 '/mnt/c/Users/harry/OneDrive - Nexus365/Informal_Cities/DHS_Data_And_Prep/Staging/tables/1.ESIR01.REC22.csv',
 '/mnt/c/Users/harry/OneDrive - Nexus365/Informal_Cities/DHS_Data_And_Prep/Staging/tables/1.ESIR01.REC31.csv']

In [84]:
len(data_files)

10760

Initialise a TableDataHelper which will handle all the schema checks and data loading

In [51]:
db_helper = TableDataHelper(conn_str=conn_str_sqlalchemy, table_spec_table=TABLESPEC_TABLENAME,
                             value_spec_table=VALUESPEC_TABLENAME, spec_schema=SPEC_SCHEMA,
                           data_schema=DATA_SCHEMA, dry_run=True)

In [52]:
db_helper._is_dry_run

True

Uncomment and execute this when you're ready to ~~break things~~ run the updates

In [53]:
#db_helper._is_dry_run = False

## Prepare the database 

Check that all necessary tables exist and have the required columns. 

This is only done once for each distinct destination table (table_name), prepare_db_for_file is a no-op if it's already been done. 

In [None]:
for table_file in data_files:
    surveyid, _, _, _, table_name = TableDataHelper.parse_table_name(table_file)
    # creates the table if it doesn't exist; otherwise 
    # checks that all required columns exist and are wide enough
    # (compared to the metadata)   
    db_helper.prepare_db_for_file(table_name)

In [74]:
db_helper._verified_tables

{'MREC01',
 'MREC11',
 'MREC22',
 'MREC31',
 'MREC32',
 'MREC41',
 'MREC51',
 'MREC61',
 'MREC71',
 'MREC75',
 'MREC80',
 'MREC83',
 'MREC84',
 'MREC85',
 'MREC91',
 'MREC92',
 'MREC93',
 'MREC94',
 'MREC97',
 'MREC98',
 'MREC99',
 'MRECDV',
 'MRECFG',
 'MRECGC',
 'REC01',
 'REC11',
 'REC21',
 'REC22',
 'REC31',
 'REC32',
 'REC33',
 'REC41',
 'REC42',
 'REC43',
 'REC44',
 'REC4A',
 'REC51',
 'REC61',
 'REC71',
 'REC75',
 'REC80',
 'REC81',
 'REC82',
 'REC83',
 'REC84',
 'REC85',
 'REC91',
 'REC92',
 'REC93',
 'REC94',
 'REC95',
 'REC96',
 'REC97',
 'REC98',
 'REC99',
 'REC9A',
 'REC9B',
 'REC9C',
 'RECDV',
 'RECECD',
 'RECFG',
 'RECG1',
 'RECG2',
 'RECGC',
 'RECH0',
 'RECH1',
 'RECH10',
 'RECH11',
 'RECH2',
 'RECH3',
 'RECH4',
 'RECH4A',
 'RECH5',
 'RECH5A',
 'RECH5B',
 'RECH5CS',
 'RECH5S',
 'RECH6',
 'RECH6A',
 'RECH6B',
 'RECH6CS',
 'RECH6S',
 'RECH7',
 'RECH7A',
 'RECH7B',
 'RECH7C',
 'RECH7D',
 'RECH8',
 'RECH9',
 'RECH9A',
 'RECHA',
 'RECHAA',
 'RECHAC',
 'RECHAN1',
 'RECHAN2',
 

## See what was modified

For any tables that had columns added / widened, you **may** want to reload all data into that table. It depends on how the table has been updated in the past and what files you're running against: is it possible that some data files already in the DB were loaded without all necessary columns being present? If the DB has been kept up to date using this code, then it shouldn't be an issue. 

Otherwise you might want to set RELOAD_ALL_MODIFIED to True. 

In [75]:
tables_changed = db_helper.list_modified_tables()
tables_changed

set()

In [None]:
RELOAD_ALL_MODIFIED = False

## Load the data!

In [None]:
for table_file in data_files[]:    
    # check how many (if any) rows for this survey exist in this table
    surveyid, _, _, _, table_name = TableDataHelper.parse_table_name(table_file)
    n_in_db = db_helper.get_db_survey_table_rowcount(surveyid, table_name)
    print(os.path.basename(table_file) + "... ", end="")
    if n_in_db == 0:
        print("\n    ....File needs loading completely")
        db_helper.load_table(table_file)
        continue
    try:
        data = pd.read_csv(table_file)
    except UnicodeDecodeError:
        # You might need to keep an eye on this. So far, this is the only other 
        # encoding I've seen.
        data = pd.read_csv(table_file, encoding='cp1252')
    n_in_file = len(data)
    if n_in_file > n_in_db:
        print("\n    ....File has more rows than db; drop and reload")
        db_helper.drop_and_reload(table_file)
    elif RELOAD_ALL_MODIFIED and table_name in db_helper.list_modified_tables():
        print("\n    ....DB table had schema modified; drop and reload")
        db_helper.drop_and_reload(table_file)
    else:
        print("... ok!")
    