import library and dataset

In [36]:
import numpy as np
import tensorflow as tf
from matplotlib import pyplot as plt
import cv2
from torch.utils.data import DataLoader
from torch import nn
import torch
import torch.nn.functional as F
from scipy import ndimage


mnist = tf.keras.datasets.mnist
(x_train, y_train), (x_test, y_test) = mnist.load_data()


Randomly place the digit within a larger sized picture and add new label for the center

In [74]:
def build_data(img, value, new_size):
    raw_img = np.zeros((new_size,new_size))
    img = img/255
    
    x_min, y_min = np.random.randint(new_size - img.shape[0]), np.random.randint(new_size - img.shape[0])
    x_max, y_max = x_min + img.shape[0], y_min + img.shape[0]
    
    
    x_center = x_min + (x_max-x_min)/2
    y_center = y_min + (y_max-y_min)/2
    
    raw_img[x_min:x_max, y_min:y_max] = img
    raw_img = np.reshape(raw_img, (1,new_size,new_size))
    label = [int(value), np.array([x_center, y_center]).astype('float32')]
    
    return raw_img, label

In [None]:
train_data = [build_data(img, value, 90) for img, value in zip(x_train[:40000], y_train[:40000])]
test_data = [build_data(img, value, 90) for img, value in zip(x_train[40000:45000], y_train[40000:45000])]


download images from new training data

In [63]:
# count = 0
# for i in train_data:
#     cv2.imwrite(f'path_to_download{count}.jpg', i[0][0])
#     count +=1

In [75]:
train_dataloader = DataLoader(train_data, batch_size=64, shuffle=True)
test_dataloader = DataLoader(test_data, batch_size=64, shuffle=True)


In [14]:
class NeuralNetwork_OL_v2(nn.Module):
  
    def __init__(self):
        super(NeuralNetwork_OL_v2, self).__init__()
        
        self.conv0 = nn.Conv2d(1, 16, 3, padding=(2,2))
        self.pool0 = nn.MaxPool2d(2, stride=2)
        self.conv1 = nn.Conv2d(16, 16, 3, padding=(3,3))
        self.pool1 = nn.MaxPool2d(2, stride=2)
        self.flatten = nn.Flatten()
        self.linear_relu_stack = nn.Sequential(
            nn.Linear(16*25*25, 256), 
             nn.ReLU(),
        )
        self.linear = nn.Linear(256, 10)
        self.linear_x = nn.Linear(256, 1)
        self.linear_y = nn.Linear(256, 1)
        self.linear_all = nn.Linear(256, 2)
        
    def forward(self, x):
        x = self.conv0(x)
        x = F.relu(self.pool0(x))
        x = self.conv1(x)
        x = F.relu(self.pool1(x))
        x = self.flatten(x)
        x = self.linear_relu_stack(x)
        logits = self.linear(x)
        centr = self.linear_all(x)
        return logits, centr

model = NeuralNetwork_OL_v2()



NeuralNetwork_OL_v2(
  (conv0): Conv2d(1, 16, kernel_size=(3, 3), stride=(1, 1), padding=(2, 2))
  (pool0): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (conv1): Conv2d(16, 16, kernel_size=(3, 3), stride=(1, 1), padding=(3, 3))
  (pool1): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (flatten): Flatten(start_dim=1, end_dim=-1)
  (linear_relu_stack): Sequential(
    (0): Linear(in_features=10000, out_features=256, bias=True)
    (1): ReLU()
  )
  (linear): Linear(in_features=256, out_features=10, bias=True)
  (linear_x): Linear(in_features=256, out_features=1, bias=True)
  (linear_y): Linear(in_features=256, out_features=1, bias=True)
  (linear_all): Linear(in_features=256, out_features=2, bias=True)
)


build two loss functions, one for digit and the other for the center 

In [49]:
loss_fn = nn.CrossEntropyLoss()
loss_mse = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
alpha = 100
beta = 1

In [50]:
def train(dataloader, model, loss_fn, loss_mse, optimizer, alpha, beta):
    model.train() 
    size = len(train_dataloader.dataset)
    
    loss_dig_list = []
    loss_center_list = []
    
    for batch, (X, y) in enumerate(dataloader):

        X, y0, y1 = X, y[0], y[1]

        y0_pred, y1_pred = model(X.float())
        
        loss = alpha*loss_fn(y0_pred, y0) + beta*loss_mse(y1_pred, y1.float())
        loss_dig = loss_fn(y0_pred, y0)
        loss_center = loss_mse(y1_pred, y1.float())
        

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        

        if batch % 100 == 0:
            loss, current = loss.item(), batch*len(X)
            
            loss_dig = loss_dig.item()
            loss_center = loss_center.item()
            
            loss_dig_list.append(loss_dig)
            loss_center_list.append(loss_center)
            print(f"MAIN loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")
            print(f"Digit prediction loss: {loss_dig:>7f}  [{current:>5d}/{size:>5d}]")
            print(f"Coordinate prediction loss: {loss_center:>7f}  [{current:>5d}/{size:>5d}]")
            print("-----------")
            
            

