<a href="https://colab.research.google.com/github/Ogunfool/Multi-class-logistic-regression-with-Python./blob/main/MultiClass_LogisticRegression_fromScratch_Modified.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# Import Necessary Libraries
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from scipy.special import softmax
from sklearn.preprocessing import OneHotEncoder, StandardScaler
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split, cross_val_score, KFold, GridSearchCV
from scipy.special import expit, logit
from matplotlib.animation import FuncAnimation
from keras.datasets import mnist

Before we begin manipulating our data, let's create a checkpoint.

1.   Checkpoints are places throughout our code where we store a copy of our dataset to avoid loosing a lot of progress incase we overwrite some variables mistakenly.
2.   It is an extremely reliable practice when we need to clean, preprocess and process many parts of a dataset.



In [5]:
# npz is a combination of two or more npy files
def checkpoints_z(filename, data_0, data_1):
  np.savez(filename, data_0, data_1)
  checkpoint_variable = np.load(filename + '.npz') #Load so that we always have an on-hand version of the checkpoint
  return(checkpoint_variable)

In [2]:
# I didn't process my data with the header - so a simple npy file works for me
def checkpoints(filename, checkpoint_data):
  np.save(filename, checkpoint_data)
  checkpoint_variable = np.load(filename + '.npy') #Load so that we always have an on-hand version of the checkpoint
  return(checkpoint_variable)

Load datasets (Keras and Sklearn datasets) and preprocess.


1.  Add bias feature set
2.  Train, Val , Test split (Set shuffle to true)
1.  Feature scaling using data standardization (Substract mean, then divide by standard deviation), features will have a mean of 0 and standard deviation of 1. Normalization is dividing a vector by its lenght and it transforms the data to a range between 0 and 1.
2.  One-hot encoding for categorical data y_train.





Keras MNIST Dataset Preprocessing.

In [3]:
#loading
(train_X, train_y), (test_X, test_y) = mnist.load_data()

Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz


In [7]:
# Log test and train set
checkpoint_train_set = checkpoints_z('train_set', train_X, train_y)
checkpoint_test_set = checkpoints_z('test_set', test_X, test_y)

In [8]:
def reshape_m(X):
  return X.reshape(X.shape[0], np.prod(X.shape[1:]))

In [9]:
X_train = reshape_m(train_X)
X_test = reshape_m(test_X)

In [10]:
X_train.shape, X_test.shape

((60000, 784), (10000, 784))

In [11]:
# Add Bias feature column to dataset
X_train_bias = np.ones((60000,1))
X_test_bias = np.ones((10000,1))

In [12]:
X_train_bias.shape, X_test_bias.shape

((60000, 1), (10000, 1))

In [13]:
X_train_bias.shape

(60000, 1)

In [14]:
# Concatenate with original dataset
X_train = np.concatenate((X_train_bias, X_train), axis =1)
X_test = np.concatenate((X_test_bias, X_test), axis =1)

In [15]:
X_train.shape, X_test.shape

((60000, 785), (10000, 785))

In [16]:
# Since X,y data are already seperated - Let's split Train-Val-Test
# Set shuffle to true to shuffle before split
X_train, X_val, y_train, y_val = train_test_split(X_train, train_y, test_size = 0.1, random_state = 42, shuffle=True, stratify=train_y)

In [17]:
y_test = test_y

In [18]:
X_train.shape, X_test.shape, X_val.shape

((54000, 785), (10000, 785), (6000, 785))

In [19]:
y_train.shape, y_test.shape, y_val.shape

((54000,), (10000,), (6000,))

In [20]:
# Let's normalize data - StandardScaler
# Fit t0 training set and use mean and std to transform X_val and X_test
scaler = StandardScaler().fit(X_train)
X_train_scaled = scaler.transform(X_train)
X_val_scaled = scaler.transform(X_val)
X_test_scaled = scaler.transform(X_test)

In [21]:
# Now let's use one-hot encoding for categarical data - y
y_df = pd.DataFrame(y_train)
y_df.head()
y_train_df = pd.get_dummies(y_df[0])
y_train = y_train_df.to_numpy()

In [22]:
y_train.shape

(54000, 10)

Scikit learn IRIS dataset preprocessing.
Note: The sklearn dataset is already in an array format in a dictionary, so you can easily trun it into a pandas data or numpy array by calling the np.array() or pd.Dataframe() functions on it.

In [23]:
# Sklearn dataset
iris_data = load_iris()
iris_data.data.shape

