# Fairness-Aware Instrumentation of ML-Pipelines

## Preparations

In [1]:
from collections import defaultdict
import inspect
import pandas as pd
import numpy as np
from scipy import stats
import re
from graphviz import Digraph

from sklearn.preprocessing import OneHotEncoder, StandardScaler, label_binarize, KBinsDiscretizer
from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.linear_model import LogisticRegression
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier

from utils import *

import warnings
warnings.filterwarnings('ignore')

## Pipeline Functions

### German Dataset

In [2]:
def german_pipeline_easy(f_path = '../data/german_titled.csv'):
    data = pd.read_csv(f_path)
    # projection
    data = data[['status_of_existing_account', 'duration_in_month', 'credit_his', 'purpose', 'credit_amt', 'saving_account', 'preset_emp', 'installment_rate', 'personal_status_and_sex', 'guarantors', 'present_residence', 
                 'property', 'age','label']]
    # filtering
    data = data[data.credit_amt>=4000]

    #start sklearn pipeline
    one_hot_and_impute = Pipeline([
        ('imputer', SimpleImputer(strategy='most_frequent')),
        ('onehot', OneHotEncoder())
    ])

    featurizer = ColumnTransformer(transformers=[
        ('onehot', OneHotEncoder(), ['status_of_existing_account', 'credit_his','purpose', 'saving_account', 'preset_emp']),
        ('impute_onehot', one_hot_and_impute, ['personal_status_and_sex', 'guarantors', 'property']),
        ('std_scaler', StandardScaler(), ['duration_in_month', 'credit_amt', 'present_residence', 'age'])
    ])
    pipeline = Pipeline([
        ('features', featurizer),
        ('learner', RandomForestClassifier())
    ])
    return pipeline
    

In [38]:
def german_pipeline_normal(f_path = '../data/german_titled.csv'):
    f_path_1 = '../data/german_titled_split_1.csv'
    f_path_2 = '../data/german_titled_split_2.csv'

    # load data
    data_split_1 = pd.read_csv(f_path_1, index_col = 0)
    data_split_2 = pd.read_csv(f_path_2, index_col = 0)

    # join
    data = pd.merge(data_split_1, data_split_2, on='identifier')

    # drop first col
    data.drop(data.columns[0], axis=1, inplace = True)

    # projection
    data = data[['status_of_existing_account', 'duration_in_month', 'credit_his', 'purpose', 'credit_amt', 'saving_account', 'preset_emp', 'installment_rate', 'personal_status_and_sex', 'guarantors', 'present_residence', 
                 'property', 'age','label']]
    # filtering
    data = data[data.credit_amt>=4000]

    #start sklearn pipeline
    one_hot_and_impute = Pipeline([
        ('imputer', SimpleImputer(strategy='most_frequent')),
        ('onehot', OneHotEncoder())
    ])

    featurizer = ColumnTransformer(transformers=[
        ('onehot', OneHotEncoder(), ['status_of_existing_account', 'credit_his','purpose', 'saving_account', 'preset_emp']),
        ('impute_onehot', one_hot_and_impute, ['personal_status_and_sex', 'guarantors', 'property']),
        ('std_scaler', StandardScaler(), ['duration_in_month', 'credit_amt', 'present_residence', 'age'])
    ])
    pipeline = Pipeline([
        ('features', featurizer),
        ('learner', RandomForestClassifier())
    ])
    return pipeline


### Compass Dataset

