In [1]:
import torch
from torch import nn
from torch import optim
from torch.nn import functional as F
from torch.utils.data import DataLoader, TensorDataset, random_split
import pytorch_lightning as pl
from pytorch_lightning.callbacks import EarlyStopping
from pytorch_lightning.metrics import Accuracy

import numpy as np
import pandas as pd
import argparse
import os

In [2]:
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

/kaggle/input/digit-recognizer/sample_submission.csv
/kaggle/input/digit-recognizer/train.csv
/kaggle/input/digit-recognizer/test.csv


In [3]:
class MNISTDataModule(pl.LightningDataModule):
    
    def __init__(self, root_dir):
        super(MNISTDataModule, self).__init__()
        
        self.root_dir = root_dir
        
        self.n_workers = 4
        self.batch_size = 128
        self.train_size = 38000
        self.val_size = 4000
        self.test_size = 28000
        self.image_shape = (1, 28, 28)
        self.n_classes = 10
        
    def setup(self, stage=None):
        if stage == 'fit':
            train_data = pd.read_csv(self.root_dir + '/train.csv', dtype=np.float32)
            train_data, train_labels = train_data.drop('label', axis=1), train_data['label']
        
            train_data = train_data.values.reshape(train_data.shape[0], *self.image_shape)
            train_data = torch.from_numpy(train_data)
            train_labels = torch.tensor(train_labels, dtype=torch.long)
            self.train_dataset, self.val_dataset = random_split(
                TensorDataset(train_data, train_labels), [self.train_size, self.val_size]
            )
        if stage == 'test':
            test_data = pd.read_csv(self.root_dir + '/test.csv', dtype=np.float32)
            self.test_id = test_data.index + 1
            test_data = test_data.values.reshape(test_data.shape[0], *self.image_shape)
            test_data = torch.from_numpy(test_data)
            self.test_dataset = TensorDataset(test_data)
        
    def train_dataloader(self):
        return DataLoader(self.train_dataset, batch_size=self.batch_size,
                          num_workers=self.n_workers, shuffle=True)
    
    def val_dataloader(self):
        return DataLoader(self.val_dataset, batch_size=self.val_size,
                          num_workers=self.n_workers)
    
    def test_dataloader(self):
        return DataLoader(self.test_dataset, batch_size=self.test_size//10,
                          num_workers=self.n_workers)

In [4]:
class ResBlock(nn.Module):
    
    def __init__(self, input_size, output_size, stride, downsample):
        super(ResBlock, self).__init__()
        self.conv1 = nn.Conv2d(input_size, output_size, bias=False,
                               kernel_size=3, stride=stride, padding=1)
        self.bn1 = nn.BatchNorm2d(output_size)
        self.conv2 = nn.Conv2d(output_size, output_size, bias=False,
                               kernel_size=3, stride=1, padding=1)
        self.bn2 = nn.BatchNorm2d(output_size)
        self.downsample = downsample
    
    def forward(self, x):
        identity = x
        if self.downsample is not None:
            identity = self.downsample(identity)
        
        out = self.conv1(x)
        out = self.bn1(out)
        out = F.relu(out)
        out = self.conv2(out)
        out = self.bn2(out)
        
        out += identity
        out = F.relu(out)
        return out

