In [1]:
# Standard scientific Python imports
import matplotlib.pyplot as plt

# Import datasets, classifiers and performance metrics
from sklearn import datasets, metrics, svm
from sklearn.model_selection import train_test_split
from sklearn.datasets import make_classification
from sklearn.linear_model import Perceptron, SGDClassifier
from sklearn.utils import shuffle
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LinearRegression, LogisticRegression
from sklearn.metrics import accuracy_score, log_loss
from itertools import product
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
from torch.utils.data import Dataset, DataLoader, Subset, TensorDataset

from sklearn.svm import SVC

# SmallCNN

A SGD classifer is not enough to get reliable insights on CIFAR-10 dataset so we will use a lightweight CNN. This will allow us to accurately estimate the influence of training order

In [2]:
class SmallCNN(nn.Module):
    def __init__(self):
        super().__init__()
        self.features = nn.Sequential(
            nn.Conv2d(3, 32, kernel_size=3, padding=1), nn.ReLU(),
            nn.MaxPool2d(2),
            nn.Conv2d(32, 64, kernel_size=3, padding=1), nn.ReLU(),
            nn.MaxPool2d(2)
        )
        self.classifier = nn.Sequential(
            nn.Flatten(),
            nn.Linear(64 * 8 * 8, 128), nn.ReLU(),
            nn.Linear(128, 10)
        )
        
    def forward(self, x):
        x = self.features(x)
        x = self.classifier(x)
        return x

# CIFAR-10 Dataset (Vanilla Case)

## Data Loading

As Cifar-10 consist of RGB images, we can define 3 channels of size 32x32.

In [3]:
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.4914, 0.4822, 0.4465),   # CIFAR-10 mean
                         (0.2023, 0.1994, 0.2010))   # CIFAR-10 std
])

cifar_train_dataset = torchvision.datasets.CIFAR10(root='../data', train=True, download=True, transform=transform)
cifar_test_dataset  = torchvision.datasets.CIFAR10(root='../data', train=False, download=True, transform=transform)

X_train = torch.stack([cifar_train_dataset[i][0] for i in range(len(cifar_train_dataset))])
y_train = torch.tensor([cifar_train_dataset[i][1] for i in range(len(cifar_train_dataset))])

X_test = torch.stack([cifar_test_dataset[i][0] for i in range(len(cifar_test_dataset))])
y_test = torch.tensor([cifar_test_dataset[i][1] for i in range(len(cifar_test_dataset))])

print(f"X_train: {X_train.shape}, y_train: {y_train.shape}")
print(f"X_test: {X_test.shape}, y_test: {y_test.shape}")

train_dataset = TensorDataset(X_train, y_train)
test_dataset = TensorDataset(X_test, y_test)

train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
test_loader  = DataLoader(test_dataset, batch_size=1000, shuffle=False)

Files already downloaded and verified
Files already downloaded and verified
X_train: torch.Size([50000, 3, 32, 32]), y_train: torch.Size([50000])
X_test: torch.Size([10000, 3, 32, 32]), y_test: torch.Size([10000])


## Analysis

### (0) Base Case

Let's try as the Vanilla Base Case: Train on the entire dataset using uniform random shuffling for each epoch.

In [4]:
device = torch.device("mps" if torch.backends.mps.is_available() else "cpu")
print(f"Using device: {device}")
model = SmallCNN().to(device)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
n_epochs = 10

for epoch in range(n_epochs):
    model.train()
    total_loss = 0
    for images, labels in train_loader:
        images, labels = images.to(device), labels.to(device)
        
        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        total_loss += loss.item() * images.size(0)

    avg_loss = total_loss / len(train_loader.dataset)
    print(f"Epoch {epoch+1}/{n_epochs} - Train Loss: {avg_loss:.4f}")

model.eval()
correct = 0
total = 0
with torch.no_grad():
    for images, labels in test_loader:
        images, labels = images.to(device), labels.to(device)
        outputs = model(images)
        predicted = outputs.argmax(dim=1)
        correct += (predicted == labels).sum().item()
        total += labels.size(0)

print(f"Test Accuracy: {correct / total:.4f}")

Using device: cpu


Epoch 1/10 - Train Loss: 1.2805
Epoch 2/10 - Train Loss: 0.8993
Epoch 3/10 - Train Loss: 0.7412
Epoch 4/10 - Train Loss: 0.6213


### (1) Curriculum Learning

As curriculum learning is based on giving samples in increasing difficulty level to the model, we first need to define a difficulty function. We will base ours on difference between the distance of each point to the line that goes through all the data points.

#### Pre-analysis

In [6]:
def compute_cifar_difficulty(dataset):    
    data_loader = DataLoader(dataset, batch_size=len(dataset))
    images, labels = next(iter(data_loader))
    images = images.view(images.size(0), -1)  # Flatten images

    centroids = [images[labels == i].mean(dim=0) for i in range(10)]
    difficulty = torch.tensor([
        torch.norm(img - centroids[label]).item()
        for img, label in zip(images, labels)
    ])
    return difficulty

#### Analysis

In [None]:
device = torch.device("mps" if torch.cuda.is_available() else "cpu")
model = SmallCNN().to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=1e-3)

difficulty = compute_cifar_difficulty(cifar_train_dataset)
difficulty = (difficulty - difficulty.min()) / (difficulty.max() - difficulty.min())
sorted_indices = np.argsort(difficulty)  # easiest to hardest

batch_size = 64
epochs_per_stage = 10
num_stages = 5

phases = np.linspace(0.1, 1.0, num_stages)
previous_n = 0


for stage, phase in enumerate(phases):
    current_n = int(phase * len(sorted_indices))
    selected_idx = sorted_indices[previous_n:current_n]
    previous_n = current_n
    
    print(f"\nStage {stage+1}/{num_stages}: Using {len(selected_idx)} new examples")

    X_stage = X_train[selected_idx]
    y_stage = y_train[selected_idx]

    train_dataset = TensorDataset(X_stage, y_stage)
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

    for epoch in range(epochs_per_stage):
        model.train()
        total_loss, total_correct = 0, 0

        for images, labels in train_loader:
            images, labels = images.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            total_loss += loss.item() * labels.size(0)
            total_correct += (outputs.argmax(1) == labels).sum().item()

        acc = total_correct / len(train_dataset)
        print(f"  Epoch {epoch+1}/{epochs_per_stage} - Loss: {total_loss/len(train_dataset):.4f}, Acc: {acc:.4f}")


model.eval()
total_correct = 0
with torch.no_grad():
    for images, labels in test_loader:
        images, labels = images.to(device), labels.to(device)
        outputs = model(images)
        total_correct += (outputs.argmax(1) == labels).sum().item()

test_acc = total_correct / len(test_dataset)
print(f"\nFinal Test Accuracy: {test_acc:.4f}")

