# [HW] MNIST

https://www.kaggle.com/competitions/mnist-sai

In [None]:
# Basic module
import numpy as np
import matplotlib.pyplot as plt
from tqdm.auto import tqdm # progress bar

# PyTorch
import torch
import torch.nn as nn
import torchvision
from torchvision import transforms

In [None]:
# print version of PyTorch
torch.__version__, torchvision.__version__

In [None]:
# Define Parameters
NUM_CLASS = 10
IMG_SIZE = 28
CHANNEL = 1
BATCH_SIZE = 128

#### Prepare Dataset

In [None]:
# download from google drive
!pip install --upgrade gdown
!gdown --id '1Pb9lxPjXBEq4O8KMzdemqehRtp_jr-Wy' --output mnist.npz

In [None]:
class MNISTDataset(torch.utils.data.Dataset):
    def __init__(self, data, label):
        self.x_data = data
        self.y_label = label
        self.transform = transforms.Compose([
            transforms.ToTensor(),
            transforms.transforms.Normalize((0.5,), (0.5,))
        ])
    def __len__(self):
        return len(self.x_data)

    def __getitem__(self, idx):
        img = self.x_data[idx]
        img = self.transform(img)

        label = self.y_label[idx]
        label = torch.tensor(label, dtype=torch.long)
        return img, label

In [None]:
# Load Data from file
with np.load('mnist.npz', allow_pickle=True) as f:
    x_train, y_train = f['x_train'], f['y_train']
    x_test = f['x_test']

all_dataset = MNISTDataset(x_train, y_train)

In [None]:
# number of data
len(all_dataset)

In [None]:
# split dataset
from sklearn.model_selection import train_test_split

train_ds, val_ds = train_test_split(all_dataset,
                                    test_size=0.2,
                                    random_state=5566)

len(train_ds), len(val_ds)

In [None]:
# get 1 data
x, y = train_ds[0]
print(type(x), type(y))
print(x.shape, y.shape)

In [None]:
# Dataloader

train_loader = torch.utils.data.DataLoader(train_ds,
                                           BATCH_SIZE,
                                           shuffle=True)
val_loader = torch.utils.data.DataLoader(val_ds,
                                         BATCH_SIZE)

#### Build Model

#### Training

In [None]:
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using device: {device}")

model = model.to(device)

In [None]:
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(
    params=model.parameters(),
    lr=1e-2, # learning rate
)

In [None]:
def train(dataloader, model, loss_fn, optimizer):
    size = len(dataloader.dataset) # number of samples
    num_batches = len(dataloader) # batches per epoch

    model.train() # to training mode.
    epoch_loss, epoch_correct = 0, 0
    for batch_i, (x, y) in enumerate(tqdm(dataloader, leave=False)):
        x, y = x.to(device), y.to(device) # move data to device

        # zero the parameter gradients
        optimizer.zero_grad()

        # Compute prediction loss
        pred = model(x)
        loss = loss_fn(pred, y)

        # Optimization by gradients
        loss.backward() # backpropagation to compute gradients
        optimizer.step() # update model params

        # write to logs
        epoch_loss += loss.item() # tensor -> python value
        # (N, Class)
        epoch_correct += (pred.argmax(dim=1) == y).sum().item()

    # return avg loss of epoch, acc of epoch
    return epoch_loss/num_batches, epoch_correct/size


def test(dataloader, model, loss_fn):
    size = len(dataloader.dataset) # number of samples
    num_batches = len(dataloader) # batches per epoch

    model.eval() # model to test mode.
    epoch_loss, epoch_correct = 0, 0

    # No gradient for test data
    with torch.no_grad():
        for batch_i, (x, y) in enumerate(tqdm(dataloader, leave=False)):
            x, y = x.to(device), y.to(device)

            # Compute prediction loss
            pred = model(x)
            loss = loss_fn(pred, y)

            # write to logs
            epoch_loss += loss.item()
            epoch_correct += (pred.argmax(1) == y).sum().item()

    return epoch_loss/num_batches, epoch_correct/size

In [None]:
EPOCHS = 10
logs = {
    'train_loss': [], 'train_acc': [],
    'val_loss': [], 'val_acc': []
}
for epoch in tqdm(range(EPOCHS)):
    train_loss, train_acc = train(train_loader, model, loss_fn, optimizer)
    val_loss, val_acc = test(val_loader, model, loss_fn)

    print(f'EPOCH: {epoch} \
    train_loss: {train_loss:.4f}, train_acc: {train_acc:.3f} \
    val_loss: {val_loss:.4f}, val_acc: {val_acc:.3f} ')

    logs['train_loss'].append(train_loss)
    logs['train_acc'].append(train_acc)
    logs['val_loss'].append(val_loss)
    logs['val_acc'].append(val_acc)

#### Logs

In [None]:
plt.plot(logs['train_loss'])
plt.plot(logs['val_loss'])
plt.legend(['train_loss', 'val_loss'])
plt.title('loss')
plt.show()

In [None]:
plt.plot(logs['train_acc'])
plt.plot(logs['val_acc'])
plt.legend(['train_acc', 'val_acc'])
plt.title('Acc')
plt.show()

#### Generate file for Kaggle

https://www.kaggle.com/competitions/mnist-sai

In [None]:
class MNISTTestDataset(torch.utils.data.Dataset):
    def __init__(self, data):
        self.x_data = data
        self.transform = transforms.Compose([
            transforms.ToTensor(),
            transforms.transforms.Normalize((0.5,), (0.5,))
        ])
    def __len__(self):
        return len(self.x_data)

    def __getitem__(self, idx):
        img = self.x_data[idx]
        img = self.transform(img)
        return img

test_ds = MNISTTestDataset(x_test)
test_loader = torch.utils.data.DataLoader(test_ds, BATCH_SIZE)

In [None]:
# Predict all data
y_pred = []

model.to(device)
model.eval()

with torch.no_grad():
    for x in test_loader:
        x = x.to(device)
        pred = model(x)
        y_pred.append(pred)

y_pred = torch.cat(y_pred).argmax(1).cpu().numpy()
y_pred.shape

In [None]:
import pandas as pd

df = pd.DataFrame()
df['Id'] = [str(i) for i in range(len(x_test))]
df['Category'] = y_pred
df.to_csv('result.csv', index=None)
df