<a href="https://colab.research.google.com/github/arjunsshah/C2ST/blob/main/C2ST.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Classifier Based Two Sample Testing
In two-sample testing, we want to determine if two selected samples come from the same or different distributions. C2ST aims to use neural network classifiers to help determine this. 

## Process
Two samples, $S_P = \{x_1,...,x_n\}$ and $S_Q = \{y_1, ..., y_m\}$

1) Construct our dataset

$D = \{(x_i,0)\}^n_{i=1} \cup \{(y_i,0)\}^n_{i=1} = \{z_i,l_i\}^{2n}_{i=1}$

2) Split dataset

Shuffle into train-test split

3) Train binary classifier

4) Return a classification accuracy on $D_{te}$:

$\hat t = \frac{1}{n_{te}} \Sigma_{(z_i, l_i) \in D_{te}} I[I(f(z_i) > \frac{1}{2}) = l_i]$


5) Accept/Reject $H_0$: A significance threshold is written up below where if our c2st statistics is less than it, we accept the null hypothesis, otherwise we reject.


## How to use this notebook?
I will outline the steps needed to understand and use this notebook to perform classifier two-sample testing on any datasets

Below I have outlined a c2st function that takes in both samples, X and Y, along with other parameters that can be edited. 

For example, here is a classifier two-sample test between a student t-distribution and gaussian distribution:



```
student = scale(np.random.standard_t(20, size = 1000)).reshape(-10,10) # X
gaussian = scale(np.random.normal(0,1,1000)).reshape(-10,10) # Y

c2st(student, gaussian)
```
The last line will return the c2st score for the two samples. Currently, the default parameter for the machine learning model used as our classifier is a RandomForestClassifier but can be changed as a parameter:



