<a href="https://colab.research.google.com/github/iBalag/games-with-nn/blob/master/digit_recognizer_cnn.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [0]:
from google.colab import drive
drive.mount('/gdrive')

In [0]:
import pandas as pd

data = pd.read_csv("/gdrive/My Drive/data/digit-recognizer/train.csv")

In [0]:
data_copy = data.copy()

X = data_copy.drop("label", axis=1)
y = data["label"]

# Normalize
X = X  / 255

In [0]:
from sklearn.model_selection import train_test_split

# Split to train \ val
X_train, X_val, y_train, y_val = train_test_split(X, y, random_state=1)

In [0]:
import torch
import torch.nn as nn
import torch.optim as optim

X_train_tensor = torch.tensor(X_train.values.reshape(-1, 1, 28, 28), dtype=torch.float32)
y_train_tensor = torch.tensor(y_train.values, dtype=torch.int64)

X_val_tensor = torch.tensor(X_val.values.reshape(-1, 1, 28, 28), dtype=torch.float32)
y_val_tensor = torch.tensor(y_val.values, dtype=torch.int64)

In [0]:
from torch.utils.data import Dataset, DataLoader

class DigitDataset(Dataset):
  
    def __init__(self, x, y):
        self.len = x.shape[0]
        self.x_data = x
        self.y_data = y
    
    def __getitem__(self, index):
        return self.x_data[index], self.y_data[index]
  
    def __len__(self):
        return self.len

In [0]:
def train_model_cnn(model, train_loader, val_loader, loss, optimizer, scheduler, num_epochs):    
    loss_history = []
    train_history = []
    val_history = []
    for epoch in range(num_epochs):
        scheduler.step()
        print('Epoch:', epoch,'LR:', scheduler.get_lr())
      
        model.train() # Enter train mode
        
        loss_accum = 0
        correct_samples = 0
        total_samples = 0
        for i_step, (x, y) in enumerate(train_loader):
          
            x_gpu = x.to(device)
            y_gpu = y.to(device)
            prediction = model(x_gpu)    
            loss_value = loss(prediction, y_gpu)
            optimizer.zero_grad()
            loss_value.backward()
            optimizer.step()
            
            _, indices = torch.max(prediction, 1)
            correct_samples += torch.sum(indices == y_gpu)
            total_samples += y.shape[0]
            
            loss_accum += loss_value

        ave_loss = loss_accum / i_step
        train_accuracy = float(correct_samples) / total_samples
        val_accuracy = compute_accuracy_cnn(model, val_loader)
        
        loss_history.append(float(ave_loss))
        train_history.append(train_accuracy)
        val_history.append(val_accuracy)
        
        print("Average loss: %f, Train accuracy: %f, Val accuracy: %f" % (ave_loss, train_accuracy, val_accuracy))
        
    return loss_history, train_history, val_history
        
def compute_accuracy_cnn(model, loader):
    """
    Computes accuracy on the dataset wrapped in a loader
    
    Returns: accuracy as a float value between 0 and 1
    """
    model.eval() # Evaluation mode    
    correct = 0
    total = 0
    for data in val_loader:
        images, labels = data
        images_gpu = images.to(device)
        labels_gpu = labels.to(device)
        
        outputs = model(images_gpu)
        
        _, predicted = torch.max(outputs.data, 1)
        total += labels_gpu.size(0)
        correct += (predicted == labels_gpu).sum().item()
    
    return correct / total

In [0]:
batch_size = 32

train_dataset = DigitDataset(X_train_tensor, y_train_tensor)
train_loader = DataLoader(dataset=train_dataset,
                         batch_size=batch_size,
                         shuffle=True)

val_dataset = DigitDataset(X_val_tensor, y_val_tensor)
val_loader = DataLoader(dataset=val_dataset,
                         batch_size=batch_size,
                         shuffle=True)

device = torch.device("cuda:0") # Let's make sure GPU is available!

In [0]:
class Flattener(nn.Module):
    def forward(self, x):
        batch_size, *_ = x.shape
        return x.view(batch_size, -1)

In [0]:
from torch.optim.lr_scheduler import StepLR

nn_model_cnn = nn.Sequential(
            nn.Conv2d(1, 64, 3, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(2),
            nn.Conv2d(64, 64, 3, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(2),
            Flattener(),
            nn.Linear(64*7*7, 100),
            nn.BatchNorm1d(100),
            nn.ReLU(),
            nn.Linear(100, 10), 
         )

nn_model_cnn.type(torch.cuda.FloatTensor)

loss = nn.CrossEntropyLoss().type(torch.cuda.FloatTensor)
optimizer = optim.SGD(nn_model_cnn.parameters(), lr=1e-2, weight_decay=1e-1)
sched = StepLR(optimizer, step_size=2, gamma=0.5)

In [0]:
loss_history, train_history, val_history = train_model_cnn(nn_model_cnn, train_loader, val_loader, loss, optimizer, sched, 10)

Apply model to test dataset

In [0]:
class TestDataSet(Dataset):
  
    def __init__(self, x):
        self.len = x.shape[0]
        self.x_data = x
    
    def __getitem__(self, index):
        return index, self.x_data[index]
  
    def __len__(self):
        return self.len

In [0]:
test_data = pd.read_csv("/gdrive/My Drive/data/digit-recognizer/test.csv")
test_data = test_data / 255

test_data_tensor = torch.tensor(test_data.values.reshape(-1, 1, 28, 28), dtype=torch.float32)

test_dataset = TestDataSet(test_data_tensor)
test_loader = DataLoader(dataset=test_dataset,
                         batch_size=batch_size,
                         shuffle=True)

predictions = {}
for i, x in test_loader:

    x_gpu = x.to(device)
    prediction = nn_model_cnn(x_gpu)
  
    _, indices = torch.max(prediction, 1)

    # convert to dict
    i = i + 1
    pred_dict = dict(zip(i.tolist(), indices.tolist()))
    predictions.update(pred_dict)

# sort by keys
predictions = sorted(predictions.items())

df = pd.DataFrame(predictions, columns=["ImageId", "Label"])
df.to_csv("/content/sample_submission.csv", index=False)