##load library and def

In [1]:
#Load library
import os

import pickle

import warnings

from joblib import Parallel, delayed
from typing import Union, List, Optional,Tuple, Dict, Callable,Any

import pandas as pd
pd.options.display.float_format = "{:,.2f}".format

import math
from scipy import stats
from scipy.special import kl_div
from scipy.spatial import distance

import numpy as np

import copy

import time

from warnings import filterwarnings
filterwarnings('ignore') ##filter out LR warning

import itertools
from collections import Counter

from sklearn.datasets import make_classification
from sklearn.metrics import roc_auc_score,average_precision_score,accuracy_score,precision_score,pairwise_distances,recall_score,silhouette_score,f1_score,jaccard_score,matthews_corrcoef,pairwise

from sklearn.model_selection import KFold,StratifiedKFold
from sklearn.preprocessing import StandardScaler,LabelEncoder,OneHotEncoder,MinMaxScaler

from sklearn.neighbors import NearestNeighbors,KNeighborsClassifier,KDTree,DistanceMetric,BallTree,KNeighborsRegressor
from sklearn.ensemble import RandomForestRegressor, RandomForestClassifier,IsolationForest,ExtraTreesClassifier,ExtraTreesRegressor
from sklearn.linear_model import Lasso, Ridge, ElasticNet, LogisticRegression
from sklearn.tree import DecisionTreeClassifier
from sklearn.neural_network import MLPClassifier
from sklearn.decomposition import PCA,KernelPCA
from sklearn.manifold import TSNE

from sklearn.metrics import pairwise_distances

from sklearn.mixture import GaussianMixture
from sklearn.inspection import permutation_importance
from sklearn.svm import SVC,SVR

from sklearn.cluster import KMeans

import lightgbm as lgb

from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


##Utils


In [2]:
def adjusted_classes(y_scores, t):
    """
    This function adjusts class predictions based on the prediction threshold (t).
    Will only work for binary classification problems.
    """
    return [1 if y >= t else 0 for y in y_scores]

def data_preprocess(reald,faked,cat_cols,onehot=False):

  real=reald.copy().reset_index(drop=True)
  fake=faked.copy().reset_index(drop=True)

  ss=MinMaxScaler()
  ohe = OneHotEncoder(sparse=False,handle_unknown="ignore")

  col_names=real.columns.to_list()
  n_col=[s for s in col_names if s not in cat_cols]

  real_scaled=pd.DataFrame(ss.fit_transform(real[n_col]), index=None,columns=n_col)
  fake_scaled=pd.DataFrame(ss.transform(fake[n_col]), index=None,columns=n_col)

  if onehot:

    b_cols=[]
    for i in real.columns:
      if real[i].nunique()==2:
        b_cols.append(i)

    cat_cols=[i for i in cat_cols if i not in b_cols]

    #One-hot-encode the categorical columns.
    #Unfortunately outputs an array instead of dataframe.
    ohe_real = ohe.fit_transform(real[cat_cols])
    ohe_fake = ohe.transform(fake[cat_cols])
    #Convert it to df
    real_encoded = pd.DataFrame(ohe_real, index=None,columns=ohe.get_feature_names_out())
    fake_encoded = pd.DataFrame(ohe_fake, index=None,columns=ohe.get_feature_names_out())

    real_processed=pd.concat([real_scaled, real_encoded,real[b_cols]], axis=1)
    fake_processed=pd.concat([fake_scaled, fake_encoded,fake[b_cols]], axis=1)
  else:
    real_processed=pd.concat([real_scaled, real[cat_cols]], axis=1)[col_names]
    fake_processed=pd.concat([fake_scaled, fake[cat_cols]], axis=1)[col_names]

  return real_processed,fake_processed

  # reals,fakes=data_preprocess(real,fake,c_col)
# reale,fakee=data_preprocess(real,fake,c_col,onehot=True)

##Synthetic Data Evaluation - Utility

###Univariate

####Descriptive Stats

In [3]:
def summary_stats(real,fake,c_col,n_col):
  n_stats=real[n_col].describe().T-fake[n_col].describe().T
  c_stats=real[c_col].astype('object').describe().T-fake[c_col].astype('object').describe().T
  return n_stats,c_stats

####Univariate test statistics

In [4]:
##compare continous columns between real and fake
def kolmogorov_smirnov_test(col_name, real_col, fake_col):
    statistic, p_value = stats.ks_2samp(real_col, fake_col)
    equality = 'identical' if p_value > 0.01 else 'different'
    return {'col_name': col_name, 'ks_statistic': statistic, 'ks_p-value': p_value, 'equal distribution': equality}

def mood_test(col_name, real_col, fake_col):
    statistic, p_value,m,t = stats.median_test(real_col, fake_col,ties="below",nan_policy="omit")
    equality = 'identical' if p_value > 0.01 else 'different'
    return {'col_name': col_name, 'md_statistic': statistic, 'md_p-value': p_value, 'equal median': equality}

def mannwhitneyu(col_name, real_col, fake_col):
    statistic, p_value = stats.mannwhitneyu(real_col, fake_col)
    equality = 'identical' if p_value > 0.01 else 'different'
    return {'col_name': col_name, 'mw_statistic': statistic, 'mw_p-value': p_value, 'equal mean': equality}

def levene_test(col_name, real_col, fake_col):
    statistic, p_value = stats.levene(real_col, fake_col)
    equality = 'identical' if p_value > 0.01 else 'different'
    return {'col_name': col_name, 'le_statistic': statistic, 'le_p-value': p_value, 'equal variance': equality}


def num_statistics_df(real: pd.DataFrame, fake: pd.DataFrame, stats_func: Callable, numerical_columns=None) -> List[Dict[str, Any]]:
  assert real.columns.tolist() == fake.columns.tolist(), f'Colums are not identical between `real` and `fake`. '

  real_iter = real[numerical_columns].items()
  fake_iter = fake[numerical_columns].items()
  distances = Parallel(n_jobs=-1)(delayed(stats_func) (colname, real_col, fake_col) for (colname, real_col), (_, fake_col) in zip(real_iter, fake_iter))
  distances_df = pd.DataFrame(distances).set_index('col_name')
  distances_df.loc['mean']  = distances_df.mean(numeric_only=True)
  return distances_df