### (2) Self-Paced Learning

In Self-Paced Learning, the model is supposed to:

• learn from easier samples first (based on current loss)

• adaptively expand its training set to include harder samples as it becomes more confident

We will use the same X_train, X_test, y_train and y_test computed in the curriculum learning phase.
To implement the SPL we will introduce a difficulty threshold to let the model choose how many samples of this difficulty it is ready to handle.

In [None]:
device = torch.device("mps" if torch.backends.mps.is_available() else "cpu")
model = SmallCNN().to(device)
criterion = torch.nn.CrossEntropyLoss(reduction='none')
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

num_epochs = 15
samples_per_epoch = len(X_train)

seen_mask = torch.zeros(len(X_train), dtype=torch.bool)

for epoch in range(num_epochs):
    model.eval()
    with torch.no_grad():
        unseen_indices = (~seen_mask).nonzero(as_tuple=True)[0]
        unseen_loader = DataLoader(TensorDataset(X_train[unseen_indices], y_train[unseen_indices]),
                                   batch_size=batch_size, shuffle=False)

        all_losses = []
        for images, labels in unseen_loader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            losses = criterion(outputs, labels)
            all_losses.append(losses.cpu())

        all_losses = torch.cat(all_losses)
    
    k = min(samples_per_epoch, len(unseen_indices))
    selected_in_unseen = torch.topk(-all_losses, k).indices
    selected_indices = unseen_indices[selected_in_unseen]

    seen_mask[selected_indices] = True

    print(f"Epoch {epoch+1}: selected {len(selected_indices)} new samples (total seen: {seen_mask.sum().item()}/{len(X_train)})")

    train_loader = DataLoader(TensorDataset(X_train[selected_indices], y_train[selected_indices]),
                              batch_size=batch_size, shuffle=True)

    model.train()
    total_loss, total_correct = 0, 0
    for images, labels in train_loader:
        images, labels = images.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, labels).mean()
        loss.backward()
        optimizer.step()
        total_loss += loss.item() * len(labels)
        total_correct += (outputs.argmax(1) == labels).sum().item()

    acc = total_correct / len(train_loader.dataset)
    print(f"  Train Loss: {total_loss / len(train_loader.dataset):.4f}, Acc: {acc:.4f}")
    
print("Training complete.")

test_dataset = TensorDataset(X_test, y_test)
test_loader = DataLoader(test_dataset, batch_size=batch_size)

model.eval()
total_correct = 0
with torch.no_grad():
    for images, labels in test_loader:
        images, labels = images.to(device), labels.to(device)
        outputs = model(images)
        total_correct += (outputs.argmax(1) == labels).sum().item()

test_acc = total_correct / len(test_dataset)
print(f"\nFinal Test Accuracy: {test_acc:.4f}")

### (3) Hard-Example Mining

Hard-Example Mining consists in feeding the model only hard examples. In our case, we will consider that a sample is difficult if its normalized difficulty is greater or equal than 0,60 (in other words the top 40%).

In [12]:
difficulty = compute_cifar_difficulty(cifar_train_dataset)
difficulty = (difficulty - difficulty.min()) / (difficulty.max() - difficulty.min())

hard_mask = difficulty >= 0.60
hard_indices = np.where(hard_mask)[0] # we do not shuffle the indices to train on increasingly difficult samples (adapted CL idea)
print(f"Selected {len(hard_indices)} hard examples out of {len(difficulty)} total")

device = torch.device("mps" if torch.backends.mps.is_available() else "cpu")
model = SmallCNN().to(device)
criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

batch_size = 64
epochs = 10

X_hard = X_train[hard_indices]
y_hard = y_train[hard_indices]

train_dataset = TensorDataset(X_hard, y_hard)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

for epoch in range(epochs):
    model.train()
    total_loss, total_correct = 0, 0

    for images, labels in train_loader:
        images, labels = images.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        total_loss += loss.item() * labels.size(0)
        total_correct += (outputs.argmax(1) == labels).sum().item()

    acc = total_correct / len(train_dataset)
    print(f"Epoch {epoch+1}/{epochs} - Loss: {total_loss/len(train_dataset):.4f}, Acc: {acc:.4f}")

model.eval()
total_correct = 0
with torch.no_grad():
    for images, labels in test_loader:
        images, labels = images.to(device), labels.to(device)
        outputs = model(images)
        total_correct += (outputs.argmax(1) == labels).sum().item()

test_acc = total_correct / len(test_dataset)
print(f"\nFinal Test Accuracy: {test_acc:.4f}")

Selected 2990 hard examples out of 50000 total
Epoch 1/10 - Loss: 1.9736, Acc: 0.2953
Epoch 2/10 - Loss: 1.5191, Acc: 0.4759
Epoch 3/10 - Loss: 1.2759, Acc: 0.5686
Epoch 4/10 - Loss: 1.0993, Acc: 0.6120
Epoch 5/10 - Loss: 0.9213, Acc: 0.6890
Epoch 6/10 - Loss: 0.7678, Acc: 0.7411
Epoch 7/10 - Loss: 0.5961, Acc: 0.7990
Epoch 8/10 - Loss: 0.5057, Acc: 0.8341
Epoch 9/10 - Loss: 0.3545, Acc: 0.8957
Epoch 10/10 - Loss: 0.2610, Acc: 0.9268

Final Test Accuracy: 0.3156


# FINAL TEST ACCURACY : HYPER BAS PARCE QUE 40% DES DONNÉES DIFFICILES = 3/50 % des données jsp si on réadapte du cp

### (4) Reverse Curriculum Learning

We are implementing **Reverse Curriculum Learning (RCL)** where the model starts learning from easier goals that are close to the target and gradually works backwards to more challenging starting states.

In [20]:
difficulty = compute_cifar_difficulty(cifar_train_dataset)
difficulty = (difficulty - difficulty.min()) / (difficulty.max() - difficulty.min())
difficulty = difficulty.cpu().numpy()

sorted_indices = np.argsort(difficulty)[::-1].copy()  # reverse order for hardest first

device = torch.device("mps" if torch.backends.mps.is_available() else "cpu")
model = SmallCNN().to(device)
criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

batch_size = 64
epochs_per_stage = 1
num_stages = 5

phases = np.linspace(0.1, 1.0, num_stages)

previous_n = 0

for stage, phase in enumerate(phases):
    current_n = int(phase * len(sorted_indices))
    selected_idx = sorted_indices[previous_n:current_n]
    previous_n = current_n
    
    print(f"\nStage {stage+1}/{num_stages}: Using {len(selected_idx)} new hard→easy examples")

    X_stage = X_train[selected_idx]
    y_stage = y_train[selected_idx]

    train_dataset = TensorDataset(X_stage, y_stage)
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

    for epoch in range(epochs_per_stage):
        model.train()
        total_loss, total_correct = 0, 0

        for images, labels in train_loader:
            images, labels = images.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            total_loss += loss.item() * labels.size(0)
            total_correct += (outputs.argmax(1) == labels).sum().item()

        acc = total_correct / len(train_dataset)
        print(f"  Epoch {epoch+1}/{epochs_per_stage} - Loss: {total_loss/len(train_dataset):.4f}, Acc: {acc:.4f}")


