# DHS recode generator / table joiner

This file contains code for producing custom output "recode" tables from DHS tables, which have first been processed out into individual files for each survey and table (record type). 

Essentially it provides a means of "joining" tables that are stored in flat CSV files, just like executing a join query on data held in a DB. In fact the joining is done by loading each of the input tables to an in-memory SQLite database, building the output table there, and dumping the output table back to a CSV file.

The code for actually constructing the SQL that builds the in-memory database, and performs the joins, is implemented in an external module that should be in the same directory as this notebook. This notebook just contains the necessary code to read the input requirements and data, and write the output files.

This was developed by Harry Gibson for extracting information (potentially) related to Under 5 Mortality for Donal Bisanzio. However it should be applicable for creating any "flat" joined output tables from DHS data that has been parsed into separate tables from the CSPro format.

## Usage

The main input is a list of tables / variables that should go into the output, specified in a CSV file. This has one row for each output column, and it should have the columns "Name" and "RecordName" which specify the variable name and table name respectively.

A second input CSV file provides a list of survey IDs (in a "DHS_id" column) that data should be extracted from, or for just a few surveys you could specify these manually in the notebook.

Finally a directory path to the parsed DHS survey tables (produced using the DHS survey parsing code) must be provided. The survey id and record name from the above inputs will be used to select CSV files from this directory.

In [None]:
import csv
import glob
from collections import defaultdict
import sqlite3
import os

In [None]:
from DHSTableManagement import *
from UnicodeWriter import UnicodeWriter

# Define inputs

### 1. Specify input file locations / patterns

You need to specify:
* varsFile - must contain columns Name, RecordName, and Len, definining the columns and their source tables which should appear in the joined output table
* svyFile - must contain a column "DHS_id" giving the numeric survey ID for each survey we want to run against, to find the survey files (unless you just specify the survey IDs manually later on)
* outDir and outFNPattern - for generating the name and path of the output file for each survey
* a list of glob patterns for the survey files, with string format placeholders for the survey id and the table name. If all files are in one folder then this list will contain only one pattern
* a list of glob patterns for the survey spec files (record spec), with string format placeholders for the survey id and the table name. If all files are in one folder then this list will contain only one pattern

In [None]:
infoDir = r'\\path\to\folder\with\specification\files'
infoDir = r'\\map-fs1.ndph.ox.ac.uk\map_data\DHS_Automation\Processing\U5M_TheUniverse_And_Everything_201510\Info'
# csv file containing columns for variable names and their source tables required in output
# Must have columns Name, RecordName, and Len
varsFile = os.path.join(infoDir, 'variables_chosen_with_lengthandtype.csv')
# list of numerical survey ids in column DHS_id (unless entering list manually)
svyFile = os.path.join(infoDir, 'survey_db_list.csv')

outDir = r'\\output\folder\path'
outDir = r'C:\Temp\test'

# identifier for the output filenames
outputFilenameTag = "U5M_All_Final_Surveys"
outFNPattern = os.path.join(outDir,outputFilenameTag+".{0!s}.csv")

# Glob pattern(s) for all parsed DHS tables (assuming we're reading from CSVs, not DB)
# string format placeholder {0} should be survey number and {1} should be table id name
tablePatterns = [
    r'\\map-fs1.ndph.ox.ac.uk\map_data\DHS_Automation\DataExtraction\20150626_FullSiteScrape\ProcessedTables_PartTrimmed\{0}.*.{1}.csv',
    r'\\map-fs1.ndph.ox.ac.uk\map_data\DHS_Automation\DataExtraction\20160307_Updates\ProcessedTables_PartTrimmed\{0}.*.{1}.csv'
]
# Glob pattern(s) for all FlatRecordSpec files
# String format placeholder {0} should be survey number and {1} should be table id name
specFilePatterns = [
    r'\\map-fs1.ndph.ox.ac.uk\map_data\DHS_Automation\DataExtraction\20150626_FullSiteScrape\ParsedSpecs\{0}.*.FlatRecordSpec.csv',
    r'\\map-fs1.ndph.ox.ac.uk\map_data\DHS_Automation\DataExtraction\20160307_Updates\ParsedSpecs\{0}.*.FlatRecordSpec.csv'
]

### 2. Specify master table

