# COMP5328 - Advanced Machine Learning

## Tutorial - Learning with Noisy Data II: Label Noise

**Semester 2, 2025**

**Objectives:**

* To build a neural network classifier with Pytorch.
* To estimate the transistion matrix.

**Instructions:**

* Exercises to be completed on IPython notebook such as: 
   * Ipython 3 (Jupyter) notebook installed on your computer http://jupyter.org/install (you need to have Python installed first https://docs.python.org/3/using/index.html )
   * Web-based Ipython notebooks such as Google Colaboratory https://colab.research.google.com/ 
   
* If you are using Jupyter intalled on your computer, Go to File->Open. Drag and drop "week10_tutorial.ipynb" file to the home interface and click upload. 
* If you are using Google Colaboratory, Click File->Upload notebook, and and upload "week10_tutorial.ipynb" file
* Complete exercises in "week10_tutorial.ipynb".
* To run the cell you can press Ctrl-Enter or hit the Play button at the top.
* Complete all exercises marked with **TODO**.
* Save your file when you are done with the exercises, so you can show your tutor next week.

Lecturers: Tongliang Liu

## 1. Introduction

In this tutorial, we illustrate how to solve the instance-independent and class-dependent transition matrix in a binary classification setting, which can be easily extended to the multi-class classification.

Let $Y$ be the clean label, and $\tilde{Y}$ be the noisy label. Remind that the noisy class posteriors ${P}(\tilde{Y}|X)$ and the clean class posteriors ${P}(Y|X)$ can be related via the transition matrix $T$, i.e.,

\begin{align}
    \begin{bmatrix}
    P(\tilde{Y}=0|X) \\
    P(\tilde{Y}=1|X) 
    \end{bmatrix} =
    \begin{bmatrix}
    P(\tilde{Y}=0|Y=0) &  P(\tilde{Y}=0|Y=1) \\
    P(\tilde{Y}=1|Y=0) &  P(\tilde{Y}=1|Y=1) 
    \end{bmatrix} \begin{bmatrix}
    P(Y=0|X) \\
    P(Y=1|X) 
    \end{bmatrix}
\end{align}

Suppose we have a point $x^0$, such that $P(Y=0|X=x^0)=1$ and $P(Y=1|X=x^0)=0$, then we can obtain the first column of the transition matrix as follows,

\begin{align}
    \begin{bmatrix}
    P(\tilde{Y}=0|X=x^1) \\
    P(\tilde{Y}=1|X=x^1) 
    \end{bmatrix} &=
    \begin{bmatrix}
    P(\tilde{Y}=0|Y=0) &  P(\tilde{Y}=0|Y=1) \\
    P(\tilde{Y}=1|Y=0) &  P(\tilde{Y}=1|Y=1) 
    \end{bmatrix} 
    \begin{bmatrix}
    P(Y=0|X=x^1) \\
    P(Y=1|X=x^1) 
    \end{bmatrix} \\
    &=\begin{bmatrix}
    P(\tilde{Y}=0|Y=0) &  P(\tilde{Y}=0|Y=1) \\
    P(\tilde{Y}=1|Y=0) &  P(\tilde{Y}=1|Y=1) 
    \end{bmatrix} 
    \begin{bmatrix}
    1 \\
    0 
    \end{bmatrix} \\
    &=\begin{bmatrix}
    P(\tilde{Y}=0|Y=0) \\
    P(\tilde{Y}=1|Y=0) 
    \end{bmatrix},
\end{align}
where the noisy class posteriors can be estimated directly by training a classifier with the noisy data.

Similarly, if we have a point $x^1$, such that $P(Y=0|X=x^1)=0$ and $P(Y=1|X=x^1)=1$, then we can obtain the second column of the transition matrix.

As we can see, the points $x^0$ and $x^1$ are the keys to estimate the transition matrix $T$. These points are called the anchor points which can only belong to one clean class, i.e., given $x^i$ is an anchor point of the clean class $i$, then it must satisfy $P(Y=i|X=x^i) = 1$.

When the noise rate is upper bounded by a constant, then the anchor points can be found as follows [1],
\begin{align}
x^i = argmax_{x\in \mathcal{X}} P(\tilde{Y}=i|X=x).
\end{align}
Emprically, we could estimate the anchor points on the noisy dataset and use the estimated anchor points to estimate the transition matrix, which will be illustrated in the rest of this tutorial.