In [51]:

def test(dataloader, model, loss_fn, loss_mse, alpha=alpha, beta=beta):
    size = len(dataloader.dataset)
    model.eval()
    test_loss, test_loss_y0, test_loss_y1, correct = 0, 0, 0, 0
    with torch.no_grad():
        for X, y in dataloader:
            X, y0, y1 = X, y[0], y[1]
            y0_pred, y1_pred = model(X.float())
            test_loss += alpha*loss_fn(y0_pred, y0).item() + beta*loss_mse(y1_pred, y1.float()).item()
            test_loss_y0 += loss_fn(y0_pred, y0).item()
            test_loss_y1 += loss_mse(y1_pred, y1.float()).item()
            
            correct += (y0_pred.argmax(1) == y0).type(torch.float).sum().item() # only for digit predictions
            
    # average the loss and accuracy among all records used in the dataset
    test_loss /= size
    test_loss_y0 /= size
    test_loss_y1 /= size
    correct /= size
    print(f"Test Error: \n Accuracy: {(100*correct):>0.1f}%, Avg digit loss: {test_loss_y0:>8f}, Avg coordinate loss: {test_loss_y1:>8f} \n")

In [78]:
epochs = 10
for t in range(epochs):

    print(f"Epoch {t+1}\n-------------------------------")
    train(train_dataloader, model, loss_fn, loss_mse, optimizer, alpha=alpha, beta=beta)
    test(test_dataloader, model, loss_fn, loss_mse, alpha=alpha, beta=beta)
    
print("Done!")

In [16]:
# torch.save(model.state_dict(), '/home/george/new_model')


predict value for random index from the test data and plot him

In [84]:
(X, y) = next(iter(test_dataloader))
indx = np.random.randint(64)
y_center_actual, x_center_actual = int(y[1][indx][0]), int(y[1][indx][1])

digit_pred, center_pred = model(X.float())

predicted_digit = np.argmax(digit_pred[indx].cpu().detach().numpy())

predicted_x = int(center_pred[indx][0])
predicted_y = int(center_pred[indx][1])

plt.imshow(np.reshape(X[indx].cpu().numpy(), (90,90)), cmap="gray")
# plot the actual center in green
plt.plot(x_center_actual, y_center_actual, "og", markersize=10)
# plot the predicted center in orange
plt.plot(predicted_y, predicted_x, "oy", markersize=10)
plt.show()

print("Image shape: " + str(list(X[indx].cpu().numpy().shape)))
print("Digit: " + str(int(y[0][indx])))
print("True Center (in green): ({},{})".format(y_center_actual, x_center_actual))
print("-------------------------------")
print("Predicted Digit: "+str(predicted_digit))
print("Predicted Center (in yellow): ({},{})".format(str(predicted_x), str(predicted_y)))


In [72]:
# model.load_state_dict(torch.load('path_to_upload/new_model2'))
# model.eval()

NeuralNetwork_OL_v2(
  (conv0): Conv2d(1, 16, kernel_size=(3, 3), stride=(1, 1), padding=(2, 2))
  (pool0): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (conv1): Conv2d(16, 16, kernel_size=(3, 3), stride=(1, 1), padding=(3, 3))
  (pool1): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (flatten): Flatten(start_dim=1, end_dim=-1)
  (linear_relu_stack): Sequential(
    (0): Linear(in_features=10000, out_features=256, bias=True)
    (1): ReLU()
  )
  (linear): Linear(in_features=256, out_features=10, bias=True)
  (linear_x): Linear(in_features=256, out_features=1, bias=True)
  (linear_y): Linear(in_features=256, out_features=1, bias=True)
  (linear_all): Linear(in_features=256, out_features=2, bias=True)
)

In [37]:
def rotate(img, degree):
    img = ndimage.rotate(img, degree, reshape=False)
    return img
    

In [43]:
def noise(img):
    noise = np.random.normal(0, .1, img.shape)
    new_img = img + noise
    
    return new_img
    

In [None]:
def rotate_and_check_model(x_train, y_train):
    rotate_test_data = [build_data(rotate(img, np.random.randint(35)), value, 90) for img, value in zip(x_train, y_train)]
    rotate_dataloader = DataLoader(rotate_test_data, batch_size=64, shuffle=True)
    check_model(rotate_dataloader)

In [None]:
def noise_and_check_model(x_train, y_train):
    noise_test_data = [build_data(noise(img/255), value, 90) for img, value in zip(x_train, y_train)]
    noise_dataloader = DataLoader(noise_test_data, batch_size=64, shuffle=True)
    check_model(noise_dataloader)

In [None]:
def check_model(dataloader):
    test(dataloader, model, loss_fn, loss_mse, alpha=alpha, beta=beta)