In [None]:
!pip install skorch



In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
from torch.utils.data import Dataset, DataLoader, random_split
from skorch import NeuralNetClassifier
from skorch.dataset import Dataset
from sklearn.model_selection import GridSearchCV

In [None]:
## Necessary Parameters

batch_size = 512
epochs = 12
learning_rate = 1e-4

In [None]:
transform = torchvision.transforms.Compose([torchvision.transforms.ToTensor()])

train_dataset = torchvision.datasets.MNIST(
    root="~/torch_datasets", train=True, transform=transform, download=True
)

train_loader = DataLoader(
    train_dataset, batch_size=batch_size, shuffle=True
)

test_dataset = torchvision.datasets.MNIST(
    root="~/torch_datasets", train=False, transform=transform, download=True
)

test_loader = DataLoader(
    test_dataset, batch_size=batch_size, shuffle=False
)

In [None]:
def noise_input(images, NOISE_RATIO):
    return images * (1 - NOISE_RATIO) + torch.rand(images.size()) * NOISE_RATIO

In [None]:
class noisyMNISTDataset(Dataset):
    def __init__(self, dataset, NOISE_RATIO):
        self.dataset = dataset
        self.NOISE_RATIO = NOISE_RATIO

    def __len__(self):
        return len(self.dataset)

    def __getitem__(self, index):
        img, label = self.dataset[index]
        noisy_img = noise_input(img, NOISE_RATIO=self.NOISE_RATIO)
        noisy_img, img = noisy_img.view(-1), img.view(-1)
        return noisy_img, label, img

noisy_train_dataset = noisyMNISTDataset(train_dataset, NOISE_RATIO=0.25)
train_loader = torch.utils.data.DataLoader(
    noisy_train_dataset, batch_size=batch_size, shuffle=True
)

In [None]:
class denoisingAutoEncoder(nn.Module):
  def __init__(self, input_dim):
    super(denoisingAutoEncoder, self).__init__()

    self.encoder = nn.Sequential(
        nn.Linear(input_dim, int(input_dim*0.5)),
        nn.ReLU(),
        nn.Dropout(0.2),
        nn.Linear(int(input_dim * 0.5), int(input_dim * 0.25)),
        nn.ReLU(),
        nn.Dropout(0.2)
    )

    self.decoder = nn.Sequential(
        nn.Linear(int(input_dim * 0.25), int(input_dim * 0.5)),
        nn.ReLU(),
        nn.Dropout(0.2),
        nn.Linear(int(0.5 * input_dim), int(input_dim)),
        nn.ReLU(),
    )

  def forward(self, x):
    x = self.encoder(x)
    x = self.decoder(x)
    return x

denoisingModel = denoisingAutoEncoder(input_dim = 784)
optimizer = optim.Adam(denoisingModel.parameters(), lr=1e-3)
loss_fn = nn.MSELoss()

In [None]:
def training_loop(dataloader, model, loss_fn, optimizer):
  size = len(dataloader.dataset)
  total_loss = 0
  num_batches = len(dataloader)

  model.train()
  for batch, (X, y, Xhat) in enumerate(dataloader):
    X, y, Xhat = X, y, Xhat # X : Noisy_Image, y : label, Xhat : Image

    X = model(X)
    loss = loss_fn(X, Xhat)

    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

    total_loss += loss.item()

  avg_loss = total_loss / num_batches
  print(f"Avg Loss over epoch: {avg_loss:>.7f}")

  return avg_loss

train_losses = []

for t in range(epochs):
    train_losses.append(training_loop(train_loader, denoisingModel, loss_fn, optimizer))
print("Done!")

Avg Loss over epoch: 0.0492789
Avg Loss over epoch: 0.0289018
Avg Loss over epoch: 0.0245378
Avg Loss over epoch: 0.0223791
Avg Loss over epoch: 0.0210635
Avg Loss over epoch: 0.0202625
Avg Loss over epoch: 0.0196256
Avg Loss over epoch: 0.0191718
Avg Loss over epoch: 0.0187599
Avg Loss over epoch: 0.0184734
Avg Loss over epoch: 0.0182114
Avg Loss over epoch: 0.0179731
Done!


