# Core ML Task

# 1. Data preparation
We import CIFAR-10 dataset and add different noises to it for η = [0.2, 0.4, 0.6, 0.8] and train our model on these different noises.

In [None]:
import torch
import torchvision
from torch import nn


In [None]:
#this function adds symmetric noice to the dataset labels
def add_noise(y, noise_rate):
  num_samples = len(y)
  num_noise = int(num_samples*noise_rate)

  noisy_index = torch.randperm(num_samples)[:num_noise]

  for i in noisy_index:
    original_label = y[i].item()
    possible_labels = torch.tensor([x for x in range(10) if x != original_label])
    new_label = possible_labels[torch.randint(0, 9, (1,))]

    y[i] = new_label

    return y

# 2. Making a Simple Model

In [None]:
class SimpleCNN(nn.Module):
  def __init__(self, input_shape: int, output_shape: int, hidden_units: int):
    super().__init__()
    self.block_1 = nn.Sequential(
      nn.Conv2d(in_channels=input_shape,
                out_channels=hidden_units,
                kernel_size=3,
                stride=1,
                padding=1),
      nn.ReLU(),
      nn.Conv2d(in_channels=hidden_units,
                out_channels=hidden_units,
                kernel_size=3,
                stride=1,
                padding=1),
      nn.ReLU(),
      nn.MaxPool2d(kernel_size=2)
    )

    self.classifier = nn.Sequential(
      nn.Flatten(),
      nn.Linear(8192, out_features=output_shape),
    )

  def forward(self, x: torch.Tensor):
    x = self.block_1(x)
    x = self.classifier(x)
    return x


model0 = SimpleCNN(input_shape=3, output_shape=10, hidden_units=32)

device = "cpu"
model0.to(device)


