In [None]:
import torch
import torchvision
import torchvision.transforms as transforms
from torch.utils.data import DataLoader, Dataset
import matplotlib.pyplot as plt
import numpy as np
# Print versions to confirm installation
print(f'Torch version: {torch.__version__}')
print(f'Torchvision version: {torchvision.__version__}')


In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"\nUsing device: {device}")

In [None]:
# Define transformations for data normalization
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))  # Normalize to [-1, 1]
])

# Download and prepare training and test datasets
trainset = torchvision.datasets.CIFAR10(root='./data', train=True, download=True, transform=transform)
trainloader = torch.utils.data.DataLoader(trainset, batch_size=4, shuffle=True, num_workers=2)

testset = torchvision.datasets.CIFAR10(root='./data', train=False, download=True, transform=transform)
testloader = torch.utils.data.DataLoader(testset, batch_size=256, shuffle=False, num_workers=2)

# Print dataset sizes
print(f'Training set size: {len(trainset)}')
print(f'Test set size: {len(testset)}')


In [None]:
# Function to unnormalize and display images
def imshow(img):
    img = img / 2 + 0.5  # Unnormalize from [-1, 1] to [0, 1]
    npimg = img.numpy()
    plt.imshow(np.transpose(npimg, (1, 2, 0)))
    plt.show()

# Get some random training images
dataiter = iter(trainloader)
images, labels = next(dataiter)

# Show images and corresponding labels
imshow(torchvision.utils.make_grid(images))
print(' '.join(f'{trainset.classes[labels[j]]}' for j in range(4)))


In [22]:
def add_symmetric_noise(labels, noise_rate=0.2, num_classes=10):
    """
    Introduce symmetric noise by randomly flipping labels to any other class.

    Args:
        labels (list or np.array): Original labels.
        noise_rate (float): Fraction of labels to flip.
        num_classes (int): Total number of classes.

    Returns:
        np.array: Labels with symmetric noise.
    """
    noisy_labels = np.array(labels)
    num_noisy = int(len(labels) * noise_rate)
    noisy_indices = np.random.choice(len(labels), num_noisy, replace=False)

    for idx in noisy_indices:
        original_label = noisy_labels[idx]
        noisy_labels[idx] = np.random.choice([i for i in range(num_classes) if i != original_label])

    return noisy_labels


def add_asymmetric_noise(labels, noise_rate=0.1):
    """
    Introduce asymmetric noise by flipping labels to specific incorrect classes.

    Args:
        labels (list or np.array): Original labels.
        noise_rate (float): Fraction of labels to flip.

    Returns:
        np.array: Labels with asymmetric noise.
    """
    noisy_labels = np.array(labels)
    num_noisy = int(len(labels) * noise_rate)
    noisy_indices = np.random.choice(len(labels), num_noisy, replace=False)

    # Define asymmetric noise mapping (e.g., class 0 flips to class 1, etc.)
    noise_mapping = {
        0: 1, 1: 2, 2: 3, 3: 4, 4: 5,
        5: 6, 6: 7, 7: 8, 8: 9, 9: 0
    }

    for idx in noisy_indices:
        original_label = noisy_labels[idx]
        noisy_labels[idx] = noise_mapping[original_label]

    return noisy_labels



In [None]:
import matplotlib.pyplot as plt

# Example CIFAR-10 labels (0-9 classes)
original_labels = np.random.randint(0, 10, size=100)

# Apply symmetric noise
noisy_labels_symmetric = add_symmetric_noise(original_labels, noise_rate=0.2, num_classes=10)

# Apply asymmetric noise
noisy_labels_asymmetric = add_asymmetric_noise(original_labels, noise_rate=0.1)

# Visualize symmetric noise
plt.figure(figsize=(12, 6))
plt.subplot(1, 2, 1)
plt.hist(original_labels, bins=np.arange(11)-0.5, edgecolor='black', color='cyan')
plt.title("Original Labels Distribution")
plt.xlabel("Class")
plt.ylabel("Frequency")

plt.subplot(1, 2, 2)
plt.hist(noisy_labels_symmetric, bins=np.arange(11)-0.5, edgecolor='black', color='cyan')
plt.title("Labels After Symmetric Noise")
plt.xlabel("Class")
plt.ylabel("Frequency")

plt.tight_layout()
plt.show()

# Visualize asymmetric noise
plt.figure(figsize=(12, 6))
plt.subplot(1, 2, 1)
plt.hist(original_labels, bins=np.arange(11)-0.5, edgecolor='black', color='cyan')
plt.title("Original Labels Distribution")
plt.xlabel("Class")
plt.ylabel("Frequency")

