In [None]:
import torch
import torch.nn as nn
from torch.nn import functional as F
from torch.utils.data import TensorDataset, DataLoader
from torch.optim import SGD, Adam

import torchvision.models as models

from sklearn.model_selection import train_test_split

import pytorch_lightning as pl
from pytorch_lightning import Trainer

import pandas as pd
import numpy as np

import matplotlib.pyplot as plt
%matplotlib inline

In [None]:
test = pd.read_csv("test.csv")
train = pd.read_csv("train.csv")

In [None]:
y = train.label.values
X = train.loc[:,train.columns != "label"].values

X = X.reshape(-1, 1, 28, 28)

X_train, X_val, y_train, y_val = train_test_split(X, y, test_size = 0.1, random_state = 42) 

X_train = torch.tensor(X_train, dtype = torch.float)
X_val   = torch.tensor(X_val,   dtype = torch.float)
y_train = torch.tensor(y_train, dtype = torch.long)
y_val   = torch.tensor(y_val,   dtype = torch.long)

train_dataset = TensorDataset(X_train, y_train)
val_dataset   = TensorDataset(X_val, y_val)

In [None]:
X_test = test.values
X_test = X_test.reshape(-1, 1, 28, 28)
X_test = torch.tensor(X_test, dtype = torch.float)

test_dataset = TensorDataset(X_test)

In [None]:
epoch = 10
batch_size = 20
learning_rate = 0.0001

class MnistModel(pl.LightningModule):

    def __init__(self):
        super(MnistModel, self).__init__()
        self.layer1 = nn.Sequential( nn.Conv2d(1, 8, kernel_size=5, stride=1, padding=2), 
            nn.ReLU(), nn.MaxPool2d(kernel_size=2, stride=2)) 
        self.layer2 = nn.Sequential( nn.Conv2d(8, 16, kernel_size=5, stride=1, padding=2), 
            nn.ReLU(), nn.MaxPool2d(kernel_size=2, stride=2)) 
        self.drop_out = nn.Dropout() 
        self.fc1 = nn.Linear(784, 10) 
        self.fc2 = nn.Linear(10, 10)

    def forward(self, X):
        X = self.layer1(X)
        X = self.layer2(X)
        X = X.reshape(X.size(0), -1)
        X = torch.relu(self.fc1(X))
        X = self.fc2(X)
        return X

    def training_step(self, batch, batch_nb):
        X, y    = batch
        predict = self.forward(X)
        loss    = F.cross_entropy(predict, y)
        acc     = torch.sum(torch.argmax(predict, dim = 1) == y).float()
        return {'loss'         : loss,
                'progress_bar' : {'train_loss': loss, 'train_acc' : acc},
                'log'          : {'train_loss': loss, 'train_acc' : acc}
        }
    
    def validation_step(self, batch, batch_nb):
        X, y = batch
        predict = self.forward(X)
        acc     = torch.sum(torch.argmax(predict, dim = 1) == y).float()
        return {'val_loss': F.cross_entropy(predict, y),
                'val_acc' : acc
        }

    def validation_end(self, outputs):
        avg_loss = torch.stack([x['val_loss'] for x in outputs]).mean()
        avg_acc  = torch.stack([x['val_acc']  for x in outputs]).mean()
        return {'progress_bar' : {'val_loss' : avg_loss, 'val_acc' : avg_acc},
                'log'          : {'val_loss' : avg_loss, 'val_acc' : avg_acc}
        }
    
    def test_step(self, batch, batch_nb):
        X = batch
        predict = torch.argmax(self.forward(*X), dim = 1)
        return {'predict': predict}
    
    def test_end(self, outputs):
        predict = []
        for x in outputs: predict.extend(x['predict'].numpy())
        results = pd.Series(predict,name="Label")
        submission = pd.concat([pd.Series(range(1,28001),name = "ImageId"),results],axis = 1)
        submission.to_csv("submission.csv",index=False)
        return {}

    def configure_optimizers(self):
        return torch.optim.Adam(self.resnet.parameters(), lr = learning_rate)

    @pl.data_loader
    def train_dataloader(self):
        return DataLoader(train_dataset, batch_size = 40, shuffle = True)
    
    @pl.data_loader
    def val_dataloader(self):
        return DataLoader(val_dataset, batch_size = 40, shuffle = False)
    
    @pl.data_loader
    def test_dataloader(self):
        return DataLoader(test_dataset, batch_size = 40, shuffle = False)


In [None]:
model = MnistModel()
trainer = Trainer(early_stop_callback=False, max_nb_epochs=25)
trainer.fit(model)

In [None]:
trainer.test(model)