<a href="https://colab.research.google.com/github/msc-acse/acse-8-miniproject-sigmoid/blob/zmr/KMNIST.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [0]:
from google.colab import drive
drive.mount("/content/gdrive/")
!ls "/content/gdrive/My Drive/Sigmoid"

Drive already mounted at /content/gdrive/; to attempt to forcibly remount, call drive.mount("/content/gdrive/", force_remount=True).
kmnist_classmap.csv  kmnist-test-imgs.npy   kmnist-train-labels.npy
KMNIST.ipynb	     kmnist-train-imgs.npy


In [0]:
%pylab inline
import pandas as pd

Populating the interactive namespace from numpy and matplotlib


In [0]:
path="/content/gdrive/My Drive/Sigmoid/"
train_data = np.load(path+"kmnist-train-imgs.npy")
test_data = np.load(path+"kmnist-test-imgs.npy")
train_labels = np.load(path+"kmnist-train-labels.npy")
classmap = pd.read_csv(path+"kmnist_classmap.csv")

In [0]:
classmap.head()

Unnamed: 0,index,codepoint,char
0,0,U+304A,お
1,1,U+304D,き
2,2,U+3059,す
3,3,U+3064,つ
4,4,U+306A,な


In [0]:
# Basic EDA

print("Train Data:", train_data.shape)
print("Test Data:", test_data.shape)
print("Train Labels:", train_labels.shape)

Train Data: (60000, 28, 28)
Test Data: (10000, 28, 28)
Train Labels: (60000,)