In [4]:
def compass_pipeline(f1_path = '../data/compass/demographic.csv',f2_path = '../data/compass/jailrecord1.csv',f3_path = '../data/compass/jailrecord2.csv'):
    #read csv files
    df = pd.read_csv(f1_path)
    df1 = pd.read_csv(f2_path)
    df2 = pd.read_csv(f3_path)
    
    #drop columns inplace
    df.drop(columns=['Unnamed: 0','age_cat'],inplace=True)
    df1.drop(columns=['Unnamed: 0'],inplace=True)
    df2.drop(columns=['Unnamed: 0'],inplace=True)

    #JOIN dataframes column-wise and row-wise
    data = pd.concat([df1,df2],ignore_index=True)
    data = pd.merge(df, data, on=['id','name'])

    #drop rows that miss a few important features
    data = data.dropna(subset=['id', 'name','is_recid','days_b_screening_arrest','c_charge_degree','c_jail_out','c_jail_in'])

    #generate a new column conditioned on existed column
    data['age_cat'] = data.apply(lambda row:'<25' if row['age'] < 25 else '>45' if row['age']>45 else '25-45', axis=1)

    #PROJECTION
    data = data[['sex', 'dob','age','c_charge_degree', 'age_cat', 'race','score_text','priors_count','days_b_screening_arrest',
                 'decile_score','is_recid','two_year_recid','c_jail_in','c_jail_out']]

    #SELECT based on some conditions
    data = data.loc[(data['days_b_screening_arrest'] <= 30)]
    data = data.loc[(data['days_b_screening_arrest'] >= -30)]
    data = data.loc[(data['is_recid'] != -1)]
    data = data.loc[(data['c_charge_degree'] != "O")]
    data = data.loc[(data['score_text'] != 'N/A')]
    # create a new feature 
    data['c_jail_out'] = pd.to_datetime(data['c_jail_out']) 
    data['c_jail_in'] = pd.to_datetime(data['c_jail_in']) 
#     data['length_of_stay'] = data['c_jail_out'] - data['c_jail_in']
    #specify categorical and numeric features
    categorical = ['sex', 'c_charge_degree', 'age_cat', 'race', 'score_text', 'is_recid',
           'two_year_recid']
    numeric1 = ['age','priors_count', 'decile_score']
    numeric2 = ['days_b_screening_arrest','length_of_stay']

    #sklearn pipeline
    impute1_and_onehot = Pipeline([('imputer1', SimpleImputer(strategy='most_frequent')), 
                                   ('onehot', OneHotEncoder(handle_unknown='ignore'))])
    impute2_and_bin = Pipeline([('imputer2', SimpleImputer(strategy='mean')), 
                                ('bin_discretizer', KBinsDiscretizer(n_bins=4, encode='uniform', strategy='uniform'))])
    featurizer = ColumnTransformer(transformers=[
            ('impute1_and_onehot', impute1_and_onehot, categorical),
            ('impute2_and_bin', impute2_and_bin, numeric1),
            ('std_scaler', StandardScaler(), numeric2),
        ])
                               
    pipeline = Pipeline([
        ('features', featurizer),
        ('learner', LogisticRegression())
    ])
    return pipeline


### Adult Sample

In [5]:
def adult_pipeline_easy(f_path = '../pipelines/adult-sample.csv'):
   
    raw_data = pd.read_csv(f_path, na_values='?')
    data = raw_data.dropna()

    labels = label_binarize(data['income-per-year'], ['>50K', '<=50K'])

    feature_transformation = ColumnTransformer(transformers=[
        ('categorical', OneHotEncoder(handle_unknown='ignore'), ['education', 'workclass']),
        ('numeric', StandardScaler(), ['age', 'hours-per-week'])
    ])

        
    income_pipeline = Pipeline([
      ('features', feature_transformation),
      ('classifier', DecisionTreeClassifier())])
    
    return income_pipeline

In [6]:
def adult_pipeline_normal(f_path = '../pipelines/adult-sample_missing.csv'):
    raw_data = pd.read_csv(f_path, na_values='?')
    data = raw_data.dropna()

    labels = label_binarize(data['income-per-year'], ['>50K', '<=50K'])

    nested_categorical_feature_transformation = Pipeline(steps=[
        ('impute', SimpleImputer(missing_values=np.nan, strategy='most_frequent')),
        ('encode', OneHotEncoder(handle_unknown='ignore'))
    ])

    nested_feature_transformation = ColumnTransformer(transformers=[
        ('categorical', nested_categorical_feature_transformation, ['education', 'workclass']),
        ('numeric', StandardScaler(), ['age', 'hours-per-week'])
    ])

    nested_pipeline = Pipeline([
      ('features', nested_feature_transformation),
      ('classifier', DecisionTreeClassifier())])

    return nested_pipeline

### Loan Dataset

In [7]:
def loan_pipeline(f_path = '../pipelines/loan_train.csv'):
    data = pd.read_csv(f_path)

    # Loan_ID is not needed in training or prediction
    data = data.drop('Loan_ID', axis=1)

