In [1]:
# SSL : Ladder Network

import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import torch.nn.functional as F
from torchvision import datasets, transforms
from torch.utils.data import DataLoader,Subset

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")


# Ladder Network

In [2]:
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.5,), (0.5,))
])

train_dataset = datasets.MNIST(root='./data', train=True, download=True, transform=transform)
test_dataset = datasets.MNIST(root='./data', train=False, download=True, transform=transform)

# create loader : 100 examples across all the classes

num_labels = 100
labels = np.array(train_dataset.targets)
labeled_inx = []

100%|██████████| 9.91M/9.91M [00:00<00:00, 12.8MB/s]
100%|██████████| 28.9k/28.9k [00:00<00:00, 336kB/s]
100%|██████████| 1.65M/1.65M [00:00<00:00, 3.17MB/s]
100%|██████████| 4.54k/4.54k [00:00<00:00, 7.10MB/s]


In [3]:
# -------------------------
# Step 1: Data Preparation
# -------------------------

# Transform: Normalize MNIST images
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.1307,), (0.3081,))
])

# Load full MNIST dataset
train_dataset = datasets.MNIST('./data', train=True, download=True, transform=transform)
test_dataset = datasets.MNIST('./data', train=False, transform=transform)

# Create labeled subset (e.g., 100 examples evenly distributed across classes)
num_labels = 100
labels = np.array(train_dataset.targets)
labeled_idx = []

