In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

#### 1. Load data

In [2]:
admissions_df = pd.read_csv('../datasets/data/bar_pass_prediction.csv', index_col=False)
print(admissions_df.shape)
print(admissions_df.columns)
print(admissions_df.isnull().sum())
display(admissions_df.head())

(22407, 39)
Index(['decile1b', 'decile3', 'ID', 'decile1', 'sex', 'race', 'cluster',
       'lsat', 'ugpa', 'zfygpa', 'DOB_yr', 'grad', 'zgpa', 'bar1', 'bar1_yr',
       'bar2', 'bar2_yr', 'fulltime', 'fam_inc', 'age', 'gender', 'parttime',
       'male', 'race1', 'race2', 'Dropout', 'other', 'asian', 'black', 'hisp',
       'pass_bar', 'bar', 'bar_passed', 'tier', 'index6040', 'indxgrp',
       'indxgrp2', 'dnn_bar_pass_prediction', 'gpa'],
      dtype='object')
decile1b                   1604
decile3                    1604
ID                            0
decile1                    1092
sex                           5
race                         16
cluster                      96
lsat                          0
ugpa                          0
zfygpa                      984
DOB_yr                       50
grad                          3
zgpa                       1289
bar1                          0
bar1_yr                      39
bar2                          0
bar2_yr             

Unnamed: 0,decile1b,decile3,ID,decile1,sex,race,cluster,lsat,ugpa,zfygpa,...,hisp,pass_bar,bar,bar_passed,tier,index6040,indxgrp,indxgrp2,dnn_bar_pass_prediction,gpa
0,10.0,10.0,2,10.0,1.0,7.0,1.0,44.0,3.5,1.33,...,0,1,a Passed 1st time,True,4.0,886.842082,g 700+,i 820+,0.979804,3.5
1,5.0,4.0,3,5.0,1.0,7.0,2.0,29.0,3.5,-0.11,...,0,1,a Passed 1st time,True,2.0,649.999987,f 640-700,f 640-700,0.979804,3.5
2,3.0,2.0,36,3.0,2.0,7.0,3.0,36.0,3.5,-0.64,...,0,1,a Passed 1st time,True,3.0,760.526298,g 700+,h 760-820,0.979804,3.5
3,7.0,4.0,52,7.0,2.0,7.0,3.0,39.0,3.5,0.34,...,0,1,a Passed 1st time,True,3.0,807.894717,g 700+,h 760-820,0.979804,3.5
4,9.0,8.0,55,9.0,2.0,7.0,4.0,48.0,3.5,1.02,...,0,1,a Passed 1st time,True,5.0,949.999974,g 700+,i 820+,0.979804,3.5


### 2. Data cleaning