#     data = data.drop('Loan_Status', axis=1)

    numeric_features = data.select_dtypes(include=['int64', 'float64']).columns
    categorical_features = data.select_dtypes(include=['object']).drop(['Loan_Status'], axis=1).columns
    # do transformer on numeric & categorical data respectively
    numeric_transformer = Pipeline(steps=[
        ('imputer', SimpleImputer(strategy='median')),
        ('scaler', StandardScaler())])

    categorical_transformer = Pipeline(steps=[
        ('imputer', SimpleImputer(strategy='constant', fill_value='missing')),
        ('onehot', OneHotEncoder(handle_unknown='ignore'))])

    preprocessor = ColumnTransformer(
        transformers=[
            ('num', numeric_transformer, numeric_features),
            ('cat', categorical_transformer, categorical_features)])

    # classifier
    pipeline = Pipeline(steps=[('preprocessor', preprocessor),
                          ('classifier', RandomForestClassifier())])
    return pipeline

## Logs Part

In [32]:
# Current Version
def describe_ver(pipeline_to_test, cat_col = ['race', 'occupation', 'education'], numerical_col = ['age', 'hours-per-week']):
    
    raw_func = inspect.getsource(pipeline_to_test)


    input_args, executable_list, outputs = func_aggregation(raw_func)
    
    for line in input_args:
        exec(line)
    
    print()
    print('####################### Start Pandas Opeation #######################')
    print()
    
    ######################################
    # Initialization
    ######################################
    prev = {}
    
    numerical_metric_list = ['count', 'missing_count', 'median', 'mad', 'range']
    numerical_df = pd.DataFrame(np.inf, index = numerical_col, columns = numerical_metric_list)
    
    cat_metric_list = ['missing_count', 'num_class', 'class_count', 'class_percent']
    cat_df = pd.DataFrame(np.inf, index = cat_col, columns = cat_metric_list)

    
    ######################################
    # Execution
    ######################################     
    for cur_line in executable_list:
        print_bool = False
        exec(cur_line)
        if '#' in cur_line:
            continue
        try: 
            if str(eval(f"type({cur_line.split('=')[0].strip()})")) == "<class 'pandas.core.frame.DataFrame'>":

                target_df = cur_line.split('=')[0].strip()
                
                col_list = eval(target_df).columns.tolist()
                numerical_col_sub = [i for i in numerical_col if i in col_list]
                cat_col_sub = [j for j in cat_col if j in col_list]
                
                if len(numerical_col_sub) != 0:
                    ######################################################################################
                    # numerical features & metrices
                    # counts, missing values, Median and MAD, range/scaling
                    ######################################################################################
                    for numeric_feature in numerical_col_sub:

                        numerical_df = cal_numerical(eval(target_df), numeric_feature, numerical_df)
                
                if len(cat_col_sub) != 0:
                    ######################################################################################
                    # categorical features & metrices
                    # missing values, number of classes, counts for each group, percentage for each group
                    ######################################################################################
                    for cat_feature in cat_col_sub:

                        cat_df = cal_categorical(eval(target_df), cat_feature, cat_df)
                    

                ######################################################################################
                # Comparison occurs here! 
                ######################################################################################
                if len(prev) != 0:
                    numerical_dif = numerical_df - prev['numerical']
                    if (numerical_dif.values != 0).any():
                        # print(f'Metrics: {mat} changed in {col} with value {dif}')
                        print('*'*10)
                        print('Changes in numerical features!')
                        print(numerical_dif)
                        print('*'*10)
                        print()
                    
                ################################## 
                # ⬆️ numerical
                # ⬇️ categorical                
                ##################################
                    
                    cat_dif = get_categorical_dif(cat_df, cat_metric_list, prev['categorical'])
                    if (cat_dif.values != 0).any():
                        print('*'*10)
                        print('Changes in categorical features!')
                        print(cat_dif)
                        print('*'*10)
              
                print_bool = True
                
                if print_bool:
                    print(f'Inpected {cur_line}')
                    print('-------------------------------------------------------')
                    print() 

                # save the output for next round comparison
                prev['numerical'] = numerical_df.copy()
                prev['categorical'] = cat_df.copy()

            elif str(eval(f"type({cur_line.split('=')[0].strip()})")).startswith("<class 'sklearn"):
                pass
            else:
                pass

        except:
            ######################################################################################
            # numerical features & metrices
            # counts, missing values, Median and MAD, range/scaling
            ######################################################################################
            for numeric_feature in numerical_col:

                numerical_df = cal_numerical(eval(target_df), numeric_feature, numerical_df)

            ######################################################################################
            # categorical features & metrices
            # missing values, number of classes, counts for each group, percentage for each group
            ######################################################################################
            for cat_feature in cat_col:

                cat_df = cal_categorical(eval(target_df), cat_feature, cat_df)

            ######################################################################################
            # Comparison occurs here! 
            ######################################################################################
            if len(prev) != 0:
                numerical_dif = numerical_df - prev['numerical']
                if (numerical_dif.values != 0).any():
                    # print(f'Metrics: {mat} changed in {col} with value {dif}')
                    print('*'*10)
                    print('Changes in numerical features!')
                    print(numerical_dif)
                    print('*'*10)
                    print()

            ################################## 
            # ⬆️ numerical
            # ⬇️ categorical                
            ##################################

                cat_dif = get_categorical_dif(cat_df, cat_metric_list, prev['categorical'])
                if (cat_dif.values != 0).any():
                    print('*'*10)
                    print('Changes in categorical features!')
                    print(cat_dif)
                    print('*'*10)

            print_bool = True

            if print_bool:
                print(f'Inpected {cur_line}')
                print('-------------------------------------------------------')
                print() 

            # save the output for next round comparison
            prev['numerical'] = numerical_df.copy()
            prev['categorical'] = cat_df.copy()            
            

    nested_graph = pipeline_to_dataflow_graph(eval(f'{outputs[0]}'))

    print()
    print('####################### Start Sklearn Pipeline #######################')
    print()
        
    for item in nested_graph:
        ######################################################################################
        # numerical features & metrices
        # counts, missing values, Median and MAD, range/scaling
        ######################################################################################
        if item.name in numerical_col: 
            numeric_feature = item.name
            
            eval(target_df)[item.name] = item.operation.fit_transform(eval(target_df)[item.name].values.reshape(-1,1))
            print(f"Operations {str(item.operation).split('(')[0]} on {item.name}")
            
            ##############################
            # Metrices Calculation
            ##############################
            numerical_df = cal_numerical(eval(target_df), numeric_feature, numerical_df)
            
            ##############################
            # Comparison
            ##############################
            numerical_dif = numerical_df - prev['numerical']
            
            if (numerical_dif.loc[numeric_feature,:].values != 0).any():
                # print(f'Metrics: {mat} changed in {col} with value {dif}')
                print('*'*10)
                print('Changes in numerical features!')
                print(numerical_dif.loc[numeric_feature,:])
                print('*'*10)
                print()
                
        ######################################################################################
        # categorical features & metrices
        # missing values, number of classes, counts for each group, percentage for each group
        ######################################################################################               
        if item.name in cat_col:
            cat_feature = item.name
            ##############################
            try:
                eval(target_df)[item.name] = item.operation.fit_transform(eval(target_df)[item.name].values.reshape(-1,1)).toarray()
            except:
                eval(target_df)[item.name] = item.operation.fit_transform(eval(target_df)[item.name].values.reshape(-1,1))
            print(f"Operations {str(item.operation).split('(')[0]} on {item.name}")
            
            ##############################
            # Metrices Calculation
            ##############################            
            cat_df = cal_categorical(eval(target_df), cat_feature, cat_df)
            
            ##############################
            # Comparison
            ##############################            
            cat_dif = get_categorical_dif(cat_df, cat_metric_list, prev['categorical'])
            if (cat_dif.loc[cat_feature,:].values != 0).any():
                print('*'*10)
                print('Changes in categorical features!')
                print(cat_dif.loc[cat_feature,:])
                print('*'*10)
                print()
                
        prev['numerical'] = numerical_df.copy()
        prev['categorical'] = cat_df.copy()     

