In [1]:
import numpy

In [2]:
import sklearn.metrics
from sklearn.linear_model import LogisticRegression as LR

In [3]:
# the following implementation only work with binary labels
class SelfLearningModel():
    # pass in evaluator for labeled data as basemodel
    def __init__(self, basemodel, max_iter = 200, prob_threshold = 0.8):
        self.model = basemodel
        self.max_iter = max_iter
        self.prob_threshold = prob_threshold 
        
    # unlabeled data is marked as -1 in y    
    def fit(self, X, y):
        unlabeledX = X[y==-1, :]
        labeledX = X[y!=-1, :]
        labeledy = y[y!=-1]
        
        self.model.fit(labeledX, labeledy)
        
        # to get an estimate of proba if basemodel does not have #predict_proba (with Platt scaling)
        if not getattr(self.model, "predict_proba", None):
            self.plattlr = LR()
            preds = self.model.predict(labeledX)
            self.plattlr.fit( preds.reshape( -1, 1 ), labeledy ) # essentially drawing a boundary between the labelledy labels to estimate the proba
            
        unlabeledy = self.model.predict(unlabeledX)
        unlabeledprob = self.predict_proba(unlabeledX)
        
        unlabeledy_old = []
        #re-train, labeling unlabeled instances with model predictions, until convergence
        i = 0
        while (len(unlabeledy_old) == 0 or numpy.any(unlabeledy!=unlabeledy_old)) and i < self.max_iter:
            unlabeledy_old = numpy.copy(unlabeledy)
            # only works if there are only 2 labels
            uidx = numpy.where((unlabeledprob[:, 0] > self.prob_threshold) | (unlabeledprob[:, 1] > self.prob_threshold))[0]
            
            self.model.fit(numpy.vstack((labeledX, unlabeledX[uidx, :])), numpy.hstack((labeledy, unlabeledy_old[uidx])))
            unlabeledy = self.model.predict(unlabeledX) # does unlabeledX auto update?
            unlabeledprob = self.predict_proba(unlabeledX)
            i += 1
            
        return self
    
    def predict_proba(self, X):
        if getattr(self.model, "predict_proba", None):
            return self.model.predict_proba(X)
        else:
            preds = self.model.predict(X)
            return self.plattlr.predict_proba(preds.reshape( -1, 1 ))
        
    def score(self, X, y, sample_weight=None):
        return sklearn.metrics.accuracy_score(y, self.model.predict(X), sample_weight=sample_weight)

In [38]:
import pandas as pd
ds = pd.read_csv('heart_disease_weka_dataset.csv')

In [39]:
ytrue = ds['num'].values # actual labels
X = ds.drop(['num'], axis=1).values

In [40]:
import numpy as np
import random
# create unlabelled data - set all as unlabelled first
ys = np.array([-1]*len(ytrue)) # -1 denotes unlabeled point