model.eval()
total_correct = 0
with torch.no_grad():
    for images, labels in test_loader:
        images, labels = images.to(device), labels.to(device)
        outputs = model(images)
        total_correct += (outputs.argmax(1) == labels).sum().item()

test_acc = total_correct / len(test_dataset)
print(f"\nFinal Test Accuracy: {test_acc:.4f}")


Stage 1/5: Using 5000 new hard→easy examples
  Epoch 1/1 - Loss: 1.9906, Acc: 0.2810

Stage 2/5: Using 11250 new hard→easy examples
  Epoch 1/1 - Loss: 1.4940, Acc: 0.4584

Stage 3/5: Using 11250 new hard→easy examples


### (5) Stratified Monte-Carlo Sampling

**Stratified Monte Carlo Sampling** is a variance reduction technique where the input space is divided into distinct strata (subregions), and samples are drawn from each stratum. This ensures more uniform coverage of the space compared to standard Monte Carlo sampling, leading to more accurate and stable estimates with fewer samples.

In [None]:
difficulty = compute_cifar_difficulty(cifar_train_dataset)
difficulty = (difficulty - difficulty.min()) / (difficulty.max() - difficulty.min())

device = torch.device("mps" if torch.backends.mps.is_available() else "cpu")
model = SmallCNN().to(device)
criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

batch_size = 64
epochs_per_stage = 1
num_stages = 5
samples_per_stage = int(len(X_train) / num_stages)

num_bins = num_stages
bin_edges = np.linspace(0, 1, num_bins + 1)
bins = [[] for _ in range(num_bins)]

for idx, score in enumerate(difficulties):
    for b in range(num_bins):
        if bin_edges[b] <= score < bin_edges[b + 1] or (b == num_bins - 1 and score == 1.0):
            bins[b].append(idx)
            break

for b in bins:
    np.random.shuffle(b)

seen_indices = set()
num_seen = 0

for stage in range(num_stages):
    

    stage_indices = []

    for b in bins:
        take_n = min(samples_per_stage // num_bins, len(b))
        selected = [i for i in b if i not in seen_indices][:take_n]
        seen_indices.update(selected)
        stage_indices.extend(selected)

    np.random.shuffle(stage_indices)

    X_stage = X_train[stage_indices]
    y_stage = y_train[stage_indices]
    num_seen += len(X_stage)

    print(f"\nStage {stage+1}/{num_stages}: Sampling {len(X_stage)} data points from all difficulty strata ({num_seen}/{len(X_train)})")

    train_dataset = TensorDataset(X_stage, y_stage)
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

    for epoch in range(epochs_per_stage):
        model.train()
        total_loss, total_correct = 0, 0

        for images, labels in train_loader:
            images, labels = images.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            total_loss += loss.item() * labels.size(0)
            total_correct += (outputs.argmax(1) == labels).sum().item()

        acc = total_correct / len(train_dataset)
        print(f"  Epoch {epoch+1}/{epochs_per_stage} - Loss: {total_loss/len(train_dataset):.4f}, Acc: {acc:.4f}")

model.eval()
total_correct = 0
with torch.no_grad():
    for images, labels in test_loader:
        images, labels = images.to(device), labels.to(device)
        outputs = model(images)
        total_correct += (outputs.argmax(1) == labels).sum().item()

test_acc = total_correct / len(test_dataset)
print(f"\nFinal Test Accuracy: {test_acc:.4f}")

# CIFAR-10 Dataset with Gaussian Noise

We initialize a range of increasing difficulty. 
- 0.0: no noise — easiest samples
- 0.2: very noisy — hardest samples
- 0.25+ usually makes Cifar unreadable

In [None]:
base_train = torchvision.datasets.CIFAR10(root='../data', train=True, download=True, transform=transform)
base_test  = torchvision.datasets.CIFAR10(root='../data', train=False, download=True, transform=transform)

full_data = torch.cat([torch.tensor(base_train.data), torch.tensor(base_test.data)], dim=0).float() / 255.0
full_targets = torch.cat([
    torch.tensor(base_train.targets), 
    torch.tensor(base_test.targets)
], dim=0)

full_data = full_data.permute(0, 3, 1, 2)

noise_levels = [0.0, 0.05, 0.1, 0.15, 0.2]
sigma = 0.02

augmented_data, augmented_targets, noise_labels = [], [], []

for level in noise_levels:
    noisy_data = full_data.clone()
    if level > 0:
        N, C, H, W = noisy_data.shape
        total_pixels = H * W

        num_noisy = int(level * total_pixels)

        for i in range(N):
            coords = torch.randperm(total_pixels)[:num_noisy]
            flat_image = noisy_data[i].view(C, -1)
            flat_image[:, coords] += torch.randn(C, num_noisy) * sigma
            flat_image[:, coords].clamp_(0.0, 1.0)

    augmented_data.append(noisy_data)
    augmented_targets.append(full_targets)
    noise_labels.append(torch.full_like(full_targets, fill_value=level))

augmented_data = torch.cat(augmented_data, dim=0)
augmented_targets = torch.cat(augmented_targets, dim=0)
noise_labels = torch.cat(noise_labels, dim=0)

N_train = len(base_train)
N_test = len(base_test)
samples_per_level = N_train + N_test

X_train_list, y_train_list = [], []
X_test_list, y_test_list = [], []

for i in range(len(noise_levels)):
    start = i * samples_per_level
    end = start + samples_per_level

    X_level = augmented_data[start:end]
    y_level = augmented_targets[start:end]

    X_train_list.append(X_level[:N_train])
    y_train_list.append(y_level[:N_train])
    X_test_list.append(X_level[N_train:])
    y_test_list.append(y_level[N_train:])

X_train = torch.cat(X_train_list, dim=0)
y_train = torch.cat(y_train_list, dim=0)
X_test = torch.cat(X_test_list, dim=0)
y_test = torch.cat(y_test_list, dim=0)

print(f"X_train: {X_train.shape}, y_train: {y_train.shape}")
print(f"X_test: {X_test.shape}, y_test: {y_test.shape}")

# Create noise level labels for training set
noise_levels_train_list = []

for i, level in enumerate(noise_levels):
    noise_level_tensor = torch.full((N_train,), fill_value=i)
    noise_levels_train_list.append(noise_level_tensor)

noise_levels_train = torch.cat(noise_levels_train_list, dim=0)
print(f"noise_levels_train shape: {noise_levels_train.shape}")


In [None]:
N_train = len(base_train) 
N_test = len(base_test)

samples_per_level = N_train + N_test

X_train_list, y_train_list = [], []
X_test_list, y_test_list = [], []

for i in range(len(noise_levels)):
    start = i * samples_per_level
    end = start + samples_per_level

    X_level = augmented_data[start:end]
    y_level = augmented_targets[start:end]

    X_train_list.append(X_level[:N_train])
    y_train_list.append(y_level[:N_train])
    X_test_list.append(X_level[N_train:])
    y_test_list.append(y_level[N_train:])

X_train = torch.cat(X_train_list, dim=0)
y_train = torch.cat(y_train_list, dim=0)
X_test = torch.cat(X_test_list, dim=0)
y_test = torch.cat(y_test_list, dim=0)

print(f"X_train: {X_train.shape}, y_train: {y_train.shape}")
print(f"X_test: {X_test.shape}, y_test: {y_test.shape}")

In [None]:
noise_levels_train_list = []

for i, level in enumerate(noise_levels):
    num_train_samples = N_train
    noise_level_tensor = torch.full((num_train_samples,), fill_value=i)
    
    noise_levels_train_list.append(noise_level_tensor)

noise_levels_train = torch.cat(noise_levels_train_list, dim=0)

print(f"noise_levels_train shape: {noise_levels_train.shape}")

## Analysis

### (0) Base Case

In [None]:
batch_size = 64
train_dataset = TensorDataset(X_train, y_train)
test_dataset = TensorDataset(X_test, y_test)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)  # random sampling baseline
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