def get_frequencies(real, synthetic,percent=True):
    """Get percentual frequencies for each possible real categorical value.
    Given two iterators containing categorical data, this transforms it into
    observed/expected frequencies which can be used for statistical tests. It
    adds a regularization term to handle cases where the synthetic data contains
    values that don't exist in the real data.
    Args:
        real (list):
            A list of hashable objects.
        synthetic (list):
            A list of hashable objects.
    Yields:
        tuble[list, list]:
            The observed and expected frequencies (as a percent).
    """
    f_obs, f_exp = [], []
    real, synthetic = Counter(real), Counter(synthetic)
    for value in synthetic:
        if value not in real:
            warnings.warn(f'Unexpected value {value} in synthetic data.')
            real[value] += 1e-6  # Regularization to prevent NaN.

    if percent:
      for value in real:
          f_obs.append(synthetic[value] / sum(synthetic.values()))  # noqa: PD011
          f_exp.append(real[value] / sum(real.values()))  # noqa: PD011
    else:
      for value in real:
          f_obs.append(synthetic[value])  # noqa: PD011
          f_exp.append(real[value])  # noqa: PD011

    return f_obs, f_exp

##for categorical variables
def chisquare_test(col_name, real_col, fake_col):
  f_obs, f_exp = get_frequencies(real_col, fake_col)
  statistic, p_value = stats.chisquare(f_obs, f_exp)
  equality = 'identical' if p_value > 0.05 else 'different'
  return {'col_name': col_name, 'ct_statistic': statistic, 'ct_p-value': p_value, 'equal frequency': equality}

def cat_statistics_df(real: pd.DataFrame, fake: pd.DataFrame, stats_func: Callable, categorical_columns: List) -> pd.DataFrame:
    assert real.columns.tolist() == fake.columns.tolist(), f'Colums are not identical between `real` and `fake`. '
    real_iter = real[categorical_columns].items()
    fake_iter = fake[categorical_columns].items()
    distances = Parallel(n_jobs=-1)(delayed(stats_func) (colname, real_col, fake_col) for (colname, real_col), (_, fake_col) in zip(real_iter, fake_iter))
    distances_df = pd.DataFrame(distances).set_index('col_name')
    distances_df.loc['mean']  = distances_df.mean(numeric_only=True)
    return distances_df

####Univariate distance

In [5]:
def get_frequency(X_gt: pd.DataFrame, X_synth: pd.DataFrame, n_histogram_bins: int = 10) -> dict:
    """Get percentual frequencies for each possible real categorical value.

    Returns:
        The observed and expected frequencies (as a percent).
    """
    res = {}
    for col in X_gt.columns:
        local_bins = min(n_histogram_bins, len(X_gt[col].unique()))

        if len(X_gt[col].unique()) < n_histogram_bins:  # categorical
            gt = (X_gt[col].value_counts() / len(X_gt)).to_dict()
            synth = (X_synth[col].value_counts() / len(X_synth)).to_dict()
        else:
            gt_vals, bins = np.histogram(X_gt[col], bins=local_bins)
            synth_vals, _ = np.histogram(X_synth[col], bins=bins)
            gt = {k: v / (sum(gt_vals) + 1e-8) for k, v in zip(bins, gt_vals)}
            synth = {k: v / (sum(synth_vals) + 1e-8) for k, v in zip(bins, synth_vals)}

        for val in gt:
            if val not in synth or synth[val] == 0:
                synth[val] = 1e-11
        for val in synth:
            if val not in gt or gt[val] == 0:
                gt[val] = 1e-11

        if gt.keys() != synth.keys():
            raise ValueError(f"Invalid features. {gt.keys()}. syn = {synth.keys()}")
        res[col] = (list(gt.values()), list(synth.values()))

    return res


def kl_divergence(colname,x,y):
    relative_entropy=np.sum(kl_div(x,y))
    return {'col_name': colname, 'kl_divergence':relative_entropy}

def js_divergence(colname,x,y):
    js=distance.jensenshannon(x,y)
    return {'col_name': colname, 'js_distance': js}

def num_divergence_df(real: pd.DataFrame, fake: pd.DataFrame,stats_func: Callable, numerical_columns=None):

    freqs=get_frequency(real,fake)
    res = {}
    for col in real.columns:
        real_freq, fake_freq = freqs[col]
        res[col]=stats_func(col,real_freq,fake_freq)
    distances_df=pd.DataFrame(res).T.set_index('col_name')
    distances_df.loc['mean']=distances_df.mean()
    return distances_df

def em_distance(colname,x,y):
    em=stats.wasserstein_distance(x,y)
    return {'col_name': colname, 'em_distance': em}

def num_distance_df(real: pd.DataFrame, fake: pd.DataFrame, stats_func: Callable, numerical_columns=None) -> List[Dict[str, Any]]:
    assert real.columns.tolist() == fake.columns.tolist(), f'Colums are not identical between `real` and `fake`. '
    if numerical_columns is None:
      numerical_columns=real.columns.tolist()
    real_iter = real[numerical_columns].items()
    fake_iter = fake[numerical_columns].items()
    distances = Parallel(n_jobs=-1)(delayed(stats_func) (colname, real_col.values, fake_col.values) for (colname, real_col), (_, fake_col) in zip(real_iter, fake_iter))
    distances_df = pd.DataFrame(distances).set_index('col_name')
    distances_df.loc['mean']=distances_df.mean()
    return distances_df

###Bivariate

In [6]:
#####Nom to Nom (Cat to Cat) stats############
def conditional_entropy(x,y):
    # entropy of x given y
    y_counter = Counter(y)
    xy_counter = Counter(list(zip(x,y)))
    total_occurrences = sum(y_counter.values())
    entropy = 0
    for xy in xy_counter.keys():
        p_xy = xy_counter[xy] / total_occurrences
        p_y = y_counter[xy[1]] / total_occurrences
        entropy += p_xy * math.log(p_y/p_xy)
    return entropy