In [0]:
class SupervisedLearning:
    def __init__(self, X, y, model, optimiser, loss_function, batch_size, test_batch_size,
                 normalise=True,
                 device="cpu", 
                 confusion_matrix=True, 
                 train_transform=None,
                 val_transform=None, 
                 seed=42, n_epochs=30):
      
      self.device = device
      
      self.X = X
      self.y = y
      
      self.model = model.to(self.device)
      self.optimiser = optimiser
      self.loss_function = loss_function
      
      self.X_train = None
      self.X_val = None
      
      self.y_train = None
      self.y_val = None
      
      self.train_transform = train_transform
      self.val_transform = val_transform
      
      assert(batch_size > 0 and batch_size < int(0.1 * X.size()[0]))
      self.batch_size = batch_size
      assert(test_batch_size > 0 and test_batch_size < int(0.1 * X.size()[0]))
      self.test_batch_size = test_batch_size
      self.n_epochs = n_epochs
      self.seed = seed

      self.trained_full=False
      
    def split_data(self):
      sss = StratifiedShuffleSplit(n_splits=1, test_size=0.2, random_state=0)
      sss.get_n_splits(self.X, self.y)

      for train_index, val_index in sss.split(self.X, self.y):
        self.X_train, self.X_val = self.X[train_index], self.X[val_index]
        self.y_train, self.y_val = self.y[train_index], self.y[val_index]
        
      return None
      
      
      
    def train(self, train_data_loader):
      self.model.train()                     # set model to train mode
      
      train_loss, train_accuracy = 0., 0.
      
      for Xtr, ytr in train_data_loader:# X and y are data inside a batch specified
                                        # at train_data_loader
        Xtr = Xtr.to(self.device)
        ytr = ytr.to(self.device)
        
        optimiser.zero_grad()           # reset gradients
        zn = model(Xtr)                 # perform forward pass
        
        loss = loss_function(zn, ytr)   # compute loss value over batch
        loss.backward()                 # perform backward pass
        train_loss += (loss * Xtr.size()[0]).detach().cpu().numpy()
      
        y_pred = F.log_softmax(zn, dim=1).max(1)[1]
        train_accuracy += accuracy_score(ytr.cpu().numpy(), y_pred.detach().cpu().numpy())*Xtr.size()[0]
        
        optimiser.step()               # optimisation step
        
      return train_loss/len(train_data_loader.dataset), train_accuracy/len(train_data_loader.dataset)
    
    
    
    def validate(self, val_data_loader):
      self.model.eval()                     # set model to evaluation mode
      
      validation_loss, validation_accuracy = 0., 0.
      
      for Xv, yv in val_data_loader:
        with torch.no_grad():
          
          Xv, yv = Xv.to(self.device), yv.to(self.device)
          
          zn = model(Xv)
          loss = loss_function(zn, yv)
          validation_loss += (loss * Xv.size(0)).detach().cpu().numpy()
          
          y_pred = F.log_softmax(zn, dim=1).max(1)[1]
          validation_accuracy += accuracy_score(yv.cpu().numpy(), y_pred.detach().cpu().numpy())*Xv.size(0)

            
      return validation_loss/len(val_data_loader.dataset), validation_accuracy/len(val_data_loader.dataset)
        
      
    
    def train_model(self, train_full=False, plot_loss=True):
      # set seed
      set_seed(int(self.seed))
      
      
      if train_full: # train with full data (train + validation)
        # create dataloaders
        train_dataset = CustomTensorDataset(self.X, self.y, transform=self.train_transform)
        train_data_loader = DataLoader(train_dataset, batch_size=self.batch_size, shuffle=True)
        
        # train and validate
        if plot_loss: liveloss = PlotLosses()
        for epoch in range(self.n_epochs):
            logs = {}
            train_loss, train_accuracy = self.train(train_data_loader)

            if plot_loss:
              logs['' + 'log loss'] = train_loss.item()
              logs['' + 'accuracy'] = train_accuracy.item()
              logs['val_' + 'log loss'] = train_loss.item()
              logs['val_' + 'accuracy'] = train_accuracy.item()
              liveloss.update(logs)
              liveloss.draw()
              
        self.trained_full=True

      else:
        # split data
        self.split_data()
        
        # create dataloaders
        train_dataset = CustomTensorDataset(self.X_train, self.y_train, transform=self.train_transform)
        train_data_loader = DataLoader(train_dataset, batch_size=self.batch_size, shuffle=True)
        val_dataset = CustomTensorDataset(self.X_val, self.y_val, transform=self.val_transform)
        val_data_loader = DataLoader(val_dataset, batch_size=self.test_batch_size, shuffle=True)
        
        # train and validate
          if plot_loss: liveloss = PlotLosses()
          for epoch in range(self.n_epochs):
              logs = {}
              train_loss, train_accuracy = self.train(train_data_loader)
              val_loss, val_accuracy = self.validate(val_data_loader)

              if plot_loss:
                logs['' + 'log loss'] = train_loss.item()
                logs['' + 'accuracy'] = train_accuracy.item()
                logs['val_' + 'log loss'] = val_loss.item()
                logs['val_' + 'accuracy'] = val_accuracy.item() # liveloss wants it plotted
                liveloss.update(logs)
                liveloss.draw() 

        self.trained_full=False
              
      return None
            
            
    def evaluate_batch(self, X_test, y_test, confusion_matrix=True, test_transform=None):
      """
      This method performs the same as validate but also returns all predictions on a given dataset
      for a trained model.
      """
      self.model.eval()
      
      test_dataset = CustomTensorDataset(X_test, y_test, transform=test_transform)
      test_data_loader = DataLoader(test_dataset, batch_size=self.test_batch_size, shuffle=True)
      
      ys, y_preds = [], []
      
      for X, y in test_data_loader:
          with torch.no_grad():
              X, y = X.to(self.device), y.to(self.device)
              a2 = model(X)
              y_pred = F.log_softmax(a2, dim=1).max(1)[1]
              ys.append(y.cpu().numpy())
              y_preds.append(y_pred.cpu().numpy())
            
      y_preds, ys =  np.concatenate(y_preds, 0),  np.concatenate(ys, 0)
      
      test_loss, test_accuracy = self.validate(test_data_loader)
      
      if confusion_matrix:
        cm = ConfusionMatrix(actual_vector=ys, predict_vector=y_preds)
        print(cm)
        
        
      return y_preds, ys, test_loss, test_accuracy