device = torch.device("mps" if torch.backends.mps.is_available() else "cpu")
model = SmallCNN().to(device)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

epochs = 10
for epoch in range(epochs):
    model.train()
    total_loss, total_correct = 0, 0

    for images, labels in train_loader:
        images, labels = images.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        total_loss += loss.item() * labels.size(0)
        total_correct += (outputs.argmax(1) == labels).sum().item()

    train_acc = total_correct / len(train_dataset)
    print(f"Epoch {epoch+1}/{epochs} - Train Loss: {total_loss/len(train_dataset):.4f}, Train Acc: {train_acc:.4f}")

model.eval()
correct = 0
with torch.no_grad():
    for images, labels in test_loader:
        images, labels = images.to(device), labels.to(device)
        outputs = model(images)
        correct += (outputs.argmax(1) == labels).sum().item()
test_acc = correct / len(test_dataset)
print(f"Test Accuracy: {test_acc:.4f}")

### (1) Curriculum Learning

#### (1.1) Cumulative Curriculum Learning

In [None]:
device = torch.device("mps" if torch.backends.mps.is_available() else "cpu")
model = SmallCNN().to(device)
criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

batch_size = 64
epochs_per_stage = 10
num_stages = 5

for stage in range(num_stages):
    print(f"\nStage {stage+1}/{num_stages}: Using noise levels <= {stage}")
    
    stage_mask = noise_levels_train <= stage
    X_stage = X_train[stage_mask]
    y_stage = y_train[stage_mask]

    train_dataset = TensorDataset(X_stage, y_stage)
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

    for epoch in range(epochs_per_stage):
        model.train()
        total_loss, total_correct = 0, 0

        for images, labels in train_loader:
            images, labels = images.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            total_loss += loss.item() * labels.size(0)
            total_correct += (outputs.argmax(1) == labels).sum().item()

        acc = total_correct / len(train_dataset)
        print(f"  Epoch {epoch+1}/{epochs_per_stage} - Loss: {total_loss/len(train_dataset):.4f}, Acc: {acc:.4f}")

test_dataset = TensorDataset(X_test, y_test)
test_loader = DataLoader(test_dataset, batch_size=batch_size)

model.eval()
total_correct = 0
with torch.no_grad():
    for images, labels in test_loader:
        images, labels = images.to(device), labels.to(device)
        outputs = model(images)
        total_correct += (outputs.argmax(1) == labels).sum().item()

test_acc = total_correct / len(test_dataset)
print(f"\nFinal Test Accuracy: {test_acc:.4f}")

#### (1.2) Strict Curriculum Learning

In [None]:
device = torch.device("mps" if torch.backends.mps.is_available() else "cpu")
model = SmallCNN().to(device)
criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

batch_size = 64
epochs_per_stage = 10
num_stages = 5

for stage in range(num_stages):
    print(f"\nStage {stage+1}/{num_stages}: Using noise level {stage}")
    
    stage_mask = noise_levels_train == stage
    X_stage = X_train[stage_mask]
    y_stage = y_train[stage_mask]

    train_dataset = TensorDataset(X_stage, y_stage)
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

    for epoch in range(epochs_per_stage):
        model.train()
        total_loss, total_correct = 0, 0

        for images, labels in train_loader:
            images, labels = images.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            total_loss += loss.item() * labels.size(0)
            total_correct += (outputs.argmax(1) == labels).sum().item()

        acc = total_correct / len(train_dataset)
        print(f"  Epoch {epoch+1}/{epochs_per_stage} - Loss: {total_loss/len(train_dataset):.4f}, Acc: {acc:.4f}")

test_dataset = TensorDataset(X_test, y_test)
test_loader = DataLoader(test_dataset, batch_size=batch_size)

model.eval()
total_correct = 0
with torch.no_grad():
    for images, labels in test_loader:
        images, labels = images.to(device), labels.to(device)
        outputs = model(images)
        total_correct += (outputs.argmax(1) == labels).sum().item()

test_acc = total_correct / len(test_dataset)
print(f"\nFinal Test Accuracy: {test_acc:.4f}")

### (2) Self-Paced Learning

# FAUDRA SANS DOUTE MODIF INITIAL_LAMBDA & LAMBDA_INCREMENT

In [None]:
device = torch.device("mps" if torch.backends.mps.is_available() else "cpu")
model = SmallCNN().to(device)
criterion = torch.nn.CrossEntropyLoss(reduction='none')  # Important: per-sample loss
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

batch_size = 64
epochs = 15
initial_lambda = 1  # initial difficulty threshold
lambda_increment = 2  # increase per epoch

full_train_dataset = TensorDataset(X_train, y_train)
full_train_loader = DataLoader(full_train_dataset, batch_size=batch_size, shuffle=False)

