### Necessary Imports

In [1]:
import kagglehub
import os
import pandas as pd
import torch
from torch.utils.data import Dataset, DataLoader
from typing import Optional, Callable
from torch import Tensor, nn, optim
from torch.nn import functional as F
from torchinfo import summary
from tqdm.auto import tqdm
from sklearn.neighbors import KNeighborsClassifier
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
from sklearn.metrics import accuracy_score

### Dataset Download

In [2]:
path = kagglehub.dataset_download("oddrationale/mnist-in-csv")
print("Path to dataset files:", path)

Path to dataset files: /home/abdelnour/.cache/kagglehub/datasets/oddrationale/mnist-in-csv/versions/2


In [3]:
%ls $path

mnist_test.csv  mnist_train.csv


In [4]:
train_path = os.path.join(path, "mnist_train.csv")
test_path = os.path.join(path, "mnist_test.csv")

### Custom Dataset

In [5]:
class DataFrameDataset(Dataset):

    def __init__(self, 
        path : str,
        target : str,
        target_transform : Optional[Callable] = None,
        transform : Optional[Callable] = None
    ) -> None:
        super().__init__() 

        self.path = path
        self.data = pd.read_csv(path)

        self.target = target
        self.target_transform = target_transform
        self.transform = transform

    def __len__(self) -> int:
        return len(self.data)
    
    def __getitem__(self, idx : int) -> dict:

        row = self.data.iloc[idx]

        x = row.drop(self.target).values
        y = row[self.target]

        if self.transform:
            x = self.transform(x)
        
        if self.target_transform:
            y = self.target_transform(y)

        return {
            'x': x,
            'y': y
        }

### Preprocessing

In [6]:
def process(x : Tensor) -> Tensor:
    
    x = torch.tensor(x, dtype=torch.float32)
    x = x.view(1, 28, 28)
    x = F.pad(x, (2, 2, 2, 2), value=0)
    x = x.div(255.0)

    return x

In [7]:
dtrain = DataFrameDataset(path=train_path, target='label', transform=process)
dtest = DataFrameDataset(path=test_path, target='label', transform=process)

In [8]:
sample = dtrain[0]
print(sample['x'].shape, sample['y'])

torch.Size([1, 32, 32]) 5


### Model

In [9]:
class CNNBlock(nn.Module):

    def __init__(self, in_channels : int, out_channels : int) -> None:
        super().__init__()

        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1)
        self.relu1 = nn.ReLU()

        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1)
        self.relu2 = nn.ReLU()

        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)

    def forward(self, x : Tensor) -> Tensor:

        x = self.conv1(x)
        x = self.relu1(x)

        x = self.conv2(x)
        x = self.relu2(x)

        x = self.pool(x)

        return x

In [10]:
class Model(nn.Module):

    def __init__(self) -> None:
        super().__init__()

        self.features = nn.ModuleList()

        # 1x32x32 -> 8x16x16 -> 16x8x8 -> 32x4x4 -> 64x2x2 -> 128x1x1

        for i in range(5):

            in_channels = 1 if i == 0 else 2 ** (i + 2)
            out_channels = 2 ** (i + 3)

            self.features.append(CNNBlock(in_channels, out_channels))

        self.flatten = nn.Flatten()

        self.fc = nn.Linear(128, 128)

    def forward(self, x : Tensor) -> Tensor:
            
        for layer in self.features:
            x = layer(x)
    
        x = self.flatten(x)

        x = self.fc(x)
    
        return x

In [11]:
summary(Model(), input_size=(1, 1, 32, 32))

Layer (type:depth-idx)                   Output Shape              Param #
Model                                    [1, 128]                  --
├─ModuleList: 1-1                        --                        --
│    └─CNNBlock: 2-1                     [1, 8, 16, 16]            --
│    │    └─Conv2d: 3-1                  [1, 8, 32, 32]            80
│    │    └─ReLU: 3-2                    [1, 8, 32, 32]            --
│    │    └─Conv2d: 3-3                  [1, 8, 32, 32]            584
│    │    └─ReLU: 3-4                    [1, 8, 32, 32]            --
│    │    └─MaxPool2d: 3-5               [1, 8, 16, 16]            --
│    └─CNNBlock: 2-2                     [1, 16, 8, 8]             --
│    │    └─Conv2d: 3-6                  [1, 16, 16, 16]           1,168
│    │    └─ReLU: 3-7                    [1, 16, 16, 16]           --
│    │    └─Conv2d: 3-8                  [1, 16, 16, 16]           2,320
│    │    └─ReLU: 3-9                    [1, 16, 16, 16]           --
│    │  

### Training

In [12]:
def cos_sim(x : Tensor) -> Tensor:
    x = x.view(x.size(0), -1)
    x = F.normalize(x, p=2, dim=1)
    x = x @ x.T
    return x

In [13]:
def loss_fn(embeddings : Tensor, inputs : Tensor) -> Tensor:
    d1 = cos_sim(embeddings)
    d2 = cos_sim(inputs)
    loss = torch.pow(d1 - d2, 2).mean()
    return loss