In [47]:
describe_ver(german_pipeline_normal, ['personal_status_and_sex'], ['age'])


####################### Start Pandas Opeation #######################



KeyError: 'age'

## DAG Part

In [49]:
def find_pd_lines(pipeline_func):
    pipeline_func = inspect.getsource(pipeline_func)
    pd_lines = []
    input_args , executable_list, _ = func_aggregation(pipeline_func)
    for line in input_args:
        exec(line)
    for cur_line in executable_list:
        exec(cur_line)
        try: 
            if 'inplace' in cur_line:
                pd_lines.append(cur_line)
            elif str(eval(f"type({cur_line.split('=')[0].strip()})")).startswith("<class 'pandas"):
                pd_lines.append(cur_line)
        except:
            pass
    return pd_lines

def pd_to_dataflow_graph(pipeline_func, parent_vertices=[]):
    executable_list = find_pd_lines(pipeline_func)
    graph = []
    previous = []
    
    for line in executable_list:
        if 'inplace' in line and '#' not in line:
            df_name = line.split('.')[0]
            func_name = line.split('.')[1].split('(')[0].strip()
            col_effect = line.split('[')[1].split(']')[0].strip()
            if len(previous) > 1:
                for node in previous:
                    if node.name == df_name:
                        vertex = DataFlowVertex([node], df_name+'_drop', func_name+' '+col_effect, col_effect)
                        previous.append(vertex)
                        previous.remove(node)     
            else:
                vertex = DataFlowVertex(previous, df_name+'_drop', func_name+' '+col_effect, col_effect)
                previous = [vertex]
        else:
            var_name = line.split('=')[0].strip()

            # match ".func_name(...)"
            pd_func = re.search('\.\s*([_a-z]*)\s*\(',line)  
            if pd_func:
                func_name = pd_func.group(1)
                params = re.search('\(([^\)]*)\)',line)  #"(...)"

                if params:
                    params = params.group(1).strip()

                    if func_name == 'read_csv': #df = pd.read_csv(path)
                        vertex = DataFlowVertex(parent_vertices,var_name, func_name, params)
                        previous.append(vertex)

                    elif func_name in ['join','merge','concat']:
                        if func_name == 'concat': #df_new = pd.concat([df1,df2],keys=[])
                            df_names = [name.strip() for name in params.strip('[]').split(',')]

                        else: # df_new = df1.join/merge(df2,on='...',how='...')
                            df_names = [re.split("[.=]",line)[1].strip(), params.split(',')[0].strip()]      
                        parent_vertices = search_vertex_by_names(df_names, graph) #search in graph by df_names
                        vertex = DataFlowVertex(previous, var_name, func_name, params) #TODO vertex name?
                        previous = [vertex]
                    elif 'lambda' in params:
                        cols = var_name.split('[')[1].split(']')[0].strip()
                        vertex = DataFlowVertex(previous, func_name+' '+cols, func_name, params)
                        previous = [vertex]
                    elif '[' in var_name:
                        cols = var_name.split('[')[1].split(']')[0].strip()
                        vertex = DataFlowVertex(previous, func_name+' '+cols+' '+params, func_name, params)
                        previous = [vertex]
                    else:
                        vertex = DataFlowVertex(previous, func_name+' '+params, func_name, params)
                        previous = [vertex]


            # filter operation: "df[[cols]]", "df[condition]","df.loc[]","df.iloc[]"
            else:
                is_filter = re.search('\(([^\)]*)\)',line) #"[...]"
                if is_filter:
                    filter_cond = is_filter.group(1)
                    vertex = DataFlowVertex(previous, 'select '+filter_cond, 'filter', filter_cond)
                    previous = [vertex]

        graph.append(vertex)
            
    return graph, previous


