In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, random_split
from torchvision import datasets, transforms
import pandas as pd
import os

In [2]:
# mnist loader :)
transform = transforms.Compose([transforms.ToTensor()])
data = datasets.MNIST(root="./data", train=True, download=True, transform=transform)
train_size = int(0.5 * len(data))
val_size = len(data) - train_size
train_data, val_data = random_split(data, [train_size, val_size])
train_loader = DataLoader(train_data, batch_size=64, shuffle=True)
val_loader = DataLoader(val_data, batch_size=64, shuffle=False)

# save validation samples and labels :D
os.makedirs("MNIST", exist_ok=True)
val_samples = [(img.numpy().squeeze(), label) for img, label in val_data]
val_df = pd.DataFrame(val_samples, columns=["image", "label"])
val_df.to_csv("MNIST/held_out.csv", index=False)

# vanilla autoencoder model :D
class Autoencoder(nn.Module):
    def __init__(self):
        super().__init__()
        self.encoder = nn.Sequential(
            nn.Linear(28 * 28, 128),
            nn.ReLU(),
            nn.Linear(128, 64),
            nn.ReLU(),
            nn.Linear(64, 8)  # bottleneck layer
        )
        self.decoder = nn.Sequential(
            nn.Linear(8, 64),
            nn.ReLU(),
            nn.Linear(64, 128),
            nn.ReLU(),
            nn.Linear(128, 28 * 28),
            nn.Sigmoid()
        )

    def forward(self, x):
        x = x.view(x.size(0), -1)  # flatten
        x = self.encoder(x)
        x = self.decoder(x)
        x = x.view(x.size(0), 1, 28, 28)  # reshape to image
        return x
    
# early stopping helper!
class EarlyStopping:
    def __init__(self, patience=5):
        self.patience = patience
        self.counter = 0
        self.best_loss = None
        self.early_stop = False

    def __call__(self, val_loss):
        if self.best_loss is None or val_loss < self.best_loss:
            self.best_loss = val_loss
            self.counter = 0
        else:
            self.counter += 1
            if self.counter >= self.patience:
                self.early_stop = True

In [3]:
# training and validation loop :)
model = Autoencoder().cuda()
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=1e-4)

# early stopping setup
early_stopping = EarlyStopping(patience=5)

os.makedirs("MNIST", exist_ok=True)  # ensure directory exists

for epoch in range(50):
    # training
    model.train()
    train_loss = 0.0
    for imgs, _ in train_loader:
        imgs = imgs.to('cuda')
        optimizer.zero_grad()
        outputs = model(imgs)
        loss = criterion(outputs, imgs)
        loss.backward()
        optimizer.step()
        train_loss += loss.item()

    train_loss /= len(train_loader)

    # validation
    model.eval()
    val_loss = 0.0
    embeddings = []
    with torch.no_grad():
        for imgs, labels in val_loader:
            imgs = imgs.to('cuda')  # move to GPU
            outputs = model(imgs)
            loss = criterion(outputs, imgs)
            val_loss += loss.item()

            # save embeddings
            encoded_imgs = model.encoder(imgs).cpu().numpy()  # no manual flattening
            for emb, label in zip(encoded_imgs, labels.numpy()):
                embeddings.append((label, *emb))

    val_loss /= len(val_loader)

    # save embeddings to file
    embeddings_df = pd.DataFrame(embeddings)
    embeddings_df.to_csv(f"MNIST/AE_epoch{epoch + 1}.csv", index=False, header=False)

    print(f"epoch {epoch + 1}, train loss: {train_loss:.4f}, val loss: {val_loss:.4f}")

    # early stopping check
    early_stopping(val_loss)
    if early_stopping.early_stop:
        print("early stopping triggered! :)")
        break

print("training done! 🎉")


RuntimeError: mat1 and mat2 shapes cannot be multiplied (1792x28 and 784x128)