def theils_u(x,y):
    """
    also referred to as the Uncertainty Coefficient, is based on the conditional entropy between x and y —
    given the value of x, how many possible states does y have, and how often do they occur. Just like Cramer’s V, the output value is on the range of [0,1]
    asymmetric, meaning U(x,y)≠U(y,x) (while V(x,y)=V(y,x), where V is Cramer’s V)
    """
    s_xy = conditional_entropy(x,y)
    x_counter = Counter(x)
    total_occurrences = sum(x_counter.values())
    p_x = list(map(lambda n: n/total_occurrences, x_counter.values()))
    s_x = stats.entropy(p_x)
    if s_x == 0:
        return 1
    else:
        return (s_x - s_xy) / s_x

def cramers_v(x, y):
    """
    Calculates Cramer's V statistic for categorical-categorical association.
    This is a symmetric coefficient: V(x,y) = V(y,x)
    Parameters:
    -----------
    x : list / NumPy ndarray / Pandas Series  A sequence of categorical measurements
    y : list / NumPy ndarray / Pandas Series  A sequence of categorical measurements
    bias_correction : Boolean, default = True   Use bias correction from Bergsma and Wicher,Journal of the Korean Statistical Society 42 (2013): 323-328.
    Returns:
    --------
    float in the range of [0,1]
    """
    confusion_matrix = pd.crosstab(x, y)
    chi2 = stats.chi2_contingency(confusion_matrix)[0]
    n = confusion_matrix.sum().sum()
    phi2 = chi2 / n
    r, k = confusion_matrix.shape
    v = np.sqrt(phi2 / min(k - 1, r - 1))

    if -0.0001 <= v < 0.0001 or  1. - 0.0001 < v <= 1. + 0.0001:
        rounded_v = 0. if v < 0 else 1.
        # warnings.warn(f'Rounded V = {v} to {rounded_v}. This is probably due to floating point precision issues.',   RuntimeWarning)
        return rounded_v
    else:
        return v


#####Nom to Num stats############
def correlation_ratio(categories,measurements):
    """
    Calculates the Correlation Ratio (sometimes marked by the greek letter Eta)
    for categorical-continuous association. Answers the question - given a continuous value of a measurement, is it
    possible to know which category is it associated with?Value is in the range [0,1], where 0 means a category cannot be determined
    by a continuous measurement, and 1 means a category can be determined with absolute certainty.
    Wikipedia: https://en.wikipedia.org/wiki/Correlation_ratio
    Parameters:
    -----------
    categories : list / NumPy ndarray / Pandas Series
        A sequence of categorical measurements
    measurements : list / NumPy ndarray / Pandas Series
        A sequence of continuous measurements
    Returns:
    --------
    float in the range of [0,1]
    """
    fcat, _ = pd.factorize(categories)
    cat_num = np.max(fcat) + 1
    y_avg_array = np.zeros(cat_num)
    n_array = np.zeros(cat_num)
    for i in range(0, cat_num):
        cat_measures = measurements[np.argwhere(fcat == i).flatten()]
        n_array[i] = len(cat_measures)
        y_avg_array[i] = np.average(cat_measures)
    y_total_avg = np.sum(np.multiply(y_avg_array, n_array)) / np.sum(n_array)
    numerator = np.sum(
        np.multiply(n_array, np.power(np.subtract(y_avg_array, y_total_avg),
                                      2)))
    denominator = np.sum(np.power(np.subtract(measurements, y_total_avg), 2))
    if numerator == 0:
        return 0.
    else:
        eta = np.sqrt(numerator / denominator)
        if 1. < eta <= 1.+0.0000:
            warnings.warn(f'Rounded eta = {eta} to 1. This is probably due to floating point precision issues.',
                          RuntimeWarning)
            return 1.
        else:
            return eta

def column_associations(real: pd.DataFrame, c_col : List, theil_u=False) -> pd.DataFrame:

  corr=pd.DataFrame(index=real.columns,columns=real.columns)

  b_col=[]
  m_col=[]
  for i in c_col:
    unique_values = pd.unique(real[i])
    if len(unique_values) == 2:
      b_col.append(i)
    else:
      m_col.append(i)

  for i,ac in enumerate(corr):
    for j, bc in enumerate(corr):
      if i > j:
        continue

      if ac in c_col and bc in c_col:
        if ac in b_col and bc in b_col:
          c=matthews_corrcoef(real[ac].values,real[bc].values)
        else:
          if theil_u:
            c= theils_u(real[ac].values,real[bc].values)
          else:
            c=cramers_v(real[ac].values,real[bc].values)
      else:
        if ac in b_col or bc in b_col:
          c,_=stats.pointbiserialr(real[ac].values,real[bc].values)
        else:
          if ac in c_col or bc in c_col:
            c=correlation_ratio(real[ac].values,real[bc].values)
          else:
            c, _ = stats.pearsonr(real[ac].sort_values(), real[bc].sort_values())
      corr.loc[ac,bc]=corr.loc[bc,ac]=c
  return corr

def bivariate_test(real,fake,c_col):
  real_corr=column_associations(real,c_col,True)
  fake_corr=column_associations(fake,c_col,True)
  statistics,p=stats.ks_2samp(real_corr.to_numpy().flatten(),fake_corr.to_numpy().flatten())

  return round(p,4),real_corr,fake_corr

###Multivariate

####PCA correlation

