<a href="https://colab.research.google.com/github/MahmoudAJ2000/Two-Digit-MNIST-Recognizer/blob/main/Two_Digit_MNIST_Recognizer.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
from google.colab import drive
drive.mount('/content/gdrive/', force_remount=True)

Mounted at /content/gdrive/


In [2]:
import torch
import torch.nn as nn
import torch.nn.init as init
import torch.nn.functional as F
import numpy as np


In [3]:
from collections import OrderedDict
class _DenseLayer(nn.Sequential):
    def __init__(self, num_input_features, growth_rate, bn_size, drop_rate):
        super(_DenseLayer, self).__init__()
        self.add_module('norm1', nn.BatchNorm2d(num_input_features)),
        self.add_module('relu1', nn.ReLU(inplace=True)),
        self.add_module('conv1', nn.Conv2d(num_input_features, bn_size *
                        growth_rate, kernel_size=1, stride=1, bias=False)),
        self.add_module('norm2', nn.BatchNorm2d(bn_size * growth_rate)),
        self.add_module('relu2', nn.ReLU(inplace=True)),
        self.add_module('conv2', nn.Conv2d(bn_size * growth_rate, growth_rate,
                        kernel_size=3, stride=1, padding=1, bias=False)),
        self.drop_rate = drop_rate

    def forward(self, x):
        new_features = super(_DenseLayer, self).forward(x)
        if self.drop_rate > 0:
            new_features = F.dropout(new_features, p=self.drop_rate, training=self.training)
        return torch.cat([x, new_features], 1)


class _DenseBlock(nn.Sequential):
    def __init__(self, num_layers, num_input_features, bn_size, growth_rate, drop_rate):
        super(_DenseBlock, self).__init__()
        for i in range(num_layers):
            layer = _DenseLayer(num_input_features + i * growth_rate, growth_rate, bn_size, drop_rate)
            self.add_module('denselayer%d' % (i + 1), layer)


class _Transition(nn.Sequential):
    def __init__(self, num_input_features, num_output_features):
        super(_Transition, self).__init__()
        self.add_module('norm', nn.BatchNorm2d(num_input_features))
        self.add_module('relu', nn.ReLU(inplace=True))
        self.add_module('conv', nn.Conv2d(num_input_features, num_output_features,
                                          kernel_size=1, stride=1, bias=False))
        self.add_module('pool', nn.AvgPool2d(kernel_size=2, stride=2))