The output will be in terms of this, i.e. there will be one row for each row of this input table.

All other tables must be capable of joining to this table either 1:1 or M:1. Any that join 
1:M would result in duplicate rows from the left outer join, possibly exponentially many if
there are several such tables. The user needs to check this. If the join process seems to be taking a long 
time or using a lot of memory then this is probably what has gone wrong.

In [None]:
# The Reproduction and Birth History table: all children (not just those under 5)
masterTable = "REC21" #"RECH1" #"REC21"

## Get the tasking - the required output columns

i.e. which columns from which tables should be copied into the joined output table

We will parse the input tasking file and build a dictionary mapping the name of the input table to a list of the columns (each specified as a ColumnInfo instance) required from that table. We will save this to the notebook global **tableVars**

In [None]:
# Build a dictionary of the columns that have been requested for each table
# Key is the table name, and value is a list (because order matters) of the 
# column names/lengths (each in a 2-item dict).
tableVars = defaultdict(list)
with open(varsFile) as varfile:
    reader = csv.DictReader(varfile, delimiter=',') # delim ; in original
    for row in reader:
        varname = row['Name']
        recname = row['RecordName']
        varlength = row['Len']
        if 1:#recname in allTables:
            tableVars[recname].append(ColumnInfo({"Name": varname, "Length": varlength}))
            

## Function to guesstimate the table ID (join) columns

Some of the join / identification columns are specified in the .DCF - these are the "normal" joins for things like a single-level (e.g. child tables only) extraction. Others aren't and we have to infer them. This can't always be done automatically so the resulting join SQL should always be checked before trusting the results.

Call the function after defining it and save the results to a notebook global **tableIdsFull**



In [None]:
def getTableIDs(surveySpecFile, interestingTables):
    '''Pass a table spec file from the DCF parser, and a list of table names to get IDs for.
    
    We need to figure out the join fields for each table so we can guess, as far as possible,
    how to join the tables to one another.
    We can't do a perfect job because not everything is always specified in the DCF files, so 
    for any given extraction schema we must check the SQL that is generated before pulling 
    the trigger.
    
    We may wish to extend this to read the relationship spec file too, to pull out
    for example HA0 and HC0 as IDs in RECH5/RECH6
    
    Note that the relative order of the id columns between the tables is important
    as it is used by the joiner code to figure out which columns match to which
    The fieldnames in the parsed files do give them in a consistent order,
    but it might be more relaxing to actually check that here (CASEID first then 
    BIDX or whatever)
    '''
    tableIds = defaultdict(list)
    with open(surveySpecFile) as specfile:
        reader = csv.DictReader(specfile, delimiter = ',')
        for row in reader:
            recname = row['RecordName']
            if (recname in interestingTables and
                (row['ItemType'] in ['IdItem', 'JoinableItem']
                 )):
                # other interesting things include "line number" or "Index" 
                # in the row label, but these give false positives.
                varname = row['Name']
                varlength = row['Len']
                if 'IDX' in varname:
                    varlength = 2
                colInfo = ColumnInfo({"Name": varname, "Length":varlength})
                if colInfo not in tableIds[recname]:
                    tableIds[recname].append(colInfo)
    return tableIds

# Ensure that we are always adding the iditems and joinable column for each table
# Haven't used a separate input for the newer surveys
fullSpecFile = os.path.join(
    r'\\map-fs1.ndph.ox.ac.uk\map_data\DHS_Automation\DataExtraction\20150626_FullSiteScrape',
    'SchemaMappingSupport',
    'SchemaMapperTableSpecs_AllTables_AllSurveys_InclTypes.csv'
)
tableIdsFull = getTableIDs(fullSpecFile, tableVars.keys())

### If any further manipulation of the parsed information is required then do it here

e.g.

In [None]:
tableVars.pop('RECH1')


In [None]:
tableIdsFull['REC01'][0].Name

In [None]:
tableIdsFull['REC01'].append(ColumnInfo({"Name":"V003", "Length":3}))

In [None]:
[c.Name for c in tableIdsFull['REC01']]

In [None]:
[c.Name for c in tableIdsFull['RECH1']]

### Read the list of surveys we need 

(Or just enter them manually)

In [None]:
# get files to read from survey_db_list
with open(svyFile) as svyfile:
    reader = csv.DictReader(svyfile)
    svys = [row['DHS_id'] for row in reader]


