# Import

In [1]:
import os
import math
import time
import struct
import numpy as np
import matplotlib.pyplot as pyplot
from sklearn.decomposition import PCA
from sklearn.metrics import accuracy_score
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import GridSearchCV
from sklearn.preprocessing import PolynomialFeatures
from sklearn.model_selection import train_test_split
from sklearn.base import BaseEstimator, ClassifierMixin
# from mlxtend.data import loadlocal_mnist

# Binary Classifier

In [2]:
class BinaryClassifier(BaseEstimator, ClassifierMixin):
    def __init__(self, batch_size=16, max_iter=100, learning_rate=0.01, random_state=1, C=100):
        self.batch_size = batch_size
        self.max_iter = max_iter
        self.learning_rate = learning_rate
        self.random_state = random_state
        self.C = C
        self.rgen = np.random.RandomState(self.random_state)
        
    def fit(self, X, y):
        # Exception Handling
        if self.C < 0:
            raise ValueError("The C value of %r must be positive" % self.C)
        if ((self.learning_rate < 0) or (self.learning_rate > 1)):
            raise ValueError("The learning_rate value of %r is invalid." % self.learning_rate,
                             "Set the learning_rate value between 0.0 and 1.0.")        
            
        # Set the number of batches : total number of data / batch size
        n_batches = math.ceil(len(X) / self.batch_size)
        # Process the total number of data is not divided into batch size
        n_rest = X.shape[0] - (n_batches-1) * self.batch_size
        
        # Initialize weight and bias
        self.w_ = self.rgen.normal(loc=0.0, scale=0.01, size=X.shape[1])
        self.b_ = 0.
        
        for epoch in range(self.max_iter):
            # Shuffle the data on every epoch
            X, y = self.shuffle(X, y)
            for j in range(n_batches - 1):
                self.update(X, y, self.batch_size, j)
            self.update(X, y, n_rest, j)
            
        return self
    
    def predict(self, X):
        return np.where(self.hypothesis(X) >= 1, 1, -1)
    
    def hypothesis(self, X):
        return np.dot(X, self.w_) + self.b_
    
    def shuffle(self, X, y):
        shuffle_index = np.arange(X.shape[0])
        np.random.shuffle(shuffle_index)
        return X[shuffle_index], y[shuffle_index]

    def update(self, X, y, minibatch_size, n_batch):
        X_mini = X[n_batch*minibatch_size : (n_batch+1)*minibatch_size]
        y_mini = y[n_batch*minibatch_size : (n_batch+1)*minibatch_size]
        w_sum = np.zeros(X.shape[1])
        b_sum = 0

        for i in range(minibatch_size):
            if (y_mini[i] * self.hypothesis(X_mini[i]) < 1):
                w_sum -= y_mini[i]*X_mini[i]
                b_sum -= y_mini[i]
        self.w_ -= self.learning_rate * ((1 / minibatch_size) * w_sum + (1/self.C)*self.w_)
        self.b_ -= self.learning_rate * ((1 / minibatch_size) * b_sum)

    # def gradientFullBatchUpdate(self, X, y, batch_size, n_batch):
    #     X_batch = X[n_batch*minibatch_size : (n_batch+1)*minibatch_size]
    #     y_batch = y[n_batch*minibatch_size : (n_batch+1)*minibatch_size]
    #     grad_w = np.zeros(X.shape[1])
    #     grad_b = 0
    #     mask = np.less_equal(np.multiply(y_batch, np.dot(X_batch, w))+b, 1)
        
    #     Xy = np.multiply(X_batch.T, y_batch)
    #     masked_Xy = np.multiply(Xy, mask)
    #     grad_w = np.sum(-masked_Xy, 1) / batch_size + self.w_/self.C

    #     masked_y = np.multiply(y_batch, mask)
    #     grad_b = np.sum(-masked_y, 0) / batch_size

    #     return grad_w, grad_b

# Multiclass Classifier