class DenseNet(nn.Module):
    r"""Densenet-BC model class, based on
    `"Densely Connected Convolutional Networks" <https://arxiv.org/pdf/1608.06993.pdf>`_

    Args:
        growth_rate (int) - how many filters to add each layer (`k` in paper)
        block_config (list of 4 ints) - how many layers in each pooling block
        num_init_features (int) - the number of filters to learn in the first convolution layer
        bn_size (int) - multiplicative factor for number of bottle neck layers
          (i.e. bn_size * k features in the bottleneck layer)
        drop_rate (float) - dropout rate after each dense layer
        num_classes (int) - number of classification classes
    """
    def __init__(self, growth_rate=32, block_config=(6, 12, 24, 16),
                 num_init_features=64, bn_size=4, drop_rate=0, num_classes=55):

        super(DenseNet, self).__init__()

        # First convolution
        self.features = nn.Sequential(OrderedDict([
            ('conv0', nn.Conv2d(1, num_init_features, kernel_size=7, stride=2, padding=3, bias=False)),
            ('norm0', nn.BatchNorm2d(num_init_features)),
            ('relu0', nn.ReLU(inplace=True)),
            ('pool0', nn.MaxPool2d(kernel_size=3, stride=2,padding=1)),
        ]))

        # Each denseblock
        num_features = num_init_features
        for i, num_layers in enumerate(block_config):
            block = _DenseBlock(num_layers=num_layers, num_input_features=num_features,
                                bn_size=bn_size, growth_rate=growth_rate, drop_rate=drop_rate)
            self.features.add_module('denseblock%d' % (i + 1), block)
            num_features = num_features + num_layers * growth_rate
            if i != len(block_config) - 1:
                trans = _Transition(num_input_features=num_features, num_output_features=num_features // 2)
                self.features.add_module('transition%d' % (i + 1), trans)
                num_features = num_features // 2

        # Final batch norm
        self.features.add_module('norm5', nn.BatchNorm2d(num_features))

        # Linear layer
        self.classifier = nn.Linear(2208, num_classes)
        self.classifier2 = nn.Linear(2208, 37)
        self.classifier3 = nn.Linear(2208, 37)
        self.classifier4 = nn.Linear(2208, 37)
        self.classifier5 = nn.Linear(2208, 37)

    def forward(self, x):
        features = self.features(x)
        out = F.relu(features, inplace=True)
        out = F.avg_pool2d(out, kernel_size=2).view(features.size(0), -1)
        out1 = self.classifier(out)
        out2 = self.classifier2(out)
        out3 = self.classifier3(out)
        out4 = self.classifier4(out)
        out5 = self.classifier5(out)
        return out1,out2,out3,out4,out5

In [4]:


device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
print(device)
def classify_and_detect(images):
    """

    :param np.ndarray images: N x 4096 array containing N 64x64 images flattened into vectors
    :return: np.ndarray, np.ndarray
    """

    N = images.shape[0]

    model = DenseNet(num_init_features=96, growth_rate=48, block_config=(6, 12, 36, 24)).to(device)    
    checkpoint = torch.load('/content/gdrive/My Drive/Colab Notebooks/checkpoints/ckptBestTestC+l_10step.pt')
    model.load_state_dict(checkpoint)
    #model = train_model().to(device)
    pred_class,pred_bboxes = evaluate(images, model)

    return pred_class, pred_bboxes

config = {
    'valid_num':5000,
    'train_num':55000,
    'batch_size': 125 , 
    'num_classes': 55, 
    'epochs': 1000,
    'lr':0.1,
    'step_size':10,
    'weight_decay':0.0005,
    'momentum':0.9,
    'gamma':0.1
}


def load_training_data():
  prefix = "train"
  directory = "/content/gdrive/My Drive/Colab Notebooks/MNISTDD_train_valid/"
  train_images = np.load(directory+ prefix + "_X.npy")
  train_labels = np.load(directory+ prefix + "_Y.npy")
  train_bboxes = np.load(directory+ prefix + "_bboxes.npy")
  prefix = "valid"
  valid_images = np.load(directory+ prefix + "_X.npy")
  valid_labels = np.load(directory+ prefix + "_Y.npy")
  valid_bboxes = np.load(directory+ prefix + "_bboxes.npy")
  return train_images, train_labels, train_bboxes, valid_images,valid_labels,valid_bboxes




def train_model():
    print("Training...")

    model = DenseNet(num_init_features=96, growth_rate=48, block_config=(6, 12, 36, 24)).to(device)
    train_images, train_labels, train_bboxes, valid_images,valid_labels,valid_bboxes = load_training_data()
    criterion = nn.BCEWithLogitsLoss()
    params = model.parameters()
    optimizer = torch.optim.SGD(model.parameters(),config['lr'],
                                momentum=config['momentum'],
                                nesterov=True,
                                weight_decay=config['weight_decay'])
    # checkpoint = torch.load('/content/gdrive/My Drive/Colab Notebooks/checkpoints/ckptBestTestC+l_3.pt')
    # model.load_state_dict(checkpoint)
    # vars = torch.load('/content/gdrive/My Drive/Colab Notebooks/checkpoints/ckptBestDenseTrainC+l_6.pt')
    lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer,step_size=config['step_size'], gamma=config['gamma'])# In your for loop
    # losses_epochs = [vars['loss']]
    # accs_epochs = [vars['acc']]
    # iou_epochs = [vars['iou']]
    losses_epochs = []
    accs_epochs = []
    iou_epochs = []
    for epoch in range(config['epochs']):
      model.train()
      losses = []
      index_start = 0
      for batch_idx in range(config['train_num']//config['batch_size']):
        optimizer.zero_grad()
        index_end = min(index_start+config['batch_size'],config['train_num'])
        train_images_tensor = torch.tensor(train_images[index_start:index_end],dtype = torch.float32).to(device)
        train_labels_double_digits = convert_to_labels(train_labels,config['train_num'])
        train_labels_l1_one_hot = np.eye(config['num_classes'])[train_labels_double_digits]
        train_labels_l2_one_hot = np.eye(37)[train_bboxes[index_start:index_end][:,0][:,0]]
        train_labels_l3_one_hot = np.eye(37)[train_bboxes[index_start:index_end][:,0][:,1]]
        train_labels_l4_one_hot = np.eye(37)[train_bboxes[index_start:index_end][:,1][:,0]]
        train_labels_l5_one_hot = np.eye(37)[train_bboxes[index_start:index_end][:,1][:,1]]
        train_labels_tensor_l1 = torch.tensor(train_labels_l1_one_hot[index_start:index_end],dtype = torch.float32).to(device)
        train_labels_tensor_l2 = torch.tensor(train_labels_l2_one_hot,dtype = torch.float32).to(device)
        train_labels_tensor_l3 = torch.tensor(train_labels_l3_one_hot,dtype = torch.float32).to(device)
        train_labels_tensor_l4 = torch.tensor(train_labels_l4_one_hot,dtype = torch.float32).to(device)
        train_labels_tensor_l5 = torch.tensor(train_labels_l5_one_hot,dtype = torch.float32).to(device)
        index_start = index_end
        train_images_tensor=train_images_tensor.view(-1,1,64,64)
        l1,l2,l3,l4,l5 = model(train_images_tensor)
        loss1 = criterion(l1, train_labels_tensor_l1)
        loss2 = criterion(l2, train_labels_tensor_l2)
        loss3 = criterion(l3, train_labels_tensor_l3)
        loss4 = criterion(l4, train_labels_tensor_l4)
        loss5 = criterion(l5, train_labels_tensor_l5)
        loss = loss1+loss2+loss3+loss4+loss5
        loss.backward()
        optimizer.step()  # update the weights
        losses.append(loss.item())
        if batch_idx%5 ==0:
          print("Train Epoch: {} batch {} done, loss {}".format(epoch,batch_idx,loss.item()))
      avg_loss = sum(losses)/len(losses)
      losses_epochs.append(avg_loss)
      acc,iou = validate(valid_images,valid_labels,valid_bboxes,model)
      accs_epochs.append(acc)
      iou_epochs.append(round(iou,4))
      print('Validation: Train Epoch: {}\tLoss: {:.6f}\t Accuracy: {}\tiou: {}'.format(epoch, avg_loss,acc,round(iou,4)))
      if max(accs_epochs) == acc and max(iou_epochs) == round(iou,4):
        torch.save({
            'model_state_dict': model.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
            'loss':loss,
            'acc':acc,
            'iou':iou
            }, '/content/gdrive/My Drive/Colab Notebooks/checkpoints/ckptBestDenseTrainC+l_10step.pt')
        torch.save(model.state_dict(), '/content/gdrive/My Drive/Colab Notebooks/checkpoints/ckptBestTestC+l_10step.pt')
        torch.save(model.to(torch.device(device)), '/content/gdrive/My Drive/Colab Notebooks/models/modelDenseBestC+l_10step.pt')
        print("Checkpoint saved")
      lr_scheduler.step()  # learnig rate annealing

    return model

def validate(valid_images,valid_labels,valid_bboxes,model):
    print("Validating...")
    model.eval()
    l1_outputs = []
    l2_outputs = []
    l3_outputs = []
    l4_outputs = []
    l5_outputs = []
    with torch.no_grad():
      index_start = 0
      for batch_idx in range(valid_images.shape[0]//config['batch_size']):
        index_end = min(index_start+config['batch_size'],valid_images.shape[0])
        images_tensor = torch.tensor(valid_images[index_start:index_end],dtype = torch.float32).to(device)
        images_tensor = images_tensor.view(-1,1,64,64)
        l1,l2,l3,l4,l5 = model(images_tensor)
        l1_outputs.append(l1)
        l2_outputs.append(l2)
        l3_outputs.append(l3)
        l4_outputs.append(l4)
        l5_outputs.append(l5)
        index_start = index_end
    l1_output = torch.cat(l1_outputs, dim=0)
    l2_output = torch.cat(l2_outputs, dim=0)
    l3_output = torch.cat(l3_outputs, dim=0)
    l4_output = torch.cat(l4_outputs, dim=0)
    l5_output = torch.cat(l5_outputs, dim=0)
    l1_np = l1_output.cpu().detach().numpy()
    l2_np = l2_output.cpu().detach().numpy()
    l3_np = l3_output.cpu().detach().numpy()
    l4_np = l4_output.cpu().detach().numpy()
    l5_np = l5_output.cpu().detach().numpy()
    classes = convert_from_labels(l1_np,valid_images.shape[0])
    l2_og = convert_one_hot_to_original_format(l2_np,valid_images.shape[0])
    l3_og = convert_one_hot_to_original_format(l3_np,valid_images.shape[0])
    l4_og = convert_one_hot_to_original_format(l4_np,valid_images.shape[0])
    l5_og = convert_one_hot_to_original_format(l5_np,valid_images.shape[0])
    boxes = convert_boxes_to_original_format(l2_og,l3_og,l4_og,l5_og,valid_images.shape[0])
    acc = compute_classification_acc(classes,valid_labels)
    iou = compute_iou(boxes,valid_bboxes)
    return acc,iou


def evaluate(val_images, model):
  print("Testing...")
  model.eval()
  l1_outputs = []
  l2_outputs = []
  l3_outputs = []
  l4_outputs = []
  l5_outputs = []
  with torch.no_grad():
    index_start = 0
    for batch_idx in range(val_images.shape[0]//config['batch_size']):
      index_end = min(index_start+config['batch_size'],val_images.shape[0])
      images_tensor = torch.tensor(val_images[index_start:index_end],dtype = torch.float32).to(device)
      l1,l2,l3,l4,l5 = model(images_tensor.view(-1,1,64,64))
      l1_outputs.append(l1)
      l2_outputs.append(l2)
      l3_outputs.append(l3)
      l4_outputs.append(l4)
      l5_outputs.append(l5)
      index_start = index_end
  l1_output = torch.cat(l1_outputs, dim=0)
  l2_output = torch.cat(l2_outputs, dim=0)
  l3_output = torch.cat(l3_outputs, dim=0)
  l4_output = torch.cat(l4_outputs, dim=0)
  l5_output = torch.cat(l5_outputs, dim=0)
  l1_np = l1_output.cpu().detach().numpy()
  l2_np = l2_output.cpu().detach().numpy()
  l3_np = l3_output.cpu().detach().numpy()
  l4_np = l4_output.cpu().detach().numpy()
  l5_np = l5_output.cpu().detach().numpy()
  classes = convert_from_labels(l1_np,val_images.shape[0])
  l2_og = convert_one_hot_to_original_format(l2_np,val_images.shape[0])
  l3_og = convert_one_hot_to_original_format(l3_np,val_images.shape[0])
  l4_og = convert_one_hot_to_original_format(l4_np,val_images.shape[0])
  l5_og = convert_one_hot_to_original_format(l5_np,val_images.shape[0])
  boxes = convert_boxes_to_original_format(l2_og,l3_og,l4_og,l5_og,val_images.shape[0])
  return classes,boxes



           



def convert_to_labels(Y, size):
    labels = np.zeros(size, dtype = np.int8)
    for i in range(size):
      start_index = 0
      cur_value = 10
      for j in range(Y[i][0]):
        start_index += cur_value
        cur_value =  cur_value - 1
      labels[i] = start_index + Y[i][1] - Y[i][0]
    return labels

def convert_from_labels(labels, size):
    Y = np.zeros((size, 2), dtype=np.int8)
    # all possible 2-digit combinations
    inx = [[0,0], [0,1], [0,2], [0,3], [0,4], [0,5], [0,6], [0,7], [0,8], [0,9],
           [1,1], [1,2], [1,3], [1,4], [1,5], [1,6], [1,7], [1,8], [1,9],
           [2,2], [2,3], [2,4], [2,5], [2,6], [2,7], [2,8], [2,9],
           [3,3], [3,4], [3,5], [3,6], [3,7], [3,8], [3,9],
           [4,4], [4,5], [4,6], [4,7], [4,8], [4,9],
           [5,5], [5,6], [5,7], [5,8], [5,9],
           [6,6], [6,7], [6,8], [6,9],
           [7,7], [7,8], [7,9],
           [8,8], [8,8],
           [9,9]]
    for i in range(size):
      Y[i] = inx[np.where(labels[i] == np.amax(labels[i]))[0][0]]
    return Y

def convert_one_hot_to_original_format(l,size):
  temp = np.zeros(size)
  j =0
  for i in l:
    temp[j] = np.where(i == np.amax(i))[0][0]
    j+=1
  return temp


def convert_boxes_to_original_format(l2,l3,l4,l5,size):
    pred_bboxes = np.zeros((size,2,4))
    pred_bboxes[:,0][:,0] = l2
    pred_bboxes[:,0][:,1] = l3
    pred_bboxes[:,1][:,0] = l4
    pred_bboxes[:,1][:,1] = l5
    pred_bboxes[:,0][:,2] = l2 + 28
    pred_bboxes[:,0][:,3] = l3 + 28
    pred_bboxes[:,1][:,2] = l4 + 28
    pred_bboxes[:,1][:,3] = l5 + 28
    return np.array(pred_bboxes)

def compute_classification_acc(pred, gt):
    assert pred.shape == gt.shape
    return (pred == gt).astype(int).sum() / gt.size


def compute_iou(b_pred, b_gt):
    """

    :param b_pred: predicted bounding boxes, shape=(n,2,4)
    :param b_gt: ground truth bounding boxes, shape=(n,2,4)
    :return:
    """

    n = np.shape(b_gt)[0]
    L_pred = np.zeros((64, 64))
    L_gt = np.zeros((64, 64))
    iou = 0.0
    for i in range(n):
        for b in range(2):
            rr, cc = polygon([b_pred[i, b, 0], b_pred[i, b, 0], b_pred[i, b, 2], b_pred[i, b, 2]],
                             [b_pred[i, b, 1], b_pred[i, b, 3], b_pred[i, b, 3], b_pred[i, b, 1]], [64, 64])
            L_pred[rr, cc] = 1

            rr, cc = polygon([b_gt[i, b, 0], b_gt[i, b, 0], b_gt[i, b, 2], b_gt[i, b, 2]],
                             [b_gt[i, b, 1], b_gt[i, b, 3], b_gt[i, b, 3], b_gt[i, b, 1]], [64, 64])
            L_gt[rr, cc] = 1

            iou += (1.0 / (2 * n)) * (np.sum((L_pred + L_gt) == 2) / np.sum((L_pred + L_gt) >= 1))

            L_pred[:, :] = 0
            L_gt[:, :] = 0

    return iou

  


    

cuda


In [5]:
import time
import numpy as np
from skimage.draw import polygon

def compute_classification_acc(pred, gt):
    assert pred.shape == gt.shape
    return (pred == gt).astype(int).sum() / gt.size


def compute_iou(b_pred, b_gt):
    """

    :param b_pred: predicted bounding boxes, shape=(n,2,4)
    :param b_gt: ground truth bounding boxes, shape=(n,2,4)
    :return:
    """

    n = np.shape(b_gt)[0]
    L_pred = np.zeros((64, 64))
    L_gt = np.zeros((64, 64))
    iou = 0.0
    for i in range(n):
        for b in range(2):
            rr, cc = polygon([b_pred[i, b, 0], b_pred[i, b, 0], b_pred[i, b, 2], b_pred[i, b, 2]],
                             [b_pred[i, b, 1], b_pred[i, b, 3], b_pred[i, b, 3], b_pred[i, b, 1]], [64, 64])
            L_pred[rr, cc] = 1

            rr, cc = polygon([b_gt[i, b, 0], b_gt[i, b, 0], b_gt[i, b, 2], b_gt[i, b, 2]],
                             [b_gt[i, b, 1], b_gt[i, b, 3], b_gt[i, b, 3], b_gt[i, b, 1]], [64, 64])
            L_gt[rr, cc] = 1

            iou += (1.0 / (2 * n)) * (np.sum((L_pred + L_gt) == 2) / np.sum((L_pred + L_gt) >= 1))

            L_pred[:, :] = 0
            L_gt[:, :] = 0

    return iou


def main():
    # prefix = "test"
    prefix = "valid"
    directory = "/content/gdrive/My Drive/Colab Notebooks/MNISTDD_train_valid/"
    images = np.load("/content/gdrive/My Drive/Colab Notebooks/MNISTDD_train_valid/"+prefix + "_X.npy")
    start_t = time.time()
    pred_class, pred_bboxes = classify_and_detect(images)
    end_t = time.time()
    gt_class = np.load("/content/gdrive/My Drive/Colab Notebooks/MNISTDD_train_valid/"+prefix + "_Y.npy")
    gt_bboxes = np.load("/content/gdrive/My Drive/Colab Notebooks/MNISTDD_train_valid/"+prefix + "_bboxes.npy")
    acc = compute_classification_acc(pred_class, gt_class)
    iou = compute_iou(pred_bboxes, gt_bboxes)

    time_taken = end_t - start_t

    print(f"Classification Acc: {acc}")
    print(f"Detection IOU: {iou}")
    print(f"Test time: {time_taken}")


if __name__ == '__main__':
    main()


Testing...
Classification Acc: 0.9834
Detection IOU: 0.9599756141954755
Test time: 25.027446031570435
