# Method 1: Without training on the counting on dataset 

In [4]:
from __future__ import print_function
import argparse
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets, transforms
from IPython.core.debugger import set_trace
import numpy as np
from matplotlib import pyplot

# we need google drive access to upload the datasets
#from google.colab import drive
#drive.mount('/content/gdrive')

Taking the network from lab 3 used for classification of MNIST and using it for our counting problem

# > MNIST classification


---
We will train a convolutional network to classify digits. The architecture should be similar to:
  - conv layer: 20 filters, kernel size: 5x5, stride:1
  - relu
  - max pool: kernel size: 2x2, stride:2
  - conv layer: 50 filters, kernel size: 5x5, stride:1
  - relu
  - max pool: kernel size: 2x2, stride:2
  - fully connected: 500 neurons
  - relu
  - fully connected: 10 neurons
  - log softmax

In [6]:
# Training settings    
kwargs={}
class Args():
  def __init__(self):
      self.batch_size = 64
      self.test_batch_size = 64
      self.epochs = 10
      self.lr = 0.01
      self.momentum = 0.9
      self.seed = 1
      self.log_interval = int(10000 / self.batch_size)
      self.cuda = False

args = Args()

use_cuda = torch.cuda.is_available()
torch.manual_seed(args.seed)
device = torch.device("cuda" if use_cuda else "cpu")

kwargs = {'num_workers': 1, 'pin_memory': True} if use_cuda else {}

In [7]:
no_filters1 = 20
no_filter2 = 50
no_neurons1 = 500
class CNN(nn.Module):
    # the init() is called a single time, when you create the model
    # so all the layers should be created here.
    def __init__(self):
        super().__init__()
        self.conv1 = nn.Conv2d(in_channels = 1, out_channels = no_filters1, kernel_size = 5, stride = 1)
        self.conv2 = nn.Conv2d(no_filters1, no_filter2, 5, 1)
        self.fc1 = nn.Linear(in_features = 4 * 4 * no_filter2, out_features = no_neurons1)
        self.fc2 = nn.Linear(in_features = no_neurons1, out_features = 10)
    # the forward() is called at each iteration, so we only apply the already
    # created operations inside this function 
    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = F.max_pool2d(x, 2, 2)
        x = F.relu(self.conv2(x))
        x = F.max_pool2d(x, 2, 2)
        x = x.view(-1, 4*4*no_filter2)
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return F.log_softmax(x, dim=1)

Load MNIST dataset containing drawn digits.

In [None]:
!wget www.di.ens.fr/~lelarge/MNIST.tar.gz
!tar -zxvf MNIST.tar.gz

In [None]:
# we create a loader to iterate through the dataset
train_loader = torch.utils.data.DataLoader(
    datasets.MNIST('./', train=True, download=True,
                   transform=transforms.Compose([
                       transforms.ToTensor(),
                       
                   ])),
    batch_size=args.batch_size, shuffle=True,drop_last=True, **kwargs)

test_loader = torch.utils.data.DataLoader(
    datasets.MNIST('./', train=False, transform=transforms.Compose([
                       transforms.ToTensor(),
                   ])),
    batch_size=args.test_batch_size, shuffle=False,drop_last=True, **kwargs)

first_train_batch_imgs, first_train_batch_labels = next(iter(train_loader))
# set_trace()

f, axarr = pyplot.subplots(1,5)
for i in range(5):
  axarr[i].imshow(first_train_batch_imgs[i,0])
print(f'Labels of the shown images: {first_train_batch_labels[:5]}')

Train Convolutional network for MNIST classification.

In [None]:
# define two functions, one for training the model and one for testing it

def train(args, model, device, train_loader, optimizer, epoch):
    model.train()
    all_losses = []
    for batch_idx, (data, target) in enumerate(train_loader):
        # put the data on the GPU
        data, target = data.to(device), target.to(device)
        # initialize as zeros all the gradients of the model
        optimizer.zero_grad()
        
        # obtain the predictions in the FORWARD pass of the network
        output = model(data)
        # compute average LOSS for the current batch
        loss = F.nll_loss(output, target)
        all_losses.append(loss.detach().cpu().numpy())
        # BACKPROPAGATE the gradients
        loss.backward()
        # use the computed gradients to OPTIMISE the model
        optimizer.step()
        # print the training loss of each batch
        if batch_idx % args.log_interval == 0:
            print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                epoch, batch_idx * len(data), len(train_loader.dataset),
                100. * batch_idx / len(train_loader), loss.item()))
    return np.array(all_losses).mean()