In [7]:
def pca_correlation(real, fake):
    """
    Calculate the relation between PCA explained variance values. Due to some very large numbers, in recent implementation the MAPE(log) is used instead of
    regressions like Pearson's r.

    """
    real=real.copy()
    fake=fake.copy()
    n_components=real.shape[1]#//4
    pca_r = PCA(n_components=n_components)
    pca_f = PCA(n_components=n_components)

    pca_r.fit(real)
    pca_f.fit(fake)

    results = pd.DataFrame({'real': pca_r.explained_variance_, 'fake': pca_f.explained_variance_})
    print(f'\n######################################PCA Correlation####################################')
    print(f'\nTop {n_components} PCA components:')
    print(results.to_string())

    statistics,p=stats.ks_2samp(pca_r.explained_variance_, pca_f.explained_variance_)

    return p

####Maximum Mean Discrepancy (MMD)

In [8]:
def mmd_kernel(X,Y,kernel='rbf',gamma=1,degree=2,coef0=0):
  """MMD using linear/rbf/polynomial kernel (i.e., k(x,y) = (gamma <X, Y> + coef0)^degree)
  Arguments:
      X {[n_sample1, dim]} -- [X matrix]
      Y {[n_sample2, dim]} -- [Y matrix]
  Keyword Arguments:
      gamma {int} -- [gamma] (default: {1})
      degree {int} -- [degree] (default: {2})
      coef0 {int} -- [constant item] (default: {0})
  Returns:
      [scalar] -- [MMD value]
    """
  if kernel == 'linear':
    delta = X.mean(0) - Y.mean(0)
    score = delta.dot(delta.T)

  elif kernel == 'rbf':
    XX = pairwise.rbf_kernel(X, X, gamma)
    YY = pairwise.rbf_kernel(Y, Y, gamma)
    XY = pairwise.rbf_kernel(X, Y, gamma);
    score=XX.mean() + YY.mean() - 2 * XY.mean()


  elif kernel == 'polynomial':
    XX = pairwise.polynomial_kernel(X, X, degree, gamma, coef0)
    YY = pairwise.polynomial_kernel(Y, Y, degree, gamma, coef0)
    XY = pairwise.polynomial_kernel(X, Y, degree, gamma, coef0)
    score = XX.mean() + YY.mean() - 2 * XY.mean()
  else:
    raise ValueError(f"Unsupported kernel {kernel}")

  return score

####ML Utility

In [None]:
def ml_evaluation(real, fake, df_test, target_col: str, target_type: str = 'class' ,data_strategy_flag=False, verbose: bool =True):

    df_test=df_test.copy()

    real=real.copy()
    fake=fake.copy()

    real_x,real_y = real.drop([target_col], axis=1),real[target_col]
    fake_x,fake_y = fake.drop([target_col], axis=1),fake[target_col]
    X_test,y_test = df_test.drop([target_col], axis=1),df_test[target_col]

    if target_type == 'regr':

      estimators = [
          KNeighborsRegressor(),
          SVR(kernel = 'rbf'),
          lgb.LGBMRegressor(n_estimators=100,random_state=1),
          RandomForestRegressor(n_estimators=100, random_state=1),
          Lasso(random_state=1),
          Ridge(alpha=1.0, random_state=1),
          ElasticNet(random_state=1)
      ]

      estimators_names = ['KNN','SVC','LGBM','RF','LS','RD','EN']

    elif target_type == 'class':
      estimators = [
          KNeighborsClassifier(),
          lgb.LGBMClassifier(n_estimators=100,random_state=1),
          RandomForestClassifier(n_estimators=100, random_state=1),
          LogisticRegression(multi_class='auto', solver='lbfgs', max_iter=500, random_state=1),
          DecisionTreeClassifier(random_state=1),
          MLPClassifier([50, 50], solver='adam', activation='relu', learning_rate='adaptive', random_state=1)
      ]

      estimators_names = ['KNN','LGBM','RF','LR','DT','MLP']
    else:
        raise ValueError(f'target_type must be \'regr\' or \'class\'')

    zipped_estimators= zip(estimators_names,estimators)

    R=[]

    if target_type == 'class':
        for est_name,est in zipped_estimators:
          start = time.time()
          print(est_name)

          if(len(np.unique(fake_y)))==1:
            R.append(['data synthesis',est_name,0,0,0,0,0,0])
          else:
            model= est
            model.fit(fake_x,fake_y)
            y_test_probas = model.predict_proba(X_test)[:,1]
            y_test_predicted = model.predict(X_test)
            acc = accuracy_score(y_test, y_test_predicted)
            pre = precision_score(y_test, y_test_predicted)
            rec = recall_score(y_test, y_test_predicted)
            f1=f1_score(y_test, y_test_predicted)
            aucpr = average_precision_score(y_test, y_test_probas)
            aucroc = roc_auc_score(y_test, y_test_probas)

            R.append(['data synthesis',est_name,acc,pre,rec,f1,aucpr,aucroc])


          if data_strategy_flag:
            real_fake=pd.concat([real,fake])
            real_fake_pos=pd.concat([real,fake[fake[target_col]==1]])
            real_fake_x,real_fake_y=real_fake.drop([target_col], axis=1),real_fake[target_col]
            real_fake_pos_x,real_fake_pos_y=real_fake_pos.drop([target_col], axis=1),real_fake_pos[target_col]
            model= est
            model.fit(real_fake_pos_x,real_fake_pos_y)
            y_test_probas = model.predict_proba(X_test)[:,1]
            y_test_predicted = model.predict(X_test)
            acc = accuracy_score(y_test, y_test_predicted)
            pre = precision_score(y_test, y_test_predicted)
            rec = recall_score(y_test, y_test_predicted)
            f1=f1_score(y_test, y_test_predicted)
            aucpr = average_precision_score(y_test, y_test_probas)
            aucroc = roc_auc_score(y_test, y_test_probas)
            t = round(time.time() - start,2)
            R.append(['data balancing',est_name,acc,pre,rec,f1,aucpr,aucroc])

            model= est
            model.fit(real_fake_x,real_fake_y)
            y_test_probas = model.predict_proba(X_test)[:,1]
            y_test_predicted = model.predict(X_test)
            acc = accuracy_score(y_test, y_test_predicted)
            pre = precision_score(y_test, y_test_predicted)
            rec = recall_score(y_test, y_test_predicted)
            f1=f1_score(y_test, y_test_predicted)
            aucpr = average_precision_score(y_test, y_test_probas)
            aucroc = roc_auc_score(y_test, y_test_probas)

            R.append(['data augmentation',est_name,acc,pre,rec,f1,aucpr,aucroc])

          t = round(time.time() - start,2)
          print(f'{est_name} takes {t} seconds to run')
    dfResults = pd.DataFrame(data=R, columns=['data_strategy','clf_name','acc','pre','rec','f1','aucpr','aucroc']).sort_values(['data_strategy'])

    return dfResults