In [None]:
# Define the noisy test dataset and DataLoader
noisy_test_dataset = noisyMNISTDataset(test_dataset, NOISE_RATIO=0.25)
test_loader = torch.utils.data.DataLoader(
    noisy_test_dataset, batch_size=batch_size, shuffle=False
)

def extract_features(model, dataloader):
    model.eval()
    features = []
    labels = []

    with torch.no_grad():
        for X, y, Xhat in dataloader:
            encoded_features = model.encoder(X)
            features.append(encoded_features)
            labels.append(y)

    return torch.cat(features), torch.cat(labels)

# Extract features for training and testing datasets
train_features, train_labels = extract_features(denoisingModel, train_loader)
test_features, test_labels = extract_features(denoisingModel, test_loader)

# Print feature shapes
print("Train feature shape:", train_features.shape)  # Expected: [60000, latent_dim]
print("Train label shape:", train_labels.shape)      # Expected: [60000]

Train feature shape: torch.Size([60000, 196])
Train label shape: torch.Size([60000])


### Training MLP

In [None]:
class MLP(nn.Module):
  def __init__(self, input_dim=196, output_l1_dim=98, dropout_ratio=0.2):
    super().__init__()
    self.classifier = nn.Sequential(
        nn.Linear(input_dim, output_l1_dim),
        nn.ReLU(),
        nn.Dropout(dropout_ratio),
        nn.Linear(output_l1_dim, 10),
        nn.Softmax(dim=1)
    )

  def forward(self, x):
    return self.classifier(x)

classifierModel = MLP()
optimizer = optim.Adam(classifierModel.parameters(), lr=1e-3)
loss_fn = nn.CrossEntropyLoss()

In [None]:
def train_mlp(model, dataloader, loss_fn, optimizer):
    model.train()
    total_loss = 0
    correct = 0
    total_samples = 0

    for X, y in dataloader:
        # Forward pass
        preds = model(X)
        loss = loss_fn(preds, y)

        # Backward pass
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        # Metrics tracking
        total_loss += loss.item()
        _, predicted_classes = torch.max(preds, dim=1)
        correct += (predicted_classes == y).sum().item()
        total_samples += y.size(0)

    accuracy = correct / total_samples
    avg_loss = total_loss / len(dataloader)
    print(f"Train Loss: {avg_loss:.4f}, Train Accuracy: {accuracy:.4f}")
    return avg_loss, accuracy

In [None]:
def evaluate_mlp(model, dataloader, loss_fn):
    model.eval()
    total_loss = 0
    correct = 0
    total_samples = 0

    with torch.no_grad():
        for X, y in dataloader:
            preds = model(X)
            loss = loss_fn(preds, y)

            total_loss += loss.item()
            _, predicted_classes = torch.max(preds, dim=1)
            correct += (predicted_classes == y).sum().item()
            total_samples += y.size(0)

    accuracy = correct / total_samples
    avg_loss = total_loss / len(dataloader)
    print(f"Test Loss: {avg_loss:.4f}, Test Accuracy: {accuracy:.4f}")
    return avg_loss, accuracy

In [None]:
batch_size = 64

# Wrap features and labels into TensorDataset
train_dataset = torch.utils.data.TensorDataset(train_features, train_labels)
test_dataset = torch.utils.data.TensorDataset(test_features, test_labels)

# Create DataLoader
train_loader_classification = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader_classification = torch.utils.data.DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

In [None]:
epochs = 10
train_losses = []
train_accuracies = []
test_losses = []
test_accuracies = []

for epoch in range(epochs):
    print(f"Epoch {epoch + 1}/{epochs}")
    train_loss, train_acc = train_mlp(classifierModel, train_loader_classification, loss_fn, optimizer)
    test_loss, test_acc = evaluate_mlp(classifierModel, test_loader_classification, loss_fn)

    # Log metrics
    train_losses.append(train_loss)
    train_accuracies.append(train_acc)
    test_losses.append(test_loss)
    test_accuracies.append(test_acc)

