In [4]:
import torch
from torch import nn
from torch.utils.data import DataLoader
from torchvision import datasets
from torchvision.transforms import ToTensor, Lambda
import random
from torchsummary import summary
import torch.nn.functional as F
from scipy import stats

def rand_img_nr(max_val: int)->int:
    mean_val = 10
    std_dev = 2.0
    min_val = 5
    max_val = 25
    r = stats.truncnorm.rvs(-min_val/std_dev, max_val/std_dev, loc=mean_val, scale=std_dev, size=1)
    return round(r[0])

class MNIST_multiple_digits(datasets.MNIST):
    def __getitem__(self, idx):
        max_digits_nr = 25
        img_w = 28
        label = -1
        img_nr = rand_img_nr(max_digits_nr)
        feature_list = []
        for i in range(0, img_nr):
            idx = random.randint(0, (super().__len__()-1))
            inner_feature, inner_label = super().__getitem__(idx)
            feature_list.append(inner_feature)
            if inner_label == 7:
                label = 1
        feature = torch.cat(feature_list, dim=2)
        pad_size = (max_digits_nr - img_nr)*img_w
        feature = F.pad(feature, (0, pad_size), "constant", 0) 
        return feature, label   

training_data = MNIST_multiple_digits(
    root="data",
    train=True,
    download=True,
    transform=ToTensor()
)

test_data = MNIST_multiple_digits(
    root="data",
    train=False,
    download=True,
    transform=ToTensor()
)

def collate_fn(batch):
    data, labels =zip(*batch)
    labels = [0 if label==-1 else label for label in labels] 
    return torch.stack(data), torch.tensor(labels)
        
train_dataloader = DataLoader(training_data, batch_size=128, shuffle = True, collate_fn=collate_fn, num_workers=8)
test_dataloader = DataLoader(test_data, batch_size=128, shuffle = True, collate_fn=collate_fn, num_workers=8)

device = 'cuda' if torch.cuda.is_available() else 'cpu'
print(f'Using {device} device')

class NeuralNetwork(nn.Module):
    def __init__(self):
        super(NeuralNetwork, self).__init__()
        self.flatten = nn.Flatten()
        self.conv1 = nn.Sequential(         
            nn.Conv2d(
                in_channels=1,              
                out_channels=32,            
                kernel_size=3,              
                stride=1,                   
                padding=1,                  
            ),                              
            nn.ReLU(),                      
        )
        self.conv2 = nn.Sequential(         
            nn.Conv2d(32, 32, 5, 1, 2),     
            nn.ReLU(),                      
            nn.MaxPool2d(2), 
        )
        self.conv3 = nn.Sequential(         
            nn.Conv2d(
                in_channels=32,              
                out_channels=16,            
                kernel_size=5,              
                stride=1,                   
                padding=2,                  
            ),                              
            nn.ReLU(),                      
            nn.MaxPool2d(kernel_size=2),    
        )
        self.linear = nn.Sequential(
            nn.Linear(16 * 7 * 7 * 25, 500),
            nn.Linear(500, 128),
            nn.Linear(128, 2),
        )
        
    def forward(self, x):
        x = self.conv1(x)
        x = self.conv2(x) 
        x = self.conv3(x)
        x = self.flatten(x)      
        output = self.linear(x)
        return output

model = NeuralNetwork()
model.to(device)

Using cuda device


NeuralNetwork(
  (flatten): Flatten(start_dim=1, end_dim=-1)
  (conv1): Sequential(
    (0): Conv2d(1, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): ReLU()
  )
  (conv2): Sequential(
    (0): Conv2d(32, 32, kernel_size=(5, 5), stride=(1, 1), padding=(2, 2))
    (1): ReLU()
    (2): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  )
  (conv3): Sequential(
    (0): Conv2d(32, 16, kernel_size=(5, 5), stride=(1, 1), padding=(2, 2))
    (1): ReLU()
    (2): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  )
  (linear): Sequential(
    (0): Linear(in_features=19600, out_features=500, bias=True)
    (1): Linear(in_features=500, out_features=128, bias=True)
    (2): Linear(in_features=128, out_features=2, bias=True)
  )
)