[1]. Jiacheng Cheng, Tongliang Liu, Kotagiri Ramamohanarao, and Dacheng Tao. Learning withbounded instance-and label-dependent label noise. In ICML, 2020.

## 2. Prepare Data

The data is provided at the tutorial's root folder, which is called "input.data". It is a CSV format file. Each row of the file represents an example. The first 10 columns represents the attributes and the last column represents the labels.

### 2.1 add class-conditional random label noise (CCN)  to the data

As mentioned in the lecture,
* class-conditional random label noise (CCN) is independent on the attributes but dependent on the labels, i.e., $P(\tilde{Y}|Y,X)=P(\tilde{Y}|Y)$.
* The negative filp rate and the positive filp rate may be not symmetric, i.e., $P(\tilde{Y}=1|Y=0) \neq P(\tilde{Y}=0|Y=1)$, where $Y$ is the clean label, $\tilde{Y}$ is the noisy label.
* The filp rates are usually bounded in [0,0.5)

In [1]:
import os
import copy
import random
import numpy as np
import pandas as pd
from collections import OrderedDict

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.utils.data
from torch.utils.data import Dataset, DataLoader
import torchvision
import torchvision.models
import torchvision.transforms


# The data is a 2D array.
# Each row of the file represents an example. 
# The first 10 columns represents their attributes. 
# The last column represents their labels.
# The function should return the noisy data which 
# has the same shape with the input data. The last column represents their noisy labels.
def binary_CCN_generator(data, flip_rates=[0.15,0.25]):
    r"""class-conditional random label noise generator.

    Args:
        flip rates: [P(Y^{~}=1|Y=0),P(Y^{~}=0|Y=1)]
        data: An 2D-array of the "clean" data, each row repesents an example. 
        The last column of the data should be labels {0,1}.
        
    Returns:
        new_data:  An 2D-array of the noisy data.
    """
    # Please don't modify the variable "data".
    new_data = copy.deepcopy(data)
    
    # TODO Please complete this function.
    label_idx = len(new_data[0])-1
    for d in new_data:
        curr_label = int(d[label_idx])
        if random.uniform(0, 1) <= flip_rates[curr_label]:
            d[label_idx] = abs(curr_label - 1)
    return new_data



# You may test your binary_CCN_generator with the following function.
def test_CCN_generator(data_path="./input.data", flip_rates=[0.15,0.25]):
    
    assert all(fr >= 0 and fr < 0.5 for fr in flip_rates)
    assert len(flip_rates) == 2
    
    data = pd.read_csv(data_path, header = None).values.tolist()
    noisy_data = binary_CCN_generator(data = data, flip_rates = flip_rates)
    positive_len = 0
    postive_f = 0
    negative_f = 0
    negative_len = 0 

    for i in range(len(data)):
        d = data[i]
        n_d = noisy_data[i]
        if d[-1] == 1.0:
            positive_len += 1
        else:
            negative_len += 1
        if d[-1] != n_d[-1]:
            if n_d[-1] == 0:
                postive_f += 1
            else:
                negative_f += 1
    
    if (data == noisy_data and sum(flip_rates) != 0) or (negative_f/negative_len-flip_rates[0] > 0.01 and postive_f/positive_len-flip_rates[1]> 0.01):
        print("Test failed!")
    else:
        print("Test passed!")
    
    
test_CCN_generator()
        
    

Test passed!


### 2.2 prepare the noisy data for training and validation

In [2]:
# A helper class, it is used as an input of the DataLoader object.
class DatasetArray(Dataset):
    r"""This is a child class of the pytorch Dataset object."""
    def __init__(self, data, labels=None, transform=None):
        if labels != None:
            self.data_arr = np.asarray(data).astype(np.float32)
            self.label_arr = np.asarray(labels).astype(np.long)
        else:
            tmp_arr = np.asarray(data)
            self.data_arr = tmp_arr[:,:-1].astype(np.float32)
            self.label_arr = tmp_arr[:,-1].astype(np.long)
        self.transform = transform
        
    def __len__(self):
        return len(self.data_arr)
    
    def __getitem__(self, index):
     
        data = self.data_arr[index]
        label = self.label_arr[index]
        
        if self.transform is not None:
            data = self.transform(data)
            
        return (data, label)
    
    