Epoch 1/10
Train Loss: 1.6698, Train Accuracy: 0.8445
Test Loss: 1.5488, Test Accuracy: 0.9276
Epoch 2/10
Train Loss: 1.5543, Train Accuracy: 0.9192
Test Loss: 1.5304, Test Accuracy: 0.9395
Epoch 3/10
Train Loss: 1.5365, Train Accuracy: 0.9338
Test Loss: 1.5198, Test Accuracy: 0.9477
Epoch 4/10
Train Loss: 1.5261, Train Accuracy: 0.9433
Test Loss: 1.5139, Test Accuracy: 0.9515
Epoch 5/10
Train Loss: 1.5183, Train Accuracy: 0.9495
Test Loss: 1.5091, Test Accuracy: 0.9571
Epoch 6/10
Train Loss: 1.5139, Train Accuracy: 0.9531
Test Loss: 1.5035, Test Accuracy: 0.9627
Epoch 7/10
Train Loss: 1.5092, Train Accuracy: 0.9575
Test Loss: 1.5024, Test Accuracy: 0.9625
Epoch 8/10
Train Loss: 1.5067, Train Accuracy: 0.9597
Test Loss: 1.4979, Test Accuracy: 0.9667
Epoch 9/10
Train Loss: 1.5030, Train Accuracy: 0.9627
Test Loss: 1.4979, Test Accuracy: 0.9661
Epoch 10/10
Train Loss: 1.5008, Train Accuracy: 0.9647
Test Loss: 1.4965, Test Accuracy: 0.9681


## HyperParameter Tuning

In [None]:
import numpy as np
from skorch import NeuralNetClassifier
from sklearn.model_selection import GridSearchCV
from sklearn.metrics import accuracy_score

class MLP(nn.Module):
    def __init__(self, input_dim=196, output_l1_dim=98, dropout_ratio=0.2):
        super().__init__()
        self.classifier = nn.Sequential(
            nn.Linear(input_dim, output_l1_dim),
            nn.ReLU(),
            nn.Dropout(dropout_ratio),
            nn.Linear(output_l1_dim, 10),
            nn.Softmax(dim=1)
        )

    def forward(self, x):
        return self.classifier(x)

net = NeuralNetClassifier(
    MLP,
    module__input_dim=196,
    criterion=nn.CrossEntropyLoss,
    optimizer=optim.Adam,
    max_epochs=10,
    lr=0.001,
    batch_size=64,
    device='cuda' if torch.cuda.is_available() else 'cpu'
)

param_grid = {
    'module__output_l1_dim': [64, 128, 196],
    'module__dropout_ratio': [0.2, 0.4, 0.6],
    'lr': [1e-3, 1e-4, 5e-4],
    'max_epochs': [10, 20, 30]
}

grid_search = GridSearchCV(
    estimator=net,
    param_grid=param_grid,
    cv=3,
    scoring='accuracy',
    verbose=2,
    n_jobs=-1
)

train_data = torch.utils.data.TensorDataset(train_features, train_labels)
test_data = torch.utils.data.TensorDataset(test_features, test_labels)

grid_search.fit(train_features.numpy(), train_labels.numpy())

print("Best Hyperparameters:", grid_search.best_params_)
print("Best Cross-Validation Accuracy:", grid_search.best_score_)

best_model = grid_search.best_estimator_
test_preds = best_model.predict(test_features.numpy())
test_accuracy = accuracy_score(test_labels.numpy(), test_preds)
print("Test Accuracy with Best Hyperparameters:", test_accuracy)

Fitting 3 folds for each of 81 candidates, totalling 243 fits
  epoch    train_loss    valid_acc    valid_loss     dur
-------  ------------  -----------  ------------  ------
      1        [36m1.6722[0m       [32m0.9141[0m        [35m1.5603[0m  2.1313
      2        [36m1.5460[0m       [32m0.9298[0m        [35m1.5387[0m  1.9811
      3        [36m1.5295[0m       [32m0.9407[0m        [35m1.5270[0m  1.9977
      4        [36m1.5195[0m       [32m0.9497[0m        [35m1.5182[0m  1.9564
      5        [36m1.5122[0m       [32m0.9558[0m        [35m1.5120[0m  2.0165
      6        [36m1.5073[0m       [32m0.9586[0m        [35m1.5080[0m  2.9806
      7        [36m1.5035[0m       [32m0.9597[0m        [35m1.5061[0m  2.6102
      8        [36m1.5004[0m       [32m0.9611[0m        [35m1.5028[0m  2.0243
      9        [36m1.4974[0m       [32m0.9623[0m        [35m1.5014[0m  1.9977
     10        [36m1.4950[0m       [32m0.9641[0m        [35m1.4