# **Loading Data**

In [1]:
import numpy as np
import pandas as pd

train_data = pd.read_csv('/kaggle/input/digit-recognizer/train.csv')
test_data = pd.read_csv('/kaggle/input/digit-recognizer/test.csv')
submission = pd.read_csv('/kaggle/input/digit-recognizer/sample_submission.csv')
train_data.shape

(42000, 785)

In [2]:
X = train_data.drop(['label'], axis=1)
y = train_data['label']
X_test = test_data

# **CNN Using PyTorch**

In [3]:
import math
import random
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader, random_split

X = X.values.astype(np.float32) / 255.0
y = y.values.astype(np.int64)
X_test = X_test.values.astype(np.float32) / 255.0

In [4]:
# --- config (lowercase as requested) ---
batch_size = 128
epochs = 12
lr = 1e-3
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
rnd = 42

In [5]:
# --- helpers: reshape to image ---
def to_image(np_arr):
    return np_arr.reshape(-1, 1, 28, 28)

# --- dataset ---
class ImgDataset(Dataset):
    def __init__(self, X, y=None, train=True):
        self.X = torch.from_numpy(to_image(X)).float()
        self.y = torch.from_numpy(y).long() if y is not None else None
        self.train = train
        # no augmentation requested
        self.transform = lambda x: x
    def __len__(self):
        return len(self.X)
    def __getitem__(self, idx):
        if self.y is None:
            return self.X[idx]
        return self.X[idx], self.y[idx]

full_ds = ImgDataset(X, y)
val_size = int(0.1 * len(full_ds))
train_size = len(full_ds) - val_size
train_ds, val_ds = random_split(full_ds, [train_size, val_size],
                                generator=torch.Generator().manual_seed(rnd))
train_ds.dataset.train = True
val_ds.dataset.train = False

train_loader = DataLoader(train_ds, batch_size=batch_size, shuffle=True, num_workers=2, pin_memory=True)
val_loader   = DataLoader(val_ds, batch_size=batch_size, shuffle=False, num_workers=2, pin_memory=True)
test_ds = ImgDataset(X_test, y=None)
test_loader = DataLoader(test_ds, batch_size=1024, shuffle=False, num_workers=2, pin_memory=True)

In [6]:
# --- custom conv and maxpool implemented via unfold ---
class MyConv2d(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size, stride=1, padding=0, bias=True):
        super().__init__()
        if isinstance(kernel_size, int):
            kh = kw = kernel_size
        else:
            kh, kw = kernel_size
        self.in_channels = in_channels
        self.out_channels = out_channels
        self.kernel_size = (kh, kw)
        self.stride = stride
        self.padding = padding
        weight_shape = (out_channels, in_channels, kh, kw)
        self.weight = nn.Parameter(torch.randn(weight_shape) * math.sqrt(2.0 / (in_channels * kh * kw)))
        if bias:
            self.bias = nn.Parameter(torch.zeros(out_channels))
        else:
            self.register_parameter('bias', None)
    def forward(self, x):
        B, C, H, W = x.shape
        kh, kw = self.kernel_size
        patches = F.unfold(x, kernel_size=(kh, kw), padding=self.padding, stride=self.stride)  # (B, K, L) where K=C*kh*kw
        K = patches.shape[1]
        w = self.weight.view(self.out_channels, -1)  # (out, K)
        out = torch.matmul(w.unsqueeze(0).expand(B, -1, -1), patches)  # (B, out, L)
        if self.bias is not None:
            out = out + self.bias.view(1, -1, 1)
        H_out = (H + 2*self.padding - kh) // self.stride + 1
        W_out = (W + 2*self.padding - kw) // self.stride + 1
        return out.view(B, self.out_channels, H_out, W_out)

class MyMaxPool2d(nn.Module):
    def __init__(self, kernel_size, stride=None, padding=0):
        super().__init__()
        if isinstance(kernel_size, int):
            kh = kw = kernel_size
        else:
            kh, kw = kernel_size
        self.kernel_size = (kh, kw)
        self.stride = stride if (stride is not None) else kernel_size
        self.padding = padding
    def forward(self, x):
        B, C, H, W = x.shape
        kh, kw = self.kernel_size
        s = self.stride
        patches = F.unfold(x, kernel_size=(kh, kw), padding=self.padding, stride=self.stride)  # (B, C*kh*kw, L)
        patches = patches.view(B, C, kh*kw, -1)
        pooled, _ = patches.max(dim=2)  # (B, C, L)
        H_out = (H + 2*self.padding - kh) // s + 1
        W_out = (W + 2*self.padding - kw) // s + 1
        return pooled.view(B, C, H_out, W_out)