plt.subplot(1, 2, 2)
plt.hist(noisy_labels_asymmetric, bins=np.arange(11)-0.5, edgecolor='black', color='cyan')
plt.title("Labels After Asymmetric Noise")
plt.xlabel("Class")
plt.ylabel("Frequency")

plt.tight_layout()
plt.show()


In [None]:
import torch
import torch.nn as nn

class CIFAR10_CNN(nn.Module):
    def __init__(self):
        super().__init__()

        # Conv Block 1
        self.block1 = nn.Sequential(
            nn.Conv2d(3, 32, 3, padding=1),  # Output: 32x32x32
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.MaxPool2d(2, 2)  # Output: 16x16x32
        )

        # Conv Block 2
        self.block2 = nn.Sequential(
            nn.Conv2d(32, 64, 3, padding=1),  # Output: 16x16x64
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(2, 2)  # Output: 8x8x64
        )

        # Conv Block 3 (No Pooling)
        self.block3 = nn.Sequential(
            nn.Conv2d(64, 128, 3, padding=1),  # Output: 8x8x128
            nn.BatchNorm2d(128),
            nn.ReLU()
        )

        # Conv Block 4 (No Pooling)
        self.block4 = nn.Sequential(
            nn.Conv2d(128, 256, 3, padding=1),  # Output: 8x8x256
            nn.BatchNorm2d(256),
            nn.ReLU()
        )

        # Classifier
        self.classifier = nn.Sequential(
            nn.Flatten(),
            nn.Linear(256 * 8 * 8, 512),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(512, 10)
        )

    def forward(self, x):
        x = self.block1(x)
        x = self.block2(x)
        x = self.block3(x)
        x = self.block4(x)
        return self.classifier(x)

    def to_device(self):
        self.to(device)
        return self

# Test the model
if __name__ == "__main__":
    model = CIFAR10_CNN()
    test_input = torch.randn(4, 3, 32, 32)  # Batch of 4 CIFAR-10 images
    print("Output shape:", model(test_input).shape)  # Should be (4, 10)


In [None]:
import torch
import torch.nn as nn

# Define the model (assuming CIFAR10_CNN is already defined)
model = CIFAR10_CNN()

# Define the SGD optimizer with weight decay and momentum
optimizer = torch.optim.SGD(
    model.parameters(),  # Pass the model's parameters
    lr=0.01,             # Learning rate
    momentum=0.9,        # Momentum value
    weight_decay=1e-4    # Weight decay (L2 regularization)
)

# Print optimizer details to verify
print(optimizer)


In [None]:
import torch
import torch.nn.functional as F

# Phase 1: Standard Loss Functions --------------------------------------------------
def cross_entropy(outputs, targets):
    """Standard Cross-Entropy Loss"""
    return F.cross_entropy(outputs, targets)

def mean_absolute_error(outputs, targets):
    """Mean Absolute Error (MAE) between outputs and one-hot targets"""
    one_hot_targets = F.one_hot(targets, num_classes=outputs.size(1)).float()
    return torch.mean(torch.abs(outputs - one_hot_targets))

def reversed_cross_entropy(outputs, targets):
    """Reverse Cross-Entropy (RCE)"""
    softmax_outputs = F.softmax(outputs, dim=1)
    one_hot_targets = F.one_hot(targets, num_classes=outputs.size(1)).float()
    return -torch.mean(torch.sum((1 - one_hot_targets) * torch.log(softmax_outputs + 1e-8), dim=1))

def focal_loss(outputs, targets, gamma=2):
    """Focal Loss (Handles class imbalance)"""
    ce_loss = F.cross_entropy(outputs, targets, reduction='none')
    pt = torch.exp(-ce_loss)
    return torch.mean((1 - pt) ** gamma * ce_loss)

# Test the loss functions -----------------------------------------------------------
if __name__ == "__main__":
    # Test case: 2 samples, 3 classes
    logits = torch.tensor([[2.0, 1.0, 0.1], [0.1, 2.0, 1.0]])
    labels = torch.tensor([0, 1])  # Class indices

    print("CE:", cross_entropy(logits, labels).item())
    print("MAE:", mean_absolute_error(logits, labels).item())
    print("RCE:", reversed_cross_entropy(logits, labels).item())
    print("Focal:", focal_loss(logits, labels).item())


In [None]:
import torch
import torch.nn.functional as F

# Phase 2: Optimized Normalized Loss Functions ----------------------------------------
def normalized_cross_entropy(outputs, targets):
    """NCE = log p(y|x) / log(prod_k p(k|x))"""
    probs = F.softmax(outputs, dim=1)
    log_probs = torch.log(probs + 1e-8)  # Numerical stability
    numerator = log_probs[torch.arange(len(targets)), targets]
    denominator = torch.sum(log_probs, dim=1)
    return torch.mean(numerator / denominator)

