In [24]:
from google.colab import drive
drive.mount('/content/drive')

import os
import matplotlib.pyplot as plt
import pandas as pd
from PIL import Image
import numpy as np

# Pytorch import
import torch
from torch.utils.data import Dataset, DataLoader, random_split
from torchvision.transforms import ToTensor, Lambda
from torch import nn

DATA_PATH = '/path/to/data/'

# Get device
device = torch.accelerator.current_accelerator().type if torch.accelerator.is_available() else "cpu"
print(f"Using {device} device")

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
Using cuda device


In [25]:
# Read the training data
df = pd.read_csv(DATA_PATH + 'train.csv')
labels = df['label']
training_images = df.drop('label', axis=1)

# Create a custom Dataset class for PyTorch
class DigitDataset(Dataset):
    def __init__(self, csv_path, transform=None, target_transform=None):
        self.transform = transform
        self.target_transform = target_transform

        self.data = pd.read_csv(csv_path)
        self.is_train_data = 'label' in self.data.columns
        self.features = self.data.drop('label', axis=1) if self.is_train_data else self.data
        self.labels = self.data['label'] if self.is_train_data else None

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        pixels = self.features.iloc[idx].to_numpy(dtype=np.uint8)
        label = int(self.labels.iloc[idx]) if self.is_train_data else None

        img = pixels.reshape(28, 28)
        img = Image.fromarray(img)

        if self.transform:
            img = self.transform(img)
        if self.target_transform:
            label = self.target_transform(label)

        return img, label if self.is_train_data else img

In [26]:
# Hyperparameter setups
learning_rate = 1e-3
epochs = 30
batch_size = 64

In [None]:
# Initialize dataset
full_training_data = DigitDataset(
    DATA_PATH + 'train.csv',
    transform=ToTensor())

train_size = int(0.8 * len(full_training_data))
test_size = len(full_training_data) - train_size
train_dataset, test_dataset = random_split(full_training_data, [train_size, test_size])

train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, pin_memory=True, num_workers=2)
test_dataloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, pin_memory=True, num_workers=2)

# Iterate through dataset and see some samples
train_features, train_labels = next(iter(train_dataloader))
print(f"Feature batch shape: {train_features.size()}")
print(f"Labels batch shape: {train_labels.size()}")

# Show some sample data points
figure = plt.figure(figsize=(10, 10))
for i in range(25):
    plt.subplot(5, 5, i+1)
    img = train_features[i].squeeze()
    label = torch.argmax(train_labels[i]).item()

    plt.imshow(img, cmap='gray')
    plt.title(label)
    plt.axis('off')

plt.show()

In [28]:
class DigitNet(nn.Module):
    def __init__(self):
        super().__init__()
        self.flatten = nn.Flatten()
        self.linear_relu_stack = nn.Sequential(
            nn.Linear(28*28, 512),
            nn.ReLU(),
            nn.Linear(512, 512),
            nn.ReLU(),
            nn.Linear(512, 10),
        )

    def forward(self, x):
        x = self.flatten(x)
        logits = self.linear_relu_stack(x)
        return logits

model = DigitNet().to(device)
print(model)

DigitNet(
  (flatten): Flatten(start_dim=1, end_dim=-1)
  (linear_relu_stack): Sequential(
    (0): Linear(in_features=784, out_features=512, bias=True)
    (1): ReLU()
    (2): Linear(in_features=512, out_features=512, bias=True)
    (3): ReLU()
    (4): Linear(in_features=512, out_features=10, bias=True)
  )
)


In [34]:
def train_loop(dataloader, model, loss_fn, optimizer):
    size = len(dataloader.dataset)
    model.train()

    for batch, (X, y) in enumerate(dataloader):
        X = X.to(device, non_blocking=True)
        y = torch.as_tensor(y, device=device, dtype=torch.long)

        pred = model(X)
        loss = loss_fn(pred, y)

        loss.backward()
        optimizer.step()
        optimizer.zero_grad()

        if batch % 100 == 0:
            loss, current = loss.item(), batch * batch_size + len(X)
            print(f"loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")

def test_loop(dataloader, model, loss_fn):
    model.eval()
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    test_loss, correct = 0, 0

    with torch.no_grad():
        for X, y in dataloader:
            X = X.to(device, non_blocking=True)
            y = y.to(device, non_blocking=True)

            pred = model(X)
            test_loss += loss_fn(pred, y).item()
            correct += (pred.argmax(dim=1) == y).type(torch.float).sum().item()

    test_loss /= num_batches
    correct /= size
    print(f"Test Error: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")

In [35]:
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

for t in range(epochs):
    print(f"Epoch {t+1}\n-------------------------------")
    train_loop(train_dataloader, model, loss_fn, optimizer)
    test_loop(test_dataloader, model, loss_fn)

Epoch 1
-------------------------------
loss: 0.000711  [   64/33600]
loss: 0.004382  [ 6464/33600]
loss: 0.000029  [12864/33600]
loss: 0.000149  [19264/33600]
loss: 0.015674  [25664/33600]
loss: 0.000474  [32064/33600]
Test Error: 
 Accuracy: 97.5%, Avg loss: 0.174493 

Epoch 2
-------------------------------
loss: 0.000024  [   64/33600]
loss: 0.003685  [ 6464/33600]
loss: 0.189161  [12864/33600]
loss: 0.000004  [19264/33600]
loss: 0.000176  [25664/33600]
loss: 0.000036  [32064/33600]
Test Error: 
 Accuracy: 98.0%, Avg loss: 0.134618 

Epoch 3
-------------------------------
loss: 0.000001  [   64/33600]
loss: 0.000003  [ 6464/33600]
loss: 0.000001  [12864/33600]
loss: 0.003458  [19264/33600]
loss: 0.000002  [25664/33600]
loss: 0.007344  [32064/33600]
Test Error: 
 Accuracy: 97.9%, Avg loss: 0.140605 

Epoch 4
-------------------------------
loss: 0.006083  [   64/33600]
loss: 0.000007  [ 6464/33600]
loss: 0.000533  [12864/33600]
loss: 0.000101  [19264/33600]
loss: 0.091408  [25664/3

In [36]:
kaggle_dataset = DigitDataset(
    DATA_PATH + 'test.csv',
    transform=ToTensor())

kaggle_dataloader = DataLoader(kaggle_dataset, batch_size=batch_size, shuffle=False)

# Make predictions
model.eval()
predictions = []

with torch.no_grad():
    for X, y in kaggle_dataloader:
        X = X.to(device)
        outputs = model(X)
        preds = outputs.argmax(dim=1)
        predictions.extend(preds.cpu().numpy())

# Save submission
submission = pd.DataFrame({
    'ImageId': range(1, len(predictions) + 1),
    'Label': predictions
})

submission.to_csv(DATA_PATH + 'submission.csv', index=False)
print('Saved submission')

Saved submission