def test(args, model, device, test_loader):
    model.eval()
    test_loss = 0
    correct = 0
    with torch.no_grad():
        num_iter = 0
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            # obtain the prediction by a forward pass
            output = model(data)
            # calculate the loss for the current batch and add it across the entire dataset
            test_loss += F.nll_loss(output, target) # sum up batch loss
            # compute the accuracy of the predictions across the entire dataset
            # get the most probable prediction
            pred = output.argmax(dim=1, keepdim=True) # get the index of the max log-probability
            correct += pred.eq(target.view_as(pred)).float().mean().item()
            num_iter += 1
    test_loss /= num_iter
    test_accuracy = 100. * correct / num_iter
    # print the Accuracy for the entire dataset
    print('\nTest set: Average loss: {:.4f}, Accuracy: ({:.0f}%)\n'.format(
        test_loss,
        test_accuracy))
    return test_loss, test_accuracy

Create an optimizer and call the training / testing functions.

In [None]:
def plot_loss(loss, label, color='blue'):
    pyplot.plot(loss, label=label, color=color)
    pyplot.legend()

# move the model to the GPU (when available)
model = CNN().to(device)
# create an Stochastic Gradient Descent optimiser
optimizer = optim.SGD(model.parameters(), lr=args.lr, momentum=args.momentum)

losses_train = []
losses_test = []
accuracy_test = []
for epoch in range(1, args.epochs + 1):
# for epoch in range(1, 3):
    train_loss = train(args, model, device, train_loader, optimizer, epoch)
    test_loss, test_accuracy = test(args, model, device, test_loader)
    losses_train.append(train_loss)
    losses_test.append(test_loss)
    accuracy_test.append(test_accuracy)

#plot the loss/accuracy    
pyplot.figure(1)
plot_loss(losses_train,'train_loss','red')
plot_loss(losses_test,'test_loss')
pyplot.figure(2)
plot_loss(accuracy_test,'test_accuracy')

# save the final model
torch.save(model.state_dict(),"mnist_cnn.pt")


# > MNIST counting


---
Load loalisation dataset.
Our goal is to count the digits in every image.

In [None]:
import pickle

def get_large_dataset(path, max_batch_idx=100, shuffle=False,first_k=5000):
  # load the dataset as numpy arrays (tensors)
  with open(path,'rb') as handle:
    data = pickle.load(handle)
  # select only first_k elements in the dataset
  np_dataset_large  = np.expand_dims(data['images'],1)[:first_k]
  np_dataset_coords = data['coords'].astype(np.float32)[:first_k]
  
  # show a couple of examples from the dataset
  print(f'np_dataset_large shape: {np_dataset_large.shape}')
  for ii in range(5):
    example = np_dataset_large[10+ii].reshape((100, 100))
    pyplot.figure()
    pyplot.imshow(example, cmap="gray")
  
  # create loader from the numpy tensors
  from torch.utils.data import TensorDataset
  from torch.utils.data import DataLoader
  dataset_large, dataset_coords = map(torch.tensor, 
                (np_dataset_large, np_dataset_coords))
  dataset_large = dataset_large.to(device)
  dataset_coords = dataset_coords.to(device)

  large_dataset = TensorDataset(dataset_large, dataset_coords)
  large_data_loader = DataLoader(large_dataset, 
       batch_size=args.batch_size, shuffle=shuffle, drop_last=True)
  return large_data_loader

# create both train and test dataset
# TODO: change these paths to the place where the pickles are stored in you drive
path_train = '/content/gdrive/MyDrive/Copy of data_train.pickle'
path_test = '/content/gdrive/MyDrive/Copy of data_test.pickle'

large_data_loader_train = get_large_dataset(path_train,max_batch_idx=50,shuffle=True, first_k=1000)
large_data_loader_test = get_large_dataset(path_test,max_batch_idx=50)

We will convert the network a fully convolutional network

