In [1]:
import pandas as pd 
import numpy as np
from sklearn.metrics import accuracy_score

num_train_samples = 100
samples_per_class = int(num_train_samples/10)

x = pd.read_csv('data/mnist_train.csv').sample(frac = 1)
y = x['label']
x.drop(['label'], inplace = True, axis = 1)

x_test = pd.read_csv('data/mnist_test.csv')
y_test = x_test['label']
x_test.drop(['label'], inplace = True, axis = 1)

In [2]:
x_train, x_unlabeled = x[y.values == 0].values[:samples_per_class], x[y.values == 0].values[samples_per_class: ]
y_train, y_unlabeled = y[y.values == 0].values[:samples_per_class], y[y.values == 0].values[samples_per_class: ]


for i in range(1,10):
    x_train = np.concatenate([x_train, x[y.values == i].values[:samples_per_class]], axis = 0)
    y_train = np.concatenate([y_train, y[y.values == i].values[:samples_per_class]], axis = 0)
    
    x_unlabeled = np.concatenate([x_unlabeled, x[y.values == i].values[samples_per_class: ]], axis = 0)
    y_unlabeled = np.concatenate([y_unlabeled, y[y.values == i].values[samples_per_class: ]], axis = 0)

Shuffle the data

In [3]:
p = np.random.permutation(x_train.shape[0])
x_train, y_train = x_train[p], y_train[p]

p = np.random.permutation(x_unlabeled.shape[0])
x_unlabeled, y_unlabeled = x_unlabeled[p], y_unlabeled[p]

## Feature Engineering

In [4]:
from sklearn.preprocessing import Normalizer
scaler = Normalizer()
x_train = scaler.fit_transform(x_train)
x_test = scaler.transform(x_test)
x_unlabeled = scaler.transform(x_unlabeled)



In [5]:
from sklearn.decomposition import PCA

pca = PCA(n_components = 50)
x_train_pca = pca.fit_transform(x_train)
x_test_pca = pca.transform(x_test)
x_unlabeled_pca = pca.transform(x_unlabeled)



In [6]:
from sklearn.preprocessing import PolynomialFeatures

poly = PolynomialFeatures()
x_train_poly = poly.fit_transform(x_train_pca)
x_test_poly = poly.transform(x_test_pca)
x_unlabeled_poly = poly.transform(x_unlabeled_pca)



## Effect of Increasing Data

## Baseline Estimates

In [7]:
from sklearn.linear_model import SGDClassifier
from sklearn.metrics import accuracy_score


test_acc = []
for _ in range(10):
    log_reg  = SGDClassifier(loss = 'log', n_jobs = -1, alpha = 1e-5)
    log_reg.fit(x_train_poly, y_train)
    y_test_pred = log_reg.predict(x_test_poly)
    test_acc.append(accuracy_score(y_test_pred, y_test))
    
    
print('Test Accuracy: {:.2f}%'.format(np.array(test_acc).mean()*100))

Test Accuracy: 73.01%


## Semi-Supervised Training

In [8]:
# Concept similar to : https://www.analyticsvidhya.com/blog/2017/09/pseudo-labelling-semi-supervised-learning-technique/