In [None]:
svys.extend([393, 421,239,311,425,437,451,473,457,450])

In [None]:
len(set(svys))

In [None]:
#svys = [450]
#svys = [393, 421,239,311,425,437,451,473,457,450]
svys=[248]
# or do all that are available
# svys= [os.path.basename(f).split('.')[0] 
#       for f in glob.glob(tblPattern.format("*", "RECH0"))]

# Process the surveys

In [None]:
# This is just for development of the table joining code, it will prevent anything actually happening
skipDB = False

# Should we ignore any table/survey pairs where none of the requested data is present
skipBlanks = False

# Should we just print out the output-generation SQL for testing
isDebug = False

*** For each survey: ***
* Create a new in-memory DB
* Load all required CSV files for that survey into individual tables in the in-memory DB 
* Create indexes on the join columns in the DB
* Create an output table that is the result of joining them all
* Write that table to disk as CSV

*** Function for processing one survey ***

In [None]:
def processSurvey(svyID, tblPatterns, tblVars, allTblIdCols, masterTable, outFile, verbose = True ):
    # this creates the new temporary db
    db = sqlite3.connect(':memory:')
    cursor = db.cursor()
    srcTableInfos = {}
    
    for tblName, tblCols in tblVars.iteritems():
        tblIdCols = allTblIdCols[tblName]
        # Load one table of this survey into the database
        # Find the individual file required
        for tblPattern in tblPatterns:
            tblFiles = glob.glob(tblPattern.format(svyID, tblName))
            if len(tblFiles) > 0:
                break
        if len(tblFiles) != 1:
            print ("Survey "+str(svyID)+" table "+tblName+" does not exist or is not well specified!")
            continue
        print tblName +"... ",
        tblFile = tblFiles[0]
        with open(tblFile) as tbl:
            reader = csv.DictReader(tbl)
            
            # Create a tableinfo object which will handle building the sql necessary
            # for interacting with this table in the database
            srcTable = TableInfo(tblName, tblIdCols, tblCols)
            if (skipDB):
                # For debugging of TableInfo
                continue
                
            # Get the sql to create the table in the database
            createSql = srcTable.GetCreateTableSQL()
            orderedCols = srcTable.AllColumns()
            
            # Get the "insert into xx(yy,bb) VALUES..." part of the SQL to populate the
            # data into the DB from the CSV reader
            insertSql = srcTable.GetInsertSQLTemplate()
            
            # Read the data from the CSV file into a list of lists, each correctly 
            # ordered for the columns that are in the DB table insert statement.
            # Use value "N/A" for any columns that are not present in this survey
            data = [([row.get(i, 'N/A') for i in orderedCols]) for row in reader ]
            
            # if an incoming CSV table has none of the columns we asked for except IDs
            # (e.g. we only wanted some non-standard survey specific columns and this survey 
            # doesn't have them)
            # then don't just include its ID columns, just skip dealing with it altogether
            gotData = False
            for i in data:
                if i.count('N/A') < (len(i) - len(tblIdCols)):
                    gotData = True
                    break
            if skipBlanks and not gotData:
                print "Skipping table {0} as none of the required cols are present".format(
                    tblName)
                continue
            
            # otherwise save the tableinfo
            srcTableInfos[tblName] = srcTable
            # and create and populate the table into the db
            cursor.execute(createSql)
            cursor.executemany(insertSql, data)
            # and create indexes in the DB on the relevant join columns
            idxSql = srcTable.GetCreateIndexSQL()
            cursor.executescript(idxSql)
        db.commit()
    
    # Now the in-memory database is populated for this survey we can continue.
    # Get a list of all the table names, but with the master table" 
    # - i.e. the left one on the left outer join - at the start of 
    # the list as required by MultiTableJoiner and the rest sorted after
    tblNames = [i for i in sorted(srcTableInfos) if i != masterTable]
    if masterTable in srcTableInfos:
        tblNames[0] = masterTable
    else:
        print "Warning: requested master table {0} isn't present! Join may fail!".format(masterTable)
    
    if (len(tblNames)) == 0:
        print "Nothing for survey " + str(svyID)
        return
    
    # Note that we also don't actually check here if the join is appropriate. 
    # For example from a Child master table we can join to its parents table and the household 
    # table. But we shouldn't do the reverse as for each household there are many children.
    # If we tried, we'd get repeated rows (probably) on the left join.
    # If there was more than one such table then we would get an exploding number of rows.
    # The table joiner code makes some basic effort to check this based on the number and length
    # of join columns.
    
    # Instantiate the object to write the join SQL
    multi = MultiTableJoiner("outputTbl", [srcTableInfos[n] for n in tblNames] )
    # and get the SQL. This is just returning the SQL string, not running it.
    # Use GetCreateIntoSQL(QualifyFieldNames=True) to name output fields like 
    # RECH2_HV270 rather than just HV270
    joinEmAllSQL = multi.GetCreateIntoSQL(QualifyFieldNames=True)
   
    # One-off bodge for Donal's data where we want to join the household schedule table 
    # to the child table. This uses a different join column in the child tables, i.e. normally 
    # child tables are joined to mother tables based on BIDX, but the child tables also contain a column 
    # for household-schedule joining called B16. Thanks to that unhelpful name (REC21.B16) there is 
    # no way that I can see of automatically inferring this from the .DCF specification files.
    #joinEmAllSQL = joinEmAllSQL.replace (
    #    'LEFT JOIN RECH1 ON substr(REC21.CASEID, 1, length(REC21.CASEID)-3) = RECH1.HHID and REC21.BIDX = RECH1.HVIDX',
    #    'LEFT JOIN RECH1 ON substr(REC21.CASEID, 1, length(REC21.CASEID)-3) = RECH1.HHID and REC21.B16 = RECH1.HVIDX'
    #)
    
    if isDebug:
        print joinEmAllSQL
        return
    
    # now execute the SQL, thus creating outputTbl in the in-memory database
    cursor.execute(joinEmAllSQL)
    # and dumpy the results out to CSV
    cursor.execute("select * from outputTbl")
    colNames = [description[0] for description in cursor.description]
    
    # TODO a given column should always appear in the same table but occasionally 
    # this is not the case. So we have to specify in the input file all the places it 
    # could come from, which will generate multiple columns in the output.
    # e.g. some surveys have HV270 in RECH3 rather than RECH2 and so we need to specify 
    # both if we are running for all surveys.
    # Ideally we would check here for these duplicates and write out only the one which 
    # doesn't have "N/A" in the values. But that would need to inspect each row, and thus 
    # would be much slower. So for now it's best to just use the QualifyFieldNames option 
    # above to ensure they all get written somehow.
    with open(outFile, "wb") as f:
        writer = UnicodeWriter(f)
        writer.writerow(colNames)
    #print ""
        writer.writerows(cursor)
    db.close()