for i in range(10):
    idx = np.where(labels == i)[0][:num_labels // 10]
    labeled_idx.extend(idx)

unlabeled_idx = list(set(range(len(train_dataset))) - set(labeled_idx))

labeled_dataset = Subset(train_dataset, labeled_idx)
unlabeled_dataset = Subset(train_dataset, unlabeled_idx)

labeled_loader = DataLoader(labeled_dataset, batch_size=64, shuffle=True)
unlabeled_loader = DataLoader(unlabeled_dataset, batch_size=128, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=256, shuffle=False)

In [4]:
# add noise

class GaussianNoise(nn.Module):
    def __init__(self, stddev):
        super().__init__()
        self.stddev = stddev
    def forward(self, x):
        if self.training:
            noise = torch.randn_like(x) * self.stddev
            return x + noise
        else:
            return x

class Encoder(nn.Module):
    def __init__(self, noise_std):
        super().__init__()
        self.noise = GaussianNoise(noise_std)
        self.fc1 = nn.Linear(784, 1000)
        self.fc2 = nn.Linear(1000, 500)
        self.fc3 = nn.Linear(500, 250)
        self.fc4 = nn.Linear(250, 250)
        self.fc5 = nn.Linear(250, 10)

    def forward(self, x):
        z = []
        x = x.view(-1, 784)
        x = self.noise(x)
        z1 = self.fc1(x)
        z.append(z1)
        z2 = self.fc2(F.relu(z1))
        z.append(z2)
        z3 = self.fc3(F.relu(z2))
        z.append(z3)
        z4 = self.fc4(F.relu(z3))
        z.append(z4)
        z5 = self.fc5(F.relu(z4))
        z.append(z5)
        return z

class Decoder(nn.Module):
    def __init__(self):
        super().__init__()
        self.fc1 = nn.Linear(10, 250)    # reconstruct z4 (250)
        self.fc2 = nn.Linear(250, 250)   # reconstruct z3 (250)
        self.fc3 = nn.Linear(250, 500)   # reconstruct z2 (500)
        self.fc4 = nn.Linear(500, 1000)  # reconstruct z1 (1000)

    def forward(self, z_corr):
        d1 = self.fc1(z_corr[-1])              # input: z5 (10)
        d2 = self.fc2(F.relu(d1))              # matches z3
        d3 = self.fc3(F.relu(d2))              # matches z2
        d4 = self.fc4(F.relu(d3))              # matches z1
        return [d4, d3, d2, d1]  # decoder outputs for z1 to z4

In [5]:
# -------------------------
# Step 3: Training Functions
# -------------------------

def supervised_loss(output, target):
    return F.cross_entropy(output, target)

def reconstruction_loss(z_clean, z_recon):
    loss = 0
    # Match z1 to z4 with d4 to d1
    for zc, zr in zip(z_clean[:4], z_recon):  # z_clean[:4] = z1 to z4
        loss += F.mse_loss(zr, zc.detach())
    return loss

def train_epoch(encoder, decoder, optimizer, labeled_loader, unlabeled_loader, alpha):
    encoder.train()
    decoder.train()

    for (x_l, y_l), (x_u, _) in zip(labeled_loader, unlabeled_loader):
        x_l, y_l = x_l.to(device), y_l.to(device)
        x_u = x_u.to(device)

        z_corr_l = encoder(x_l)
        z_clean_l = encoder(x_l)
        z_corr_u = encoder(x_u)
        z_clean_u = encoder(x_u)

        output = z_corr_l[-1]
        loss_sup = supervised_loss(output, y_l)

        recon_l = decoder(z_corr_l)
        recon_u = decoder(z_corr_u)

        loss_unsup = reconstruction_loss(z_clean_l, recon_l) + reconstruction_loss(z_clean_u, recon_u)

        loss = loss_sup + alpha * loss_unsup

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

def evaluate(encoder, loader):
    encoder.eval()
    correct = 0
    with torch.no_grad():
        for x, y in loader:
            x, y = x.to(device), y.to(device)
            output = encoder(x)[-1]
            pred = output.argmax(dim=1)
            correct += (pred == y).sum().item()
    return correct / len(loader.dataset)


In [6]:
# -------------------------
# Step 4: Training Loop
# -------------------------

encoder = Encoder(noise_std=0.3).to(device)
decoder = Decoder().to(device)
optimizer = optim.Adam(list(encoder.parameters()) + list(decoder.parameters()), lr=1e-3)

epochs = 30
alpha = 0.5

for epoch in range(1, epochs + 1):
    train_epoch(encoder, decoder, optimizer, labeled_loader, unlabeled_loader, alpha)
    test_acc = evaluate(encoder, test_loader)
    labeled_acc = evaluate(encoder, labeled_loader)
    unlabeled_acc = evaluate(encoder, DataLoader(unlabeled_dataset, batch_size=256))
    print(f"Epoch {epoch:02d} | Test Acc: {test_acc:.4f} | Labeled Acc: {labeled_acc:.4f} | Unlabeled Pseudo Acc: {unlabeled_acc:.4f}")

Epoch 01 | Test Acc: 0.5021 | Labeled Acc: 0.6900 | Unlabeled Pseudo Acc: 0.5004
Epoch 02 | Test Acc: 0.5071 | Labeled Acc: 0.7600 | Unlabeled Pseudo Acc: 0.5126
Epoch 03 | Test Acc: 0.5318 | Labeled Acc: 0.7500 | Unlabeled Pseudo Acc: 0.5344
Epoch 04 | Test Acc: 0.4756 | Labeled Acc: 0.7500 | Unlabeled Pseudo Acc: 0.4818
Epoch 05 | Test Acc: 0.3654 | Labeled Acc: 0.6900 | Unlabeled Pseudo Acc: 0.3757
Epoch 06 | Test Acc: 0.6824 | Labeled Acc: 0.9100 | Unlabeled Pseudo Acc: 0.6873
Epoch 07 | Test Acc: 0.5987 | Labeled Acc: 0.9600 | Unlabeled Pseudo Acc: 0.6081
Epoch 08 | Test Acc: 0.5500 | Labeled Acc: 0.9400 | Unlabeled Pseudo Acc: 0.5630
Epoch 09 | Test Acc: 0.6050 | Labeled Acc: 0.9600 | Unlabeled Pseudo Acc: 0.6134
Epoch 10 | Test Acc: 0.6733 | Labeled Acc: 0.9700 | Unlabeled Pseudo Acc: 0.6777
Epoch 11 | Test Acc: 0.6852 | Labeled Acc: 0.9900 | Unlabeled Pseudo Acc: 0.6895
Epoch 12 | Test Acc: 0.6938 | Labeled Acc: 0.9900 | Unlabeled Pseudo Acc: 0.6941
Epoch 13 | Test Acc: 0.7092 

# PIE Models

In [7]:
# PI - Network

# -------------------------
# Step 2: Define the Model
# -------------------------

class PiCNN(nn.Module):
    """Simple CNN for Π Model"""
    def __init__(self):
        super().__init__()
        self.conv1 = nn.Conv2d(1, 32, 3, 1)
        self.conv2 = nn.Conv2d(32, 64, 3, 1)
        self.dropout = nn.Dropout(0.5)
        self.fc1 = nn.Linear(9216, 128)
        self.fc2 = nn.Linear(128, 10)

    def forward(self, x):
        x = F.relu(self.conv1(x))        # Conv layer 1
        x = F.relu(self.conv2(x))        # Conv layer 2
        x = F.max_pool2d(x, 2)           # Max pooling
        x = torch.flatten(x, 1)          # Flatten to [batch, features]
        x = self.dropout(x)              # Apply dropout
        x = F.relu(self.fc1(x))          # Fully connected
        x = self.fc2(x)                  # Output logits
        return x


In [8]:
# -------------------------
# Step 3: Training Utilities
# -------------------------

def add_noise(x, noise_std=0.15):
    """Adds Gaussian noise to an input tensor (augmentation)"""
    return x + torch.randn_like(x) * noise_std

def train_pi_model(model, optimizer, labeled_loader, unlabeled_loader, alpha):
    model.train()
    for (x_l, y_l), (x_u, _) in zip(labeled_loader, unlabeled_loader):
        x_l, y_l = x_l.to(device), y_l.to(device)
        x_u = x_u.to(device)

        # Supervised loss on labeled data (with dropout)
        logits_l = model(x_l)
        loss_sup = F.cross_entropy(logits_l, y_l)

        # Consistency loss on unlabeled data
        # Pass same input twice with different dropout masks and augmentations
        x_u1 = add_noise(x_u)
        x_u2 = add_noise(x_u)

        logits_u1 = model(x_u1)
        logits_u2 = model(x_u2)

        probs_u1 = F.softmax(logits_u1, dim=1)
        probs_u2 = F.softmax(logits_u2, dim=1)

        loss_unsup = F.mse_loss(probs_u1, probs_u2)

        # Total loss
        loss = loss_sup + alpha * loss_unsup

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

def evaluate(model, loader):
    model.eval()
    correct = 0
    with torch.no_grad():
        for x, y in loader:
            x, y = x.to(device), y.to(device)
            output = model(x)
            pred = output.argmax(dim=1)
            correct += (pred == y).sum().item()
    return correct / len(loader.dataset)

In [9]:
# -------------------------
# Step 4: Train the Model
# -------------------------

model = PiCNN().to(device)
optimizer = optim.Adam(model.parameters(), lr=1e-3)

epochs = 20
alpha = 20.0  # weight for consistency loss

for epoch in range(1, epochs + 1):
    train_pi_model(model, optimizer, labeled_loader, unlabeled_loader, alpha)
    test_acc = evaluate(model, test_loader)
    print(f"Epoch {epoch:02d} | Test Accuracy: {test_acc:.4f}")

Epoch 01 | Test Accuracy: 0.2972
Epoch 02 | Test Accuracy: 0.5309
Epoch 03 | Test Accuracy: 0.6740
Epoch 04 | Test Accuracy: 0.6927
Epoch 05 | Test Accuracy: 0.7014
Epoch 06 | Test Accuracy: 0.6991
Epoch 07 | Test Accuracy: 0.6940
Epoch 08 | Test Accuracy: 0.6979
Epoch 09 | Test Accuracy: 0.7095
Epoch 10 | Test Accuracy: 0.7174
Epoch 11 | Test Accuracy: 0.7184
Epoch 12 | Test Accuracy: 0.7299
Epoch 13 | Test Accuracy: 0.7232
Epoch 14 | Test Accuracy: 0.6992
Epoch 15 | Test Accuracy: 0.7063
Epoch 16 | Test Accuracy: 0.7198
Epoch 17 | Test Accuracy: 0.7080
Epoch 18 | Test Accuracy: 0.6979
Epoch 19 | Test Accuracy: 0.7023
Epoch 20 | Test Accuracy: 0.7178


# Self Training

In [10]:
# self training

from torch.utils.data import DataLoader, Subset, ConcatDataset, Dataset

# Step 2: CNN for self-training
class BaseCNN(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv1 = nn.Conv2d(1, 16, 3, padding=1)
        self.conv2 = nn.Conv2d(16, 32, 3, padding=1)
        self.pool = nn.MaxPool2d(2, 2)
        self.fc1 = nn.Linear(32 * 7 * 7, 128)
        self.fc2 = nn.Linear(128, 10)

    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = x.view(-1, 32 * 7 * 7)
        x = F.relu(self.fc1(x))
        return self.fc2(x)

# Step 3: Supervised training
def train(model, loader, optimizer):
    model.train()
    for x, y in loader:
        x, y = x.to(device), y.to(device)
        optimizer.zero_grad()
        loss = F.cross_entropy(model(x), y)
        loss.backward()
        optimizer.step()

# Step 4: Generate pseudo-labels for unlabeled data
def generate_pseudo_labels(model, loader, threshold=0.8):
    model.eval()
    pseudo_x, pseudo_y = [], []
    with torch.no_grad():
        for x, _ in loader:
            x = x.to(device)
            logits = model(x)
            probs = F.softmax(logits, dim=1)
            conf, pred = torch.max(probs, 1)
            mask = conf > threshold
            if mask.any():
                pseudo_x.append(x[mask])
                pseudo_y.append(pred[mask])
    if pseudo_x:
        return torch.utils.data.TensorDataset(torch.cat(pseudo_x), torch.cat(pseudo_y))
    return None

def evaluate(model, loader):
    model.eval()
    correct = 0
    with torch.no_grad():
        for x, y in loader:
            x, y = x.to(device), y.to(device)
            pred = model(x).argmax(1)
            correct += (pred == y).sum().item()
    return correct / len(loader.dataset)

# Step 5: Self-Training Loop
model = BaseCNN().to(device)
optimizer = optim.Adam(model.parameters(), lr=1e-3)

for epoch in range(1, 11):
    train(model, labeled_loader, optimizer)
    pseudo_dataset = generate_pseudo_labels(model, unlabeled_loader, threshold=0.95)
    if pseudo_dataset:
        print(f"Epoch {epoch}: Adding {len(pseudo_dataset)} pseudo-labeled samples.")
        labeled_set = ConcatDataset([labeled_dataset, pseudo_dataset])
        labeled_loader = DataLoader(labeled_dataset, batch_size=64, shuffle=True)
    acc = evaluate(model, test_loader)
    print(f"[Self-Training] Epoch {epoch} - Test Accuracy: {acc:.4f}")

[Self-Training] Epoch 1 - Test Accuracy: 0.2255
[Self-Training] Epoch 2 - Test Accuracy: 0.3789
[Self-Training] Epoch 3 - Test Accuracy: 0.5263
[Self-Training] Epoch 4 - Test Accuracy: 0.6021
[Self-Training] Epoch 5 - Test Accuracy: 0.6258
[Self-Training] Epoch 6 - Test Accuracy: 0.6473
Epoch 7: Adding 1 pseudo-labeled samples.
[Self-Training] Epoch 7 - Test Accuracy: 0.6713
Epoch 8: Adding 215 pseudo-labeled samples.
[Self-Training] Epoch 8 - Test Accuracy: 0.6696
Epoch 9: Adding 1348 pseudo-labeled samples.
[Self-Training] Epoch 9 - Test Accuracy: 0.6701
Epoch 10: Adding 4459 pseudo-labeled samples.
[Self-Training] Epoch 10 - Test Accuracy: 0.6713


# Co-Trainig

In [11]:
# Co - Training
class SplitMNIST(Dataset):
    def __init__(self, dataset, side='left'):
        self.dataset = dataset
        self.side = side

    def __len__(self):
        return len(self.dataset)

    def __getitem__(self, idx):
        x, y = self.dataset[idx]
        if self.side == 'left':
            return x[:, :, :14], y
        else:
            return x[:, :, 14:], y

# Step 3: Define Half-CNN Model
class HalfCNN(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv1 = nn.Conv2d(1, 16, 3, padding=1)
        self.conv2 = nn.Conv2d(16, 16, 3, padding=1)
        self.pool = nn.MaxPool2d(2, 2)
        self.fc1 = nn.Linear(16 * 7 * 3, 64)  # Actually matches input from 28x14 images
        self.fc2 = nn.Linear(64, 10)

    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))     # → [B, 16, 14, 7]
        x = self.pool(F.relu(self.conv2(x)))     # → [B, 16, 7, 3]
        x = x.view(x.size(0), -1)                # Flatten safely
        x = F.relu(self.fc1(x))
        return self.fc2(x)

# Step 4: Training and Evaluation Functions
def train(model, loader, optimizer):
    model.train()
    for x, y in loader:
        x, y = x.to(device), y.to(device)
        optimizer.zero_grad()
        output = model(x)
        loss = F.cross_entropy(output, y)
        loss.backward()
        optimizer.step()

def evaluate(model, loader):
    model.eval()
    correct = 0
    with torch.no_grad():
        for x, y in loader:
            x, y = x.to(device), y.to(device)
            pred = model(x).argmax(1)
            correct += (pred == y).sum().item()
    return correct / len(loader.dataset)

def generate_pseudo_labels(model, loader, threshold=0.5):
    model.eval()
    pseudo_x, pseudo_y = [], []
    with torch.no_grad():
        for x, _ in loader:
            x = x.to(device)
            logits = model(x)
            probs = F.softmax(logits, dim=1)
            conf, pred = torch.max(probs, 1)
            mask = conf > threshold
            if mask.any():
                pseudo_x.append(x[mask].cpu())
                pseudo_y.append(pred[mask].cpu())
    if pseudo_x:
        return torch.utils.data.TensorDataset(torch.cat(pseudo_x), torch.cat(pseudo_y))
    return None

# Step 5: Initialize Models and Loaders
model1 = HalfCNN().to(device)
model2 = HalfCNN().to(device)
opt1 = optim.Adam(model1.parameters(), lr=1e-3)
opt2 = optim.Adam(model2.parameters(), lr=1e-3)

view1_loader = DataLoader(SplitMNIST(labeled_dataset, 'left'), batch_size=64, shuffle=True)
view2_loader = DataLoader(SplitMNIST(labeled_dataset, 'right'), batch_size=64, shuffle=True)
unlabeled1 = DataLoader(SplitMNIST(unlabeled_dataset, 'left'), batch_size=256)
unlabeled2 = DataLoader(SplitMNIST(unlabeled_dataset, 'right'), batch_size=256)

# Step 6: Co-Training Loop
for epoch in range(1, 11):
    print(f"\nEpoch {epoch}")

    # Train both models on current labeled data
    train(model1, view1_loader, opt1)
    train(model2, view2_loader, opt2)

    # Generate pseudo-labels
    p1 = generate_pseudo_labels(model1, unlabeled1)
    p2 = generate_pseudo_labels(model2, unlabeled2)

    if p1 and p2:
        print(f"  Adding pseudo-labels: View1 ← {len(p2)} from model2, View2 ← {len(p1)} from model1")
        pseudo_view1 = SplitMNIST(p1, side='right')   # model1 gets help from model2
        pseudo_view2 = SplitMNIST(p2, side='left')    # model2 gets help from model1

        view1_loader = DataLoader(ConcatDataset([SplitMNIST(labeled_set, 'left'), pseudo_view2]), batch_size=64, shuffle=True)
        view2_loader = DataLoader(ConcatDataset([SplitMNIST(labeled_set, 'right'), pseudo_view1]), batch_size=64, shuffle=True)

    acc1 = evaluate(model1, unlabeled1)
    acc2 = evaluate(model2, unlabeled2)
    print(f"  View1 Accuracy: {acc1:.4f}, View2 Accuracy: {acc2:.4f}")


Epoch 1
  View1 Accuracy: 0.2056, View2 Accuracy: 0.1029

Epoch 2
  View1 Accuracy: 0.2397, View2 Accuracy: 0.1191

Epoch 3
  View1 Accuracy: 0.3042, View2 Accuracy: 0.1711

Epoch 4
  View1 Accuracy: 0.3495, View2 Accuracy: 0.2230

Epoch 5
  View1 Accuracy: 0.4269, View2 Accuracy: 0.2686

Epoch 6
  View1 Accuracy: 0.4618, View2 Accuracy: 0.3609

Epoch 7
  View1 Accuracy: 0.5020, View2 Accuracy: 0.4410

Epoch 8
  View1 Accuracy: 0.5256, View2 Accuracy: 0.4662

Epoch 9
  View1 Accuracy: 0.5412, View2 Accuracy: 0.4808

Epoch 10
  View1 Accuracy: 0.5532, View2 Accuracy: 0.4937