class pseudo_labeling():

    
    def __init__(self, model, unlabelled_data, x_test_poly,y_test,
                 sample_rate=0.01, upper_threshold = 0.6, lower_threshold = 0.4, verbose = False):
        
        self.x_test=x_test_poly
        self.y_test=y_test
       
        self.sample_rate = sample_rate
        self.model = model
        self.unlabelled_data = unlabelled_data
        self.verbose = verbose
        self.upper_threshold = upper_threshold
        self.lower_threshold = lower_threshold
        
        # create a list of all the indices 
        self.unlabelled_indices = list(range(unlabelled_data.shape[0]))      
        
        # Number of rows to sample in each iteration
        self.sample_size = int(unlabelled_data.shape[0] * self.sample_rate)
        
        # Shuffle the indices
        np.random.shuffle(self.unlabelled_indices)

    
    def __pop_rows(self):
        """
        Function to sample indices without replacement
        """
        chosen_rows = self.unlabelled_indices[:self.sample_size]
        
        # Remove the chosen rows from the list of indicies (We are sampling w/o replacement)
        self.unlabelled_indices = self.unlabelled_indices[self.sample_size:]
        return chosen_rows
    
    
    def fit(self, X, y):
        
        """
        Perform pseudo labelling
        
        X: train features
        y: train targets
        
        """
        
        num_iters = int(len(self.unlabelled_indices)/self.sample_size)

        for _ in (tqdm(range(num_iters)) if self.verbose else range(num_iters)):
            
            # Get the samples
            chosen_rows = self.__pop_rows()

            # Fit to data
            self.model.fit(X, y.ravel())
            
            chosen_unlabelled_rows = self.unlabelled_data[chosen_rows,:]
            pseudo_labels_prob = self.model.predict_proba(chosen_unlabelled_rows)
            
            
            # We have 10 classes this means `predict_proba` returns an array of 10 probabilities per datapoint
            # We will first find the maximum probability and then find the rows which are within our threshold values
            label_probability = np.max(pseudo_labels_prob, axis = 1)
            labels_within_threshold = np.where((label_probability < self.lower_threshold) | (label_probability > self.upper_threshold))[0]
            
            
           # Use argmax to find the class with the highest probability
            pseudo_labels = np.argmax(pseudo_labels_prob[labels_within_threshold], axis = 1)
            chosen_unlabelled_rows = chosen_unlabelled_rows[labels_within_threshold]

            # Combine data
            X = np.vstack((chosen_unlabelled_rows, X))
            y = np.vstack((pseudo_labels.reshape(-1,1), np.array(y).reshape(-1,1)))

            # Shuffle 
            indices = list(range(X.shape[0]))
            np.random.shuffle(indices)

            X = X[indices]
            y = y[indices]     
            
            y_test_pred = pseudo_labeller.predict(self.x_test)
            print('Test Accuracy: {:.2f}%'.format(accuracy_score(y_test_pred, self.y_test)*100))
        
    def predict(self, X):
        return self.model.predict(X)
    
    def predict_proba(self, X):
        return self.model.predict_proba(X)
    
     
    def decision_function(self, X):
        return self.model.decision_function(X)

    

In [9]:
from sklearn.linear_model import SGDClassifier 
from tqdm.notebook import tqdm
from xgboost import XGBClassifier

param = {}
param['booster'] = 'gbtree'
param['objective'] = 'binary:logistic'
#     param["eval_metric"] = "error"
#     param['eta'] = 0.5
#     param['gamma'] = 0.2
#     param['max_depth'] = 3
#     param['min_child_weight']=1
#     param['max_delta_step'] = 0
#     param['subsample']= 0.5
#     param['colsample_bytree']=1
#     param['verbosity'] = 0
#     param['seed'] = 0
#     param['base_score'] = 0.5


# create XGBoost instance with default hyper-parameters
#xgb = XGBClassifier(objective='binary:logistic',verbosity = 0)
xgb = XGBClassifier(**param,use_label_encoder=False)


log_reg = SGDClassifier(loss = 'log', n_jobs = -1, alpha = 1e-5)


pseudo_labeller = pseudo_labeling(
        log_reg,
        x_unlabeled_poly,
        x_test_poly,
        y_test, # for evaluation purpose
        sample_rate = 0.04,
        verbose = True
    )

In [10]:
pseudo_labeller.fit(x_train_poly, y_train)

  0%|          | 0/25 [00:00<?, ?it/s]

Test Accuracy: 76.80%
Test Accuracy: 77.86%
Test Accuracy: 78.58%
Test Accuracy: 78.94%
Test Accuracy: 79.07%
Test Accuracy: 79.17%
Test Accuracy: 79.99%
Test Accuracy: 79.54%
Test Accuracy: 80.53%
Test Accuracy: 80.38%
Test Accuracy: 80.28%
Test Accuracy: 80.59%
Test Accuracy: 80.88%
Test Accuracy: 80.93%
Test Accuracy: 81.05%
Test Accuracy: 81.16%
Test Accuracy: 81.22%
Test Accuracy: 81.02%
Test Accuracy: 81.15%
Test Accuracy: 81.26%
Test Accuracy: 81.17%
Test Accuracy: 81.43%
Test Accuracy: 81.29%
Test Accuracy: 81.47%
Test Accuracy: 81.52%


In [11]:
y_test_pred = pseudo_labeller.predict(x_test_poly)
print('Test Accuracy: {:.2f}%'.format(accuracy_score(y_test_pred, y_test)*100))

Test Accuracy: 81.52%