####Feature Importance

In [9]:
def feature_importance_comparison(X, Y, estimator,target_col,method_names=None):
    num_methods = len(X)

    importances = []
    stds = None

    fi_dict={}
    for i in range(num_methods):
        estimator=estimator.fit(X[i], Y[i])
        fi_dict[method_names[i]]=estimator.feature_importances_
        importances.append(estimator.feature_importances_)

        if method_names is not None and i>0:
            print(f'\nCorrelation of importances : {method_names[i]}')
            print(np.corrcoef(importances[0],importances[i])[0,1])

    if method_names is not None:
        bar_comparison(importances,stds, labels=method_names, tick_names = X[0].columns, save_name = 'all_feat_importance',target_col=target_col)

    return importances

def bar_comparison(vectors, std=None, labels=None, tick_names=None, save_name = None, target_col=None,max_length = 5):

    num_bars = len(vectors)
    vector = vectors[0]
    indices = np.argsort(vector)[::-1]
    indices = indices[:max_length]
    fig, ax = plt.subplots(figsize=(12,4))
    tot_bar_width = 0.7
    width = tot_bar_width/num_bars
    x = np.arange(len(indices))

    if tick_names is None:
        tick_names = range(len(vector))

    if labels is None:
        labels = ['Original', 'Synthetic', 'Transfer']

    for i, vec in enumerate(vectors):
        xbar = x - tot_bar_width/2 + (i+1/2)*width
        if std is not None:
            ax.bar(xbar, vec[indices],  yerr=std[i][indices], width=width, label=labels[i])
        else:
            ax.bar(xbar, vec[indices], width=width, label=labels[i])

    ax.set_ylim(bottom=0)
    fig.tight_layout()
    ticks = np.array(tick_names, dtype='object')
    ticks = ticks[indices]
    plt.xticks(x, ticks,fontsize=12)
    plt.legend()
    plt.xlim([-1, len(indices)])
    fig.savefig('feature_imp.pdf')
    plt.show()


def feature_importance(real, fake_dfs, target_col: str, c_col: str,target_type: str = 'class' ,verbose: bool =True) -> float:

    real_copy=real.copy()

    real,_=data_preprocess(real_copy,real_copy,c_col,onehot=True)
    display(real)
    real_x = real.drop([target_col], axis=1)
    real_y = real[target_col]

    X, Y = [real_x], [real_y]

    mothed_names=['Identity']
    for n,fake in fake_dfs.items():
      _,fake=data_preprocess(real_copy,fake,c_col,onehot=True)
      fake_x = fake.drop([target_col], axis=1)
      fake_y = fake[target_col]

      X.append(fake_x)
      Y.append(fake_y)
      mothed_names.append(n)

    if target_type == 'regr':
        estimator=lgb.LGBMRegressor(n_estimators=100,random_state=1, importance_type='gain')
    elif target_type == 'class':
        estimator=lgb.LGBMClassifier(n_estimators=100,random_state=1, importance_type='gain')
    else:
        raise ValueError(f'target_type must be \'regr\' or \'class\'')

    method_names=['Identity', 'Simple simulation', 'Distance-based models', 'Statistical models', 'DGMs']
    feat_imp = feature_importance_comparison(X,Y,estimator=estimator,target_col=target_col,method_names=['Identity', 'Simple simulation', 'Distance-based models', 'Statistical models', 'DGMs'])#mothed_names)

    return feat_imp

###Visual

####Univariate Plot

In [None]:
def univariate_plot(real,fake,n_col=None,filename=''):
    real=real.copy()
    fake=fake.copy()

    if n_col is None:
      n_col=real.columns.tolist()

    real['dataset']='real'
    fake['dataset']='fake'
    df_rf=pd.concat([real,fake]).reset_index(drop=True)

    n_var=len(n_col)

    fig,axs= plt.subplots(nrows=n_var,ncols=2,figsize=(16,5*n_var))

    idx=0
    for i in n_col:
      sns.histplot(x=i,hue='dataset',data=df_rf,alpha=0.5,ax=axs[idx][0])
      ax2 = axs[idx][0].twinx()
      sns.kdeplot(x=i,hue='dataset',data=df_rf,alpha=1,ax=ax2).set(title="Kernel Density Function")
      sns.histplot(x=i,hue='dataset',data=df_rf,alpha=0.5,bins=len(df_rf), stat="density",element="step"
      , fill=False, cumulative=True, common_norm=False,ax=axs[idx][1]).set(title="Cumulative Distribution Function")
      axs[idx][1].set(ylabel=None)
      idx+=1

    if filename!='':
      fig.savefig(filename)

    plt.show()

def univariate_single_plot(real,fake,var_name,figsize=(16,5),filename=''):
    real['dataset']='real'
    fake['dataset']='fake'
    df_rf=pd.concat([real,fake]).reset_index(drop=True)

    _,axes= plt.subplots(1,2,figsize=figsize)
    axes=plt.subplot(1,2,1)
    sns.histplot(x=var_name,hue='dataset',data=df_rf,alpha=0.5)
    axes.twinx()
    sns.kdeplot(x=var_name,hue='dataset',data=df_rf,alpha=1).set(title="Kernel Density Function",ylabel=None)
    axes=plt.subplot(1,2,2)
    sns.histplot(x=var_name,hue='dataset',data=df_rf,alpha=0.5,bins=len(df_rf), stat="density",element="step", fill=False, cumulative=True, common_norm=False).set(title="Cumulative Distribution Function")

    if filename!='':
      fig.savefig(filename)

    return plt