# Splitting the data into three parts.
def train_val_test_random_split(data, fracs=[0.7,0.1,0.2]):
    r"""Split the data into training, validation and test set.
    Args:
        fracs: a list of length three
    """
    assert len(fracs) == 3
    assert sum(fracs) == 1
    assert all(frac > 0 for frac in fracs)
    n = len(data)
    subset_lens = [int(n*frac) for frac in fracs]
    idxs = list(range(n))
    random.shuffle(idxs)
    data = np.array(data)
    new_data = []
    start_idx = 0
    for subset_len in subset_lens:
        end_idx = start_idx + subset_len
        cur_idxs = idxs[start_idx:end_idx]
        new_data.append(data[cur_idxs,:].tolist())
        start_idx = end_idx
    return new_data
    
    
# Preparation of the data for training, validation and testing a pytorch network. 
# Note that the test data is not in use for this lab.
def get_loader(batch_size =128, num_workers = 1, train_val_test_split = [0.7,0.1,0.2], data=None):
    r"""This function is used to read the data file and split the data into three subsets, i.e, 
    train data, validation data and test data. Their corresponding DataLoader objects are returned."""
    
    [train_data, val_data, test_data] = train_val_test_random_split(data, fracs = train_val_test_split)

    train_data = DatasetArray(data = train_data)
    val_data = DatasetArray(data = val_data)
    test_data = DatasetArray(data = test_data)

    #The pytorch built-in class DataLoader can help us to shuffle the data, draw mini-batch,
    #do transformations, etc. 
    train_loader = DataLoader(
        train_data,
        batch_size=batch_size,
        shuffle=True,
        num_workers=num_workers,
    )

    val_loader = DataLoader(
        val_data,
        batch_size=100,
        shuffle=False,
        num_workers=num_workers,
    )

    test_loader = DataLoader(
        test_data,
        batch_size=100,
        num_workers=num_workers,
        shuffle=False,
    )
    return train_loader, val_loader, test_loader

## 3. Define a Simple Network

In [3]:
# Define a fully connected network class.
class FCNet(nn.Module):
    def __init__(self, input_dim=10, hidden_dim=25, output_dim=2):
        super(FCNet, self).__init__()
        self.input_layer = nn.Linear(input_dim, hidden_dim)
        self.hidden_layer = nn.Linear(hidden_dim, hidden_dim)
        self.output_layer = nn.Linear(hidden_dim, output_dim)

    def forward(self, x):
        # TODO based on the class attributes (fields), define a fully connected network with one hidden layer.
        # Specifically, it should have this structure:
        # input_layer->relu->hidden_layer->relu->output_layer.
        out = F.relu(self.input_layer(x))
        out = F.relu(self.hidden_layer(out))
        out = self.output_layer(out)
        return out
        
        
        return out
    
def Predefined_FCNet(input_dim=10,hidden_dim=5,output_dim=2):
    model = FCNet(input_dim=input_dim,hidden_dim=hidden_dim,output_dim=output_dim)
    return model



## 4. Training and validation

### 4.1 some helper functions for training and validation 

In [4]:
# When all random seeds are fixed, the python runtime environment becomes deterministic.
def seed_torch(seed=1029):
    r"""Fix all random seeds for repeating the expriement result."""
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    # If multi-GPUs are used. 
    torch.cuda.manual_seed_all(seed) 
    torch.backends.cudnn.benchmark = False
    torch.backends.cudnn.deterministic = True


# Embedding labels to one-hot form.
def one_hot_embedding(labels, num_classes):
    r"""Embedding labels to one-hot form.

    Args:
      labels: (LongTensor) class labels, sized [N,].
      num_classes: (int) number of classes.

    Returns:
      (tensor) encoded labels, sized [N, #classes].
    """
    y = torch.eye(num_classes) 
    return y[labels] 


# Calcuate the accuracy according to the prediction and the true label.
def accuracy(output, target, topk=(1,)):
    r"""Computes the precision@k for the specified values of k."""
    maxk = max(topk)
    batch_size = target.size(0)

    _, pred = output.topk(maxk, 1, True, True)
    pred = pred.t()
    correct = pred.eq(target.view(1, -1).expand_as(pred))

    res = []
    for k in topk:
        correct_k = correct[:k].view(-1).float().sum(0)
        res.append(correct_k.mul_(100.0 / batch_size))
    return res


# A helper function which is used to record the experiment results.
class AverageMeter(object):
    def __init__(self):
        self.reset()

    def reset(self):
        self.val = 0
        self.avg = 0
        self.sum = 0
        self.count = 0

    def update(self, val, num):
        self.val = val
        self.sum += val * num
        self.count += num
        self.avg = self.sum / self.count
        
        
