In [None]:
%matplotlib inline

import pystan
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, accuracy_score
from sklearn.linear_model import LogisticRegression
from sklearn.preprocessing import StandardScaler

In [None]:
sns.set(style='white', font_scale=1.2)

## Data

In [None]:
iris = load_iris()

X_train, X_test, y_train, y_test = train_test_split(iris['data'], iris['target'], test_size=0.3)

scaler = StandardScaler()
X_train = pd.DataFrame(scaler.fit_transform(X_train), columns=iris['feature_names'])
X_test = pd.DataFrame(scaler.transform(X_test), columns=iris['feature_names'])

print(f'Train sample dim: {X_train.shape}')
print(f'Train class proportions: {iris["target_names"][0]}: {(y_train == 0).mean():.2f}, {iris["target_names"][1]}: {(y_train == 1).mean():.2f}, {iris["target_names"][2]}: {(y_train == 2).mean():.2f}')
print(f'Test samples: {len(y_test)}')
print(f'Train class proportions: {iris["target_names"][0]}: {(y_test == 0).mean():.2f}, {iris["target_names"][1]}: {(y_test == 1).mean():.2f}, {iris["target_names"][2]}: {(y_test == 2).mean():.2f}')

## Standard logistic regression

In [None]:
logreg = LogisticRegression(C=10)
logreg.fit(X_train, y_train)
ypred_train = logreg.predict(X_train)
ypred_test = logreg.predict(X_test)

print(f'intercept = {logreg.intercept_}')
print(f'beta = {logreg.coef_}')

print()
print('Train')
print(classification_report(y_train, ypred_train, target_names=iris['target_names']))

print()
print('Test')
print(classification_report(y_test, ypred_test, target_names=iris['target_names']))

## Robust logistic regression

In [None]:
class StanClassifierWrapper():
    def __init__(self, stan_model, prior_sigma=1, transition_prior=1, restarts=1):
        self.sm = stan_model
        self.intercept_ = None
        self.coef_ = None
        self.gamma_ = None
        self.prior_sigma = prior_sigma
        self.transition_prior = transition_prior
        self.restarts = restarts

    def fit(self, X, y):
        K = len(np.unique(y))
        data = {
            'N': X.shape[0],
            'D': X.shape[1],
            'K': K,
            'X': X,
            'y': y + 1,
            'prior_sigma': self.prior_sigma,
            'transition_prior': self.transition_prior
        }
        
        best_par = None
        best_lp = None
        for _ in range(max(self.restarts, 1)):
            res = sm.optimizing(data=data, init=self.param_initializer(K, X.shape[1]), as_vector=False)
            if best_lp is None or res['value'] > best_lp:
                best_par = res['par']
                best_lp = res['value']
            
        self.intercept_ = best_par['alpha']
        self.coef_ = best_par['beta'].T
        if 'gamma' in best_par:
            self.gamma_ = best_par['gamma']

    def predict(self, X):
        return np.asarray((self.intercept_ + X.dot(self.coef_.T)).idxmax(axis=1))
    
    def param_initializer(self, num_classes, num_features):
        def wrapped():
            ginit = 0.01/(num_classes-1)*np.ones((num_classes, num_classes))
            np.fill_diagonal(ginit, 0.99)
            return {
                'alpha': np.random.uniform(-2, 2, num_classes),
                'beta': np.random.uniform(-2, 2, (num_features, num_classes)),
                'gamma': ginit
            }
        
        return wrapped

In [None]:
robust_code = """
data {
    int<lower=0> N;  // number of observations
    int<lower=1> D;  // number of predictors
    int<lower=2> K;  // number of classes
    matrix[N, D] X;
    int<lower=1,upper=K> y[N];  // observed (noisy) class labels
    real prior_sigma;
    real transition_prior;
}
parameters {
    vector[K] alpha;
    matrix[D, K] beta;
    simplex[K] gamma[K];
}
model {
    matrix[N, K] x_beta = rep_matrix(to_row_vector(alpha), N) + X * beta;
    
    alpha ~ normal(0, prior_sigma);
    to_vector(beta) ~ normal(0, prior_sigma);
    for (i in 1:K) {
        gamma[i] ~ dirichlet(rep_vector(transition_prior, K));
    }

    for (n in 1:N) {
        vector[K] u = rep_vector(0.0, K);
        vector[K] w = softmax(x_beta[n]');
        
        for (j in 1:K) {
            u += w[j] * gamma[j];
        }
                
        y[n] ~ categorical(u);
    }
}
"""

sm = pystan.StanModel(model_code=robust_code)

In [None]:
clf = StanClassifierWrapper(sm, prior_sigma=10, transition_prior=2, restarts=6)
clf.fit(X_train, y_train)

ypred_train = clf.predict(X_train)
ypred_test = clf.predict(X_test)

print(f'intercept = {clf.intercept_}')
print(f'beta = {clf.coef_}')
print(f'gamma = {clf.gamma_}')

print()
print('Train')
print(classification_report(y_train, ypred_train, target_names=iris['target_names']))

print()
print('Test')
print(classification_report(y_test, ypred_test, target_names=iris['target_names']))

## Evaluation

In [None]:
def draw_transition_prob(size):
    U = np.zeros([size, size])
    for i in range(size):
        ind = [x for x in range(size) if x != i]
        U[i, ind] = np.random.dirichlet(0.5*np.ones(size - 1))
    return U

def draw_noisy_labels(y_true, noise_prob, transition_prob):
    K = transition_prob.shape[0]
    N = len(y_true)

    y_noisy = np.array(y_true)
    for i in np.random.choice(N, size=int(noise_prob*N), replace=False):
        y_noisy[i] = np.random.choice(range(K), p=transition_prob[y_true[i], :])

    return y_noisy

def eval_test_error(clf, X_train, y_train, X_test, y_test):
    clf.fit(X_train, y_train)
    ypred = clf.predict(X_test)
    return 1 - accuracy_score(y_test, ypred)

In [None]:
n_repeats = 40
noise_proportions = np.array([0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6])

errors = []
for i, flip_prob in enumerate(noise_proportions):
    scores = []
    for _ in range(n_repeats):
        T = draw_transition_prob(3)
        y_noisy = draw_noisy_labels(y_train, flip_prob, T)
        noise_level = 1.0 - (y_noisy == y_train).mean()

        robust = StanClassifierWrapper(sm, prior_sigma=10, transition_prior=2, restarts=6)
        r_err = eval_test_error(robust, X_train, y_noisy, X_test, y_test)
        errors.append({
            'Model': 'Robust regression',
            'Noise level (%)': 100*noise_level,
            'Test error (%)': 100*r_err
        })
        
        logreg = LogisticRegression(C=10)
        r_err = eval_test_error(logreg, X_train, y_noisy, X_test, y_test)
        errors.append({
            'Model': 'Logistic regression',
            'Noise level (%)': 100*noise_level,
            'Test error (%)': 100*r_err
        })

errors = pd.DataFrame(errors)

In [None]:
sns.relplot(x='Noise level (%)', y='Test error (%)', hue='Model', kind='line', data=errors, height=6);