# univariate_single_plot(real,fake,"age",(14,3))
# univariate_plot(real,fake,n_col,filename='adult_univariate.pdf')

####Bivariate plot

In [None]:
def bivariate_plots(rc,fc, figsize= (10,15),filename=''):
  fig, ((ax, ax2)) = plt.subplots(2, 1, figsize=figsize,constrained_layout = True)

  rc_mask = np.tril(np.ones_like(rc, dtype=bool))
  fc_mask = np.triu(np.ones_like(fc, dtype=bool))
  sns.heatmap(rc.fillna(value=np.nan),mask=rc_mask,cmap='Greens', linewidths=0.5, yticklabels=True,ax=ax,annot=True,cbar=False,fmt='.2f',annot_kws={"fontsize":6})
  sns.heatmap(fc.fillna(value=np.nan),mask=fc_mask,cmap='Blues', linewidths=0.5, yticklabels=True,ax=ax,annot=True,cbar=False,fmt='.2f',annot_kws={"fontsize":6})
  # Rotate the tick labels and set their alignment.
  plt.setp(ax.get_xticklabels(), rotation=45,ha="right",rotation_mode="anchor")
  # ax.set_title("Correlations between real (green) Vs synthetic (blue) and its variance", fontsize=12)

  corr_diff=abs(rc-fc)
  sns.heatmap(corr_diff.fillna(value=np.nan),mask=fc_mask,cmap='GnBu', xticklabels=False,vmin=0,vmax=1,linewidths=0.5, ax=ax2,annot=True,fmt='.2f',annot_kws={"fontsize":6})
  # Rotate the tick labels and set their alignment.
  # plt.setp(ax2.get_xticklabels(), rotation=45, ha="right",rotation_mode="anchor")

  # ax2.set_title("Difference of bivariate correlation between real and synthetic", fontsize=12)

  if filename!='':
    fig.savefig(filename)
    # fig.savefig(filename,tight_layout=True)

# def bivariate_plots(rc,fc, figsize= (10,18),filename=''):
#   fig, ((ax, ax2)) = plt.subplots(2, 1, figsize=figsize,constrained_layout = True)

#   rc_mask = np.tril(np.ones_like(rc, dtype=bool))
#   fc_mask = np.triu(np.ones_like(fc, dtype=bool))
#   sns.heatmap(rc.fillna(value=np.nan),mask=rc_mask,cmap='Greens', linewidths=0.5, yticklabels=False,ax=ax,annot=True,cbar=False,fmt='.2f',annot_kws={"fontsize":6})
#   sns.heatmap(fc.fillna(value=np.nan),mask=fc_mask,cmap='Blues', linewidths=0.5, yticklabels=False,ax=ax,annot=True,cbar=False,fmt='.2f',annot_kws={"fontsize":6})
#   # Rotate the tick labels and set their alignment.
#   plt.setp(ax.get_xticklabels(), rotation=45, ha="right",rotation_mode="anchor")
#   ax.set_title("Bivariate correlation - real (green) Vs synthetic (blue)", fontsize=12)

#   corr_diff=abs(rc-fc)
#   sns.heatmap(corr_diff.fillna(value=np.nan),mask=fc_mask,cmap='GnBu', vmin=0,vmax=1,linewidths=0.5, ax=ax2,annot=True,fmt='.2f',annot_kws={"fontsize":6})
#   # Rotate the tick labels and set their alignment.
#   plt.setp(ax2.get_xticklabels(), rotation=45, ha="right",rotation_mode="anchor")
#   ax2.set_title("Difference of bivariate correlation between real and synthetic", fontsize=12)

#   if filename!='':
#     fig.savefig(filename)
#     # fig.savefig(filename,tight_layout=True)

In [None]:
def bivariate_plot(rc,fc, figsize= (14,10),cbar_flag=True,filename=''):
  fig = plt.subplots(figsize=figsize)

  rc_mask = np.tril(np.ones_like(rc, dtype=bool))
  fc_mask = np.triu(np.ones_like(fc, dtype=bool))

  corr_diff=round(abs(rc-fc),2)
  hp=sns.heatmap(corr_diff.fillna(value=np.nan),mask=fc_mask,cmap='GnBu', vmin=0,vmax=1,linewidths=0.5,annot=True,fmt='.2f',annot_kws={"fontsize":6},cbar=cbar_flag)

  # hp.set(title="Difference of bivariate correlations between real and synthetic")
  # # Rotate the tick labels and set their alignment.
  plt.setp(hp.get_xticklabels(), rotation=45, ha="right",rotation_mode="anchor")

  if filename!='':
    fig.savefig(filename)
    # fig.savefig(filename,tight_layout=True)

# p,rc,fc=bivariate_test(reals,fakes,c_col)
# bivariate_plots(rc,fc,filename='adult_smote.pdf')

####Multi-Variate plot

In [10]:
##2 D plot with dimensionality reduction

def table_plot(reals,fakes,dimentionality_reduction='PCA',filename=''):


  if dimentionality_reduction=='PCA':
    model=KernelPCA(n_components=2,kernel="rbf")#, whiten=True)
  elif dimentionality_reduction=='TSNE': ##takes too long for big dataset
    model=TSNE(n_components = 2, perplexity=10,random_state = 1)

  realt=model.fit_transform(reals)
  faket=model.fit_transform(fakes)

  #fit the model to our data and extract the results
  #create a dataframe from the dataset
  real_pca = pd.DataFrame(data = realt ,columns = ["Component 1","Component 2"])
  fake_pca = pd.DataFrame(data = faket ,columns = ["Component 1","Component 2"])

  real_pca['dataset']='real'
  fake_pca['dataset']='fake'

  #plot the resulting data from two dimensions
  g = sns.jointplot(data = pd.concat([real_pca,fake_pca]),
                  x = "Component 1",
                  y = "Component 2",
                    palette=["#2171B5","#6BAED6"],
                    # kind='kde',
                    # fill=True,
                    joint_kws={'alpha': 0.8},
                    hue="dataset")
  g.fig.set_size_inches((5, 5))
  if filename!='':
    g.savefig(filename+'.pdf')