In [3]:
class MulticlassClassifier(BaseEstimator, ClassifierMixin):
    def __init__(self, batch_size=16, max_iter=100, learning_rate=0.01, random_state=1, C=100):
        self.batch_size = batch_size
        self.max_iter = max_iter
        self.learning_rate = learning_rate
        self.random_state = random_state
        self.C = C
        
    def fit(self, X, y):
        self.labels = np.unique(y) # 0 ~ 9
        self.outputs_ = []
        for label in range(len(self.labels)):
            y_binary = np.where(y == label, 1, -1)
            b_c = BinaryClassifier(self.batch_size, self.max_iter, 
                                   self.learning_rate, self.random_state, self.C)
            b_c.fit(X, y_binary)
            self.outputs_.append(b_c)
        return self
        
    def predict(self, X):
        prediction = []
        for o in self.outputs_:
            prediction.append(o.hypothesis(X))
        return self.labels[np.argmax(prediction, axis=0)]

# MNIST Read Function

In [4]:
def read(images, labels):
    with open(labels, 'rb') as lbpath:
        magic, n = struct.unpack('>II', lbpath.read(8))
        labels = np.fromfile(lbpath, dtype=np.uint8)

    with open(images, 'rb') as imgpath:
        magic, num, rows, cols = struct.unpack(">IIII", imgpath.read(16))
        images = np.fromfile(imgpath, dtype=np.uint8).reshape(len(labels), 784)

    return images, labels

def read_no_label(images):
    with open(images, 'rb') as imgpath:
        magic, num, rows, cols = struct.unpack(">IIII", imgpath.read(16))
        images = np.fromfile(imgpath, dtype=np.uint8).reshape(60000, 784)
    return images

In [5]:
X, y = read(os.getcwd() + '/data/newtrain-images-idx3-ubyte', 
            os.getcwd() + '/data/newtrain-labels-idx1-ubyte')
# X_test_no_label = read_no_label(os.getcwd()+'/data/testall-images-idx3-ubyte')

In [6]:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=0)

# Run & Evaluation

In [7]:
MC=MulticlassClassifier(C=1000, learning_rate=0.01, batch_size=256)

In [0]:
start = time.time()
MC.fit(X_train, y_train)

y_pred = MC.predict(X_test)

score = accuracy_score(y_test, y_pred)
print("time :", time.time() - start)
print(score)

time : 54.07146644592285
0.8306666666666667


In [8]:
poly = PolynomialFeatures(degree=2, interaction_only=False, include_bias=False, order='F')

In [9]:
scaler = StandardScaler(copy=True, with_mean=True, with_std=True)
scaler.fit(X_train)
X_train_scaled = scaler.transform(X_train)
X_test_scaled = scaler.transform(X_test)

X_train_scaled = X_train_scaled.reshape(-1, 28*28)
pca = PCA(n_components=0.95)
X_train_scaled_pca = pca.fit_transform(X_train_scaled) 
X_test_scaled_pca = pca.transform(X_test_scaled)

In [0]:
X_train_scaled_pca_poly = poly.fit_transform(X_train_scaled_pca)
X_test_scaled_pca_poly = poly.transform(X_test_scaled_pca)

In [10]:
print(X_train_scaled_pca.shape)
print(X_train_scaled_pca_poly.shape)

(56000, 340)


NameError: name 'X_train_scaled_pca_poly' is not defined

In [0]:
start = time.time()
MC.fit(X_train_scaled_pca_poly, y_train)

y_pred = MC.predict(X_test_scaled_pca_poly)

score = accuracy_score(y_test, y_pred)
print("time :", time.time() - start)
print(score)

In [14]:
start = time.time()

param_grid = [{
    'C' : [0.0001, 0.001, 0.01, 0.1, 1, 10, 100, 1000, 10000],
    'learning_rate' : [0.1, 0.01, 0.001, 0.0001],
    'batch_size' : [4, 8, 16, 32, 64, 128, 256, 512, 1024]
}]

grid_search = GridSearchCV(MulticlassClassifier(),
                           param_grid=param_grid,
                           cv=2, verbose=1,
                           scoring='accuracy',
                           n_jobs=-1)

grid_search.fit(X_train_scaled_pca_poly, y_train)
print('time: ', time.time() - start)
print(grid_search.best_params_)
print(grid_search.best_score_)

Fitting 2 folds for each of 324 candidates, totalling 648 fits


[Parallel(n_jobs=-1)]: Using backend LokyBackend with 4 concurrent workers.


KeyboardInterrupt: ignored