# Load a NN model.
def load_model(m_config):
    model = globals()[m_config['model_name']](
        input_dim=m_config["input_dim"],
        hidden_dim=m_config["hidden_dim"],
        output_dim=m_config["output_dim"]
    )
    return model



### 4.2 training and validation functions

In [5]:
def train(epoch, model, optimizer, criterion, train_loader):
    """Training a pytorch nn_model."""
    top1_acc_meter = AverageMeter()
    loss_meter = AverageMeter()

    # swith model to to train mode
    model.train()
    for step, (data, targets) in enumerate(train_loader):
        # prepare min_batch
        data = data.to(device)
        targets = targets.to(device)

        # predict
        preds = model(data)

        # forward
        loss = criterion(preds, targets)

        # set all gradients to zero
        optimizer.zero_grad()

        # backward
        loss.backward()

        # update all gradients
        optimizer.step()

        # calculate accuracy
        [top1_acc] = accuracy(preds.data, targets.data, topk=(1,))
        # record accuary and cross entropy losss
        min_batch_size = data.size(0)
        top1_acc_meter.update(top1_acc.item(), min_batch_size)
        loss_meter.update(loss.item(), min_batch_size)

    print("Train epoch ",epoch," Accuracy ",top1_acc_meter.avg)
    
    
def validate_and_test(epoch, model, criterion, val_loader, is_test=False):
    """Validation or testing of a nn_model."""
    top1_acc_meter = AverageMeter()
    loss_meter = AverageMeter()

    # swith model to to eval mode
    model.eval()
    for step, (data, targets) in enumerate(val_loader):
        # prepare min_batch
        data = data.to(device)
        targets = targets.to(device)

        # predict
        with torch.no_grad():
            preds = model(data)

        # forward
        loss = criterion(preds, targets)
            
        # calculate accuracy
        [top1_acc] = accuracy(preds.data, targets.data, topk=(1,))
  
        # record accuary and cross entropy losss
        min_batch_size = data.size(0)
        top1_acc_meter.update(top1_acc.item(), min_batch_size)
        loss_meter.update(loss.item(), min_batch_size)

    top1_acc_avg = top1_acc_meter.avg
    if(is_test == False):
        print("Validate epoch ",epoch," Accuracy ",top1_acc_avg)
    else:
        print("Test epoch ",epoch," Accuracy ",top1_acc_avg)

    return top1_acc_avg


### 4.3 run training and validation

In [6]:
# Some global variabiles
device = None
train_loader = None
model = None
model_config = OrderedDict([
    ('model_name','Predefined_FCNet'),
    ('input_dim',10),
    ('hidden_dim',2),
    ('output_dim',2)
])

optim_config = OrderedDict([
    ('epochs', 10),
    ('batch_size', 1280),
    ('base_lr', 0.1),
    ('weight_decay', 1e-5),
    ('momentum', 0.9),
    ('lr_decay', 0.1)
])


run_config = OrderedDict([
    ('seed', 1),
    ('outdir', 'trained_model'),
    ('num_workers', 1),
])

data_config = OrderedDict([
    ('flip_rates', [0.15,0.25]),
    ('data_path', "./input.data"),
    ('train_val_test_split', [0.7,0.1,0.2])
]) 

config = OrderedDict([
    ('model_config', model_config),
    ('optim_config', optim_config),
    ('run_config', run_config),
    ('data_config', data_config)
])


def run_train_val():
    global device
    global config
    global train_loader
    global model
    best_top1_acc = 0
    # check gpu
    device = 'cuda' if torch.cuda.is_available() else 'cpu'

    # configerations
    run_config = config['run_config']
    optim_config = config['optim_config']
    data_config = config['data_config']
    
    # set random seed
    seed_torch(run_config['seed'])

    # create output directory
    outdir = run_config['outdir']
    if not os.path.exists(outdir):
        os.makedirs(outdir)

    # load data
    data = pd.read_csv(data_config["data_path"], header = None).values.tolist()
    data = binary_CCN_generator(data = data, flip_rates = data_config["flip_rates"])
    
    train_loader, val_loader, test_loader = get_loader(
        batch_size = optim_config['batch_size'], 
        num_workers = run_config['num_workers'],
        train_val_test_split = data_config['train_val_test_split'],
        data = data
    )
    
    # model
    model = load_model(config['model_config'])
    if torch.cuda.device_count() > 1:
        print(torch.cuda.device_count(), "GPUs are used!")
        model = nn.DataParallel(model)
    model.to(device)

    # criterion and optimizer
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.SGD(
        params = model.parameters(),
        lr = optim_config['base_lr'], 
        momentum = optim_config['momentum'],
        weight_decay = optim_config['weight_decay']
    )

    for epoch in range(1, optim_config['epochs'] + 1):

        #train
        train(epoch, model, optimizer, criterion, train_loader)

        #validation
        top1_acc_avg = validate_and_test(epoch, model, criterion, val_loader)
        
        # save the best model so far
        if (top1_acc_avg > best_top1_acc):
            state = OrderedDict([
                ('config', config),
                ('state_dict', model.state_dict()),
                ('optimizer', optimizer.state_dict()),
                ('epoch', epoch),
                ('top1-accuracy', top1_acc_avg),
            ])
            best_model_path = os.path.join(outdir, 'model_best.pth')
            torch.save(state, best_model_path)
            best_top1_acc = top1_acc_avg
            