(150, 4)

In [None]:
# Loading data in different ways
# Pandas
df = pd.DataFrame(data=iris_data.data, 
                  columns=iris_data.feature_names)
df.head()
# You can convert the pandas dataframe to numpy array
arr = df.to_numpy()

In [None]:
# Loading directly
data_load = load_iris()
X = data_load.data
y = data_load.target
list(data_load.target_names)

In [None]:
# Since X,y data are already seperated - Let's split Train-Val-Test
# Set shuffle to true to shuffle before split
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size = 0.3, random_state = 2, shuffle=True, stratify=y)

In [None]:
# Let's normalize data - StandardScaler
# Fit t0 training set and use mean and std to transform X_train and X_test
scaler = StandardScaler().fit(X_train)
X_train_scaled = scaler.transform(X_train)
X_test_scaled = scaler.transform(X_test)

In [None]:
# Now let's use one-hot encoding for categarical data - y
y_df = pd.DataFrame(y_train)
y_df.head()
y_train_df = pd.get_dummies(y_df[0])
y_train = y_train_df.to_numpy()

MultiVariantLogisticRegression Class

In [24]:
class MultivaraiantLogisticRegression:
  def __init__ (self, X, y, ALPHA, LAMBDA, n_iter):
    self.X = X
    self.y = y
    self.ALPHA = ALPHA
    self.LAMBDA = LAMBDA
    self.n_iter = n_iter
    # self.m = X.shape[0]

    # Initialize w
    self.w = np.zeros(X.shape[1])

  def predict(self, X):
    return X @ self.w

  def sigmoid(self,z):
    return expit(z)

  def hypothesis(self,X):
    return self.sigmoid(self.predict(X))

  def cost_function(self,X,y):
    h = self.hypothesis(X)
    unreg_term = (1/X.shape[0]) * np.sum(-y * np.log(h, where=h>0) - (1-y) * np.log(1-h, where=(1-h) > 0))
    reg_term = (self.LAMBDA/(2*X.shape[0])) * (sum(self.w*self.w))
    return unreg_term + reg_term

  def grad(self,X,y):
    unreg_term = (1/X.shape[0]) * (X.transpose() @ (self.hypothesis(X) - y))
    reg_term = self.LAMBDA/X.shape[0] * self.w[1:]
    reg_term_array = np.concatenate((np.zeros(1),reg_term))
    return unreg_term + reg_term_array

  def update(self,X,y):
    self.w = self.w - (self.ALPHA * self.grad(X,y))

  def accuracy(self, pred_y, y):
    # prediction_prob = self.hypothesis()
    # decision = prediction_prob.round()
    return (sum(pred_y == y))/y.shape[0], (sum(pred_y != y))/y.shape[0]

  def prediction(self, W, X):
    return self.sigmoid(np.matmul(W,X.transpose()))


In [25]:
# Define some helper functions 
# Convergence Check as a function
def convergence_check(cost_train_per_iteration):
  changes_list = []
  k = len(cost_train_per_iteration)
  old_v = 0
  for f in range(k-1,k-6,-1):
    # print(f)
    cur_v = cost_train_per_iteration[f]
    new_change = abs(cur_v - old_v)
    old_v = cur_v
    changes_list.append(new_change)
  if max(changes_list[1:]) <= SMALL_ENOUGH:
    answer = True
  else:
    answer = False
  return answer

def early_stop(cost_train_per_iteration, cost_val_per_iteration):
  # if last 5 training_cost is 
  train_val_cost_diff = np.array(cost_train_per_iteration[-3:]) - np.array(cost_val_per_iteration[-3:])
  if (train_val_cost_diff[0] < train_val_cost_diff[1]) & (train_val_cost_diff[1] < train_val_cost_diff[2]):
    print('Early stopping at iteration:', i)
    answer = True
  else:
    answer = False
  return answer

# Let's create a plot function
def animate(cost_train_per_iteration, cost_val_per_iteration):
  plt.cla()
  plt.plot(cost_train_per_iteration, label='Training cost')
  plt.plot(cost_val_per_iteration, label='Validation cost')
  plt.legend(loc='upper left')
  plt.tight_layout()

TRAINING ALL CLASSIFIERS. Note a few things