In [11]:
def calc_k_l(real):

  k_values = [999]
  l_values = [999]
  for n_clusters in np.arange(5,50,10):
      if len(real) / n_clusters < 10:
          continue
      model = KMeans(n_clusters=n_clusters, init="k-means++", random_state=0).fit(real)
      counts: dict = Counter(model.labels_)
      k_values.append(np.min(list(counts.values())))

      clusters = model.predict(real)
      clusters_df = pd.Series(clusters, index=real.index)
      for cluster in range(n_clusters):
          partition = real[clusters_df == cluster]
          uniq_values = partition.drop_duplicates()
          l_values.append(len(uniq_values))


  return [int(np.min(k_values)),int(np.min(l_values))]

In [None]:
def small_counts_ratio(fake,c_col):
  fake=fake.copy()
  fake['counts'] = fake.groupby(c_col).cumcount() + 1
  return sum(fake['counts']<6)/len(fake)

In [12]:
def get_copies(real,fake,return_len: bool = True) -> Union[pd.DataFrame, int]:
        """
        Check whether any real values occur in the fake data.
        :param return_len: whether to return the length of the copied rows or not.
        :return: Dataframe containing the duplicates if return_len=False, else integer indicating the number of copied rows.
        """
        real_hashes = real.apply(lambda x: hash(tuple(x)), axis=1)
        fake_hashes = fake.apply(lambda x: hash(tuple(x)), axis=1)

        dup_idxs = fake_hashes.isin(real_hashes.values)
        dup_idxs = dup_idxs[dup_idxs == True].sort_index().index.tolist()

        copies = fake.loc[dup_idxs, :]

        if return_len:
            return len(copies)
        else:
            return copies

#Evaluation

In [None]:
def uni_test(real,fake,c_col,n_col):

  ks=num_statistics_df(real,fake,kolmogorov_smirnov_test,n_col)##equal distribution?
  mw=num_statistics_df(real,fake,mannwhitneyu,n_col) #equal mean?
  md=num_statistics_df(real,fake,mood_test,n_col)#equal median?
  le=num_statistics_df(real,fake,levene_test,n_col) #equal variance?
  cs=cat_statistics_df(real,fake,chisquare_test,c_col)#same frequency
  kl=num_divergence_df(real,fake,kl_divergence)
  js=num_divergence_df(real,fake,js_divergence)
  em=num_distance_df(real,fake,em_distance)

  result=pd.concat([ks,mw,md,le,cs,kl,js,em],axis=1)
  return result[['ks_p-value','mw_p-value','md_p-value','le_p-value', 'ct_p-value', 'kl_divergence','js_distance', 'em_distance']]

In [None]:
data_dir='/content/drive/MyDrive/Experiment/DATA/'
dfs_path=['credit-g','national-longitudinal-survey-binary','company-bankruptcy','law-school-admission-bianry','adult','diabetes']
folds=5
GMs=['Simulation.csv','SMOTE.csv','ADASYN.csv','SVMSMOTE.csv','SMOTENC.csv','SMOTETomek.csv','Synthpop.csv','Copula.csv','ctgan.csv','tvae.csv']

target_col="class"

###Statistical Similarity

In [None]:

for data_folder in dfs_path:
  print(data_folder)
  print("-"*50)
  uni_result={}
  biv_result={}
  mmd_result={}
  pca_result={}
  det_result={}
  loaded=load_data(data_dir,data_folder,GMs)

  c_col=loaded['c_col']
  n_col=loaded['n_col']
  real=loaded['real']

  for syn_name,fake in loaded['fake'].items():
    print(syn_name)
    print("-"*30)
    fake.columns=real.columns
    reals,fakes=data_preprocess(real,fake,c_col,onehot=False)
    reale,fakee=data_preprocess(real,fake,c_col,onehot=True)
    uni_result[syn_name]=uni_test(reals,fakes,c_col,n_col)

    p,rc,fc=bivariate_test(reals,fakes,c_col)
    biv_result[syn_name]=p

    pca=pca_correlation(reals,fakes)
    pca_result[syn_name]=pca

    if reals.shape[0]>30000:
      reals,fakes=reals.sample(30000),fakes.sample(30000)
    mmd=mmd_kernel(reals,fakes)
    mmd_result[syn_name]=mmd


  pickle.dump(uni_result,open(data_dir+data_folder+'/uni_result.pickle', 'wb'))
  pickle.dump(biv_result,open(data_dir+data_folder+'/biv_result.pickle', 'wb'))
  pickle.dump(mmd_result,open(data_dir+data_folder+'/mmd_result.pickle', 'wb'))
  pickle.dump(pca_result,open(data_dir+data_folder+'/pca_result.pickle', 'wb'))

In [None]:
def pri_test(real,fake,c_col):
  fake[c_col]=fake[c_col].round()
  dup=get_copies(real,fake)
  sma=small_counts_ratio(fake,c_col)
  return dup,sma

In [None]:
all_pri={}
for data_folder in dfs_path:
  print(data_folder)
  print("-"*50)
  pri_result={}
  loaded=load_data(data_dir,data_folder,GMs)

  c_col=loaded['c_col']
  n_col=loaded['n_col']
  real=loaded['real']

  for syn_name,fake in loaded['fake'].items():
    print(syn_name)
    print("-"*30)
    fake.columns=real.columns
    pri=pri_test(real,fake,c_col)
    pri_result[syn_name]=pri

  # pickle.dump(det_result,open(data_dir+data_folder+'/det_result.pickle', 'wb'))
  all_pri[data_folder]=pri_result