def normalized_mean_absolute_error(outputs, targets):
    """NMAE = MAE * 1/(2*(K-1))"""
    K = outputs.size(1)
    return mean_absolute_error(outputs, targets) * (1/(2*(K-1)))

def normalized_reversed_cross_entropy(outputs, targets):
    """NRCE = RCE * 1/(A*(K-1)) (A=1 from paper proofs)"""
    K = outputs.size(1)
    return reversed_cross_entropy(outputs, targets) * (1/(1*(K-1)))

def normalized_focal_loss(outputs, targets, gamma=2):
    """NFL = log[(1-p_y)^γ p_y] / log[prod_k (1-p_k)^γ p_k]"""
    probs = F.softmax(outputs, dim=1)
    log_terms = torch.log((1 - probs)**gamma * probs + 1e-8)
    numerator = log_terms[torch.arange(len(targets)), targets]
    denominator = torch.sum(log_terms, dim=1)
    return torch.mean(numerator / denominator)



In [28]:
class APLLoss:
    def __init__(self, active_loss, passive_loss, alpha=1.0, beta=1.0):
        """
        Implements the APL framework from the paper:
        L_APL = α * L_Active + β * L_Passive

        Args:
            active_loss: One of [NCE, NFL] (normalized active losses)
            passive_loss: One of [NMAE, NRCE] (normalized passive losses)
            alpha: Weight for active loss term
            beta: Weight for passive loss term
        """
        self.active_loss = active_loss
        self.passive_loss = passive_loss
        self.alpha = alpha
        self.beta = beta

    def __call__(self, outputs, targets):
        active_term = self.active_loss(outputs, targets)
        passive_term = self.passive_loss(outputs, targets)
        return self.alpha * active_term + self.beta * passive_term

# Example usage with paper's first combination: αNCE + βMAE
apl_nce_mae = APLLoss(
    active_loss=normalized_cross_entropy,
    passive_loss=normalized_mean_absolute_error,
    alpha=1.0,
    beta=1.0
)


In [None]:
# Test case --------------------------------------------------------
logits = torch.tensor([
    [3.0, 1.0, 0.1],  # Class 0 dominant
    [0.1, 3.0, 0.1],  # Class 1 dominant
    [0.1, 0.1, 3.0]   # Class 2 dominant
])
labels = torch.tensor([0, 1, 2])  # All correct labels

# Calculate individual components for verification
nce = normalized_cross_entropy(logits, labels)  # ~0.333
nmae = normalized_mean_absolute_error(logits, labels)  # ~0.000

# Calculate APL loss
apl_loss = apl_nce_mae(logits, labels)



In [None]:
import torch
import numpy as np
import matplotlib.pyplot as plt
from torch.utils.data import DataLoader, Subset
from copy import deepcopy

# Configuration
NUM_EPOCHS = 50
BATCH_SIZE = 128
NOISE_RATES = {'symmetric': [0.2, 0.4, 0.6, 0.8],
               'asymmetric': [0.1, 0.2, 0.3, 0.4]}
LOSS_FUNCTIONS = {
    'CE': cross_entropy,
    'FL': lambda o, t: focal_loss(o, t, gamma=2),
    'NCE': normalized_cross_entropy,
    'NFL': lambda o, t: normalized_focal_loss(o, t, gamma=2),
    #'APL(NCE+MAE)': APLLoss(normalized_cross_entropy, normalized_mean_absolute_error),
    #'APL(NFL+RCE)': APLLoss(normalized_focal_loss, normalized_reversed_cross_entropy)
}

def run_experiments():
    # Load clean dataset once
    original_trainset = torchvision.datasets.CIFAR10(
        root='./data', train=True, download=True, transform=transform)

    # Store final results across all experiments
    final_results = {ntype: {} for ntype in NOISE_RATES.keys()}

    for noise_type, rates in NOISE_RATES.items():
        for eta in rates:
            print(f"\n{'-'*40}\nTraining with {noise_type} noise η={eta}\n{'-'*40}")

            # Create noisy dataset copy
            noisy_trainset = deepcopy(original_trainset)
            original_targets = noisy_trainset.targets.copy()

            # Apply noise
            if noise_type == 'symmetric':
                noisy_targets = add_symmetric_noise(original_targets, eta)
            else:
                noisy_targets = add_asymmetric_noise(original_targets, eta)

            noisy_trainset.targets = noisy_targets
            train_loader = DataLoader(noisy_trainset, batch_size=BATCH_SIZE,
                                    shuffle=True, num_workers=2, pin_memory=True)

            # Train all loss functions simultaneously
            results = train_single_experiment(train_loader, eta, noise_type)
            final_results[noise_type][eta] = results

            # Cleanup
            del noisy_trainset
            plot_metrics(results, eta, noise_type)

    # Plot final comparison
    plot_final_comparison(final_results)
    return final_results

