In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [None]:
# torch interface
import torch 
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import albumentations as A
from albumentations.pytorch import ToTensorV2
from torchvision import transforms, utils
import pandas as pd
import numpy as np 
import matplotlib.pyplot as plt

In [None]:
class cfg:
    train_csv_path='/kaggle/input/digit-recognizer/train.csv'
    test_csv_path= '/kaggle/input/digit-recognizer/test.csv'
    submit_csv_path= '/kaggle/input/digit-recognizer/sample_submission.csv'

In [None]:
class MinstDataset(Dataset):
    """Digit Recognizer."""
    
    def __init__(self, 
                 df, 
                 transform=None,
                 target_transform=None
                ):
        """
        Args: 
            df(DataFrame): dataframe
            transform (callable, optional): optional albumentations transform
        """
        self.df = df
        self.transform = transform
        self.target_transform = target_transform
        
    def __len__(self):
        return len(self.df)
    
    
    def __getitem__(self, idx):
        # handle edge case
        if torch.is_tensor(idx):
            idx = idx.tolist()
        
        label = self.df['label'].iloc[idx]
        image = self.df.iloc[idx, 1:]
        image = np.array([image], dtype=float).reshape(28,28)

        if self.transform:
            transformed = self.transform(image=image)
            image = transformed["image"]
        if self.target_transform:
            label = self.target_transform(label)
        
        return image, label

In [None]:
from sklearn.model_selection import train_test_split

df = pd.read_csv(cfg.train_csv_path)
train_df, val_df = train_test_split(df, test_size=0.2, random_state=42)


train_transform_pipeline = A.Compose([
    A.Resize(28, 28),  # Keep original size or choose based on your model
    A.ToFloat(max_value=255.0),  # Convert to float in [0,1] range
    A.Normalize(mean=0.1307, std=0.3081),  # MNIST-specific values
    A.RandomRotate90(),
    A.ShiftScaleRotate(shift_limit=0.0625, scale_limit=0.1, rotate_limit=15),
    ToTensorV2(),
])
test_transform_pipeline = A.Compose([
    A.Resize(28, 28),  # Keep original size or choose based on your model
    A.ToFloat(max_value=255.0),  # Convert to float in [0,1] range
    A.Normalize(mean=0.1307, std=0.3081),  # MNIST-specific values
    ToTensorV2(),
])

train_ds = MinstDataset(train_df, transform=train_transform_pipeline)
val_ds = MinstDataset(val_df, transform=test_transform_pipeline)

In [None]:
# train_ds[0][0].max()
train_ds.df.label.unique()

In [None]:
train_dl = DataLoader(train_ds, batch_size=256, shuffle=True)
val_dl = DataLoader(val_ds, batch_size=256, shuffle=False)

In [None]:
# Display image and label.
train_features, train_labels = next(iter(train_dl))
print(f"Feature batch shape: {train_features.size()}")
print(f"Labels batch shape: {train_labels.size()}")

img = train_features[0]
image_transposed = np.transpose(img, (1, 2, 0))
label = train_labels[0]
plt.imshow(image_transposed)
plt.show()
print(f"Label: {label}")

In [None]:
import torchvision
from torchvision import models
import torchmetrics 
import torch.nn as nn
from torchinfo import summary
import torch



# Set the manual seeds
torch.manual_seed(42)
torch.cuda.manual_seed(42)

# train setting
epochs = 50
lr = 1e-3
device = (
    "cuda"
    if torch.cuda.is_available()
    else "mps"
    if torch.backends.mps.is_available()
    else "cpu"
)
print(f"Using {device} device")

In [None]:
# weights = torchvision.models.AlexNet_Weights.DEFAULT
# model = models.alexnet(weights=weights)
# # Freeze all base layers in the "features" section of the model (the feature extractor) by setting requires_grad=False
# for param in model.features.parameters():
#     param.requires_grad = False

# model.classifier[-1].out_features = 3

import torch.nn.functional as F
import torch.nn as nn

# class MnistConvNet(nn.Module):
#     def __init__(self, num_classes):
#         super().__init__()
#         self.num_classes = num_classes
#         self.conv1 = nn.Conv2d(1, 32, kernel_size=3, stride=1, padding=1)
#         self.conv2 = nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1)
#         self.fc1 = nn.Linear(64 * 7 * 7, 128)
#         self.fc2 = nn.Linear(128, self.num_classes)
#         self.pool = nn.MaxPool2d(2, 2)
#         self.relu = nn.ReLU()