In [5]:
def train_loop(dataloader, model, loss_fcn, optimizer):
    size = len(dataloader.dataset)
    
    for batch, (X,y) in enumerate(dataloader):
        X,y = X.to(device), y.to(device)
        pred = model(X)
        loss = loss_fcn(pred, y)
        
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        if batch % 100 == 0:
            loss, current = loss.item(), batch*len(X)
            print(f"Loss:{loss:>7f} [{current:>5d}/{size:>5d}]")
            
def test_loop(dataloader, model, loss_fcn):
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    test_loss, correct = 0, 0
    
    with torch.no_grad():
        for X,y in dataloader:
            X,y = X.to(device), y.to(device)
            pred = model(X)
            test_loss += loss_fcn(pred,y).item()
            correct += (pred.argmax(1) == y).type(torch.float).sum().item()
            
    test_loss /= num_batches
    correct /= size
    print(f"Test Error: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")


In [6]:
loss_fcn = nn.CrossEntropyLoss()
learning_rate = 1e-2
optimizer = torch.optim.SGD(model.parameters(), lr=learning_rate)

epochs = 25
for t in range(epochs):
    print(f"Epoch{t+1}/{epochs}-------------")
    train_loop(train_dataloader, model, loss_fcn, optimizer)
    test_loop(test_dataloader, model, loss_fcn)
print("Training completed!")
model_path = 'multi_digit_mnist_model_2'
torch.save(model, model_path)

Epoch1/25-------------
Loss:0.694648 [    0/60000]
Loss:0.659970 [12800/60000]
Loss:0.620484 [25600/60000]
Loss:0.603687 [38400/60000]
Loss:0.672133 [51200/60000]
Test Error: 
 Accuracy: 65.4%, Avg loss: 0.636105 

Epoch2/25-------------
Loss:0.610077 [    0/60000]
Loss:0.597068 [12800/60000]
Loss:0.558556 [25600/60000]
Loss:0.607892 [38400/60000]
Loss:0.624744 [51200/60000]
Test Error: 
 Accuracy: 72.3%, Avg loss: 0.578738 

Epoch3/25-------------
Loss:0.517665 [    0/60000]
Loss:0.521675 [12800/60000]
Loss:0.535403 [25600/60000]
Loss:0.480550 [38400/60000]
Loss:0.434167 [51200/60000]
Test Error: 
 Accuracy: 80.3%, Avg loss: 0.417612 

Epoch4/25-------------
Loss:0.397538 [    0/60000]
Loss:0.441859 [12800/60000]
Loss:0.329610 [25600/60000]
Loss:0.446622 [38400/60000]
Loss:0.395946 [51200/60000]
Test Error: 
 Accuracy: 84.9%, Avg loss: 0.332430 

Epoch5/25-------------
Loss:0.314573 [    0/60000]
Loss:0.233878 [12800/60000]
Loss:0.423550 [25600/60000]
Loss:0.200708 [38400/60000]
Loss:

In [27]:
# inference
# import model class definition if there is no data from previous cells
model_loaded = torch.load(model_path)
model_loaded.eval()


NeuralNetwork(
  (flatten): Flatten(start_dim=1, end_dim=-1)
  (conv1): Sequential(
    (0): Conv2d(1, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): ReLU()
  )
  (conv2): Sequential(
    (0): Conv2d(32, 32, kernel_size=(5, 5), stride=(1, 1), padding=(2, 2))
    (1): ReLU()
    (2): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  )
  (conv3): Sequential(
    (0): Conv2d(32, 16, kernel_size=(5, 5), stride=(1, 1), padding=(2, 2))
    (1): ReLU()
    (2): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  )
  (linear): Sequential(
    (0): Linear(in_features=19600, out_features=500, bias=True)
    (1): Linear(in_features=500, out_features=128, bias=True)
    (2): Linear(in_features=128, out_features=2, bias=True)
  )
)