for epoch in range(epochs):
    model.eval()
    all_losses = []

    with torch.no_grad():
        for images, labels in full_train_loader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            losses = criterion(outputs, labels)
            all_losses.append(losses.cpu())

    all_losses = torch.cat(all_losses)
    print(torch.min(all_losses))
    print(torch.max(all_losses))

    lambda_threshold = initial_lambda + epoch * lambda_increment

    selected_indices = (all_losses <= lambda_threshold).nonzero(as_tuple=True)[0]

    if len(selected_indices) == 0:
        print(f"Epoch {epoch+1}: No samples selected for training (lambda={lambda_threshold:.3f}), stopping early.")
        break

    print(f"Epoch {epoch+1}: lambda={lambda_threshold:.3f}, selected {len(selected_indices)}/{len(X_train)} samples")

    train_subset = TensorDataset(X_train[selected_indices], y_train[selected_indices])
    train_loader = DataLoader(train_subset, batch_size=batch_size, shuffle=True)

    model.train()
    total_loss, total_correct = 0, 0
    for images, labels in train_loader:
        images, labels = images.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, labels).mean()
        loss.backward()
        optimizer.step()

        total_loss += loss.item() * labels.size(0)
        total_correct += (outputs.argmax(1) == labels).sum().item()

    acc = total_correct / len(train_subset)
    print(f"  Training Loss: {total_loss/len(train_subset):.4f}, Accuracy: {acc:.4f}")

    if len(selected_indices) == len(X_train):
        print(f"  All samples were selected, stopping early.")
        break

test_dataset = TensorDataset(X_test, y_test)
test_loader = DataLoader(test_dataset, batch_size=batch_size)

model.eval()
total_correct = 0
with torch.no_grad():
    for images, labels in test_loader:
        images, labels = images.to(device), labels.to(device)
        outputs = model(images)
        total_correct += (outputs.argmax(1) == labels).sum().item()

test_acc = total_correct / len(test_dataset)
print(f"\nFinal Test Accuracy: {test_acc:.4f}")


### (3) Hard-Example Mining

In [None]:
device = torch.device("mps" if torch.backends.mps.is_available() else "cpu")
model = SmallCNN().to(device)
criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

batch_size = 64
epochs = 10

hard_mask = noise_levels_train >= 3

X_hard = X_train[hard_mask]
y_hard = y_train[hard_mask]
print(f"Selected {len(X_hard)} hard examples out of {len(X_train)}")

hard_dataset = TensorDataset(X_hard, y_hard)
hard_loader = DataLoader(hard_dataset, batch_size=batch_size, shuffle=True)

for epoch in range(epochs):
    model.train()
    total_loss, total_correct = 0, 0

    for images, labels in hard_loader:
        images, labels = images.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        total_loss += loss.item() * labels.size(0)
        total_correct += (outputs.argmax(1) == labels).sum().item()

    acc = total_correct / len(hard_dataset)
    print(f"Epoch {epoch+1}/{epochs} - Loss: {total_loss/len(hard_dataset):.4f}, Accuracy: {acc:.4f}")

test_dataset = TensorDataset(X_test, y_test)
test_loader = DataLoader(test_dataset, batch_size=batch_size)

model.eval()
total_correct = 0
with torch.no_grad():
    for images, labels in test_loader:
        images, labels = images.to(device), labels.to(device)
        outputs = model(images)
        total_correct += (outputs.argmax(1) == labels).sum().item()

test_acc = total_correct / len(test_dataset)
print(f"\nFinal Test Accuracy: {test_acc:.4f}")

### (4) Reverse Curriculum Learning

#### (4.1) Cumulative Reverse Curriculum Learning

In [None]:
device = torch.device("mps" if torch.backends.mps.is_available() else "cpu")
model = SmallCNN().to(device)
criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

batch_size = 64
epochs_per_stage = 10
num_stages = 5

for stage in reversed(range(num_stages)):
    print(f"\nStage {stage+1}/{num_stages}: Using noise levels >= {stage}")
    
    stage_mask = noise_levels_train >= stage
    X_stage = X_train[stage_mask]
    y_stage = y_train[stage_mask]

    train_dataset = TensorDataset(X_stage, y_stage)
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

    for epoch in range(epochs_per_stage):
        model.train()
        total_loss, total_correct = 0, 0

        for images, labels in train_loader:
            images, labels = images.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            total_loss += loss.item() * labels.size(0)
            total_correct += (outputs.argmax(1) == labels).sum().item()

        acc = total_correct / len(train_dataset)
        print(f"  Epoch {epoch+1}/{epochs_per_stage} - Loss: {total_loss/len(train_dataset):.4f}, Acc: {acc:.4f}")

test_dataset = TensorDataset(X_test, y_test)
test_loader = DataLoader(test_dataset, batch_size=batch_size)

model.eval()
total_correct = 0
with torch.no_grad():
    for images, labels in test_loader:
        images, labels = images.to(device), labels.to(device)
        outputs = model(images)
        total_correct += (outputs.argmax(1) == labels).sum().item()

test_acc = total_correct / len(test_dataset)
print(f"\nFinal Test Accuracy: {test_acc:.4f}")

#### (4.2) Strict Reverse Curriculum Learning

In [None]:
device = torch.device("mps" if torch.backends.mps.is_available() else "cpu")
model = SmallCNN().to(device)
criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

batch_size = 64
epochs_per_stage = 10
num_stages = 5

for stage in reversed(range(num_stages)):
    print(f"\nStage {stage+1}/{num_stages}: Using noise level {stage}")
    
    stage_mask = noise_levels_train == stage
    X_stage = X_train[stage_mask]
    y_stage = y_train[stage_mask]

    train_dataset = TensorDataset(X_stage, y_stage)
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

    for epoch in range(epochs_per_stage):
        model.train()
        total_loss, total_correct = 0, 0

        for images, labels in train_loader:
            images, labels = images.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            total_loss += loss.item() * labels.size(0)
            total_correct += (outputs.argmax(1) == labels).sum().item()

        acc = total_correct / len(train_dataset)
        print(f"  Epoch {epoch+1}/{epochs_per_stage} - Loss: {total_loss/len(train_dataset):.4f}, Acc: {acc:.4f}")

test_dataset = TensorDataset(X_test, y_test)
test_loader = DataLoader(test_dataset, batch_size=batch_size)

model.eval()
total_correct = 0
with torch.no_grad():
    for images, labels in test_loader:
        images, labels = images.to(device), labels.to(device)
        outputs = model(images)
        total_correct += (outputs.argmax(1) == labels).sum().item()

test_acc = total_correct / len(test_dataset)
print(f"\nFinal Test Accuracy: {test_acc:.4f}")

### (5) Stratified Monte-Carlo Sampling

**Stratified Monte Carlo Sampling** is a variance reduction technique where the input space is divided into distinct strata (subregions), and samples are drawn from each stratum. This ensures more uniform coverage of the space compared to standard Monte Carlo sampling, leading to more accurate and stable estimates with fewer samples.

In [None]:
device = torch.device("mps" if torch.backends.mps.is_available() else "cpu")
model = SmallCNN().to(device)
criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

batch_size = 64
epochs_per_stage = 10
num_stages = 5
samples_per_stage = 10000