1.   Use cost function as descent function (used to track convergence and for model diagnosis by plotting descent function to know if model is learning) but be careful of the math log error.
2.  First ensure the model is learning (i.e first tune learning rate (alpha) hyperparameter) - Once we are sure that algorithm(s) are correct, and learning is happening, then other issues like inderfitting then overfitting can be treated.
3. Is it learning? Let's plot descent function every 50 iterations




In [27]:
# Main Loop - But training with different y(s) and classifier objects
# Instantiate the MultivaraiantLogisticRegression class object
# Create classifier class objects (Track current classifier)
SMALL_ENOUGH = 1e-3
prediction_prob_per_classifier = []

classifiers = [MultivaraiantLogisticRegression(X_train_scaled, y_train[:,ys], 0.1, 0, 5000) for ys in range(y_train.shape[1])]
for index, classifier in enumerate(classifiers):
  # Create necessary lists 
  cost_train_per_iteration = []
  cost_val_per_iteration = []
  # Track no of iterations
  i = 0

  # Let's define stopping criteria(s)
  not_converged = True
  while not_converged:
    # Track cost on train and validation set per epoch
    # Plot learning curves
    CF_train = classifier.cost_function(classifier.X, classifier.y)
    CF_val = classifier.cost_function(X_val_scaled, y_val)

    # Append lists
    cost_train_per_iteration.append(CF_train)
    cost_val_per_iteration.append(CF_val)

    # Update weight(s)
    classifier.update(classifier.X, classifier.y)

    # print(i)
    # Check for convergence or perform early stopping
    # Convergence check
    if (i > 10):
      if (convergence_check(cost_train_per_iteration)) == True:
        print('Classifier', index,'converged at iteration:', i)
        not_converged = False
        break
      

    # # Early Stopping
    # if (i > 10):
    #   if (early_stop(cost_train_per_iteration, cost_val_per_iteration)) == True:
    #     print('Early stop at:', i)
    #     not_converged = False
    #     break
      

    # Increment i
    i+=1

  # Let's track misclassification error and/ accuracy as well : EVALUATION METRIC
  prediction_prob_per_classifier.append(classifier.hypothesis(X_val_scaled))


# Choose the most confident class
classifier_array = np.array(prediction_prob_per_classifier)
classifier_array = classifier_array.transpose()
target_class = np.argmax(classifier_array, axis=1)

# Check accuracy and misclassification error
classifier.accuracy(target_class, y_val)

Classifier 0 converged at iteration: 77
Classifier 1 converged at iteration: 94
Classifier 2 converged at iteration: 1030
Classifier 3 converged at iteration: 71
Classifier 4 converged at iteration: 146
Classifier 5 converged at iteration: 75
Classifier 6 converged at iteration: 46
Classifier 7 converged at iteration: 1152
Classifier 8 converged at iteration: 117
Classifier 9 converged at iteration: 125


(0.8131666666666667, 0.18683333333333332)

Tuning alpha

In [None]:
# Main Loop - But training with different y(s) and classifier objects
# Instantiate the MultivaraiantLogisticRegression class object
# Create classifier class objects (Track current classifier)
SMALL_ENOUGH = 1e-3
prediction_prob_per_classifier = []
accuracies_list = []
alpha_list = [0.001, 0.003, 0.01, 0.03, 0.1, 0.3]

for alpha in alpha_list:
  classifiers = [MultivaraiantLogisticRegression(X_train_scaled, y_train[:,ys], alpha, 0, 5000) for ys in range(y_train.shape[1])]
  for index, classifier in enumerate(classifiers):
    # Create necessary lists 
    cost_train_per_iteration = []
    cost_val_per_iteration = []
    # Track no of iterations
    i = 0

    # Let's define stopping criteria(s)
    not_converged = True
    while not_converged:
      # Track cost on train and validation set per epoch
      # Plot learning curves
      CF_train = classifier.cost_function(classifier.X, classifier.y)
      CF_val = classifier.cost_function(X_val_scaled, y_val)

      # Append lists
      cost_train_per_iteration.append(CF_train)
      cost_val_per_iteration.append(CF_val)

      # if (i+1)%10 == 0:
      #   # animate(cost_train_per_iteration, cost_val_per_iteration)
      #   ani = FuncAnimation(plt.gcf(), animate(cost_train_per_iteration,cost_val_per_iteration), interval=1000)
      #   plt.tight_layout()
      #   plt.show()

      # if (i+1)%50 == 0:
      #   plt.close()
      #   plt.plot(cost_train_per_iteration, label='Training cost')
      #   plt.legend(loc='upper left')
      #   plt.show()
      #   print('classfier:',index)

      # Update weight(s)
      classifier.update(classifier.X, classifier.y)

      # print(i)
      # Check for convergence or perform early stopping
      # Convergence check
      if (i > 10):
        if (convergence_check(cost_train_per_iteration)) == True:
          print('Classifier', index,'converged at iteration:', i)
          not_converged = False
          break

      # Increment i
      i+=1

    # Let's track misclassification error and/ accuracy as well : EVALUATION METRIC
    prediction_prob_per_classifier.append(classifier.hypothesis(X_val_scaled))


  # Choose the most confident class
  classifier_array = np.array(prediction_prob_per_classifier)
  classifier_array = classifier_array.transpose()
  target_class = np.argmax(classifier_array, axis=1)

  # Check accuracy and misclassification error
  cla = classifier.accuracy(target_class, y_val)

  accuracies_list.append(cla)