In [None]:
# design the fully convolutional network   
# the first two conv layers should be the same as the original classification conv layers
# the last two conv layers should be transformed from the last two fully connected layers in the original network
class CNN_fully_conv(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv1 = nn.Conv2d(1, no_filters1, 5, 1)
        self.conv2 = nn.Conv2d(no_filters1, no_filter2, 5, 1)
        self.fully_conv1  = nn.Conv2d(no_filter2,no_neurons1, 4)
        self.fully_conv2 = nn.Conv2d(no_neurons1,5, 1)

    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = F.max_pool2d(x, 2, 2)
        x = F.relu(self.conv2(x))
        x = F.max_pool2d(x, 2, 2)
        x = F.relu(self.fully_conv1(x))
        x = self.fully_conv2(x)
        return F.log_softmax(x, dim=1)

We will run this network on the larger 100x100 images and get 10 maps representing the probability that a digit is found at that location. For every of the 5 feature maps (one for every posible count), we will select the feature map with the strongest activation.

Save the old classification model, and create a new fully convolutional model from the old parameters.

In [None]:
# data should be in [0,1]
def preprocess(data):
  return data.float() / 255.0
print("Model's state_dict:")
for param_tensor in model.state_dict():
    print(param_tensor, "\t", model.state_dict()[param_tensor].size())

# save the classification model
PATH = 'conv_net.pt'
torch.save(model.state_dict(), PATH)

# define the fully_conv model
model_fuly_conv = CNN_fully_conv()

# load classification model
loaded_state_dict = torch.load(PATH)

# loaded_state_dict contain the weights of the classification model
# For the fully_conv model we will use exactly the same parameters.
# For the convolutional part we can directly load them as they have the same name.

# We need to convert the last fully-connected layers into convolutions
# For a single neuron we would just reshape the parameters from a vector into a kernel. 
# We just need to know what is the spatial dimension of the original fully-connected input.
# In our case, the first fully-connected had an input of size 4 x 4 x no_filter2, so we must use kernels of size 4 x 4 x no_filter2
# The second fully-connected receives as input just a vector(1x1 spatial dimension) of size no_neurons1 thus we use kernels of size 1 x 1 x no_neurons1
model_dict = {}
for key,val in loaded_state_dict.items():
  key = key.replace('fc','fully_conv')
  print(f'key: {key}')
  if 'fully_conv1.weight' in key:
    val = val.view(-1,no_filter2,4,4)
  if 'fully_conv2.weigh' in key:
    val = val.view(-1,no_neurons1,1,1)
  model_dict[key] = val
  
model_fuly_conv.load_state_dict(model_dict)
model_fuly_conv = model_fuly_conv.to(device)

print(model_fuly_conv.parameters)

Evaluate this fully convolutional network.

In [None]:
# iterate over all the batches and estimate the location of the digit for each sample
for batch_idx, (large_imgs, target_coords) in enumerate(large_data_loader_test):
  print(f'large_imgs shape {large_imgs.shape}')
  large_imgs = preprocess(large_imgs)
  out_prob_maps = model_fuly_conv(large_imgs)
  # from the 5 maps, we select the index of the strongest activation
  max_ind = torch.argmax(torch.max(out_prob_maps.view(args.batch_size,5,-1),dim=2)[0],dim=1)
  # just for the first batch lets print some of the feature maps:
  if batch_idx == 0:
    for ii in range(5):
      pyplot.figure()
      pyplot.imshow(large_imgs[ii,0].cpu().detach().numpy(), cmap="gray")

      pyplot.figure()
      # pyplot.imshow(np.log(out_prob_maps[ii,max_ind[ii]].cpu().detach().numpy()), cmap="gray")
      pyplot.imshow(out_prob_maps[ii,max_ind[ii]].cpu().detach().numpy(), cmap="jet")
      pyplot.colorbar()
      
  all_locs = []
  # get the location of the maxim, for every example in batch
  # for this we linearise the selected map into a vector and find the maximum
  for i in range(args.batch_size):
    max_loc = torch.argmax(out_prob_maps[i,max_ind[i]])
    all_locs.append(max_loc)
  max_location = torch.stack(all_locs,dim=0)
  # compute the index in the original map from the index in the vector
  max_location_x = torch.unsqueeze(max_location / 19, dim=1).float()  / 19.0 
  max_location_y = torch.unsqueeze(max_location % 19, dim=1).float() / 19.0 
  coords = torch.cat([max_location_x,max_location_y],dim=1)
  # compute the error between the estimated location and the ground truth one
  mse = torch.mean(torch.sqrt(torch.sum((target_coords - coords) * (target_coords - coords),dim=1)))
  print(f'Mean squared error: {mse}')
  