for stage in range(num_stages):
    print(f"\nStage {stage+1}/{num_stages}: Sampling from stage level = {stage}")
    
    stage_mask = (noise_levels_train == stage).nonzero(as_tuple=True)[0]
    
    # Randomly sample without replacement
    if len(stage_mask) < samples_per_stage:
        print(f"  Warning: only {len(stage_mask)} samples available, using all.")
        selected_indices = stage_mask
    else:
        selected_indices = stage_mask[torch.randperm(len(stage_mask))[:samples_per_stage]]
    
    X_stage = X_train[selected_indices]
    y_stage = y_train[selected_indices]
    train_dataset = TensorDataset(X_stage, y_stage)
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

    for epoch in range(epochs_per_stage):
        model.train()
        total_loss, total_correct = 0, 0

        for images, labels in train_loader:
            images, labels = images.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            total_loss += loss.item() * labels.size(0)
            total_correct += (outputs.argmax(1) == labels).sum().item()

        acc = total_correct / len(train_dataset)
        print(f"  Epoch {epoch+1}/{epochs_per_stage} - Loss: {total_loss/len(train_dataset):.4f}, Acc: {acc:.4f}")

test_dataset = TensorDataset(X_test, y_test)
test_loader = DataLoader(test_dataset, batch_size=batch_size)

model.eval()
total_correct = 0
with torch.no_grad():
    for images, labels in test_loader:
        images, labels = images.to(device), labels.to(device)
        outputs = model(images)
        total_correct += (outputs.argmax(1) == labels).sum().item()

test_acc = total_correct / len(test_dataset)
print(f"\nFinal Test Accuracy: {test_acc:.4f}")

# CIFAR-10 Dataset with Impulse Noise

We initialize a range of increasing difficulty. 
- 0.0: no noise — easiest samples
- 0.2: very noisy — hardest samples
- 0.25+ usually makes Cifar unreadable

In [12]:
base_train = torchvision.datasets.CIFAR10(root='../data', train=True, download=True, transform=transform)
base_test  = torchvision.datasets.CIFAR10(root='../data', train=False, download=True, transform=transform)

full_data = torch.cat([torch.tensor(base_train.data), torch.tensor(base_test.data)], dim=0).float() / 255.0
full_targets = torch.cat([
    torch.tensor(base_train.targets), 
    torch.tensor(base_test.targets)
], dim=0)

full_data = full_data.permute(0, 3, 1, 2)

noise_levels = [0.0, 0.05, 0.1, 0.15, 0.2]
sigma = 0.02

augmented_data, augmented_targets, noise_labels = [], [], []

for level in noise_levels:
    noisy_data = full_data.clone()
    if level > 0:
        N, C, H, W = noisy_data.shape
        total_pixels = H * W

        num_noisy = int(level * total_pixels)

        for i in range(N):
            coords = torch.randperm(total_pixels)[:num_noisy]
            flat_image = noisy_data[i].view(C, -1)
            flat_image[:, coords] = torch.randint(0, 2, (C, num_noisy), dtype=torch.float32)    
            flat_image[:, coords].clamp_(0.0, 1.0)

    augmented_data.append(noisy_data)
    augmented_targets.append(full_targets)
    noise_labels.append(torch.full_like(full_targets, fill_value=level))

augmented_data = torch.cat(augmented_data, dim=0)
augmented_targets = torch.cat(augmented_targets, dim=0)
noise_labels = torch.cat(noise_labels, dim=0)

N_train = len(base_train)
N_test = len(base_test)
samples_per_level = N_train + N_test

X_train_list, y_train_list = [], []
X_test_list, y_test_list = [], []

for i in range(len(noise_levels)):
    start = i * samples_per_level
    end = start + samples_per_level

    X_level = augmented_data[start:end]
    y_level = augmented_targets[start:end]

    X_train_list.append(X_level[:N_train])
    y_train_list.append(y_level[:N_train])
    X_test_list.append(X_level[N_train:])
    y_test_list.append(y_level[N_train:])

X_train = torch.cat(X_train_list, dim=0)
y_train = torch.cat(y_train_list, dim=0)
X_test = torch.cat(X_test_list, dim=0)
y_test = torch.cat(y_test_list, dim=0)

print(f"X_train: {X_train.shape}, y_train: {y_train.shape}")
print(f"X_test: {X_test.shape}, y_test: {y_test.shape}")

# Create noise level labels for training set
noise_levels_train_list = []

for i, level in enumerate(noise_levels):
    noise_level_tensor = torch.full((N_train,), fill_value=i)
    noise_levels_train_list.append(noise_level_tensor)

noise_levels_train = torch.cat(noise_levels_train_list, dim=0)
print(f"noise_levels_train shape: {noise_levels_train.shape}")


Files already downloaded and verified
Files already downloaded and verified
X_train: torch.Size([250000, 3, 32, 32]), y_train: torch.Size([250000])
X_test: torch.Size([50000, 3, 32, 32]), y_test: torch.Size([50000])
noise_levels_train shape: torch.Size([250000])


In [13]:
N_train = len(base_train) 
N_test = len(base_test)

samples_per_level = N_train + N_test

X_train_list, y_train_list = [], []
X_test_list, y_test_list = [], []

for i in range(len(noise_levels)):
    start = i * samples_per_level
    end = start + samples_per_level

    X_level = augmented_data[start:end]
    y_level = augmented_targets[start:end]

    X_train_list.append(X_level[:N_train])
    y_train_list.append(y_level[:N_train])
    X_test_list.append(X_level[N_train:])
    y_test_list.append(y_level[N_train:])

X_train = torch.cat(X_train_list, dim=0)
y_train = torch.cat(y_train_list, dim=0)
X_test = torch.cat(X_test_list, dim=0)
y_test = torch.cat(y_test_list, dim=0)

print(f"X_train: {X_train.shape}, y_train: {y_train.shape}")
print(f"X_test: {X_test.shape}, y_test: {y_test.shape}")

X_train: torch.Size([250000, 3, 32, 32]), y_train: torch.Size([250000])
X_test: torch.Size([50000, 3, 32, 32]), y_test: torch.Size([50000])


In [14]:
noise_levels_train_list = []

for i, level in enumerate(noise_levels):
    num_train_samples = N_train
    noise_level_tensor = torch.full((num_train_samples,), fill_value=i)
    
    noise_levels_train_list.append(noise_level_tensor)

noise_levels_train = torch.cat(noise_levels_train_list, dim=0)

print(f"noise_levels_train shape: {noise_levels_train.shape}")

noise_levels_train shape: torch.Size([250000])


## Analysis

### (0) Base Case

In [None]:
batch_size = 64
train_dataset = TensorDataset(X_train, y_train)
test_dataset = TensorDataset(X_test, y_test)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)  # random sampling baseline
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