credit-customers
--------------------------------------------------
Simulation.csv
SMOTE.csv
SMOTENC.csv
SMOTETomek.csv
Synthpop.csv
Copula.csv
ctgan.csv
tvae.csv
SIMULATION
------------------------------
SMOTE
------------------------------
SMOTENC
------------------------------
SMOTETOMEK
------------------------------
SYNTHPOP
------------------------------
COPULA
------------------------------
CTGAN
------------------------------
TVAE
------------------------------
national-longitudinal-survey-binary
--------------------------------------------------
Simulation.csv
SMOTE.csv
SMOTENC.csv
SMOTETomek.csv
Synthpop.csv
Copula.csv
ctgan.csv
tvae.csv
SIMULATION
------------------------------
SMOTE
------------------------------
SMOTENC
------------------------------
SMOTETOMEK
------------------------------
SYNTHPOP
------------------------------
COPULA
------------------------------
CTGAN
------------------------------
TVAE
------------------------------
company-bankruptcy
--------------

In [None]:
t=[]
for dn,df in all_pri.items():
  t.append(pd.DataFrame(all_pri[dn]).T)

###ML utility

In [None]:
data_dir='/content/drive/MyDrive/Experiment/DATA/'

folds=5

for data_folder in dfs_path:
  target_col="class"
  if data_folder in ['national-longitudinal-survey-binary','law-school-admission-bianry']:
    target_col="label"
  df_result={}
  print(data_folder)
  for i in range(folds):
    print(f'Fold {i} is currently processing------------------------')
    print('\n')
    seed_folder=data_dir+data_folder+'/synthetic/seed'+str(i)
    baseline=pd.read_csv(os.path.join(seed_folder, 'baseline.csv'))
    df_test=pd.read_csv(os.path.join(seed_folder, 'df_test.csv'))
    r=ml_evaluation(baseline,baseline,df_test,target_col)
    df_result[data_folder+'.baseline'+'.seed'+str(i)]=r
    for file in GMs:
      print(file)
      fake=pd.read_csv(os.path.join(seed_folder, file))
      fake.columns=baseline.columns
      fake[target_col]=adjusted_classes(fake[target_col],0.5)
      r=ml_evaluation(baseline,fake,df_test,target_col)
      df_result[data_folder+'.'+file[:-4]+'.seed'+str(i)]=r
  pickle.dump(df_result,open(data_dir+data_folder+'/ml_result.pickle', 'wb'))

In [None]:
all_uni={}
all_biv={}
all_mmd={}
all_pca={}
all_ml={}
all_det={}
for data_folder in dfs_path:
  temp={}
  print(data_folder)
  print("-"*50)

  uni_data = pickle.load(open(data_dir+data_folder+'/uni_result.pickle', 'rb'))
  for key, df in uni_data.items():
    temp[key]=df.loc['mean']
  all_uni[data_folder]=pd.DataFrame(temp).T

  biv_data = pickle.load(open(data_dir+data_folder+'/biv_result.pickle', 'rb'))
  all_biv[data_folder]=biv_data

  mmd_data = pickle.load(open(data_dir+data_folder+'/mmd_result.pickle', 'rb'))
  all_mmd[data_folder]=mmd_data

  pca_data = pickle.load(open(data_dir+data_folder+'/pca_result.pickle', 'rb'))
  all_pca[data_folder]=pca_data

  ml_data = pickle.load(open(data_dir+data_folder+'/ml_result.pickle', 'rb'))
  all_ml[data_folder]=pd.concat(ml_data)

  det_data = pickle.load(open(data_dir+data_folder+'/det_result.pickle', 'rb'))
  all_det[data_folder]=pd.concat(det_data)


# pd.concat(all_uni).to_csv('uni.test.csv')
# pd.DataFrame(all_biv).T.mean()

credit-customers
--------------------------------------------------
national-longitudinal-survey-binary
--------------------------------------------------
company-bankruptcy
--------------------------------------------------
law-school-admission-bianry
--------------------------------------------------
diabetes
--------------------------------------------------


In [None]:
all_uni={}
all_biv={}
all_mmd={}
all_pca={}
all_ml={}
all_dis={}
for data_folder in dfs_path:
  temp={}
  print(data_folder)
  print("-"*50)

  uni_data = pickle.load(open(data_dir+data_folder+'/uni_result.pickle', 'rb'))
  for key, df in uni_data.items():
    temp[key]=df.loc['mean']
  all_uni[data_folder]=pd.DataFrame(temp).T

  biv_data = pickle.load(open(data_dir+data_folder+'/biv_result.pickle', 'rb'))
  all_biv[data_folder]=biv_data

  mmd_data = pickle.load(open(data_dir+data_folder+'/mmd_result.pickle', 'rb'))
  all_mmd[data_folder]=mmd_data

  pca_data = pickle.load(open(data_dir+data_folder+'/pca_result.pickle', 'rb'))
  all_pca[data_folder]=pca_data

  ml_data = pickle.load(open(data_dir+data_folder+'/ml_result.pickle', 'rb'))
  all_ml[data_folder]=pd.concat(ml_data)

  dis_data = pickle.load(open(data_dir+data_folder+'/dis_result.pickle', 'rb'))
  all_dis[data_folder]=pd.DataFrame(dis_data).T

# pd.concat(all_uni).to_csv('uni.test.csv')
# pd.DataFrame(all_biv).T.mean()

credit-customers
--------------------------------------------------
national-longitudinal-survey-binary
--------------------------------------------------
company-bankruptcy
--------------------------------------------------
law-school-admission-bianry
--------------------------------------------------
diabetes
--------------------------------------------------


In [None]:
pd.DataFrame(dis_data).T[['distance_mean','distance_1nn']]

Unnamed: 0,distance_mean,distance_mean.1,distance_1nn
SIMULATION,3.67,0.33,1.76
SMOTE,3.59,0.32,1.25
SMOTENC,3.56,0.35,1.27
SMOTETOMEK,3.59,0.32,1.25
SYNTHPOP,3.69,0.41,2.12
COPULA,3.87,0.44,2.22
CTGAN,3.69,0.38,1.65
TVAE,3.61,0.36,1.59