*** Run the above function for all the surveys ***

Here is where we actually do the work and create the output

In [None]:
#def processSurvey(svyID, tblPatterns, tblVars, tblIdCols, masterTable, outFile, verbose = True ):

for svyID in svys:
    print "Survey "+str(svyID)
    outname = outFNPattern.format(svyID)
    processSurvey(svyID, tablePatterns, tableVars, tableIdsFull, masterTable, outname)

### Workings below this point - all redundant

Note on the methodology: the table joiner code creates a new table that is the joined result, i.e. 

```SQL
CREATE TABLE output AS blahblah
```

Originally the code was written with the intention of first generating an output table as a copy of the relevant parts of the master input table, and then joining each of the other input tables in turn to populate the remaining columns.
For example

In [None]:
inTable = srcTableInfos['REC43']
cp = TableToTableFieldCopier(outTable, inTable, inTable.OutputColumns())

update = cp.GetUpdateSQL_Join()
cursor.execute(update)
db.commit()

But that doesn't work because it turns out SQLite ***doesn't support join in an update query***. Nice to know.

In [None]:
inTable = srcTableInfos['REC43']
cp = TableToTableFieldCopier(outTable, inTable, inTable.OutputColumns())


Instead we might try REPLACE INTO. But that doesn't work because it adds duplicate rows.

In [None]:
inTable = srcTableInfos['REC43']
cp = TableToTableFieldCopier(outTable, inTable, inTable.OutputColumns())

update = cp.GetUpdateSQL_Replace()
cursor.execute(update)
db.commit()


So instead we ended up with the one-step approach demonstrated above.