In [14]:
def train(
    lr : float,
    batch_size : int,
    epochs : int,
    dtrain : DataFrameDataset,
    dtest : DataFrameDataset,
    device : torch.device
) -> Model:
    
    model = Model().to(device)
    optimizer = optim.Adam(model.parameters(), lr=lr)

    loaders = {
        'train': DataLoader(dtrain, batch_size=batch_size, shuffle=True),
        'test': DataLoader(dtest, batch_size=batch_size, shuffle=False)
    }

    for _ in range(epochs):

        for phase in ["train", "test"]:

            is_training = phase == "train"
            model.train(is_training)
            loader = loaders[phase]

            running_loss = 0.0

            for batch in tqdm(loader, desc=phase):

                x = batch['x'].to(device)

                with torch.set_grad_enabled(is_training):

                    y_pred = model(x)
                    loss = loss_fn(y_pred, x)

                    running_loss += loss.item() * x.size(0)
                    
                    if is_training:
                        optimizer.zero_grad()
                        loss.backward()
                        optimizer.step()

            epoch_loss = running_loss / len(loader.dataset)

            print(f"{phase} Loss: {epoch_loss:.4f}")

    return model

In [15]:
classifier = train(
    lr=0.001,
    batch_size=64,
    epochs=10,
    dtrain=dtrain,
    dtest=dtest,
    device=torch.device("cuda" if torch.cuda.is_available() else "cpu")
)

train:   0%|          | 0/938 [00:00<?, ?it/s]

train Loss: 0.0493


test:   0%|          | 0/157 [00:00<?, ?it/s]

test Loss: 0.0101


train:   0%|          | 0/938 [00:00<?, ?it/s]

train Loss: 0.0075


test:   0%|          | 0/157 [00:00<?, ?it/s]

test Loss: 0.0064


train:   0%|          | 0/938 [00:00<?, ?it/s]

train Loss: 0.0049


test:   0%|          | 0/157 [00:00<?, ?it/s]

test Loss: 0.0046


train:   0%|          | 0/938 [00:00<?, ?it/s]

train Loss: 0.0040


test:   0%|          | 0/157 [00:00<?, ?it/s]

test Loss: 0.0039


train:   0%|          | 0/938 [00:00<?, ?it/s]

train Loss: 0.0034


test:   0%|          | 0/157 [00:00<?, ?it/s]

test Loss: 0.0035


train:   0%|          | 0/938 [00:00<?, ?it/s]

train Loss: 0.0029


test:   0%|          | 0/157 [00:00<?, ?it/s]

test Loss: 0.0028


train:   0%|          | 0/938 [00:00<?, ?it/s]

train Loss: 0.0026


test:   0%|          | 0/157 [00:00<?, ?it/s]

test Loss: 0.0025


train:   0%|          | 0/938 [00:00<?, ?it/s]

train Loss: 0.0023


test:   0%|          | 0/157 [00:00<?, ?it/s]

test Loss: 0.0022


train:   0%|          | 0/938 [00:00<?, ?it/s]

train Loss: 0.0021


test:   0%|          | 0/157 [00:00<?, ?it/s]

test Loss: 0.0020


train:   0%|          | 0/938 [00:00<?, ?it/s]

train Loss: 0.0020


test:   0%|          | 0/157 [00:00<?, ?it/s]

test Loss: 0.0020


### Inference

In [16]:
def inference(
    model : Model,
    data : DataFrameDataset,
    batch_size : int,
    device : torch.device
) -> Tensor:

    model.eval()

    loader = DataLoader(data, batch_size=batch_size, shuffle=False)
    embeddings = []

    for batch in tqdm(loader, desc="Inference"):

        x = batch['x'].to(device)

        with torch.no_grad():
            y_pred = model(x)

        embeddings.append(y_pred)

    return torch.cat(embeddings)

In [17]:
train_embeddings = inference(
    model=classifier,
    data=dtrain,
    batch_size=64,
    device=torch.device("cuda" if torch.cuda.is_available() else "cpu")
)

Inference:   0%|          | 0/938 [00:00<?, ?it/s]

In [18]:
test_embeddings = inference(
    model=classifier,
    data=dtest,
    batch_size=64,
    device=torch.device("cuda" if torch.cuda.is_available() else "cpu")
)

Inference:   0%|          | 0/157 [00:00<?, ?it/s]

In [19]:
knn = Pipeline([
    ('scaler', StandardScaler()),
    ('knn', KNeighborsClassifier(n_neighbors=5))
])

In [20]:
knn.fit(train_embeddings.cpu().numpy(), dtrain.data['label'])

In [21]:
y_test_hat = knn.predict(test_embeddings.cpu().numpy())

In [22]:
acc  = accuracy_score(dtest.data['label'], y_test_hat)
print(f"Accuracy: {100 * acc:.2f}%")

Accuracy: 96.18%


### Save the embeddings

In [23]:
torch.save(classifier.state_dict(), "mnist_classifier.pth")

In [24]:
p = torch.randperm(len(test_embeddings)).tolist()
test_embeddings_df = pd.DataFrame(
    data=test_embeddings[p,:].cpu().numpy(),
    columns=[f"x_{i}" for i in range(test_embeddings.size(1))]
)
test_labels = pd.Series(p, name='label')

test_embeddings_df.to_csv("test_embeddings.csv", index=False)
test_labels.to_csv("test_labels.csv", index=False)

In [25]:
dtest.data.to_csv("test_data.csv", index=False)

In [26]:
N = 128
indices = torch.randperm(len(dtrain.data)).tolist()[:N]
dtrain.data.iloc[indices].to_csv("train_data.csv", index=False)

In [27]:
train_embeddings = train_embeddings[indices]
p = torch.randperm(len(train_embeddings)).tolist()
train_embeddings_df = pd.DataFrame(
    data=train_embeddings[p,:].cpu().numpy(),
    columns=[f"x_{i}" for i in range(train_embeddings.size(1))]
)
train_labels = pd.Series(p, name='label')

train_embeddings_df.to_csv("train_embeddings.csv", index=False)
train_labels.to_csv("train_labels.csv", index=False)