[File colab link](https://colab.research.google.com/drive/1bXbzJgUoGeIM_7EMgnh6p-UsvAkRY8Tj#scrollTo=31EWlxEpDA_b)

In [None]:
import numpy as np
import pandas as pd
from matplotlib import pyplot as plt
import torch
import torch.nn as nn
import torchvision
from torch.utils.data import DataLoader
import torch.nn.functional as F
import torch.optim as optim

import traceback

In [None]:
DEBUG=True
batch_size=128
shuffle_flag=False if DEBUG else False
train_data=torchvision.datasets.MNIST("./data",train=True,download=True,transform=torchvision.transforms.ToTensor())
test_data=torchvision.datasets.MNIST("./data",train=False,download=True,transform=torchvision.transforms.ToTensor())
train_loader=DataLoader(train_data,batch_size=batch_size,shuffle=shuffle_flag)
test_loader=DataLoader(test_data,batch_size=batch_size,shuffle=shuffle_flag)

In [None]:
plt.imshow(train_data.data[0].detach().numpy(),cmap="gray")
plt.title("digit {}".format(train_data.targets[0].detach().numpy()))

## Create network

In [43]:
class ConvNet(nn.Module):
  def __init__(self,in_feature_2d,out_feature):
    self.in_feature_2d=in_feature_2d
    super(ConvNet,self).__init__()
    self.layer_2d=[]
    self. _set_network(in_feature_2d,out_feature) # Internal protected method

  def _set_network(self,in_feature_2d,out_feature):
    self.conv1=nn.Conv2d(in_channels=1,out_channels=16,kernel_size=(5,5),stride=1,padding="same")
    self.layer_2d.append(self.conv1)

    self.act1=nn.ReLU()
    self.layer_2d.append(self.act1)

    self.pooling1=nn.MaxPool2d(kernel_size=(2,2),stride=2)
    self.layer_2d.append(self.pooling1)

    self.conv2=nn.Conv2d(in_channels=16,out_channels=32,kernel_size=(3,3),padding="same")
    self.layer_2d.append(self.conv2)

    self.act2=nn.ReLU()
    self.layer_2d.append(self.act2)

    self.pooling2=nn.MaxPool2d(kernel_size=(2,2),stride=2)
    self.layer_2d.append(self.pooling2)

    self.conv3=nn.Conv2d(in_channels=32,out_channels=32,kernel_size=(3,3),padding="same")
    self.layer_2d.append(self.conv3)

    self.act3=nn.ReLU()
    self.layer_2d.append(self.act3)

    self.pooling3=nn.MaxPool2d(kernel_size=(7,7)) # global max pooling with pooling size of the input
    self.layer_2d.append(self.pooling3)

    self.featureFC=nn.Linear(in_features=32,out_features=2,bias=True)
    self.outputFC=nn.Linear(in_features=2,out_features=out_feature,bias=True)
  
  def forward(self,x):
    feature=self.feature_extraction(x)
    return self.outputFC(feature)
  
  def feature_extraction(self,x):
    input=x
    for layer in self.layer_2d:
      output=layer(input)
      input=output
    output=self.featureFC(output.squeeze())
    return output
  
  def predict_prob(self,x):
    output=self.forward(x)
    return output.softmax(dim=1) # apply softmax to each row, each batch
  
  def predict(self,x):
    prob=self.predict_prob(x)
    return prob.argmax(dim=1)

In [44]:
input_size_2d=np.array(train_data.data[0].shape)

In [45]:
model=ConvNet(input_size_2d,10)

In [46]:
lr=0.1
EPOCHES=100

In [47]:
optimizer=optim.SGD(model.parameters(),lr=lr)
loss_func=nn.CrossEntropyLoss()

In [48]:
def train( model, train_loader, test_loader, optimizer, loss_func, EPOCHES, device=torch.device("cpu")):
    model.to(device)
    model.train()
    train_loss_recorder = []
    lr_recorder = []
    for epoch in range(EPOCHES):
        avg_loss = 0
        for batch_idx, (data, target) in enumerate(train_loader):
            data = data.to(device)
            target = target.to(device)

            optimizer.zero_grad()  # clear all grad to avoid cumulation of grad
            output = model(data)
            loss_val = loss_func(output, target)
            avg_loss += loss_val.item()
            loss_val.backward()

            # if batch_idx % 100 == 0:
            #     print(
            #         "Train Epoch:{}/{} [{}/{} ({:.0f}%)] \t Loss: {:.6f}\r".format(
            #             epoch + 1,
            #             EPOCHES,
            #             batch_idx * len(data),
            #             len(train_loader.dataset),
            #             100 * (batch_idx / len(train_loader)),
            #             loss_val.item(),
            #         ),
            #         end="",
            #     )

        optimizer.step()

        avg_loss = avg_loss / (len(train_loader))
        train_loss_recorder.append([epoch, avg_loss])
        lr_recorder.append([epoch, optimizer.param_groups[0]["lr"]])
        if epoch % 20 ==0:
            print(
                "Train Epoch:{}/{} \t Average Loss: {:.6f}\r".format(
                    epoch+1, EPOCHES, avg_loss
                )
            )
            torch.save(model.state_dict(), "model_epoch_{}.pth".format(epoch))
            torch.save(optimizer.state_dict(), "optimizer_epoch_{}.pth".format(epoch))
    return {"loss": {"train_loss": train_loss_recorder}, "lr": lr_recorder}



In [49]:
recorded_message=train(model,train_loader,test_loader,optimizer,loss_func,EPOCHES)

Train Epoch:1/2 	 Average Loss: 2.418369


In [50]:
recorded_message["loss"]

{'train_loss': [[0, 2.4183692723703283], [1, 2.4150800755791573]]}

In [51]:
class TestTools():
    def __init__(self) -> None:
        pass
def accuracy(model,dataset):
    model.eval()
    with torch.no_grad():
        result=model.predict(dataset.data.reshape(-1,1,28,28).float())
    torch.sum(result.argmax(dim=1)==train_data.targets).item()/len(train_data)

In [52]:
with torch.no_grad():
  result=model.predict_prob(train_data.data.reshape(-1,1,28,28).float())

0.10441666666666667