In [7]:
# --- model using custom layers ---
class CustomCNN(nn.Module):
    def __init__(self):
        super().__init__()
        self.features = nn.Sequential(
            MyConv2d(1, 32, 3, padding=1, bias=False),
            nn.BatchNorm2d(32),
            nn.ReLU(inplace=True),
            MyConv2d(32, 32, 3, padding=1, bias=False),
            nn.BatchNorm2d(32),
            nn.ReLU(inplace=True),
            MyMaxPool2d(2),

            MyConv2d(32, 64, 3, padding=1, bias=False),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True),
            MyConv2d(64, 64, 3, padding=1, bias=False),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True),
            MyMaxPool2d(2),

            MyConv2d(64, 128, 3, padding=1, bias=False),
            nn.BatchNorm2d(128),
            nn.ReLU(inplace=True),
            MyMaxPool2d(2),
            nn.AdaptiveAvgPool2d(1)
        )
        self.classifier = nn.Sequential(
            nn.Flatten(),
            nn.Linear(128, 128, bias=False),
            nn.BatchNorm1d(128),
            nn.ReLU(inplace=True),
            nn.Dropout(0.3),
            nn.Linear(128, 10)
        )
    def forward(self, x):
        x = self.features(x)
        x = self.classifier(x)
        return x

In [8]:
# --- seed & model setup ---
random.seed(rnd)
np.random.seed(rnd)
torch.manual_seed(rnd)
if torch.cuda.is_available():
    torch.cuda.manual_seed_all(rnd)

model = CustomCNN().to(device)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=lr, weight_decay=1e-4)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, factor=0.5, patience=3, min_lr=1e-6)

In [9]:
# --- training loop (no function wrapper) ---
best_val = float('inf')
best_state = None
for epoch in range(epochs):
    model.train()
    tr_loss = 0.0
    tr_correct = 0
    tr_total = 0
    for xb, yb in train_loader:
        xb = xb.to(device)
        yb = yb.to(device)
        optimizer.zero_grad()
        logits = model(xb)
        loss = criterion(logits, yb)
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), 5.0)
        optimizer.step()
        tr_loss += loss.item() * xb.size(0)
        preds = logits.argmax(dim=1)
        tr_correct += (preds == yb).sum().item()
        tr_total += xb.size(0)

    model.eval()
    val_loss = 0.0
    val_correct = 0
    val_total = 0
    with torch.no_grad():
        for xb, yb in val_loader:
            xb = xb.to(device)
            yb = yb.to(device)
            logits = model(xb)
            loss = criterion(logits, yb)
            val_loss += loss.item() * xb.size(0)
            preds = logits.argmax(dim=1)
            val_correct += (preds == yb).sum().item()
            val_total += xb.size(0)

    tr_loss /= tr_total
    val_loss /= val_total
    scheduler.step(val_loss)
    print(f"Epoch {epoch:02d}  tr_loss={tr_loss:.4f} tr_acc={tr_correct/tr_total:.4f}  val_loss={val_loss:.4f} val_acc={val_correct/val_total:.4f}")

    if val_loss < best_val - 1e-6:
        best_val = val_loss
        best_state = {k: v.cpu().clone() for k, v in model.state_dict().items()}

Epoch 00  tr_loss=0.2529 tr_acc=0.9468  val_loss=0.0993 val_acc=0.9750
Epoch 01  tr_loss=0.0464 tr_acc=0.9882  val_loss=0.0713 val_acc=0.9781
Epoch 02  tr_loss=0.0296 tr_acc=0.9923  val_loss=0.0558 val_acc=0.9826
Epoch 03  tr_loss=0.0225 tr_acc=0.9936  val_loss=0.0693 val_acc=0.9817
Epoch 04  tr_loss=0.0180 tr_acc=0.9947  val_loss=0.0414 val_acc=0.9881
Epoch 05  tr_loss=0.0179 tr_acc=0.9947  val_loss=0.0911 val_acc=0.9736
Epoch 06  tr_loss=0.0140 tr_acc=0.9961  val_loss=0.0672 val_acc=0.9793
Epoch 07  tr_loss=0.0121 tr_acc=0.9963  val_loss=0.0575 val_acc=0.9838
Epoch 08  tr_loss=0.0112 tr_acc=0.9965  val_loss=0.2039 val_acc=0.9398
Epoch 09  tr_loss=0.0055 tr_acc=0.9987  val_loss=0.0325 val_acc=0.9912
Epoch 10  tr_loss=0.0026 tr_acc=0.9996  val_loss=0.0332 val_acc=0.9905
Epoch 11  tr_loss=0.0024 tr_acc=0.9996  val_loss=0.0330 val_acc=0.9902


In [10]:
# --- load best and predict ---
model.load_state_dict(best_state)
model.to(device)
model.eval()
probs_list = []6
with torch.no_grad():
    for xb in test_loader:
        xb = xb.to(device)
        logits = model(xb)
        probs = F.softmax(logits, dim=1)
        probs_list.append(probs.cpu().numpy())
probs_all = np.concatenate(probs_list, axis=0)
preds = probs_all.argmax(axis=1)

In [11]:
submission['Label'] = preds
submission.to_csv('submission_cnn_pytorch.csv', index=False)
# Test Accuracy: 0.99225
print("Saved submission_cnn_pytorch.csv")

Saved submission_cnn_pytorch.csv