```
c2st(student, gaussian, clf=MLPClassifier(activation='tanh', hidden_layer_sizes=(10,10,10), max_iter=600))
```
Here, you can edit the number of hidden layers, activation functions, etc for our classifier. Here is a [link](https://scikit-learn.org/stable/modules/generated/sklearn.neural_network.MLPClassifier.html) to the sckit-learn MLPClassifer model for specifics.

### Accepting/Rejecting $H_0$ and Power

I have created two functions, significance_threshold and power, in order to determine when we accept/reject our null hypothesis. Both of these formulas come straight from Lopez-Paz's paper in the appendix. Here is an example of using the significance_threshold function:


```
significance_threshold(0.05, 100)
```
The first parameter is the significance level and the second parameter is the size of our test set when we do the train test split. 

The power function is similar in nature except it has an additional parameter representing the distance of our c2st score from 0.5 (the null hypothesis). Here is an example of how this can be calculated using our student vs gaussian example:


```
c2stScore = c2st(student, gaussian)
epsilon = c2stScore - 0.5
power(0.05, 1000, epsilon)
```













In [None]:
from typing import Union, Tuple
import warnings

import numpy as np
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import KFold, cross_val_score
from __future__ import annotations
import numpy as np
from functools import partial
from numpy.random import default_rng
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import KFold, cross_val_score
from sklearn.neural_network import MLPClassifier
from sklearn import __version__ as sklversion
import time
import statsmodels.api as sm
import statsmodels.formula.api as smf
import scipy
import math
from sklearn.preprocessing import scale

FIXEDSEED = 1309


NDIM = 10
max_nsamples = 8096
sample_sizes = [ 2**it for it in range(7,12)]
sample_sizes.append(max_nsamples)
RNG = default_rng(FIXEDSEED)
print(sample_sizes)

  import pandas.util.testing as tm


[128, 256, 512, 1024, 2048, 8096]


In [None]:
def c2st(
    X: np.ndarray,
    Y: np.ndarray,
    scoring: str = "balanced_accuracy",
    z_score: bool = True,
    noise_scale: float = None,
    verbosity: int = 0,
    clf=RandomForestClassifier(random_state=1),
    cv=KFold(n_splits=10, shuffle=True, random_state=1),
    return_scores: bool = False,
    nan_drop: bool = False,
) -> Union[float, Tuple[float, np.ndarray]]:

  """
   Args:
        X: Samples from one distribution, shape (n_samples_X, n_features)
        Y: Samples from another distribution, shape (n_samples_Y, n_features)
        scoring: a classifier scoring metric, anything that
            sklearn.model_selection.cross_val_score(scoring=...) accepts
        z_score: Z-scoring using X
        noise_scale: If passed, will add Gaussian noise with std noise_scale to
            samples of X and of Y
        verbosity: control the verbosity of
            sklearn.model_selection.cross_val_score
        clf: a scikit-learn classifier class instance
        cv: cross-validation class instance with sklearn API, e.g.
            sklearn.model_selection.KFold
        return_scores: Return 1d array of CV scores in addition to their mean
        nan_drop: Filter NaNs from CV scores and at least return the mean of
            the values left in scores.
  """
    
    # if z_score:
    #     X_mean = np.mean(X, axis=0)
    #     X_std = np.std(X, axis=0)
    #     X = (X - X_mean) / X_std
    #     Y = (Y - X_mean) / X_std

  # if noise_scale is not None:
  #       X += noise_scale * np.random.randn(*X.shape)
  #       Y += noise_scale * np.random.randn(*Y.shape)

    # prepare data
  data = np.concatenate((X, Y))
    # labels
  target = np.concatenate((np.zeros((X.shape[0],)), np.ones((Y.shape[0],))))

  scores = cross_val_score(
        clf, data, target, cv=cv, scoring=scoring, verbose=verbosity
  )

  if nan_drop:
      isnan = np.isnan(scores)
      if isnan.any():
          scores = scores[~isnan]
      if len(scores) == 0:
          warnings.warn("Only NaN scores, return NaN")
          if return_scores:
              return np.nan, np.array([np.nan] * len(isnan))
          else:
              return np.nan
  mean_scores = scores.mean()
  return mean_scores, scores
  # if return_scores:
  #     return scores
  # else:
  #     return mean_scores, scores

In [None]:
mu1, sigma1 = 0, 0.1 # mean and standard deviation
mu2, sigma2 = 0, 0.3
s1 = np.random.normal(mu1, sigma1, 100).reshape(-1,1)
s2 = np.random.normal(mu2, sigma2, 100).reshape(-1,1)
c2st(s1,s2)

(0.6672896547896549,
 array([0.49494949, 0.77083333, 0.45604396, 0.70707071, 0.73076923,
        0.57142857, 0.77083333, 0.8989899 , 0.59340659, 0.67857143]))

In [None]:
c2st(s1,s2)

0.9661999200900804

In [None]:
def significance_threshold(
    # significance threshold
    # we accept the null hypothesis if we are under this threshold, reject when
    # above this threshold
    a: float,
    nte: int,

  ) -> float:
  """
  Args: 
    a: significance level
    nte: size of test set in D

  """

  z = 0.5 + (scipy.stats.norm.ppf(1-a)) / math.sqrt(4 * nte)
  return z

In [None]:
def power(
    # returns the probability of making a type II error
    a: float,
    nte: int,
    epsilon: float # epsilon is the different from the acheived c2st score and 0.5
  ) -> float:
  """
  Args:
    a: significance level
    nte: size of test set in D
    epsilon: difference between c2st score and 0.5
  """

  p = scipy.stats.norm.cdf( ( (scipy.stats.norm.ppf(1-a) / 2) - epsilon * math.sqrt(nte) ) / math.sqrt((1/4) - epsilon**2) )
  return p 

In [None]:
significance_threshold(0.05, 100)

0.5822426813475736

In [None]:
power(0.05, 100, 0.08)

0.518121309052312

In [None]:
# test to repeatedly accept/reject the null hypothesis
num_accepted = 0
num_rejected = 0
thresh = significance_threshold(0.05, 1000)
for i in range(100):
  mu1, sigma1 = 0, 0.1 # mean and standard deviation
  mu2, sigma2 = 0, 0.1
  s1 = np.random.normal(mu1, sigma1, 1000).reshape(-10,10)
  s2 = np.random.normal(mu2, sigma2, 1000).reshape(-10,10)
  c2stScore = c2st(s1, s2)
  if c2stScore < thresh:
    num_accepted += 1
  else:
    num_rejected += 1
print(num_rejected / 100)

  # This is added back by InteractiveShellApp.init_path()


ValueError: ignored

In [None]:
# student t distribution vs gaussian
student = scale(np.random.standard_t(20, size = 1000)).reshape(-10,10)
gaussian = scale(np.random.normal(0,1,1000)).reshape(-10,10)

c2stScore = c2st(student, gaussian)
epsilon = c2stScore - 0.5
c2stScore

0.5017250267418527

In [None]:
# student t distribution vs gaussian power
type2Error = []
dOfF = list(range(1,20))
for i in dOfF:
  student = scale(np.random.standard_t(i, size = 1000)).reshape(-10,10)
  gaussian = scale(np.random.normal(0,1,1000)).reshape(-10,10)
  c2stScore = c2st(student, gaussian)
  epsilon = abs(c2stScore - 0.5)
  type2Error.append(power(0.05, 1000, epsilon))



  


In [None]:
c2st(X=s1,Y=s2, clf=MLPClassifier(activation='tanh', hidden_layer_sizes=(10,10,10), max_iter=600))

0.7981073450113698