#     def forward(self, x):
#         x = self.relu(self.conv1(x))
#         x = self.pool(x)
#         x = self.relu(self.conv2(x))
#         x = self.pool(x)
#         x = x.view(x.size(0), -1)  # Flatten while preserving batch size
#         x = self.relu(self.fc1(x))
#         x = self.fc2(x)
#         return x

# model = MnistConvNet(num_classes=10)

class MnistConvNet(nn.Module):
  
    def __init__(self, num_classes=10):
        super().__init__()
        self.conv1 = nn.Conv2d(in_channels=1, out_channels=32, kernel_size=5, stride=1, padding=1)
        self.bn1 = nn.BatchNorm2d(32)
        self.conv2 = nn.Conv2d(in_channels=32, out_channels=64, kernel_size=5,stride=1, padding=1)
        self.bn2 = nn.BatchNorm2d(64)
        self.maxpool = nn.MaxPool2d(kernel_size=2, stride=2)
        self.fc1 = nn.Linear(64 * 5 * 5, 256)
        self.fc2 = nn.Linear(256, num_classes)

    def forward(self, x):
        x = F.relu(self.bn1(self.conv1(x)))
        x = self.maxpool(x)
        x = F.relu(self.bn2(self.conv2(x)))
        x = self.maxpool(x)
        x = x.view(-1, 64 * 5 * 5)
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return x
    


model = MnistConvNet()

# Print a summary using torchinfo (uncomment for actual output)
summary(model=model, 
        input_size=(64, 1, 28, 28), # make sure this is "input_size", not "input_shape"
#         col_names=["input_size"], # uncomment for smaller output
        col_names=["input_size", "output_size", "num_params", "trainable"],
        col_width=20,
        row_settings=["var_names"],
)

In [None]:
model.to(device)
train_acc_fn = torchmetrics.Accuracy('multiclass', num_classes=10).to(device)
val_acc_fn = torchmetrics.Accuracy('multiclass', num_classes=10).to(device)
optimizer = torch.optim.Adam(params=model.parameters(), lr=lr)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer=optimizer, mode='min', factor=0.1, patience=5)
criterion_fn = nn.CrossEntropyLoss()  # Note: This is a class, not a function

train_losses, val_losses, train_accs, val_accs = [], [], [], []

for epoch in range(epochs):
    model.train()
    train_run_loss = 0
    train_acc_fn.reset()

    for idx, (x, y) in enumerate(train_dl):
        optimizer.zero_grad()
        x, y = x.to(device), y.to(device)
        pred_logit = model(x)
        loss = criterion_fn(pred_logit, y)
        loss.backward()
        optimizer.step()
        
        train_run_loss += loss.item()
        train_acc_fn.update(pred_logit.softmax(dim=1), y)
    
    train_loss = train_run_loss / len(train_dl)
    train_acc = train_acc_fn.compute().item()
    print(f'Epoch {epoch+1}/{epochs} - Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f}')
    train_losses.append(train_loss)
    train_accs.append(train_acc)
    
    model.eval()
    val_run_loss = 0
    val_acc_fn.reset()

    with torch.inference_mode():
        for idx, (x, y) in enumerate(val_dl):
            x, y = x.to(device), y.to(device)
            val_logit = model(x)
            val_loss = criterion_fn(val_logit, y)
            val_run_loss += val_loss.item()
            val_acc_fn.update(val_logit.softmax(dim=1), y)
    
    val_loss = val_run_loss / len(val_dl)
    val_acc = val_acc_fn.compute().item()
    print(f'Epoch {epoch+1}/{epochs} - Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}')
    val_losses.append(val_loss)
    val_accs.append(val_acc)
    
    scheduler.step(val_loss)  # Use test loss for LR scheduling

print("Training completed.")

In [None]:
import numpy as np
test_pred = [] 
test_df = pd.read_csv('/kaggle/input/digit-recognizer/test.csv')
submit_df = pd.read_csv('/kaggle/input/digit-recognizer/sample_submission.csv')
model.eval()
with torch.inference_mode():
    for idx, row in test_df.iterrows():
        image = np.array(row).reshape(28,28)
        image = test_transform_pipeline(image=image)['image']
        image = image.unsqueeze(0).to(device)
        pred = model(image)
        pred_class = pred.argmax(dim=1).item()
        
        test_pred.append(pred_class)

# Save predictions to submit_df
submit_df['Label'] = test_pred

# Save the submission file
submit_df.to_csv('submission.csv', index=False)
print("Predictions saved to submission.csv")

In [None]:
submit_df