Let's make predictions another way: Less computationally expensive and faster. Gather the weights from each classifier after training and use matrix multiplication.

In [28]:
weights = np.zeros((y_train.shape[1],X_train.shape[1]))
for index, classifier in enumerate (classifiers):
  weights[index] = classifier.w

pred_train = classifier.prediction(weights, X_train_scaled)
# # Make predictions (Train and Val set)
trainset_predictions = np.argmax(classifier.prediction(weights, X_train_scaled), axis = 0)
valset_predictions = np.argmax(classifier.prediction(weights, X_val_scaled), axis = 0)

# Check accuracy
print(classifier.accuracy(trainset_predictions, (y_df.to_numpy()).squeeze()))
print(classifier.accuracy(valset_predictions, y_val))

(0.8204074074074074, 0.17959259259259258)
(0.8131666666666667, 0.18683333333333332)


Hyperparameter tuning - Grid search vs Interval/random search


1.   Grid search



In [None]:
from math import inf
# GRID SEARCH
# Create a dictionary of hyperparameters, loop through dico, train and choose best
params_dict = {}
alpha_list = [0.001, 0.003, 0.01, 0.03, 0.1, 0.3]
gamma_list = [0.001, 0.003, 0.01, 0.03, 0.1, 0.3]
best_so_far_accuracy = -inf

params = []
# List of tuple
for i in alpha_list:
  for j in gamma_list:
    params.append((i,j))


# Dictionary 
for key, val in enumerate(params):
  params_dict[key] = val

for ind in range(len(params_dict)):
  SMALL_ENOUGH = 1e-3
  weights = np.zeros((y_train.shape[1],X_train.shape[1]))

  classifiers = [MultivaraiantLogisticRegression(X_train_scaled, y_train[:,ys], params_dict[ind][0], params_dict[ind][1], 5000) for ys in range(y_train.shape[1])]
  for index, classifier in enumerate(classifiers):
    # Create necessary lists 
    cost_train_per_iteration = []
    cost_val_per_iteration = []
    # Track no of iterations
    i = 0

    # Let's define stopping criteria(s)
    not_converged = True
    while not_converged:
      # Track cost on train and validation set per epoch
      # Plot learning curves
      CF_train = classifier.cost_function(classifier.X, classifier.y)
      CF_val = classifier.cost_function(X_val_scaled, y_val)

      # Append lists
      cost_train_per_iteration.append(CF_train)
      cost_val_per_iteration.append(CF_val)

      # Update weight(s)
      classifier.update(classifier.X, classifier.y)

      # Convergence check
      if (i > 10):
        if (convergence_check(cost_train_per_iteration)) == True:
          print('Classifier', index,'converged at iteration:', i)
          not_converged = False
          break

      # Increment i
      i+=1
      # if i > 1:
      #   not_converged = False
      #   break

  for inx, classifier in enumerate (classifiers):
    weights[inx] = classifier.w

  # Make predictions on val set
  valset_predictions = np.argmax(classifier.prediction(weights, X_val_scaled), axis = 0)
  # Accuracy of current model
  accuracy = classifier.accuracy(valset_predictions, y_val)

  if accuracy[0] > best_so_far_accuracy:
    best_so_far_accuracy = accuracy[0]
    best_hyperparameter = params_dict[ind]

In [None]:
best_hyperparameter

(0.003, 0.001)

In [None]:
best_so_far_accuracy

0.8311666666666667

Conclusion: With this OOP-python-based multi-variate mlogistic regression model, we got an accuracy of 83% on the MNIST validation dataset.