# Purpose
Series of steps to test for leakage in an existing project.

The example below is applied to the [Dirty Duck EIS Tutorial](https://github.com/dssg/dirtyduck), using City of Chicago data. We assume that you have already followed the steps in the tutorial up to and including the Inspections section. The variables in ALL CAPS are things that need to be customized for each project, so make a copy of this notebook and personalize it for your project.

# Process



In [1]:
import sys
import os
import math
import warnings

import pandas as pd
import numpy as np
import scipy.stats as stats
import matplotlib.pyplot as plt
import psycopg2
from sqlalchemy import create_engine

from sklearn import linear_model
from sklearn import metrics 
from sklearn.model_selection import TimeSeriesSplit, train_test_split
from IPython.core.interactiveshell import InteractiveShell

from dd_credentials import * # credentials file containing database login variables (dbname, user, host, password, port)

warnings.filterwarnings(action='once')
InteractiveShell.ast_node_interactivity = "all"

In [2]:
def execute_sql(statement, dbname, user, host, password, port, isolation = False, results = True):
    """
    Use psycopg2 to execute PostgreSQL queries
    
    Input:
        statement (str): SQL statement to run in database
        dbname, user, host, password (str): database credentials
        isolation (bool): indicator for whether to change isolation level to autocommit; True for queries that cannot be run 
            from within a transation (see https://wiki.postgresql.org/wiki/Psycopg2_Tutorial), e.g. queries other than SELECT
        results (bool): indicator for whether the query is expected to output results;
            for example, True for SELECT statements and False for CREATE TABLE statements
    
    Output:
        relation (dataframe): query results or empty dataframe if results = False
    """
    conn = psycopg2.connect("dbname={} user={} host={} password={} port={}".format(dbname, user, host, password, port))
    cur = conn.cursor()
    if isolation:
        conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
    cur.execute(statement)
    relation = pd.DataFrame()
    if results:
        results = cur.fetchall()
        colnames = [desc[0] for desc in cur.description]
        relation = pd.DataFrame(results, columns=colnames)
    cur.close()
    conn.close()
    return relation


In [3]:
def randomize(df, do_not_randomize = None):
    """
    Randomize column values of a file. Each column is randomized independently.
    
    Inputs:
        df (dataframe): dataframe to randomize
        do_not_randomize (list): optional list of strings indicating names of 
            columns that should not be randomized
    Outputs:
        df (dataframe): dataframe of randomized data
    """     
    df_random = df.copy()
    if do_not_randomize:
        cols = [c for c in df.columns if c not in do_not_randomize]
    else:
        cols = df.columns
        
    for col in cols:
        #print('\t\tRandomizing column ' + col)
        df_random[col] = np.random.permutation(df_random[col])

    return df_random

In [4]:
# source: https://overlaid.net/2016/02/08/replace-words-in-files-or-strings-using-python/
def do_replacement(base_text, word_map):
    """
    Helper function for replace_words_in_file. Make replacements in base_text, as 
    indicated in  word_map.
    """
    for key, val in word_map.items():
        base_text = base_text.replace(key, val)
    return base_text

def replace_words_in_file(read_from, write_to, word_map):
    """
    Create copy of a file with certain words replaced
    
    Inputs
        read_from: name of file to read from
        write_to: name of new file to be created
        word_map: dictionary of mappings between words and their replacements
            (e.g. {'old_word': 'new_word'})
    Outputs
        None. Will create a new file with the name given in write_to
        
    """
    # Open your desired file as 't' and read the lines into string 'tempstr'
    t = open(read_from, 'r')
    tempstr = t.read()
    t.close()

    # Using the "replace_words" function, we'll pass in our tempstr to be used as the base, 
    # and our device_values to be used as replacement.  
    output = do_replacement(tempstr, word_map)

    # Write out the new config file
    fout = open(write_to, 'w')
    fout.write(output)
    fout.close()

In [5]:
def get_new_filename(filename, suffix):
    """
    Output a new filename (str) given a filename and suffix to append. Assumes file extension at the end is separated by a period.
    """
    k = filename.rfind(".")
    new_filename = filename[:k] + suffix + '.' + filename[k+1:]
    return new_filename

In [6]:
def generate_binary_pred(y_scores, k):
    """
    Generate binary predictions, where top k proportion of highest y_scores receive class of 1
    """
    cutoff = int(len(y_scores) * k)
    ind = np.argpartition(y_scores, -cutoff)[-cutoff:] # indices of top k probabilities
    y_pred = np.zeros(y_scores.shape[0])
    y_pred[ind] = 1
    return y_pred

## I. Create a randomized version of the raw schema

Pull the information schema from selected database.

In [7]:
# name of the database schema containing raw data to be randomized
RAW_SCHEMA = 'raw' 

In [8]:
statement = "SELECT * FROM information_schema.tables;"
tables = execute_sql(statement, dbname, user, host, password, port)

In [9]:
tables

Unnamed: 0,table_catalog,table_schema,table_name,table_type,self_referencing_column_name,reference_generation,user_defined_type_catalog,user_defined_type_schema,user_defined_type_name,is_insertable_into,is_typed,commit_action
0,food,inspections,active_facilities,BASE TABLE,,,,,,YES,NO,
1,food,postgis,geography_columns,VIEW,,,,,,NO,NO,
2,food,postgis,geometry_columns,VIEW,,,,,,YES,NO,
3,food,postgis,spatial_ref_sys,BASE TABLE,,,,,,YES,NO,
4,food,postgis,raster_columns,VIEW,,,,,,NO,NO,
5,food,postgis,raster_overviews,VIEW,,,,,,NO,NO,
6,food,cleaned,inspections,BASE TABLE,,,,,,YES,NO,
7,food,inspections,failed,BASE TABLE,,,,,,YES,NO,
8,food,inspections,failed_major_violation,BASE TABLE,,,,,,YES,NO,
9,food,cleaned_randomized,inspections,BASE TABLE,,,,,,YES,NO,


Select tables from the raw schema; they will be randomized. In this tutorial, there is only one (raw.inspections).

In [10]:
raw = tables[tables.table_schema == RAW_SCHEMA]
raw.head()

Unnamed: 0,table_catalog,table_schema,table_name,table_type,self_referencing_column_name,reference_generation,user_defined_type_catalog,user_defined_type_schema,user_defined_type_name,is_insertable_into,is_typed,commit_action
18,food,raw,inspections,BASE TABLE,,,,,,YES,NO,


First created the _randomized schema if it doesn't yet exist

In [11]:
statement = "CREATE SCHEMA IF NOT EXISTS {}_randomized;".format(RAW_SCHEMA)
output = execute_sql(statement, dbname, user, host, password, port, isolation=True, results = False)

Randomize every table in raw and write output to _randomized schema

In [12]:
for table_name in raw.table_name:
    print("Working on table {}".format(table_name))
    
    # Pull the table from original schema
    print("\tPulling table")
    statement = "SELECT * FROM {}.{};".format(RAW_SCHEMA, table_name)
    table = execute_sql(statement, dbname, user, host, password, port)
    
    # Randomize the table
    print("\tRandomizing")
    randomized_table = randomize(table)
    
    # Make a new table in _randomized schema
    print("\tUploading randomized version")
    statement = "CREATE TABLE IF NOT EXISTS {}_randomized.{} (LIKE {}.{});".format(RAW_SCHEMA, table_name, RAW_SCHEMA, table_name)
    output = execute_sql(statement, dbname, user, host, password, port, isolation = True, results = False)
    
    # Write results into new table
    statement = "SELECT COUNT(*) FROM {}_randomized.{};".format(RAW_SCHEMA, table_name)
    output = execute_sql(statement, dbname, user, host, password, port)
    if output.iloc[0,0]>0: #  do nothing if new table already contains data
        print("\t*****SKIPPING TABLE {} -- it already has data".format(table_name))
    else:
        engine = create_engine('postgresql://{}:{}@{}:{}/{}'.format(user, password, host, port, dbname))
        randomized_table.to_sql(table_name, engine, schema = '{}_randomized'.format(RAW_SCHEMA), index = False, if_exists='append')
    

Working on table inspections
	Pulling table
	Randomizing
	Uploading randomized version
	*****SKIPPING TABLE inspections -- it already has data


## II. Edit + execute preprocessing and config files so that they point to the newly randomized raw schema

New versions of the preprocessing and config files with an "_edited" suffix will be produced.

Make list of preprocessing files that get RAW_SCHEMA ready for triage experiments. The list should be in correct execution order, and file names should have some extension (e.g. .sql).

NOTE: The code below currently only works for .sql files. Need to add .py flexibility if necessary.

In [13]:
PREPROC_FILES = ['/home/ubuntu/dirtyduck/sql/create_eis_schema.sql']

Also define the experiment config file name

In [14]:
EXP_CONFIG = "/home/ubuntu/dirtyduck/triage/experiment_config/eis_01.yaml"

Make a dictionary of text replacements to apply across every preprocessing & config file. This should include:
- Names of schemas that are produced during preprocessing. Add a _randomized suffix to each of them.
- The "purpose" model group key from the experiment config file. In order to avoid overwriting existing model groups, the purpose must be changed to indicate that this experiment is for leakage detection.



In [15]:
WORD_MAP = {RAW_SCHEMA+'.': RAW_SCHEMA+'_randomized.', 
    'cleaned.': 'cleaned_randomized.', 
    'semantic.': 'semantic_randomized.',
    'inspections.': 'inspections_randomized.',
    'eis.': 'eis_randomized.',
    "purpose: 'exploring'": "purpose: 'leakage-detection'"}

Apply automated find+replace to every preprocessing file.  

**NOTE**: Be sure to open the outputted files (with "_edited" suffix) to check that no unexpected replacements were made and that all necessary replacements are made. In this case, we have to manually edit some schema creation statements.

In [16]:
preproc_files_edited = []
for filename in PREPROC_FILES:
    new_filename = get_new_filename(filename, "_edited")
    replace_words_in_file(filename, new_filename, WORD_MAP)
    preproc_files_edited.append(new_filename)

Execute the edited preprocessing files

In [17]:
preproc_files_edited

['/home/ubuntu/dirtyduck/sql/create_eis_schema_edited.sql']

In [19]:
for filename in preproc_files_edited:
    print("Executing file ", filename)
    conn = psycopg2.connect("dbname={} user={} host={} password={} port={}".format(dbname, user, host, password, port))
    conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
    with conn.cursor() as cursor:
        with open(filename, 'r') as f:
            cursor.execute(f.read())

Executing file  /home/ubuntu/dirtyduck/sql/create_eis_schema_edited.sql


OperationalError: could not write to hash-join temporary file: No space left on device


Now edit the experiment config file in a similar fashion. Apply automated find+replace to config file.  

**NOTE**: Again, be sure to open the outputted file (with "_edited" suffix) to check that no unexpected replacements were made and that all necessary replacements are made.  
In this case, we increased test_durations and test_label_timespans from 1month to 3month to obtain a wider test window for more variation in predicted results. The same change was made to the original, non-randomized experiment as well.

In [20]:
new_exp_config = get_new_filename(EXP_CONFIG, '_edited')
replace_words_in_file(EXP_CONFIG, new_exp_config, WORD_MAP)

## III. Run an experiment with this new setup

Now that we've set up randomized versions of the original schemas in the database and created a new config file for a randomized experiment, it's time to run the actual triage experiment.

**NOTE**: This must be done from the command line!

- First validate the new config file. For this tutorial: ./tutorial.sh triage --config_file inspections_test_edited.yaml validate
- Then execute the experiment. For this tutorial: ./tutorial.sh triage --config_file inspections_test_edited.yaml run

In [None]:
# Tried running these commands within Jupyter but didn't work
#!!~/dirtyduck/tutorial.sh triage --config_file inspections_test_edited.yaml validate

In [None]:
#!!~/dirtyduck/tutorial.sh triage --config_file inspections_test_edited.yaml run

## IV. Align the results of randomized experiment against original experiment

First, we need to map the model groups from our new, randomized experiment to the model groups from the original experiment

Confirm that we now see a set of model groups where purpose = "leakage-detection"

In [None]:
statement = "SELECT * FROM results.model_groups;"
model_groups = execute_sql(statement, dbname, user, host, password, port)
model_groups

Recall that a model group is defined by the classifier (model_type), its parameters (model_parameters), the features (feature_list), and the model_config. Since we varied the "purpose" key within model_config for these randomized experiments, all other properties of a model group should be used as index keys. This includes other model group keys in the model_config column.

In [None]:
# pull out all model group keys from model_config column and make new columns from them
list_of_lists = list(model_groups.model_config.apply(lambda x: list(x.keys())))
keys = set([key for x in list_of_lists for key in x])

for key in keys:
    model_groups[key] = model_groups.model_config.apply(lambda x: x.get(key, None))

In [None]:
# convert model_parameters and feature_list columns to strings so they can be used as index keys
model_groups['model_parameters'] = model_groups.model_parameters.apply(lambda x: ', '.join('{}{}'.format(key, val) for key, val in x.items()))
model_groups['feature_list'] = model_groups.feature_list.apply(lambda x: ', '.join(x))

In [None]:
index_cols = [col for col in model_groups.columns if col not in ['purpose', 'model_group_id', 'model_config']]
index_cols

In [None]:
model_groups = model_groups.set_index(index_cols)

Join the original and randomized model groups to create a wide (rather than long) version of model_group table. Each row of model_groups_wide corresponds to a particular model group configuration and indicates the model group ids of the original vs randomized versions.

In [None]:
# define original "purpose" model group key
ORIG_PURPOSE = 'test'

In [None]:
orig = model_groups[model_groups.purpose == ORIG_PURPOSE]
rand = model_groups[model_groups.purpose == 'leakage-detection']
model_groups_wide = pd.merge(orig[['model_group_id']], rand[['model_group_id']], how='right', left_index=True, right_index=True, suffixes = ['_orig', '_rand'])

In [None]:
model_groups_wide

Pick a model group configuration on which to test for leakage. Let's go with the first row, where model_group_id_orig = 1, model_group_id_rand = 7

In [None]:
# create variables for the model_group id's you selected
GROUP_ID_ORIG = 1
GROUP_ID_RAND = 7

Look at the models relevant to each model group

In [None]:
statement = "SELECT * FROM results.models where model_group_id = {};".format(GROUP_ID_ORIG)
models_orig = execute_sql(statement, dbname, user, host, password, port)
models_orig

In [None]:
statement = "SELECT * FROM results.models where model_group_id = {};".format(GROUP_ID_RAND)
models_rand = execute_sql(statement, dbname, user, host, password, port)
models_rand

Pick a pair of comparable models on which to test for leakage, i.e. with matching train_end_time. Let's go with the first row of each table, i.e. model_id numbers 1 (original) and 13 (random).  
Grab the predictions made by each of these models.

In [None]:
# create variables for the model id's you selected
MODEL_ID_ORIG = 1
MODEL_ID_RAND = 13

In [None]:
statement = "SELECT * FROM results.predictions where model_id = {};".format(MODEL_ID_ORIG)
predictions_orig = execute_sql(statement, dbname, user, host, password, port)
predictions_orig

In [None]:
predictions_orig_sorted = predictions_orig.sort_values(by=['score', 'label_value'], ascending=[False, True])

In [None]:
statement = "SELECT * FROM results.predictions where model_id = {};".format(MODEL_ID_RAND)
predictions_rand = execute_sql(statement, dbname, user, host, password, port)
predictions_rand

In [None]:
predictions_rand_sorted = predictions_rand.sort_values(by=['score', 'label_value'], ascending=[False, True])

Grab the evaluation metrics for each of these models.

In [None]:
statement = "SELECT * FROM results.evaluations where model_id = {};".format(MODEL_ID_ORIG)
evaluations_orig = execute_sql(statement, dbname, user, host, password, port)
evaluations_orig

In [None]:
statement = "SELECT * FROM results.evaluations where model_id = {};".format(MODEL_ID_RAND)
evaluations_rand = execute_sql(statement, dbname, user, host, password, port)
evaluations_rand

## V. Perform statistical tests on the randomized model to see if it performs better than random (and thus suggests data leakage)

Tutorial next steps:
- Bootstrap (?) the predicted scores to get a large sample of certain evaluation metrics.  
- Perform statistical tests for leakage on those evaluation metrics  

NOTE: 
- For simplicity, may skip bootstrapping and perform chi squared test on single result.
- Need to add complexity and variation to current Dirty Duck model; current one produces same scores for almost everyone.

Create bootstraped samples of the predicted scores

In [None]:
# bootstrap_n (int): number of bootstrap samples to run
boostrap_n = 100
bootstrap_samples = []
for n in range(bootstrap_n):
    sample = predictions_rand.scores.sample(predictions_rand.shape[0], replace=True)
    bootstrap_samples.append(sample)

In [None]:
bootstrap_samples

Generate labels at a specific threshold, e.g. 10% for each bootstrapped sample set

Calculate confusion matrix for each bootstrapped sample set

Perform chi-squared test 

In [None]:
stats.chisquare(f_obs, f_exp, ddof=1)