In [5]:
class ResNet(nn.Module):
    
    def __init__(self, input_size, output_size, hidden_sizes):
        super(ResNet, self).__init__()
        
        self.conv1 = nn.Conv2d(input_size, hidden_sizes[0], bias=False,
                               kernel_size=7, stride=2, padding=3)
        self.bn1 = nn.BatchNorm2d(hidden_sizes[0])
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
        
        self.res1 = self.make_block(hidden_sizes[0], hidden_sizes[1], 1)
        self.res2 = self.make_block(hidden_sizes[1], hidden_sizes[2], 2)
        self.res3 = self.make_block(hidden_sizes[2], hidden_sizes[3], 2)
        self.res4 = self.make_block(hidden_sizes[3], hidden_sizes[4], 2)
        
        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(hidden_sizes[4], output_size)
        
    def make_block(self, input_size, output_size, stride):
        downsample = None if stride == 1 else nn.Sequential(
            nn.Conv2d(input_size, output_size, bias=False,
                      kernel_size=3, stride=2, padding=1),
            nn.BatchNorm2d(output_size)
        )
        return nn.Sequential(
            ResBlock(input_size, output_size, stride, downsample),
            ResBlock(output_size, output_size, 1, None)
        )
        
    def forward(self, x):
        out = self.conv1(x)
        out = self.bn1(out)
        out = F.relu(out)
        out = self.maxpool(out)

        out = self.res1(out)
        out = self.res2(out)
        out = self.res3(out)
        out = self.res4(out)

        out = self.avgpool(out)
        out = out.flatten(1)
        out = self.fc(out)
        return out

In [6]:
class MNISTClassifier(pl.LightningModule):
    
    def __init__(self, hparams, input_size, output_size):
        super(MNISTClassifier, self).__init__()
        self.hparams = hparams
        
        self.resnet = ResNet(input_size, output_size, self.hparams.hidden_sizes)
        self.train_acc = Accuracy()
        self.val_acc = Accuracy()
        
    def forward(self, x):
        return self.resnet(x)
        
    def training_step(self, batch, batch_idx):
        x, y = batch
        y_hat = self(x)
        self.train_acc(y_hat, y)
        loss = F.cross_entropy(y_hat, y)
        self.log('loss', loss)
        return loss
    
    def validation_step(self, batch, batch_idx):
        x, y = batch
        y_hat = self(x)
        self.val_acc(y_hat, y)
        loss = F.cross_entropy(y_hat, y)
        self.log('val_loss', loss)
        return loss
    
    def configure_optimizers(self):
        return optim.Adam(self.parameters(), lr=self.hparams.learn_rate)

In [7]:
MNIST_dm = MNISTDataModule('/kaggle/input/digit-recognizer')

args = argparse.Namespace(
    learn_rate=3e-4,
    hidden_sizes=[64, 64, 128, 256, 512]
)
model = MNISTClassifier(args, MNIST_dm.image_shape[0], MNIST_dm.n_classes)

trainer = pl.Trainer(
    gpus=1,
    max_epochs=32,
    checkpoint_callback=False,
    callbacks=[EarlyStopping('val_loss', patience=3)],
    progress_bar_refresh_rate=20
)

In [8]:
trainer.fit(model, MNIST_dm)
print(f"Training accuracy is {model.train_acc.compute()}")
print(f"Validation accuracy is {model.val_acc.compute()}")

Validation sanity check: |          | 0/? [00:00<?, ?it/s]

Training: |          | 0/? [00:00<?, ?it/s]

Validating: |          | 0/? [00:00<?, ?it/s]

Validating: |          | 0/? [00:00<?, ?it/s]

Validating: |          | 0/? [00:00<?, ?it/s]

Validating: |          | 0/? [00:00<?, ?it/s]

Validating: |          | 0/? [00:00<?, ?it/s]

Validating: |          | 0/? [00:00<?, ?it/s]

Validating: |          | 0/? [00:00<?, ?it/s]

Validating: |          | 0/? [00:00<?, ?it/s]

Validating: |          | 0/? [00:00<?, ?it/s]

Validating: |          | 0/? [00:00<?, ?it/s]

Training accuracy is 0.9874289631843567
Validation accuracy is 0.9004999995231628


In [9]:
MNIST_dm.setup(stage='test')

predictions = []
for batch_idx, batch in enumerate(MNIST_dm.test_dataloader()):
    predictions.extend(model(batch[0]).argmax(1).detach().numpy())

submission = pd.DataFrame({
    'ImageId': MNIST_dm.test_id,
    'Label': predictions
})
submission.to_csv('submission.csv', index=False)