labeled_N = int(2)
# get N/2 points where y == 0 and N/2 points wherre y == 1
random_labeled_points = random.sample(list(np.where(ytrue == 0)[0]), labeled_N//2) + random.sample(list(np.where(ytrue == 1)[0]), labeled_N//2)

# add N points of labeled data into unlabelled data
ys[random_labeled_points] = ytrue[random_labeled_points]

In [7]:
from sklearn.linear_model import SGDClassifier

basemodel = SGDClassifier(loss='log', penalty='l1', max_iter=5, tol=-np.infty)

# (traditional) supervised training
basemodel.fit(X[random_labeled_points, :], ys[random_labeled_points])
print("supervised log.reg. score", basemodel.score(X, ytrue))

supervised log.reg. score 0.4612794612794613


In [8]:
ssmodel = SelfLearningModel(basemodel)
ssmodel.fit(X, ys)
print("self-learning log.reg. score", ssmodel.score(X, ytrue))

self-learning log.reg. score 0.5387205387205387


In [None]:
# Contrastive Pessimistic Likelihood Estimation - the following implementation only work with binary labels

import numpy
import sklearn.metrics
from sklearn.linear_model import LogisticRegression as LR
import nlopt # for optimization, i.e. minimization of cost
import scipy.stats # for t-test
import sys

class CPLELearningModel():
    def __init__(
        self, 
        basemodel, 
        pessimistic=True, 
        predict_from_probabilities = False, 
        use_sample_weighting = True, 
        max_iter=3000, 
        verbose = 1
    ):
        self.model = basemodel
        self.pessimistic = pessimistic
        self.predict_from_probabilities = predict_from_probabilities
        self.use_sample_weighting = use_sample_weighting
        self.max_iter = max_iter
        self.verbose = verbose
        
        self.it = 0 # iteration counter
        self.noimprovementsince = 0 # log likelihood hasn't improved since this number of iterations
        self.maxnoimprovementsince = 3 # threshold for iterations without improvements (convergence is assumed when this is reached)
        
        self.buffersize =  200
        # buffer for the last few discriminative likelihoods (used to check for convergence)
        self.lastdls = [0]*self.buffersize # same as np.zeros, except returns int 0 and list obj instead of array obj
        
        # best discriminative likelihood and corresponding soft labels; updated during training
        self.bestdl = numpy.infty
        self.bestlbls = []

    # random probs are assigned to the unlabelled data, which is subseq. optimized through an optimization procedure   
    # Explanation: converting softlabels(floats) into hard labels(binary), and then fit all the data (preferably with weights based on softlabels) 
    # the cost is then calculated by the accuracy of the labelled data + comparing the accuracy of the prediction of the fitted model with the assigned (random) labels
    def discriminative_likelihood(
        self, 
        model, 
        labeledData, 
        labeledy = None, 
        unlabeledData = None, 
        unlabeledWeights = None, # predicted probs of unlabeled y
        unlabeledlambda = 1
    ):
        unlabeledy = (unlabeledWeights[:, 0]<0.5)*1 # taking prob, those below 0.5 are assiged 0; above 0.5 are assigned 1 (*1 converts boolean to binary)
        uweights = numpy.copy(unlabeledWeights[:, 0])
        uweights[unlabeledy==1] = 1-uweights[unlabeledy==1] # subtract from 1 for k=1 instances to reflect confidence; confidence is larger if prediction is closer to 0, i.e. pessimistic??
        weights = numpy.hstack((numpy.ones(len(labeledy)), uweights)) # labeled y have 100% confidence; merged with that of the predicted unlabeled data
        labels = numpy.hstack((labeledy, unlabeledy))
        
        # fit model on supervised(labeled+unlabeled) data
        if self.use_sample_weighting: # 
            model.fit(numpy.vstack((labeledData, unlabeledData)), labels, sample_weight=weights) # use confidence level obtained above
        else:
            model.fit(numpy.vstack((labeledData, unlabeledData)), labels)
        
        # probability of labeled data
        P = model.predict_proba(labeledData)
        labeledDL = -sklearn.metrics.log_loss(labeledy, P) # cost, i.e. negative log-likelihood

        # probability of unlabeled data
        unlabeledP = model.predict_proba(unlabeledData)  
        eps = 1e-15
        unlabeledP = numpy.clip(unlabeledP, eps, 1 - eps) # so that wouldn't take log(0)
        # separate weights into those predicting 0 and those 1; multiplied with predictions; summed, then take the mean; i.e. negative log llikelihood with each sample multiplied by respective weights
        unlabeledDL = numpy.average((unlabeledWeights * numpy.vstack((1-unlabeledy, unlabeledy)).T * numpy.log(unlabeledP)).sum(axis=1))
        
        # calc cost
        if self.pessimistic:
            # pessimistic: minimize the difference between unlabeled and labeled discriminative likelihood (assume worst case for unknown true labels)
            dl = unlabeledlambda * unlabeledDL - labeledDL
        else: 
            # optimistic: minimize negative total discriminative likelihood (i.e. maximize likelihood) 
            dl = - unlabeledlambda * unlabeledDL - labeledDL
        
        return dl
    
    # allow breaking out of nlopt if cost function converged
    def discriminative_likelihood_objective(
        self, 
        model, 
        labeledData, 
        labeledy = None, 
        unlabeledData = None, 
        unlabeledWeights = None, 
        unlabeledlambda = 1
    ):
        dl = self.discriminative_likelihood(
            model, 
            labeledData, 
            labeledy, 
            unlabeledData, 
            unlabeledWeights, 
            unlabeledlambda
        )
        
        self.it += 1
        self.lastdls[numpy.mod(self.it, len(self.lastdls))] = dl # cost result stored in lastdls in position mod(iter) index (instead of a rolling update)
        
        if numpy.mod(self.it, self.buffersize) == 0: # or True:
            # improvement of last half to first half
            improvement = numpy.mean((self.lastdls[(len(self.lastdls)//2):])) - numpy.mean((self.lastdls[:(len(self.lastdls)//2)]))
            # ttest - test for hypothesis that the likelihoods have not changed (i.e. there has been no improvement, and we are close to convergence) 
            _, prob = scipy.stats.ttest_ind(self.lastdls[int((len(self.lastdls)/2)):], self.lastdls[:int((len(self.lastdls)/2))])
            
            # take confidence level at 90%, i.e. alpha=0.1
            # if improvement is not certain accoring to t-test...
            noimprovement = prob > 0.1 and improvement < 0
            if noimprovement:
                self.noimprovementsince += 1
                if self.noimprovementsince >= self.maxnoimprovementsince:
                    # no improvement since a while - converged; exit
                    self.noimprovementsince = 0
                    raise Exception(" converged.") # we need to raise an exception to get NLopt to stop before exceeding the iteration budget
            else:
                self.noimprovementsince = 0
            
            if self.verbose == 2:
                print(self.it, dl, numpy.mean(self.lastdls), improvement, round(prob, 3), (prob < 0.1))
            elif self.verbose:
                sys.stdout.write(('.') if not noimprovement else 'n')
                      
        if dl < self.bestdl:
            self.bestdl = dl
            self.bestlbls = numpy.copy(unlabeledWeights[:, 0])
                        
        return dl
    
    def fit(self, X, y): # -1 for unlabeled
        unlabeledX = X[y==-1, :]
        labeledX = X[y!=-1, :]
        labeledy = y[y!=-1]
        
        M = unlabeledX.shape[0]
         
        # train on labeled data
        self.model.fit(labeledX, labeledy)
        
        # to get an estimate of proba if basemodel does not have #predict_proba (with Platt scaling)
        if not getattr(self.model, "predict_proba", None):
            # Platt scaling
            self.plattlr = LR()
            preds = self.model.predict(labeledX)
            self.plattlr.fit( preds.reshape( -1, 1 ), labeledy )

        # re-train, labeling unlabeled instances pessimistically
        # use random labels initially, optimize by minimizing the cost function
        # note: grads in the arg is necessary for nlopt
        f = lambda softlabels, grads=[]: self.discriminative_likelihood_objective(
                self.model, 
                labeledX, 
                labeledy=labeledy, 
                unlabeledData=unlabeledX, 
                unlabeledWeights=numpy.vstack((softlabels, 1-softlabels)).T, 
            )
        lblinit = numpy.random.random(len(unlabeledX))

        try:
            self.it = 0
            # optimization for minimizing cost(dl)
            opt = nlopt.opt(nlopt.GN_DIRECT_L_RAND, M) # for algorithms, see https://nlopt.readthedocs.io/en/latest/NLopt_Algorithms/
            opt.set_lower_bounds(numpy.zeros(M))
            opt.set_upper_bounds(numpy.ones(M))
            opt.set_min_objective(f)
            opt.set_maxeval(self.max_iter)
            self.bestsoftlbl = opt.optimize(lblinit) # if function converged, #discriminative_likelihood_objective will raise and break out of nlopt
            print(" max_iter exceeded.")
        except Exception as e:
            print(e)
        
        self.bestsoftlbl = self.bestlbls # we rely on the self.bestlbls (in case nlopt is broken out of due to convergence)
        ll = f(self.bestsoftlbl) # the minimized cost

        # same as the middle section in #discriminative_likelihood, can be refactored
        unlabeledy = (self.bestsoftlbl<0.5)*1
        uweights = numpy.copy(self.bestsoftlbl) # large prob. for k=0 instances, small prob. for k=1 instances 
        uweights[unlabeledy==1] = 1-uweights[unlabeledy==1] # subtract from 1 for k=1 instances to reflect confidence
        weights = numpy.hstack((numpy.ones(len(labeledy)), uweights))
        labels = numpy.hstack((labeledy, unlabeledy))
        if self.use_sample_weighting:
            self.model.fit(numpy.vstack((labeledX, unlabeledX)), labels, sample_weight=weights)
        else:
            self.model.fit(numpy.vstack((labeledX, unlabeledX)), labels)
        
        if self.verbose > 1:
            print("number of non-one soft labels: ", numpy.sum(self.bestsoftlbl != 1), ", balance:", numpy.sum(self.bestsoftlbl<0.5), " / ", len(self.bestsoftlbl))
            print("current likelihood: ", ll)
                        
        return self
    
    # same as SelfLearning
    def predict_proba(self, X):
        if getattr(self.model, "predict_proba", None):
            return self.model.predict_proba(X)
        else:
            preds = self.model.predict(X)
            return self.plattlr.predict_proba(preds.reshape( -1, 1 ))
        
    def predict(self, X):
        if self.predict_from_probabilities:
            P = self.predict_proba(X)
            return (P[:, 0]<numpy.average(P[:, 0]))
        else:
            return self.model.predict(X)
    
    def score(self, X, y, sample_weight=None):
        return sklearn.metrics.accuracy_score(y, self.predict(X), sample_weight=sample_weight)

In [44]:
# create unlabelled data - set all as unlabelled first
ys = np.array([-1]*len(ytrue)) # -1 denotes unlabeled point

labeled_N = int(20)
# get N/2 points where y == 0 and N/2 points wherre y == 1
random_labeled_points = random.sample(list(np.where(ytrue == 0)[0]), labeled_N//2) + random.sample(list(np.where(ytrue == 1)[0]), labeled_N//2)

# add N points of labeled data into unlabelled data
ys[random_labeled_points] = ytrue[random_labeled_points]

In [113]:
from sklearn.linear_model import SGDClassifier

basemodel = SGDClassifier(loss='log', penalty='l1', max_iter=3000, tol=-np.infty)

# (traditional) supervised training
basemodel.fit(X[random_labeled_points, :], ys[random_labeled_points])
print("supervised log.reg. score", basemodel.score(X, ytrue))

supervised log.reg. score 0.6835016835016835


In [114]:
ssmodel = SelfLearningModel(basemodel)
ssmodel.fit(X, ys)
print("self-learning log.reg. score", ssmodel.score(X, ytrue))

self-learning log.reg. score 0.7912457912457912


In [118]:
ssmodel = CPLELearningModel(basemodel, verbose=2)
ssmodel.fit(X, ys)
print("CPLE semi-supevised log.reg. score", ssmodel.score(X, ytrue))

200 16.729070395850236 11.900967018887636 1.5601950233013433 0.053 True
400 11.00189844671482 12.27011901260958 -0.9250046350379044 0.252 False
600 16.569745915889758 11.751856281960618 0.07627891252135299 0.929 False
800 14.983910424420595 12.538101681982004 0.2467096489438827 0.759 False
1000 16.756779001060757 11.478232649010442 -0.44352213616023484 0.613 False
1200 16.749851849758127 12.213713400222879 -0.14929121353569563 0.867 False
1400 16.562818764587128 12.634333317882378 0.6790332566670365 0.406 False
1600 16.071686006148294 12.508382352362128 0.9012125291777835 0.286 False
1800 12.392096574400572 13.015612339906392 -0.620910611114855 0.396 False
2000 16.583600218495018 12.75894476155293 -0.006108754162227115 0.994 False
 converged.
number of non-one soft labels:  277 , balance: 4  /  277
current likelihood:  16.853682263830954
CPLE semi-supevised log.reg. score 0.6767676767676768


In [119]:
# CPLE does not offer improvement over supervised on this dataset over this amount of iterations!! 
# (not to mention it's doesn't give t-verified improvements most of the time)