def sklearn_to_dataflow_graph(pipeline, parent_vertices=[]):
    
    graph = pipeline_to_dataflow_graph_full(pipeline, name_prefix='', parent_vertices=[])
    for node in graph:
        if node.parent_vertices==[]:
            node.parent_vertices = parent_vertices
    return graph

def visualize(nested_graph, save_file_path ='./pipeline.gv'):
    dot = Digraph(comment='preprocessing_pipeline')

    for node in nested_graph:
        dot.node(node.name,label = node.name+',\nop='+node.operation)
        parents = node.parent_vertices 
        
        for parent in parents:
            dot.edge(parent.name, node.name)
    dot.render(save_file_path, view=True)
    return dot

################# PUT EVERYTHING TOGETHER #################
def dag_plot(func_pipe):
    pd_graph, parent_vertices = pd_to_dataflow_graph(func_pipe)
    pipeline = eval(func_pipe.__name__+'()')
    sklearn_graph = sklearn_to_dataflow_graph(pipeline, parent_vertices)
    pd_graph.extend(sklearn_graph)
    visualize(pd_graph)

### Combine together

In [22]:
def func_tracer(pipeline, cat_col, numerical_col):
    dag_plot(pipeline)
    describe_ver(pipeline, cat_col, numerical_col)

In [51]:
# adult_pipeline
# cat_col = ['race', 'occupation', 'education'], numerical_col = ['age', 'hours-per-week']
func_tracer(adult_pipeline_easy, cat_col = ['race', 'occupation', 'education'], numerical_col = ['age', 'hours-per-week'])


