In [1]:
import numpy as np,os
from collections import defaultdict
from sklearn.preprocessing import StandardScaler
from sklearn.base import BaseEstimator, ClassifierMixin
from sklearn.datasets import load_breast_cancer, make_classification
from sklearn.linear_model import LogisticRegression
from sklearn.linear_model._logistic import _logistic_regression_path, _logistic_loss, _intercept_dot
from sklearn.model_selection import cross_val_score, RepeatedStratifiedKFold, GridSearchCV, StratifiedShuffleSplit
from sklearn.metrics import accuracy_score, confusion_matrix
from sklearn.utils.extmath import log_logistic
from scipy.optimize import minimize
import matplotlib.pyplot as plt
from libs import corrupt_label, load_mat, noisy_evaluate

In [2]:
def log_loss(wp,X,target,C,PN,NP):
    c = wp[-1]
    w = wp[:-1]
    z = np.dot(X,w) + c
    yz = target * z
    nyz = -target * z
    ls = -log_logistic(yz)
    nls = -log_logistic(nyz)
    idx = target==1
    loss = ls.copy()
    loss[idx] = (1-NP)*ls[idx] - PN*nls[idx]
    loss[~idx] = (1-PN)*ls[~idx] - NP*nls[~idx]
    loss = loss/(1-PN-NP) + .5 * C * np.dot(w, w)
    return loss.sum()

In [3]:
class Logit(LogisticRegression,BaseEstimator,ClassifierMixin):
    def __init__(self,PN=.2, NP=.2, robust=True,C=1.0,max_iter=100):
        super().__init__(C=C,max_iter=max_iter)
        self.PN = PN
        self.NP = NP
        self.robust= robust
    
    def fit(self,X,y):
        w0 = np.zeros(X.shape[1]+1)
        target = y.copy()
        target[target==0] = -1
        if self.robust:
            self.r_ = minimize(log_loss,w0,method="L-BFGS-B",args=(X, target, self.C,self.PN,self.NP),
                               options={"maxiter": self.max_iter})
        else:
            self.r_ = minimize(_logistic_loss,w0,method="L-BFGS-B",args=(X, target, self.C),options={"maxiter": self.max_iter})

    def predict(self,X):
        c = self.r_.x[-1]
        w = self.r_.x[:-1]
        z = np.dot(X,w) + c
        y = z.reshape(-1)
        y[y<=0] = 0
        y[y>0] = 1
        return y

In [8]:
def create_noise(y,PN,NP):
    yn = y.copy()
    idp = np.argwhere(y==1).ravel()
    idp = np.random.choice(idp,size=int(len(idp)*PN),replace=False)
    yn[idp] = 0
    idp = np.argwhere(y==0).ravel()
    idp = np.random.choice(idp,size=int(len(idp)*NP),replace=False)
    yn[idp] = 1
    return yn

In [10]:
datasets = ['heart','german','breast_cancer','diabetis','thyroid','image','banana']
X,y = load_mat('german') 
cm = np.array([
    [.8,.2],
    [.2,.8]
])
X.shape,np.unique(y,return_counts=True)

((1000, 20), (array([0, 1], dtype=int32), array([700, 300])))

In [11]:
params = {
    'PN':np.linspace(.1,.45,8),
    'NP':np.linspace(.1,.45,8)
}
lr = Logit(max_iter=100,C=1.0)
clf = GridSearchCV(lr,params,cv=StratifiedShuffleSplit(n_splits=3))

In [12]:
clf.fit(X,create_noise(y,PN=.2,NP=.2))
clf.best_params_

{'NP': 0.1, 'PN': 0.1}

In [13]:
%%time
scores = []
for _ in range(3):
    yn = create_noise(y,PN=.2,NP=.2)
    CV = StratifiedShuffleSplit(n_splits=3,test_size=.33)
    r = noisy_evaluate(clf,X,yn,y,CV,[accuracy_score])
    scores.append(list(r.values())[0]*100)
    print(list(r.values())[0]*100)
print(f"{sum(scores)/len(scores):.3f}")

70.0
69.8989898989899
70.1010101010101
70.000
Wall time: 9min 43s