device = torch.device("mps" if torch.backends.mps.is_available() else "cpu")
model = SmallCNN().to(device)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

epochs = 10
for epoch in range(epochs):
    model.train()
    total_loss, total_correct = 0, 0

    for images, labels in train_loader:
        images, labels = images.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        total_loss += loss.item() * labels.size(0)
        total_correct += (outputs.argmax(1) == labels).sum().item()

    train_acc = total_correct / len(train_dataset)
    print(f"Epoch {epoch+1}/{epochs} - Train Loss: {total_loss/len(train_dataset):.4f}, Train Acc: {train_acc:.4f}")

model.eval()
correct = 0
with torch.no_grad():
    for images, labels in test_loader:
        images, labels = images.to(device), labels.to(device)
        outputs = model(images)
        correct += (outputs.argmax(1) == labels).sum().item()
test_acc = correct / len(test_dataset)
print(f"Test Accuracy: {test_acc:.4f}")

### (1) Curriculum Learning

#### (1.1) Cumulative Curriculum Learning

In [None]:
device = torch.device("mps" if torch.backends.mps.is_available() else "cpu")
model = SmallCNN().to(device)
criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

batch_size = 64
epochs_per_stage = 10
num_stages = 5

for stage in range(num_stages):
    print(f"\nStage {stage+1}/{num_stages}: Using noise levels <= {stage}")
    
    stage_mask = noise_levels_train <= stage
    X_stage = X_train[stage_mask]
    y_stage = y_train[stage_mask]

    train_dataset = TensorDataset(X_stage, y_stage)
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

    for epoch in range(epochs_per_stage):
        model.train()
        total_loss, total_correct = 0, 0

        for images, labels in train_loader:
            images, labels = images.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            total_loss += loss.item() * labels.size(0)
            total_correct += (outputs.argmax(1) == labels).sum().item()

        acc = total_correct / len(train_dataset)
        print(f"  Epoch {epoch+1}/{epochs_per_stage} - Loss: {total_loss/len(train_dataset):.4f}, Acc: {acc:.4f}")

test_dataset = TensorDataset(X_test, y_test)
test_loader = DataLoader(test_dataset, batch_size=batch_size)

model.eval()
total_correct = 0
with torch.no_grad():
    for images, labels in test_loader:
        images, labels = images.to(device), labels.to(device)
        outputs = model(images)
        total_correct += (outputs.argmax(1) == labels).sum().item()

test_acc = total_correct / len(test_dataset)
print(f"\nFinal Test Accuracy: {test_acc:.4f}")

#### (1.2) Strict Curriculum Learning

In [None]:
device = torch.device("mps" if torch.backends.mps.is_available() else "cpu")
model = SmallCNN().to(device)
criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

batch_size = 64
epochs_per_stage = 10
num_stages = 5

for stage in range(num_stages):
    print(f"\nStage {stage+1}/{num_stages}: Using noise level {stage}")
    
    stage_mask = noise_levels_train == stage
    X_stage = X_train[stage_mask]
    y_stage = y_train[stage_mask]

    train_dataset = TensorDataset(X_stage, y_stage)
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

    for epoch in range(epochs_per_stage):
        model.train()
        total_loss, total_correct = 0, 0

        for images, labels in train_loader:
            images, labels = images.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            total_loss += loss.item() * labels.size(0)
            total_correct += (outputs.argmax(1) == labels).sum().item()

        acc = total_correct / len(train_dataset)
        print(f"  Epoch {epoch+1}/{epochs_per_stage} - Loss: {total_loss/len(train_dataset):.4f}, Acc: {acc:.4f}")

test_dataset = TensorDataset(X_test, y_test)
test_loader = DataLoader(test_dataset, batch_size=batch_size)

model.eval()
total_correct = 0
with torch.no_grad():
    for images, labels in test_loader:
        images, labels = images.to(device), labels.to(device)
        outputs = model(images)
        total_correct += (outputs.argmax(1) == labels).sum().item()

test_acc = total_correct / len(test_dataset)
print(f"\nFinal Test Accuracy: {test_acc:.4f}")

### (2) Self-Paced Learning

# SAME FAUDRA SANS DOUTE MODIF INITIAL_LAMBDA & LAMBDA_INCREMENT

In [None]:
device = torch.device("mps" if torch.backends.mps.is_available() else "cpu")
model = SmallCNN().to(device)
criterion = torch.nn.CrossEntropyLoss(reduction='none')  # Important: per-sample loss
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

batch_size = 64
epochs = 15
initial_lambda = 1  # initial difficulty threshold
lambda_increment = 2  # increase per epoch

full_train_dataset = TensorDataset(X_train, y_train)
full_train_loader = DataLoader(full_train_dataset, batch_size=batch_size, shuffle=False)

for epoch in range(epochs):
    model.eval()
    all_losses = []

    with torch.no_grad():
        for images, labels in full_train_loader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            losses = criterion(outputs, labels)
            all_losses.append(losses.cpu())

    all_losses = torch.cat(all_losses)
    print(torch.min(all_losses))
    print(torch.max(all_losses))

    lambda_threshold = initial_lambda + epoch * lambda_increment

    selected_indices = (all_losses <= lambda_threshold).nonzero(as_tuple=True)[0]

    if len(selected_indices) == 0:
        print(f"Epoch {epoch+1}: No samples selected for training (lambda={lambda_threshold:.3f}), stopping early.")
        break

    print(f"Epoch {epoch+1}: lambda={lambda_threshold:.3f}, selected {len(selected_indices)}/{len(X_train)} samples")

    train_subset = TensorDataset(X_train[selected_indices], y_train[selected_indices])
    train_loader = DataLoader(train_subset, batch_size=batch_size, shuffle=True)

    model.train()
    total_loss, total_correct = 0, 0
    for images, labels in train_loader:
        images, labels = images.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, labels).mean()
        loss.backward()
        optimizer.step()

        total_loss += loss.item() * labels.size(0)
        total_correct += (outputs.argmax(1) == labels).sum().item()

    acc = total_correct / len(train_subset)
    print(f"  Training Loss: {total_loss/len(train_subset):.4f}, Accuracy: {acc:.4f}")

    if len(selected_indices) == len(X_train):
        print(f"  All samples were selected, stopping early.")
        break

test_dataset = TensorDataset(X_test, y_test)
test_loader = DataLoader(test_dataset, batch_size=batch_size)

model.eval()
total_correct = 0
with torch.no_grad():
    for images, labels in test_loader:
        images, labels = images.to(device), labels.to(device)
        outputs = model(images)
        total_correct += (outputs.argmax(1) == labels).sum().item()

test_acc = total_correct / len(test_dataset)
print(f"\nFinal Test Accuracy: {test_acc:.4f}")


### (3) Hard-Example Mining