####################### Start Pandas Opeation #######################

Inpected raw_data = pd.read_csv(f_path, na_values='?')
-------------------------------------------------------

**********
Changes in numerical features!
                count  missing_count  median     mad  range
age              -8.0            0.0     0.0 -0.7413  -19.0
hours-per-week   -8.0            0.0     0.0 -1.4826    0.0
**********

**********
Changes in categorical features!
            missing_count  num_class  \
race                  0.0        0.0   
occupation           -6.0        0.0   
education             0.0        0.0   

                                                  class_count  \
race        {'White': -6, 'Black': -1, 'Amer-Indian-Eskimo...   
occupation  {'Exec-managerial': 0, 'Adm-clerical': 0, 'Cra...   
education   {'HS-grad': -1, 'Bachelors': 0, 'Some-college'...   

                                                class_percent  
race        {'White': 0.007000000000000006, 'Black':

In [53]:
# loan_pipeline
# cat_col = ['Gender', 'Education'], numerical_col = []
func_tracer(loan_pipeline, cat_col = ['Gender', 'Education'], numerical_col = [])


####################### Start Pandas Opeation #######################

Inpected data = pd.read_csv(f_path)
-------------------------------------------------------

**********
Changes in categorical features!
           missing_count  num_class                         class_count  \
Gender               0.0        0.0            {'Male': 0, 'Female': 0}   
Education            0.0        0.0  {'Graduate': 0, 'Not Graduate': 0}   

                                    class_percent  
Gender               {'Male': 0.0, 'Female': 0.0}  
Education  {'Graduate': 0.0, 'Not Graduate': 0.0}  
**********
Inpected data = data.drop('Loan_ID', axis=1)
-------------------------------------------------------


####################### Start Sklearn Pipeline #######################

Operations SimpleImputer on Gender
**********
Changes in categorical features!
missing_count                                                  -13
num_class                                                        1
class_coun

In [52]:
#compass_pipeline
# cat_col = ['race'], numerical_col = ['age']
func_tracer(compass_pipeline, cat_col = ['race'], numerical_col = [])


####################### Start Pandas Opeation #######################

Inpected df = pd.read_csv(f1_path)
-------------------------------------------------------



KeyError: 'race'

In [54]:
# german_pipeline
# cat_col = ['personal_status_and_sex'], numerical_col = ['age']
func_tracer(german_pipeline_easy, cat_col = ['personal_status_and_sex'], numerical_col = ['age'])


####################### Start Pandas Opeation #######################

Inpected data = pd.read_csv(f_path)
-------------------------------------------------------

**********
Changes in categorical features!
                         missing_count  num_class  \
personal_status_and_sex            0.0        0.0   

                                                      class_count  \
personal_status_and_sex  {'A93': 0, 'A92': 0, 'A94': 0, 'A91': 0}   

                                                            class_percent  
personal_status_and_sex  {'A93': 0.0, 'A92': 0.0, 'A94': 0.0, 'A91': 0.0}  
**********
Inpected data = data[['status_of_existing_account', 'duration_in_month', 'credit_his', 'purpose', 'credit_amt', 'saving_account', 'preset_emp', 'installment_rate', 'personal_status_and_sex', 'guarantors', 'present_residence','property', 'age','label']]
-------------------------------------------------------

**********
Changes in numerical features!
     count  missing_count  med