run_train_val()



Deprecated in NumPy 1.20; for more details and guidance: https://numpy.org/devdocs/release/1.20.0-notes.html#deprecations
  self.label_arr = tmp_arr[:,-1].astype(np.long)


2 GPUs are used!
Train epoch  1  Accuracy  54.33836514076783
Validate epoch  1  Accuracy  55.244646733162554
Train epoch  2  Accuracy  67.06197033819575
Validate epoch  2  Accuracy  71.07685941627308
Train epoch  3  Accuracy  70.73940252927092
Validate epoch  3  Accuracy  71.12340953365604
Train epoch  4  Accuracy  70.73275258926043
Validate epoch  4  Accuracy  71.22168200859748
Train epoch  5  Accuracy  70.79112451976829
Validate epoch  5  Accuracy  71.21133753613873
Train epoch  6  Accuracy  70.69063609148759
Validate epoch  6  Accuracy  71.41305471988329
Train epoch  7  Accuracy  70.778563445016
Validate epoch  7  Accuracy  71.00444812011088
Train epoch  8  Accuracy  70.78299677186038
Validate epoch  8  Accuracy  71.37167683636204
Train epoch  9  Accuracy  70.75048584979237
Validate epoch  9  Accuracy  71.26823212755889
Train epoch  10  Accuracy  70.81772437902579
Validate epoch  10  Accuracy  71.33547118670249


## 5 Evaluate Flip Rates

In [7]:
def eval_flip_rates():
    r"""caluate the negative flip rate and the positive flip rate 
    by using the prediction of a the trained classifier. 
    Remind that once we know flip rates, we know the transition matrix, 
    where the matrix is [[1-negative_flip_rate, positive_flip_rate], [negative_flip_rate, 1-positive_flip_rate]]
    
    Returns:
        negative_flip_rate
        positive_flip_rate
    """
    global config
    global train_loader
    global model
    # TODO Please complete this function, three global variables defined above should be used. 
    # Hit: The structure of this function should be:
    # load the saved best model -> get the probability predictions of the training set ->
    # find the flip rates based on the probability predictions.
    model.load_state_dict(torch.load(os.path.join(config['run_config']['outdir'], 'model_best.pth'))["state_dict"])
    model.eval()
    pos_condition_p = []
    for step, (data, targets) in enumerate(train_loader):
        
        # TODO Please complete this function.
        data = data.to(device)
        labels = targets.numpy()

        with torch.no_grad():
            outputs = model(data)

        one_hot_labels = one_hot_embedding(targets,2).numpy()
        probs = F.softmax(outputs, dim=1).cpu().data.numpy()
        data = data.cpu().numpy()
        pos_condition_p += probs[:,1].tolist() 
        
    pos_condition_p = sorted(pos_condition_p)

    negative_flip_rate, positive_flip_rate = pos_condition_p[0], 1-pos_condition_p[-1]
    return negative_flip_rate, positive_flip_rate


est_neg_flip_rate, est_pos_flip_rate = eval_flip_rates()
print('negative flip rate：', config["data_config"]["flip_rates"][0])
print('estimated negative flip rate：', est_neg_flip_rate)
print('positive flip rate：', config["data_config"]["flip_rates"][1])
print('estimated positive flip rate：', est_pos_flip_rate)

negative flip rate： 0.15
estimated negative flip rate： 0.16661709547042847
positive flip rate： 0.25
estimated positive flip rate： 0.28584039211273193