SimpleCNN(
  (block_1): Sequential(
    (0): Conv2d(3, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): ReLU()
    (2): Conv2d(32, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (3): ReLU()
    (4): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  )
  (classifier): Sequential(
    (0): Flatten(start_dim=1, end_dim=-1)
    (1): Linear(in_features=8192, out_features=10, bias=True)
  )
)

# 3. Making Custom Loss functions
We make the functions we need to use that are not present in torch.nn

In [None]:
class NormalizedCrossEntropyLoss(nn.Module):
  def __init__(self, num_classes=10, reduction='mean'):
    super(NormalizedCrossEntropyLoss, self).__init__()
    self.num_classes = num_classes
    self.reduction = reduction
    self.ce = nn.CrossEntropyLoss(reduction='none')

  def forward(self, inputs, targets):
    ce_loss = self.ce(inputs, targets)
    # Maximum possible cross-entropy for uniform prediction is log(num_classes)
    norm_loss = ce_loss / math.log(self.num_classes)
    if self.reduction == 'mean':
      return norm_loss.mean()
    elif self.reduction == 'sum':
      return norm_loss.sum()
    else:
      return norm_loss

In [None]:
class FocalLoss(nn.Module):
  def __init__(self, gamma=2, num_classes=10, reduction='mean'):
    super(FocalLoss, self).__init__()
    self.gamma = gamma
    self.num_classes = num_classes
    self.reduction = reduction
    self.ce = nn.CrossEntropyLoss(reduction='none')

  def forward(self, inputs, targets):
    ce_loss = self.ce(inputs, targets)
    pt = torch.exp(-ce_loss)
    focal_loss = ((1-pt)**self.gamma) * ce_loss
    if self.reduction == 'mean':
      return focal_loss.mean()
    elif self.reduction == 'sum':
      return focal_loss.sum()
    else:
      return focal_loss


In [None]:
  class NormalizedFocalLoss(nn.Module):
    def __init__(self, gamma=2, num_classes=10):
      super(NormalizedFocalLoss, self).__init__()
      self.gamma = gamma
      self.num_classes = num_classes
      self.focal_loss = FocalLoss(gamma=gamma, num_classes=num_classes)
    def forward(self, inputs, targets):
      focal_loss = self.focal_loss(inputs, targets)
      norm_loss = focal_loss / math.log(self.num_classes)
      if self.reduction == 'sum':
        return norm_loss.sum()
      elif self.reduction == 'mean':
        return norm_loss.mean()
      else:
        return norm_loss



# 4. Making training and testing functions

In [None]:
def train(model, device, train_loader, optimizer, loss_fn, epoch):
  model.train()
  loss = 0
  for batch_idx, (data, target) in enumerate(train_loader):
    data, target = data.to(device), target.to(device)
    preds = model(data)
    loss += loss_fn(preds, target)
    optimizer.zero_grad()
    optimizer.step()
  loss /= len(train_loader)
  loss.backward()
  print(f"Epoch: {epoch}, Loss: {loss}")

def test(model, device, test_loader, loss_fn):
  model.eval()
  test_loss = 0
  correct = 0
  with torch.inference_mode():
    for data, target in test_loader:
      data, target = data.to(device), target.to(device)
      test_preds = model(data)
      test_loss += loss_fn(preds, target)
      pred = output.argmax(dim=1, keepdim=True)
      correct += pred.eq(target.view_as(pred)).sum().item()
  test_loss /= len(test_loader.dataset)
  accuracy = (correct/len(test_loader.dataset))*100
  return test_loss, accuracy

# 5. Training the model on data with different noises for different losses

In [None]:
epochs = 3
batch_size = 16

loss_functions = {
  "Vanilla CE": nn.CrossEntropyLoss(),
  "Normalized CE": NormalizedCrossEntropyLoss(num_classes=10),
  "Focal Loss": FocalLoss(gamma=2),
  "Normalized Focal Loss": NormalizedFocalLoss(gamma=2, num_classes=10)
}

results = {loss_name: [] for loss_name in loss_functions.keys()}

noise_rates = [0.2, 0.4, 0.6, 0.8]


transform = torchvision.transforms.Compose([torchvision.transforms.ToTensor()])

trainset = torchvision.datasets.CIFAR10(root="./data", train=True, transform=transform, download=True)
testset = torchvision.datasets.CIFAR10(root="./data", train=False, transform=transform, download=True)


y_train = torch.tensor(trainset.targets)
x_train = torch.tensor(trainset.data)

y_test = torch.tensor(testset.targets)
x_test = torch.tensor(testset.data)

optimizer = torch.optim.Adam(model0.parameters(), lr=0.001)

for noise in noise_rates:
  noisy_trainset = torchvision.datasets.CIFAR10(root="./data", train=True, transform=transform, download=True)

  y_train = torch.tensor(trainset.targets)
  y_train_with_noise = add_noise(y_train, noise)
  trainset.targets = y_train_with_noise

  train_loader = torch.utils.data.DataLoader(trainset, batch_size=batch_size, shuffle=True)
  test_loader = torch.utils.data.DataLoader(testset, batch_size=batch_size, shuffle=True)

  for loss_name, loss_fn in loss_functions.items():
    print(f"Training with loss: {loss_name}")
    for epoch in range(epochs):
      # train(model0, device,  train_loader, optimizer, loss_fn, epoch)
      model0.train()
      loss = 0
      for batch_idx, (data, target) in enumerate(train_loader):
        # data, target = data.to(device), target.to(device)
        preds = model0(data)
        loss += loss_fn(preds, target)
        optimizer.zero_grad()
        optimizer.step()
      loss /= len(train_loader)
      loss.backward()
      print(f"Epoch: {epoch}, Loss: {loss}")
    test_loss, accuracy = test(model0, device, test_loader, loss_fn)
    results[loss_name].append((noise, test_loss, accuracy))






Files already downloaded and verified
Files already downloaded and verified
Files already downloaded and verified
Training with loss: Vanilla CE