A subset of the [Law School Admission Bar*](https://www.kaggle.com/datasets/danofer/law-school-admissions-bar-passage) dataset is used as a demo. Synthetic data will be generated for the following columns: 

- sex: student gender, i.e. 1 (male), 2 (female)
- race1: race, i.e. asian, black, hispanic, white, other
- ugpa: The student's undergraduate GPA, continous variable;
- bar: Ground truth label indicating whether or not the student passed the bar, i.e. passed 1st time, passed 2nd time, failed, non-graduated

The CART method will be used  evaluate the distribution and correlation differences between the real and synthetic data.

*The original paper can be found [here](https://files.eric.ed.gov/fulltext/ED469370.pdf).

In [48]:
admissions_sub = admissions_df[['sex', 'race1', 'ugpa', 'bar']]

In [49]:
# Remove all rows with missing values
real_data = admissions_sub.dropna()
print(real_data.shape)
print(real_data.isnull().sum())

(22387, 4)
sex      0
race1    0
ugpa     0
bar      0
dtype: int64


In [50]:
real_data

Unnamed: 0,sex,race1,ugpa,bar
0,1.0,white,3.5,a Passed 1st time
1,1.0,white,3.5,a Passed 1st time
2,2.0,white,3.5,a Passed 1st time
3,2.0,white,3.5,a Passed 1st time
4,2.0,white,3.5,a Passed 1st time
...,...,...,...,...
22402,2.0,black,1.8,c Failed
22403,2.0,black,1.8,c Failed
22404,2.0,black,1.8,a Passed 1st time
22405,2.0,white,1.5,a Passed 1st time


### 3. CART model

In [12]:
import numpy as np
import pandas as pd

# classes
from synthpop.validator import Validator
from synthpop.processor import Processor
# global variables
from synthpop import NUM_COLS_DTYPES
from synthpop.processor import NAN_KEY
from synthpop.method import CART_METHOD, METHODS_MAP, NA_METHODS


class Synthpop:
    def __init__(self,
                 method=None,
                 visit_sequence=None,
                 # predictor_matrix=None,
                 proper=False,
                 cont_na=None,
                 smoothing=False,
                 default_method=CART_METHOD,
                 numtocat=None,
                 catgroups=None,
                 seed=None):
        # initialise the validator and processor
        self.validator = Validator(self)
        self.processor = Processor(self)

        # initialise arguments
        self.method = method
        self.visit_sequence = visit_sequence
        self.predictor_matrix = None
        self.proper = proper
        self.cont_na = cont_na
        self.smoothing = smoothing
        self.default_method = default_method
        self.numtocat = numtocat
        self.catgroups = catgroups
        self.seed = seed

        # check init
        self.validator.check_init()

    def fit(self, df, dtypes=None):
        # TODO check df and check/EXTRACT dtypes
        # - all column names of df are unique
        # - all columns data of df are consistent
        # - all dtypes of df are correct ('int', 'float', 'datetime', 'category', 'bool'; no object)
        # - can map dtypes (if given) correctly to df
        # should create map col: dtype (self.df_dtypes)

        self.df_columns = df.columns.tolist()
        self.n_df_rows, self.n_df_columns = np.shape(df)
        self.df_dtypes = dtypes

        # check processor
        self.validator.check_processor()
        # preprocess
        processed_df = self.processor.preprocess(df, self.df_dtypes)
        self.processed_df_columns = processed_df.columns.tolist()
        self.n_processed_df_columns = len(self.processed_df_columns)

        # check fit
        self.validator.check_fit()
        # fit
        self._fit(processed_df)

    def _fit(self, df):
        self.saved_methods = {}

        # train
        self.predictor_matrix_columns = self.predictor_matrix.columns.to_numpy()
        for col, visit_step in self.visit_sequence.sort_values().items():
            print('train_{}'.format(col))

            # initialise the method
            col_method = METHODS_MAP[self.method[col]](dtype=self.df_dtypes[col], smoothing=self.smoothing[col], proper=self.proper, random_state=self.seed)
            # fit the method
            col_predictors = self.predictor_matrix_columns[self.predictor_matrix.loc[col].to_numpy() == 1]
            col_method.fit(X_df=df[col_predictors], y_df=df[col])
            # save the method
            self.saved_methods[col] = col_method

    def generate(self, k=None):
        self.k = k

        # check generate
        self.validator.check_generate()
        # generate
        synth_df = self._generate()
        # postprocess
        processed_synth_df = self.processor.postprocess(synth_df)

        return processed_synth_df

    def _generate(self):
        synth_df = pd.DataFrame(data=np.zeros([self.k, len(self.visit_sequence)]), columns=self.visit_sequence.index)

        for col, visit_step in self.visit_sequence.sort_values().items():
            print('generate_{}'.format(col))

            # reload the method
            col_method = self.saved_methods[col]
            # predict with the method
            col_predictors = self.predictor_matrix_columns[self.predictor_matrix.loc[col].to_numpy() == 1]
            synth_df[col] = col_method.predict(synth_df[col_predictors])

            # change all missing values to 0
            if col in self.processor.processing_dict[NAN_KEY] and self.df_dtypes[col] in NUM_COLS_DTYPES and self.method[col] in NA_METHODS:
                nan_indices = synth_df[self.processor.processing_dict[NAN_KEY][col]['col_nan_name']] != 0
                synth_df.loc[nan_indices, col] = 0

            # map dtype to original dtype (only excpetion if column is full of NaNs)
            if synth_df[col].notna().any():
                synth_df[col] = synth_df[col].astype(self.df_dtypes[col])

        return synth_df

In [13]:
import numpy as np
import pandas as pd

# global variables
from synthpop import NUM_COLS_DTYPES
from synthpop.method import EMPTY_METHOD, SAMPLE_METHOD
from synthpop.method import DEFAULT_METHODS_MAP, INIT_METHODS_MAP, CONT_TO_CAT_METHODS_MAP
from synthpop.method import ALL_METHODS, INIT_METHODS, DEFAULT_METHODS, NA_METHODS
from synthpop.processor import NAN_KEY


INIT_STEP = 'init'
PROCESSOR_STEP = 'processor'
FIT_STEP = 'fit'
GENERATE_STEP = 'generate'

NONE_TYPE = type(None)

DENSITY = 'density'


class Validator:
    def __init__(self, spop):
        self.spop = spop
        self.attributes_types = {'method': (NONE_TYPE, str, list),
                                 'visit_sequence': (NONE_TYPE, np.ndarray, list),
                                 # 'predictor_matrix': (NONE_TYPE,),
                                 'proper': (bool,),
                                 'cont_na': (NONE_TYPE, dict),
                                 'smoothing': (bool, str, dict),
                                 'default_method': (str,),
                                 'numtocat': (NONE_TYPE, list),
                                 'catgroups': (NONE_TYPE, int, dict),
                                 'seed': (NONE_TYPE, int),
                                 'k': (NONE_TYPE, int)}

    def check_init(self):
        step = INIT_STEP

        self.default_method_validator(step=step)
        self.method_validator(step=step)
        self.visit_sequence_validator(step=step)
        # self.predictor_matrix_validator(step=step)
        self.proper_validator(step=step)
        self.cont_na_validator(step=step)
        self.smoothing_validator(step=step)
        self.numtocat_validator(step=step)
        self.catgroups_validator(step=step)
        self.seed_validator(step=step)

    def check_processor(self):
        step = PROCESSOR_STEP

        self.visit_sequence_validator(step=step)
        self.method_validator(step=step)
        self.predictor_matrix_validator(step=step)
        self.smoothing_validator(step=step)

        self.cont_na_validator(step=step)
        self.numtocat_validator(step=step)
        self.catgroups_validator(step=step)

    def check_fit(self):
        step = FIT_STEP

        self.method_validator(step=step)
        self.visit_sequence_validator(step=step)
        self.predictor_matrix_validator(step=step)
        self.smoothing_validator(step=step)

    def check_generate(self):
        step = GENERATE_STEP

        self.k_validator(step=step)

    def check_valid_type(self, attribute_name, return_type=False):
        attribute_type = getattr(self.spop, attribute_name)
        expected_types = self.attributes_types[attribute_name]
        assert isinstance(attribute_type, expected_types)

        if return_type:
            return attribute_type

    def method_validator(self, step=None):
        if step == INIT_STEP:
            # validate method type is allowed
            method_type = self.check_valid_type('method', return_type=True)

            if isinstance(method_type, str):
                # if method type is str
                # validate method is in allowed init methods
                assert self.spop.method in INIT_METHODS

            elif isinstance(method_type, list):
                # if method type is list
                # validate all methods are allowed
                assert all(m in ALL_METHODS for m in self.spop.method)

        if step == PROCESSOR_STEP:
            first_visited_col = self.spop.visit_sequence.index[self.spop.visit_sequence == 0].values[0]

            if self.spop.method is None:
                # if method is not specified
                # for each column set method to default method according to its dtype (method for first visited column is sample_method)
                self.spop.method = [DEFAULT_METHODS_MAP[self.spop.default_method][self.spop.df_dtypes[col]] if col != first_visited_col else SAMPLE_METHOD
                                    for col in self.spop.df_columns]

            elif isinstance(self.spop.method, str):
                # if method type is str
                # for each column set method to the corresponding allowed method according to its dtype (method for first visited column is sample_method)
                self.spop.method = [INIT_METHODS_MAP[self.spop.method][self.spop.df_dtypes[col]] if col != first_visited_col else SAMPLE_METHOD
                                    for col in self.spop.df_columns]

            else:
                # validate method for first visited column with non empty method is sample_method
                for col, visit_order in self.spop.visit_sequence.sort_values().items():
                    col_method = self.spop.method[self.spop.df_columns.index(col)]
                    if col_method != EMPTY_METHOD:
                        assert col_method == SAMPLE_METHOD
                        break
                # assert all(self.spop.method[i] == SAMPLE_METHOD for i, col in enumerate(self.spop.df_columns) if col == first_visited_col)

            # validate all columns have specified methods
            assert len(self.spop.method) == self.spop.n_df_columns
            self.spop.method = pd.Series(self.spop.method, index=self.spop.df_columns)

        if step == FIT_STEP:
            for col in self.spop.method.index:
                if col in self.spop.numtocat:
                    self.spop.method[col] = CONT_TO_CAT_METHODS_MAP[self.spop.method[col]]

                elif col in self.spop.processor.processing_dict[NAN_KEY] and self.spop.df_dtypes[col] in NUM_COLS_DTYPES and self.spop.method[col] in NA_METHODS:
                    # TODO put in a function
                    nan_col_index = self.spop.method.index.get_loc(col)
                    index_list = self.spop.method.index.tolist()
                    index_list.insert(nan_col_index, self.spop.processed_df_columns[nan_col_index])
                    self.spop.method = self.spop.method.reindex(index_list, fill_value=CONT_TO_CAT_METHODS_MAP[self.spop.method[col]])

    def visit_sequence_validator(self, step=None):
        if step == INIT_STEP:
            # validate visit_sequence type is allowed
            visit_sequence_type = self.check_valid_type('visit_sequence', return_type=True)

            if isinstance(visit_sequence_type, np.ndarray):
                # if visit_sequence type is numpy array
                # transform visit_sequence into a list
                self.spop.visit_sequence = [col.item() for col in self.spop.visit_sequence]
                visit_sequence_type = list

            if isinstance(visit_sequence_type, list):
                # if visit_sequence type is list
                # validate all visits are unique
                assert len(set(self.spop.visit_sequence)) == len(self.spop.visit_sequence)
                # validate all visits are either type int or type str
                assert all(isinstance(col, int) for col in self.spop.visit_sequence) or all(isinstance(col, str) for col in self.spop.visit_sequence)

        if step == PROCESSOR_STEP:
            if self.spop.visit_sequence is None:
                # if visit_sequence is not specified
                # visit all columns in a row
                self.spop.visit_sequence = [col.item() for col in np.arange(self.spop.n_df_columns)]

            if isinstance(self.spop.visit_sequence[0], int):
                # if visit_sequence is list of column indices
                # validate every index in visit_sequence is a valid column index
                assert set(self.spop.visit_sequence).issubset(set(np.arange(self.spop.n_df_columns)))
                # transform visit_sequence into a list of column names
                self.spop.visit_sequence = [self.spop.df_columns[i] for i in self.spop.visit_sequence]
            else:
                # validate every column name in visit_sequence is a valid column name
                assert set(self.spop.visit_sequence).issubset(set(self.spop.df_columns))

            self.spop.visited_columns = [col for col in self.spop.df_columns if col in self.spop.visit_sequence]
            self.spop.visit_sequence = pd.Series([self.spop.visit_sequence.index(col) for col in self.spop.visited_columns], index=self.spop.visited_columns)

        if step == FIT_STEP:
            for col in self.spop.visit_sequence.index:
                if col in self.spop.processor.processing_dict[NAN_KEY] and self.spop.df_dtypes[col] in NUM_COLS_DTYPES and self.spop.method[col] in NA_METHODS:
                    visit_step = self.spop.visit_sequence[col]
                    self.spop.visit_sequence.loc[self.spop.visit_sequence >= visit_step] += 1

                    nan_col_index = self.spop.visit_sequence.index.get_loc(col)
                    index_list = self.spop.visit_sequence.index.tolist()
                    index_list.insert(nan_col_index, self.spop.processed_df_columns[nan_col_index])
                    self.spop.visit_sequence = self.spop.visit_sequence.reindex(index_list, fill_value=visit_step)

    def predictor_matrix_validator(self, step=None):
        # if step == INIT_STEP:
        #     # validate predictor_matrix type is allowed
        #     self.check_valid_type('predictor_matrix')

        if step == PROCESSOR_STEP:
            # build predictor_matrix so all previously visited columns are used for the prediction of the currently visited
            self.spop.predictor_matrix = np.zeros([len(self.spop.visit_sequence), len(self.spop.visit_sequence)], dtype=int)
            self.spop.predictor_matrix = pd.DataFrame(self.spop.predictor_matrix, index=self.spop.visit_sequence.index, columns=self.spop.visit_sequence.index)
            visited_columns = []
            for col, _ in self.spop.visit_sequence.sort_values().items():
                self.spop.predictor_matrix.loc[col, visited_columns] = 1
                visited_columns.append(col)

        if step == FIT_STEP:
            for col in self.spop.predictor_matrix:
                if col in self.spop.processor.processing_dict[NAN_KEY] and self.spop.df_dtypes[col] in NUM_COLS_DTYPES and self.spop.method[col] in NA_METHODS:
                    nan_col_index = self.spop.predictor_matrix.columns.get_loc(col)
                    self.spop.predictor_matrix.insert(nan_col_index, self.spop.processed_df_columns[nan_col_index], self.spop.predictor_matrix[col])

                    index_list = self.spop.predictor_matrix.index.tolist()
                    index_list.insert(nan_col_index, self.spop.processed_df_columns[nan_col_index])
                    self.spop.predictor_matrix = self.spop.predictor_matrix.reindex(index_list, fill_value=0)
                    self.spop.predictor_matrix.loc[self.spop.processed_df_columns[nan_col_index]] = self.spop.predictor_matrix.loc[col]

                    self.spop.predictor_matrix.loc[col, self.spop.processed_df_columns[nan_col_index]] = 1

    def proper_validator(self, step=None):
        if step == INIT_STEP:
            # validate proper type is allowed
            self.check_valid_type('proper')

    def cont_na_validator(self, step=None):
        if step == INIT_STEP:
            # validate cont_na type is allowed
            self.check_valid_type('cont_na')

        if step == PROCESSOR_STEP:
            if self.spop.cont_na is None:
                self.spop.cont_na = {}
            else:
                # validate columns in cont_na are valid columns
                assert all(col in self.spop.df_columns for col in self.spop.cont_na)
                # assert all(col in self.spop.visited_columns for col in self.spop.cont_na)
                # validate the type of columns in cont_na are valid types
                assert all(self.spop.df_dtypes[col] in NUM_COLS_DTYPES for col in self.spop.cont_na)
                self.spop.cont_na = {col: col_cont_na for col, col_cont_na in self.spop.cont_na.items() if self.spop.method[col] in NA_METHODS}

    def smoothing_validator(self, step=None):
        if step == INIT_STEP:
            # validate smoothing type is allowed
            self.check_valid_type('smoothing')

        if step == PROCESSOR_STEP:
            if self.spop.smoothing is False:
                self.spop.smoothing = {col: False for col in self.spop.df_columns}
            elif isinstance(self.spop.smoothing, str):
                # if smoothing type is str
                # validate smoothing is 'density'
                assert self.spop.smoothing == DENSITY
                self.spop.smoothing = {col: self.spop.df_dtypes[col] in NUM_COLS_DTYPES for col in self.spop.df_columns}
            else:
                # validate smoothing is 'denisty' for some/all numerical columns and False for all other columns
                assert all((smoothing_method == DENSITY and self.spop.df_dtypes[col] in NUM_COLS_DTYPES) or smoothing_method is False
                           for col, smoothing_method in self.spop.smoothing.items())
                self.spop.smoothing = {col: (self. spop.smoothing.get(col, False) == DENSITY and self.spop.df_dtypes[col] in NUM_COLS_DTYPES) for col in self.spop.df_columns}

        if step == FIT_STEP:
            for col in self.spop.processed_df_columns:
                if col in self.spop.numtocat:
                    self.spop.smoothing[col] = False
                elif col in self.spop.processor.processing_dict[NAN_KEY] and self.spop.df_dtypes[col] in NUM_COLS_DTYPES:
                    self.spop.smoothing[self.spop.processor.processing_dict[NAN_KEY][col]['col_nan_name']] = False

    def default_method_validator(self, step=None):
        if step == INIT_STEP:
            # validate default_method type is allowed
            self.check_valid_type('default_method')

            # validate default_method is in allowed default methods
            assert self.spop.default_method in DEFAULT_METHODS

    def numtocat_validator(self, step=None):
        if step == INIT_STEP:
            # validate numtocat type is allowed
            self.check_valid_type('numtocat')

        if step == PROCESSOR_STEP:
            if self.spop.numtocat is None:
                self.spop.numtocat = []
            else:
                # validate all columns in numtocat are valid columns
                assert all(col in self.spop.df_columns for col in self.spop.numtocat)
                # assert all(col in self.spop.visited_columns for col in self.spop.numtocat)
                # validate all columns in numtocat are numerical columns
                assert all(self.spop.df_dtypes[col] in NUM_COLS_DTYPES for col in self.spop.numtocat)

    def catgroups_validator(self, step=None):
        if step == INIT_STEP:
            # validate catgroups type is allowed
            catgroups_type = self.check_valid_type('catgroups', return_type=True)

            if isinstance(catgroups_type, int):
                # if catgroups type is int
                # validate catgroups is more than 1
                assert self.spop.catgroups > 1

            elif isinstance(catgroups_type, dict):
                # if catgroups type is dict
                # validate the keys in catgroups are the same as the columns in numtocat
                assert set(self.spop.catgroups.keys()) == set(self.spop.numtocat)
                # validate all values in catgroups are type int and more than 1
                assert all((isinstance(col_groups, int) and col_groups > 1) for col_groups in self.spop.catgroups.values())

        if step == PROCESSOR_STEP:
            if self.spop.catgroups is None:
                self.spop.catgroups = {col: 5 for col in self.spop.numtocat}
            elif isinstance(self.spop.catgroups, int):
                self.spop.catgroups = {col: self.spop.catgroups for col in self.spop.numtocat}

    def seed_validator(self, step=None):
        if step == INIT_STEP:
            # validate seed type is allowed
            self.check_valid_type('seed')

    def k_validator(self, step=None):
        if step == GENERATE_STEP:
            # validate k type is allowed
            self.check_valid_type('k')

            if self.spop.k is None:
                self.spop.k = self.spop.n_df_rows


In [14]:
# Ensure the data types are correctly set
my_data_frame = real_data.copy()
my_data_frame['sex'] = my_data_frame['sex'].astype('category')
my_data_frame['race'] = my_data_frame['race'].astype('category')
my_data_frame['gpa'] = my_data_frame['gpa'].astype('float')

# Define the data types for each column
dtypes = {
	'sex': 'category',
	'race': 'category',
	'gpa': 'float'
}

# Initialize Synthpop object with method 'cart'
spop = Synthpop(method='cart')

# Fit the Synthpop model
# spop.fit(my_data_frame, dtypes=dtypes)

In [None]:
spop.fit(my_data_frame, dtypes=dtypes)

train_sex
train_race
train_gpa


In [16]:
spop.generate(k=100)

generate_sex
generate_race
generate_gpa


Unnamed: 0,sex,race,gpa
0,1.0,7.0,3.6
1,2.0,7.0,3.7
2,2.0,7.0,3.5
3,1.0,7.0,3.0
4,2.0,7.0,3.1
...,...,...,...
95,2.0,7.0,3.4
96,1.0,3.0,3.3
97,1.0,7.0,3.3
98,1.0,7.0,3.4