In [None]:
device = torch.device("mps" if torch.backends.mps.is_available() else "cpu")
model = SmallCNN().to(device)
criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

batch_size = 64
epochs = 10

hard_mask = noise_levels_train >= 3

X_hard = X_train[hard_mask]
y_hard = y_train[hard_mask]
print(f"Selected {len(X_hard)} hard examples out of {len(X_train)}")

hard_dataset = TensorDataset(X_hard, y_hard)
hard_loader = DataLoader(hard_dataset, batch_size=batch_size, shuffle=True)

for epoch in range(epochs):
    model.train()
    total_loss, total_correct = 0, 0

    for images, labels in hard_loader:
        images, labels = images.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        total_loss += loss.item() * labels.size(0)
        total_correct += (outputs.argmax(1) == labels).sum().item()

    acc = total_correct / len(hard_dataset)
    print(f"Epoch {epoch+1}/{epochs} - Loss: {total_loss/len(hard_dataset):.4f}, Accuracy: {acc:.4f}")

test_dataset = TensorDataset(X_test, y_test)
test_loader = DataLoader(test_dataset, batch_size=batch_size)

model.eval()
total_correct = 0
with torch.no_grad():
    for images, labels in test_loader:
        images, labels = images.to(device), labels.to(device)
        outputs = model(images)
        total_correct += (outputs.argmax(1) == labels).sum().item()

test_acc = total_correct / len(test_dataset)
print(f"\nFinal Test Accuracy: {test_acc:.4f}")

### (4) Reverse Curriculum Learning

#### (4.1) Cumulative Reverse Curriculum Learning

In [None]:
device = torch.device("mps" if torch.backends.mps.is_available() else "cpu")
model = SmallCNN().to(device)
criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

batch_size = 64
epochs_per_stage = 10
num_stages = 5

for stage in reversed(range(num_stages)):
    print(f"\nStage {stage+1}/{num_stages}: Using noise levels >= {stage}")
    
    stage_mask = noise_levels_train >= stage
    X_stage = X_train[stage_mask]
    y_stage = y_train[stage_mask]

    train_dataset = TensorDataset(X_stage, y_stage)
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

    for epoch in range(epochs_per_stage):
        model.train()
        total_loss, total_correct = 0, 0

        for images, labels in train_loader:
            images, labels = images.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            total_loss += loss.item() * labels.size(0)
            total_correct += (outputs.argmax(1) == labels).sum().item()

        acc = total_correct / len(train_dataset)
        print(f"  Epoch {epoch+1}/{epochs_per_stage} - Loss: {total_loss/len(train_dataset):.4f}, Acc: {acc:.4f}")

test_dataset = TensorDataset(X_test, y_test)
test_loader = DataLoader(test_dataset, batch_size=batch_size)

model.eval()
total_correct = 0
with torch.no_grad():
    for images, labels in test_loader:
        images, labels = images.to(device), labels.to(device)
        outputs = model(images)
        total_correct += (outputs.argmax(1) == labels).sum().item()

test_acc = total_correct / len(test_dataset)
print(f"\nFinal Test Accuracy: {test_acc:.4f}")

#### (4.2) Strict Reverse Curriculum Learning

In [None]:
device = torch.device("mps" if torch.backends.mps.is_available() else "cpu")
model = SmallCNN().to(device)
criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

batch_size = 64
epochs_per_stage = 10
num_stages = 5

for stage in reversed(range(num_stages)):
    print(f"\nStage {stage+1}/{num_stages}: Using noise level {stage}")
    
    stage_mask = noise_levels_train == stage
    X_stage = X_train[stage_mask]
    y_stage = y_train[stage_mask]

    train_dataset = TensorDataset(X_stage, y_stage)
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

    for epoch in range(epochs_per_stage):
        model.train()
        total_loss, total_correct = 0, 0

        for images, labels in train_loader:
            images, labels = images.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            total_loss += loss.item() * labels.size(0)
            total_correct += (outputs.argmax(1) == labels).sum().item()

        acc = total_correct / len(train_dataset)
        print(f"  Epoch {epoch+1}/{epochs_per_stage} - Loss: {total_loss/len(train_dataset):.4f}, Acc: {acc:.4f}")

test_dataset = TensorDataset(X_test, y_test)
test_loader = DataLoader(test_dataset, batch_size=batch_size)

model.eval()
total_correct = 0
with torch.no_grad():
    for images, labels in test_loader:
        images, labels = images.to(device), labels.to(device)
        outputs = model(images)
        total_correct += (outputs.argmax(1) == labels).sum().item()

test_acc = total_correct / len(test_dataset)
print(f"\nFinal Test Accuracy: {test_acc:.4f}")

### (5) Stratified Monte-Carlo Sampling

**Stratified Monte Carlo Sampling** is a variance reduction technique where the input space is divided into distinct strata (subregions), and samples are drawn from each stratum. This ensures more uniform coverage of the space compared to standard Monte Carlo sampling, leading to more accurate and stable estimates with fewer samples.

In [None]:
device = torch.device("mps" if torch.backends.mps.is_available() else "cpu")
model = SmallCNN().to(device)
criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

batch_size = 64
epochs_per_stage = 10
num_stages = 5
samples_per_stage = 10000

for stage in range(num_stages):
    print(f"\nStage {stage+1}/{num_stages}: Sampling from stage level = {stage}")
    
    stage_mask = (noise_levels_train == stage).nonzero(as_tuple=True)[0]
    
    # Randomly sample without replacement
    if len(stage_mask) < samples_per_stage:
        print(f"  Warning: only {len(stage_mask)} samples available, using all.")
        selected_indices = stage_mask
    else:
        selected_indices = stage_mask[torch.randperm(len(stage_mask))[:samples_per_stage]]
    
    X_stage = X_train[selected_indices]
    y_stage = y_train[selected_indices]
    train_dataset = TensorDataset(X_stage, y_stage)
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

    for epoch in range(epochs_per_stage):
        model.train()
        total_loss, total_correct = 0, 0

        for images, labels in train_loader:
            images, labels = images.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            total_loss += loss.item() * labels.size(0)
            total_correct += (outputs.argmax(1) == labels).sum().item()

        acc = total_correct / len(train_dataset)
        print(f"  Epoch {epoch+1}/{epochs_per_stage} - Loss: {total_loss/len(train_dataset):.4f}, Acc: {acc:.4f}")

test_dataset = TensorDataset(X_test, y_test)
test_loader = DataLoader(test_dataset, batch_size=batch_size)

model.eval()
total_correct = 0
with torch.no_grad():
    for images, labels in test_loader:
        images, labels = images.to(device), labels.to(device)
        outputs = model(images)
        total_correct += (outputs.argmax(1) == labels).sum().item()

test_acc = total_correct / len(test_dataset)
print(f"\nFinal Test Accuracy: {test_acc:.4f}")