In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, utils

from sklearn.model_selection import train_test_split
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)
%matplotlib inline

In [None]:
train_data = pd.read_csv('train.csv')
test_data = pd.read_csv('test.csv')


X_data = (train_data.drop(train_data.columns[0], axis=1).values).astype('float32')

y_data = train_data['label'].values

X_test = (test_data.values).astype('float32')


X_train, X_val, y_train, y_val = train_test_split(X_data, y_data, test_size=0.1, random_state=1)


train_mean, train_std = X_train.mean()/255, X_train.std()/255

val_mean, val_std = X_val.mean()/255, X_val.std()/255

test_mean, test_std = X_test.mean()/255, X_test.std()/255


X_train = X_train.reshape(-1, 28, 28)
X_val = X_val.reshape(-1, 28, 28)
X_test = X_test.reshape(-1, 28, 28)


print(X_train.shape)
print(y_train.shape)
print(X_val.shape)
print(X_test.shape)

In [None]:
#train samples
for i in range(6, 9):
    plt.subplot(330 + (i+1))
    plt.imshow(X_train[i].squeeze(), cmap=plt.get_cmap('gray'))
    plt.title(y_train[i])

In [None]:
class MNISTDataset(Dataset):
    def __init__(self, X, y, transform=None):
        
        self.X = X
        self.y = y
        self.transform = transform

    def __len__(self):
        
        return len(self.X)

    def __getitem__(self, n):
        
        image = self.X[n].reshape((28,28)).astype(np.uint8)
        
        if self.transform:
            
            image = self.transform(image)
        
        if self.y is None:

            return image

        label = self.y[n]
        
        return (image, label)

In [None]:
batch_size = 32
img_size = 28
classes = range(10)

train_transform = transforms.Compose(
                    [
                    transforms.ToPILImage(),
                    transforms.RandomCrop(img_size),
                    transforms.ToTensor(),
                    transforms.Normalize(mean=[train_mean], std=[train_std]),
                    ])

val_transform = transforms.Compose(
                    [
                    transforms.ToPILImage(),
                    transforms.ToTensor(),
                    transforms.Normalize(mean=[val_mean], std=[val_std]),
                    ])

test_transform = transforms.Compose(
                    [
                    transforms.ToPILImage(),
                    transforms.ToTensor(),
                    transforms.Normalize(mean=[test_mean], std=[test_std]),
                    ])

train_dataset = MNISTDataset(X_train, y_train, transform=train_transform)
train_loader = DataLoader(dataset=train_dataset,
                                batch_size=batch_size,shuffle = True)

val_dataset = MNISTDataset(X_val, y_val, transform=val_transform)
val_loader = DataLoader(dataset=val_dataset,
                                batch_size=batch_size,shuffle = False)

test_dataset = MNISTDataset(X_test, None, transform=test_transform)
test_loader = DataLoader(dataset=test_dataset, 
                                batch_size=batch_size, shuffle=True)

In [None]:
imgs, lbls = next(iter(train_loader))
print(classes[lbls[0]])
plt.imshow(imgs[0].data.reshape((28,28)), cmap="gray")

In [None]:
class SqueezeExiteBlock(nn.Module):
    def __init__(self, filters: int) -> None:
        super().__init__()
        self.fc1 = nn.Linear(filters, filters//32)
        self.fc2 = nn.Linear(filters//32, filters)

    def forward(self, x: torch.Tensor) -> torch.Tensor:

        x_squeezed = F.adaptive_avg_pool2d(x, 1)

        x_squeezed = x_squeezed.view((-1, 128))
        x_squeezed = F.relu(self.fc1(x_squeezed), inplace=True)

        x_squeezed = torch.sigmoid(self.fc2(x_squeezed)).view((-1, 128, 1, 1))

        x_squeezed = x.mul(x_squeezed)

        return x_squeezed


class BasicConvBlock(nn.Module):
    def __init__(self, h: int, w: int, inp: int , filters: int = 128) -> None:
        super().__init__()
        self.h, self.w = h, w
        self.inp = inp

        self.conv = nn.Sequential(
            nn.Conv2d(inp, filters, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(filters, filters, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(filters, filters, kernel_size=3, padding=1),
            nn.ReLU(inplace=True)
        )
        
        self.bn = nn.BatchNorm2d(128)
        self.se = SqueezeExiteBlock(128) 

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        
        x = x.view((-1, self.inp, self.w, self.h))

        return self.se(self.bn(self.conv(x)))


class MNIST(nn.Module):
    def __init__(self) -> None:
        super().__init__()
        self.block1 = BasicConvBlock(28,28,1)
        self.block2 = BasicConvBlock(28,28,128)
        self.block3 = BasicConvBlock(14,14,128)

        self.avg = nn.AvgPool2d(kernel_size=2)
        self.global_avg = nn.AdaptiveAvgPool2d((1,1))
        self.global_max = nn.AdaptiveMaxPool2d((1,1))

        self.dropout = nn.Dropout(0.2)
        self.fc = nn.Linear(128, 10, bias=False)

    def forward(self, x: torch.Tensor) -> torch.Tensor:

        x = self.block1(x)
        x = self.block2(x)
        x = self.avg(x)
 
        x = self.block3(x)
        
        x1, x2 = self.global_avg(x).view((-1,128)), self.global_max(x).view((-1,128))

        x0 = self.fc(self.dropout(x1)) + self.fc(self.dropout(x2))

        return x0


In [None]:
#check how many preds match labels

def get_num_correct(preds, labels):
    return preds.argmax(dim=1).eq(labels).sum().item()

In [None]:
#TRAIN

model = MNIST()
model.to(device)

lr = 0.001
epochs = 20

loss_function = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=lr)
exp_lr_scheduler = torch.optim.lr_scheduler.MultiStepLR(optimizer, milestones=[13,16], gamma=0.1)

for epoch in range(epochs):

    model.train()

    for images, labels in train_loader:
        
        images_cuda, labels_cuda = images.to(device), labels.to(device)

        preds = model(images_cuda)

        optimizer.zero_grad()
        
        loss = loss_function(preds, labels_cuda)

        loss.backward() 
        optimizer.step()

    exp_lr_scheduler.step()
    
    model.eval()

    val_loss = 0
    val_correct = 0

    with torch.no_grad():
        for images, labels in val_loader:
            
            images_cuda, labels_cuda = images.to(device), labels.to(device)
            
            preds = model(images_cuda) 
            loss = loss_function(preds, labels_cuda) 
            
            val_correct += get_num_correct(preds, labels_cuda)
            val_loss = loss.item() * batch_size

    print(" Val Loss: ", val_loss)
    print(" Val Acc: ", (val_correct/len(X_val))*100)

In [None]:
model = MNIST()
model.to(device)
model.load_state_dict(torch.load('mnist_weights.pth', map_location=torch.device(device)))
model.eval()

In [None]:
pred = torch.LongTensor().to(device)

for images in test_loader:
    
    images_cuda = images.to(device)
    predictions = model(images_cuda)
    pred = torch.cat((pred, predictions.argmax(dim=1)), dim=0)

In [None]:
submission = pd.read_csv('sample_submission.csv')

submission['Label'] = pred.cpu().numpy()

submission.to_csv('predictions.csv', index=False)