def train_single_experiment(train_loader, eta, noise_type):
    models = {name: CIFAR10_CNN().to_device() for name in LOSS_FUNCTIONS.keys()}
    optimizers = {name: torch.optim.SGD(model.parameters(), lr=0.01,
                                      momentum=0.9, weight_decay=1e-4)
                for name, model in models.items()}

    metrics = {name: {'train_loss': [], 'train_acc': [],
                     'test_acc': []} for name in LOSS_FUNCTIONS.keys()}

    for epoch in range(NUM_EPOCHS):
        # Training phase
        for name, model in models.items():
            model.train()
            epoch_loss = 0.0
            correct = 0
            total = 0

            for inputs, labels in train_loader:
                inputs, labels = inputs.to(device), labels.to(device)

                optimizers[name].zero_grad()
                outputs = model(inputs)
                loss = LOSS_FUNCTIONS[name](outputs, labels)
                loss.backward()
                optimizers[name].step()

                epoch_loss += loss.item() * inputs.size(0)
                _, predicted = torch.max(outputs.data, 1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()

            # Evaluation
            train_acc = 100 * correct / total
            test_acc = evaluate(models[name], testloader)

            # Store metrics
            metrics[name]['train_loss'].append(epoch_loss / len(train_loader.dataset))
            metrics[name]['train_acc'].append(train_acc)
            metrics[name]['test_acc'].append(test_acc)

            # Print progress
            print(f"Epoch {epoch+1:02d}/{NUM_EPOCHS} | {name:12} | "
                  f"Loss: {metrics[name]['train_loss'][-1]:.4f} | "
                  f"Train Acc: {train_acc:.2f}% | "
                  f"Test Acc: {test_acc:.2f}%")

    return metrics

def evaluate(model, loader):
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for inputs, labels in loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    return 100 * correct / total

def plot_metrics(metrics, eta, noise_type):
    plt.figure(figsize=(15, 5))

    plt.subplot(1, 2, 1)
    for name, data in metrics.items():
        plt.plot(data['train_acc'], label=name)
    plt.title(f'Training Accuracy ({noise_type} η={eta})')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy (%)')
    plt.legend()

    plt.subplot(1, 2, 2)
    for name, data in metrics.items():
        plt.plot(data['test_acc'], label=name)
    plt.title(f'Test Accuracy ({noise_type} η={eta})')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy (%)')
    plt.legend()

    plt.tight_layout()
    plt.savefig(f'results_{noise_type}_eta{eta}.png')
    plt.show()

def plot_final_comparison(final_results):
    plt.figure(figsize=(12, 8))
    for noise_type in NOISE_RATES.keys():
        plt.subplot(1, 2, 1 if noise_type == 'symmetric' else 2)
        for name in LOSS_FUNCTIONS.keys():
            accuracies = [final_results[noise_type][eta][name]['test_acc'][-1]
                         for eta in NOISE_RATES[noise_type]]
            plt.plot(NOISE_RATES[noise_type], accuracies, marker='o', label=name)

        plt.title(f'{noise_type.capitalize()} Noise Comparison')
        plt.xlabel('Noise Rate')
        plt.ylabel('Final Test Accuracy (%)')
        plt.legend()
        plt.grid(True)

    plt.tight_layout()
    plt.savefig('final_comparison.png')
    plt.show()


def plot_symmetric_only(final_results):
    plt.figure(figsize=(8, 6))
    noise_type = 'symmetric'

    for name in LOSS_FUNCTIONS.keys():
        accuracies = [final_results[noise_type][eta][name]['test_acc'][-1]
                    for eta in NOISE_RATES[noise_type]]
        plt.plot(NOISE_RATES[noise_type], accuracies, marker='o', label=name)

    plt.title('Symmetric Noise Comparison')
    plt.xlabel('Noise Rate')
    plt.ylabel('Final Test Accuracy (%)')
    plt.legend()
    plt.grid(True)
    plt.savefig('symmetric_comparison.png')
    plt.show()





if __name__ == "__main__":
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    print(f"Using device: {device}")

    # Initialize datasets
    transform = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
    ])

    testset = torchvision.datasets.CIFAR10(
        root='./data', train=False, download=True, transform=transform)
    testloader = DataLoader(testset, batch_size=256, shuffle=False,
                          num_workers=2, pin_memory=True)

    # Run full experiment suite
    final_results = run_experiments()
    plot_symmetric_only(final_results)
