**ReadMe:**

Please run the code in the notebook sequentially.

In [None]:
import os
import copy
import random
import numpy as np
import pandas as pd
from collections import OrderedDict
import time

import matplotlib.pyplot as plt

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.utils.data
from torch.utils.data import Dataset, DataLoader
import torchvision
import torchvision.models
import torchvision.transforms as tt

from sklearn.metrics import accuracy_score
from sklearn.model_selection import train_test_split

In [None]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
if torch.cuda.is_available():
    torch.backends.cudnn.deterministic = True

# 1. Dataset processing

In [None]:
class CustomTensorDataset(Dataset):
    # TensorDataset with support of transforms.
    def __init__(self, tensors, transform=None):
        assert all(tensors[0].size(0) == tensor.size(0) for tensor in tensors)
        self.tensors = tensors
        self.transform = transform

    def __getitem__(self, index):
        x = self.tensors[0][index]

        if self.transform:
            x = self.transform(x)

        y = self.tensors[1][index]

        return x, y

    def __len__(self):
        return self.tensors[0].size(0)

def get_loader(train_X, val_X, test_X, train_y, val_y, test_y, train_tfms, valid_tfms, batch_size =128, num_workers = 1):
    # This function is used to convert the dataset into DataLoaders.  
    # The training data's, validation data's and testing data's corresponding DataLoader objects are returned.
    
    # The training dataset
    train_data = CustomTensorDataset((train_X, train_y), train_tfms)
    # The vailidation dataset
    val_data = CustomTensorDataset((val_X, val_y), valid_tfms)
    # The testing dataset
    test_data = CustomTensorDataset((test_X, test_y), valid_tfms) 

    # The pytorch built-in class DataLoader can help us to shuffle the data, draw mini-batch, do transformations, etc. 
    train_loader = DataLoader(
        train_data,
        batch_size=batch_size,
        shuffle=True,
        num_workers=num_workers,
    )

    val_loader = DataLoader(
        val_data,
        batch_size=100,
        shuffle=False,
        num_workers=num_workers,
    )

    test_loader = DataLoader(
        test_data,
        batch_size=100,
        num_workers=num_workers,
        shuffle=False,
    )
    # Return the three DataLodaer
    return train_loader, val_loader, test_loader

# 2. Neural network structures

## 2.1 Fully connected network

In [None]:
class FCNet(nn.Module):
    def __init__(self, input_dim = 784):
        super(FCNet, self).__init__()
        self.linear_layer_1 = nn.Linear(input_dim, 1024)
        self.bn1 = nn.BatchNorm1d(1024)
        self.drop1 = nn.Dropout(p=0.1)

        self.hidden_layer_1 = nn.Linear(1024, 512)
        self.bn2 = nn.BatchNorm1d(512)
        self.drop2 = nn.Dropout(p=0.1)

        self.hidden_layer_2 = nn.Linear(512, 256)
        self.bn3 = nn.BatchNorm1d(256)
        self.drop3 = nn.Dropout(p=0.1)

        self.hidden_layer_3 = nn.Linear(256, 128)
        self.bn4 = nn.BatchNorm1d(128)
        self.drop4 = nn.Dropout(p=0.1)

        self.hidden_layer_4 = nn.Linear(128, 64)
        self.bn5 = nn.BatchNorm1d(64)
        self.drop5 = nn.Dropout(p=0.1)

        self.hidden_layer_5 = nn.Linear(64, 32)
        self.bn6 = nn.BatchNorm1d(32)
        self.drop6 = nn.Dropout(p=0.1)
        
        self.output_layer = nn.Linear(32, 3)

    def forward(self, x):
        # Define a fully connected network with 5 hidden layer.
        out = torch.flatten(x, 1)
        out = self.drop1(self.bn1(F.relu(self.linear_layer_1(out))))
        out = self.drop2(self.bn2(F.relu(self.hidden_layer_1(out))))
        out = self.drop3(self.bn3(F.relu(self.hidden_layer_2(out))))
        out = self.drop4(self.bn4(F.relu(self.hidden_layer_3(out))))
        out = self.drop5(self.bn5(F.relu(self.hidden_layer_4(out))))
        out = self.drop6(self.bn6(F.relu(self.hidden_layer_5(out))))
        out = self.output_layer(out)
        
        probas = F.softmax(out, dim=1)

        # Return the logit and the probability
        return out, probas

## 2.2 CNN

In [None]:
class CNN(nn.Module):
    
    def __init__(self, grayscale=True):
        super(CNN, self).__init__()

        # Handle different input channel
        if grayscale:
            self.layer1 = nn.Sequential(
                nn.Conv2d(in_channels=1, out_channels=128, kernel_size=3, padding=1),
                nn.BatchNorm2d(128),
                nn.ReLU(),
                nn.MaxPool2d(kernel_size=2, stride=2)
            )
        else:
            self.layer1 = nn.Sequential(
                nn.Conv2d(in_channels=3, out_channels=128, kernel_size=3, padding=1),
                nn.BatchNorm2d(128),
                nn.ReLU(),
                nn.MaxPool2d(kernel_size=2, stride=2)
            )
        
        self.layer2 = nn.Sequential(
            nn.Conv2d(in_channels=128, out_channels=64, kernel_size=3),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2)
        )

        self.layer3 = nn.Sequential(
            nn.Conv2d(in_channels=64, out_channels=128, kernel_size=3),
            nn.BatchNorm2d(128),
            nn.ReLU()
        )

        # Adjust different dimensions for gray scale and RGB data
        if grayscale:
            self.fc1 = nn.Linear(in_features=128*4*4, out_features=600)
        else:
            self.fc1 = nn.Linear(in_features=128*5*5, out_features=600)
        self.drop1 = nn.Dropout2d(0.1)
        self.fc2 = nn.Linear(in_features=600, out_features=120)
        self.drop2 = nn.Dropout2d(0.1)
        self.fc3 = nn.Linear(in_features=120, out_features=120)
        self.drop3 = nn.Dropout2d(0.1)
        self.fc4 = nn.Linear(in_features=120, out_features=120)
        self.fc5 = nn.Linear(in_features=120, out_features=3)

        self.relu = nn.ReLU(inplace=True)
        
    def forward(self, x):
        out = self.layer1(x)
        out = self.layer2(out)
        out = self.layer3(out)
        out = out.view(out.size(0), -1)
        out = self.relu(self.fc1(out))
        out = self.drop1(out)
        out = self.relu(self.fc2(out))
        out = self.drop2(out)
        out = self.relu(self.fc3(out))
        out = self.drop3(out)
        out = self.relu(self.fc4(out))
        out = self.fc5(out)
        probas = F.softmax(out, dim=1)
        
        # Return the logit and the probability
        return out, probas

## 2.3 Resnet

In [None]:
class BasicBlock(nn.Module):
    # The basic block for resnet
    expansion = 1

    def __init__(self, in_planes, planes, stride=1):
        super(BasicBlock, self).__init__()
        self.conv1 = nn.Conv2d(
            in_planes, planes, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(planes)
        self.conv2 = nn.Conv2d(planes, planes, kernel_size=3,
                               stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(planes)

        self.shortcut = nn.Sequential()
        if stride != 1 or in_planes != self.expansion*planes:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_planes, self.expansion*planes,
                          kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(self.expansion*planes)
            )

    def forward(self, x):
        out = F.relu(self.bn1(self.conv1(x)))
        out = self.bn2(self.conv2(out))
        out += self.shortcut(x)
        out = F.relu(out)
        return out


# This is the block used for ResNet 50 and others
# We have only used ResNet 18 for running the experiment for saving the computational time
class Bottleneck(nn.Module):
    expansion = 4

    def __init__(self, in_planes, planes, stride=1):
        super(Bottleneck, self).__init__()
        self.conv1 = nn.Conv2d(in_planes, planes, kernel_size=1, bias=False)
        self.bn1 = nn.BatchNorm2d(planes)
        self.conv2 = nn.Conv2d(planes, planes, kernel_size=3,
                               stride=stride, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(planes)
        self.conv3 = nn.Conv2d(planes, self.expansion *
                               planes, kernel_size=1, bias=False)
        self.bn3 = nn.BatchNorm2d(self.expansion*planes)

        self.shortcut = nn.Sequential()
        if stride != 1 or in_planes != self.expansion*planes:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_planes, self.expansion*planes,
                          kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(self.expansion*planes)
            )

    def forward(self, x):
        out = F.relu(self.bn1(self.conv1(x)))
        out = F.relu(self.bn2(self.conv2(out)))
        out = self.bn3(self.conv3(out))
        out += self.shortcut(x)
        out = F.relu(out)
        return out


class ResNet(nn.Module):
    def __init__(self, block, num_blocks, num_classes=3, grayscale=True):
        super(ResNet, self).__init__()
        self.in_planes = 64
        # Handle different input channel for gray scale or RGB
        if grayscale:
            self.conv1 = nn.Conv2d(1, 64, kernel_size=3,
                               stride=1, padding=1, bias=False)
        else:
            self.conv1 = nn.Conv2d(3, 64, kernel_size=3,
                               stride=1, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(64)
        self.layer1 = self._make_layer(block, 64, num_blocks[0], stride=1)
        self.layer2 = self._make_layer(block, 128, num_blocks[1], stride=2)
        self.layer3 = self._make_layer(block, 256, num_blocks[2], stride=2)
        self.layer4 = self._make_layer(block, 512, num_blocks[3], stride=2)
        self.linear = nn.Linear(512*block.expansion, num_classes)

    def _make_layer(self, block, planes, num_blocks, stride):
        strides = [stride] + [1]*(num_blocks-1)
        layers = []
        for stride in strides:
            layers.append(block(self.in_planes, planes, stride))
            self.in_planes = planes * block.expansion
        return nn.Sequential(*layers)

    def forward(self, x):
        out = F.relu(self.bn1(self.conv1(x)))
        out = self.layer1(out)
        out = self.layer2(out)
        out = self.layer3(out)
        out = self.layer4(out)
        out = F.avg_pool2d(out, 4)
        out = out.view(out.size(0), -1)
        out = self.linear(out)
        probas = F.softmax(out, dim=1)
        # Return the logit and the probability
        return out, probas

# 3. Training and testing

## 3.1 Helper functions for training and validation 

In [None]:
def compute_accuracy_noisy(probas, target):
    return accuracy_score(torch.max(probas, 1)[1].cpu(), target.cpu())

# A helper function which is used to record the experiment results.
class AverageMeter(object):
    def __init__(self):
        self.reset()

    def reset(self):
        self.val = 0
        self.avg = 0
        self.sum = 0
        self.count = 0

    def update(self, val, num):
        self.val = val
        self.sum += val * num
        self.count += num
        self.avg = self.sum / self.count
        
# A helper function to calculate the testing Top-1 accuracy
def compute_accuracy(model, data_loader, Ta):
    correct_pred, num_examples = 0, 0
    for i, (features, targets) in enumerate(data_loader):           
        features = features.to(device)
        targets = targets.to(device)
        logits, probas = model(features)

        # Convert to column vector
        probas = probas.T

        # Multiply with T^{-1}
        probas = torch.matmul(torch.inverse(Ta), probas).T

        # Convert the one hot embedding to predicted label with highest probability
        _, predicted_labels = torch.max(probas, 1)

        # Calculate the Top-1 accuracy
        num_examples += targets.size(0)
        correct_pred += (predicted_labels == targets).sum()
    return correct_pred.float()/num_examples

In [None]:
# When all random seeds are fixed, the python runtime environment becomes deterministic.
def seed_torch(seed=1029):
    r"""Fix all random seeds for repeating the expriement result."""
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    # If multi-GPUs are used. 
    torch.cuda.manual_seed_all(seed) 
    torch.backends.cudnn.benchmark = False
    torch.backends.cudnn.deterministic = True

## 3.2 Functions for evaluating transition matrix

In [None]:
# The conversion function for one hot embedding
def one_hot_embedding(labels, num_classes):
    r"""Embedding labels to one-hot form.

    Args:
      labels: (LongTensor) class labels, sized [N,].
      num_classes: (int) number of classes.

    Returns:
      (tensor) encoded labels, sized [N, #classes].
    """
    y = torch.eye(num_classes) 
    return y[labels] 

In [None]:
# The evaluation function for the transition matrix by the anchor points method
def eval_tran_mat(model, train_loader):
    r"""caluate the transition matrix 
    by using the prediction of a the trained classifier (the anchor points assumption). 
    """
    model.eval()
    pos_condition_p = []
    for step, (data, targets) in enumerate(train_loader):
        
        data = data.to(device)
        labels = targets.numpy()

        with torch.no_grad():
            logits, probas = model(data)

        one_hot_labels = one_hot_embedding(targets,3).numpy()
        probas = probas.cpu().data.numpy()
        pos_condition_p += probas.tolist() 
        
    pos_condition_p = np.array(pos_condition_p)
    col1 = pos_condition_p[np.argmax(pos_condition_p[:, 0])].reshape(-1,1)
    col2 = pos_condition_p[np.argmax(pos_condition_p[:, 1])].reshape(-1,1)
    col3 = pos_condition_p[np.argmax(pos_condition_p[:, 2])].reshape(-1,1)

    return np.concatenate((col1, col2, col3), axis=1)

In [None]:
# The evaluation function for the T^A matrix by the dual-T estimator
def eval_tran_mat_TA(model, train_loader):
    r"""caluate the the transition matrix
    by using the prediction of a the trained classifier (the dual-T estimator). 
    """
    model.eval()
    y_data_tol = []
    for step, (data, targets) in enumerate(train_loader):
        
        data = data.to(device)
        labels = targets.numpy().reshape(-1,1)

        with torch.no_grad():
            logits, probas = model(data)

        prect_y = np.array(torch.max(probas, 1)[1].cpu()).reshape(-1,1)
        y_data = np.concatenate((prect_y, labels), axis=1).tolist()
        y_data_tol += y_data
        
    result = np.zeros((3,3))
    for i in range(3):
        for j in range(3):
            result[i, j] = y_data_tol.count([i,j])

    Ta = result.copy()
    for i in range(3):
        if np.sum(result[i]) != 0:
            Ta[i] /= np.sum(result[i])

    return Ta

## 3.3 Function for training, validation and testing

In [None]:
# The training, validating and testing function
def train_val_test(model, train_loader, val_loader, test_loader, optimizer, Ta, num_epoch = 4):
    # Training process
    for epoch in range(num_epoch):
        # Restart the average meter
        ave_meter = AverageMeter()
        
        model.train()
        train_index = 0
        for batch_idx, (features, targets) in enumerate(train_loader):
            
            features = features.to(device)
            targets = targets.to(device)
                
            # Forward and back propagation
            logits, probas = model(features)
            cost = F.cross_entropy(logits, targets)
            optimizer.zero_grad()
            
            cost.backward()
            
            # Updata the model parameters
            optimizer.step()

            # Store the training accuracy
            acc = compute_accuracy_noisy(probas, targets)
            ave_meter.update(acc, targets.size(0))
            
            # As we are not required to present the training accuracy, we abandon this information
            

    # Validation process
    # Restart the average meter
    ave_meter = AverageMeter()
    for batch_idx, (features, targets) in enumerate(val_loader): 

        features = features.to(device)
        targets = targets.to(device)

        # Forward prediction
        logits, probas = model(features)

        # Store the validation accuracy
        acc = compute_accuracy_noisy(probas, targets)
        ave_meter.update(acc, targets.size(0))

    # Get the avaerage validation accuracy
    average_acc_val = ave_meter.avg

    # The transition matrix estmated by anchor points method
    estimated_TB = eval_tran_mat(model, train_loader)

    # The T^A matrix estmated by dual-T method
    estimated_TA = eval_tran_mat_TA(model, train_loader)

    # To save memory during inference
    with torch.set_grad_enabled(False): 
        # Compute the testing accuracy using the provided transition matrix
        average_acc_test = compute_accuracy(model, test_loader, Ta)

    return average_acc_val, average_acc_test, estimated_TB, np.matmul(estimated_TA, estimated_TB)

In [None]:
# As the CIFIAR dataset has not known ground truth transition matrix, we use this function for evaluting the two estimation methods.
def train_val_test_CIFAR(model, train_loader, val_loader, test_loader, optimizer, Ta, Ta_dual, num_epoch = 4):
    # Ta is the transition matrix estimated by anchor points method
    # Ta_dual is the transition matrix estimated by dual-T method
    for epoch in range(num_epoch):
        ave_meter = AverageMeter()
        
        model.train()
        train_index = 0
        for batch_idx, (features, targets) in enumerate(train_loader):
            
            features = features.to(device)
            targets = targets.to(device)
                
            # Forward and back propagation
            logits, probas = model(features)
            cost = F.cross_entropy(logits, targets)
            optimizer.zero_grad()
            
            cost.backward()
            
            # Updata the model parameters
            optimizer.step()

            # Store the training accuracy
            acc = compute_accuracy_noisy(probas, targets)
            ave_meter.update(acc, targets.size(0))

            # As we are not required to present the training accuracy, we abandon this information


    # Validation process
    # Restart the average meter
    ave_meter = AverageMeter()
    for batch_idx, (features, targets) in enumerate(val_loader): 

        features = features.to(device)
        targets = targets.to(device)

        # Forward prediction
        logits, probas = model(features)

        # Store the validation accuracy
        acc = compute_accuracy_noisy(probas, targets)
        ave_meter.update(acc, targets.size(0))

    # Get the avaerage validation accuracy
    average_acc_val = ave_meter.avg

    # The transition matrix estmated by anchor points method
    estimated_TB = eval_tran_mat(model, train_loader)

    # The T^A matrix estmated by dual-T method    
    estimated_TA = eval_tran_mat_TA(model, train_loader)

    # To save memory during inference
    with torch.set_grad_enabled(False):
        # Compute the testing accuracy using the estimated transition matrix (anchor points method)
        average_acc_test = compute_accuracy(model, test_loader, Ta)

        # Compute the testing accuracy using the estimated transition matrix (dual-T method)
        average_acc_dual_test = compute_accuracy(model, test_loader, Ta_dual)

    return average_acc_val, average_acc_test, average_acc_dual_test


# 4. Experiment results

## 4.1 FashionMINIST0.5.npz

In [None]:
# Load the dataset
dataset = np.load("./data/FashionMNIST0.5.npz")
# Convert its shape
Xtr_val = dataset['Xtr'].reshape([18000, 1, 28, 28])
Str_val = dataset['Str']
# Convert its shape
Xts = dataset['Xts'].reshape([3000, 1, 28, 28])
Yts = dataset['Yts']

# The transition matrix provided
Ta = torch.tensor([[0.5,0.2, 0.3], [0.3, 0.5, 0.2], [0.2, 0.3, 0.5]]).to(device)

# Lists to store the testing accuracy
test_acc_FNN = []
test_acc_CNN = []
test_acc_Resnet = []

# The optimal model with highest validation performance to be stored
best_model = None
best_T_estimated = 0
best_T_dual_estimated = 0
best_val_acc = 0

# Loop for 10 random splits
for i in range(10):
    # Random seeds
    seed_torch(2 ** i)

    # Splitting the training and validation set
    X_train, X_val, y_train, y_val = train_test_split(Xtr_val, Str_val, test_size=0.2)

    # Convert the dataset to tensor
    X_train = torch.tensor(X_train, dtype=torch.float32)
    X_val = torch.tensor(X_val, dtype=torch.float32)
    X_test = torch.tensor(Xts, dtype=torch.float32)

    y_train = torch.tensor(y_train, dtype=torch.long)
    y_val = torch.tensor(y_val, dtype=torch.long)
    y_test = torch.tensor(Yts, dtype=torch.long)

    # For normalization
    stats = ((0.1307,), (0.3081,))


    # For training transformation
    train_tfms = tt.Compose([tt.ToPILImage(),
                            tt.RandomHorizontalFlip(), 
                            tt.ToTensor(), 
                            tt.Normalize(*stats,inplace=True)])
    # For validation and testing transformation  
    valid_tfms = tt.Compose([tt.ToPILImage(), tt.ToTensor(), tt.Normalize(*stats)])

    # Get the dataloader
    train_loader, val_loader, test_loader = get_loader(X_train, X_val, X_test, y_train, y_val, y_test, train_tfms, valid_tfms, batch_size = 100, num_workers = 0)

    # FNN model
    FNN_model = FCNet().to(device)
    FNN_optimizer = torch.optim.Adam(FNN_model.parameters(), lr=0.001)
    FNN_avg_acc_val, FNN_avg_acc_test, FNN_estimated_T, FNN_estimated_T_dual = train_val_test(FNN_model, train_loader, val_loader, test_loader, FNN_optimizer, Ta, num_epoch = 4)
    test_acc_FNN.append(float(FNN_avg_acc_test.cpu()))

    if FNN_avg_acc_val > best_val_acc:
        best_val_acc = FNN_avg_acc_val
        best_model = FNN_model
        best_T_estimated = FNN_estimated_T
        best_T_dual_estimated = FNN_estimated_T_dual

    # CNN model
    CNN_model = CNN(grayscale=True).to(device)
    CNN_optimizer = torch.optim.Adam(CNN_model.parameters(), lr=0.001)
    CNN_avg_acc_val, CNN_avg_acc_test, CNN_estimated_T, CNN_estimated_T_dual = train_val_test(CNN_model, train_loader, val_loader, test_loader, CNN_optimizer, Ta, num_epoch = 4)
    test_acc_CNN.append(float(CNN_avg_acc_test.cpu()))

    if CNN_avg_acc_val > best_val_acc:
        best_val_acc = CNN_avg_acc_val
        best_model = CNN_model
        best_T_estimated = CNN_estimated_T
        best_T_dual_estimated = CNN_estimated_T_dual
 
    # ResNet model
    Resnet_model = ResNet(block=BasicBlock, num_blocks=[2, 2, 2, 2], num_classes=3, grayscale=True).to(device)
    Resnet_optimizer = torch.optim.Adam(Resnet_model.parameters(), lr=0.001)
    Resnet_avg_acc_val, Resnet_avg_acc_test, Resnet_estimated_T, Resnet_estimated_T_dual = train_val_test(Resnet_model, train_loader, val_loader, test_loader, Resnet_optimizer, Ta, num_epoch = 4)
    test_acc_Resnet.append(float(Resnet_avg_acc_test.cpu()))

    if Resnet_avg_acc_val > best_val_acc:
        best_val_acc = Resnet_avg_acc_val
        best_model = Resnet_model
        best_T_estimated = Resnet_estimated_T
        best_T_dual_estimated = Resnet_estimated_T_dual

In [None]:
test_acc_FNN = np.array(test_acc_FNN)
print(f'The average testing accuracy FNN is {test_acc_FNN.mean():.4f}, and the std is {test_acc_FNN.std():.4f}' )

test_acc_CNN = np.array(test_acc_CNN)
print(f'The average testing accuracy CNN is {test_acc_CNN.mean():.4f}, and the std is {test_acc_CNN.std():.4f}' )

test_acc_Resnet = np.array(test_acc_Resnet)
print(f'The average testing accuracy ResNet is {test_acc_Resnet.mean():.4f}, and the std is {test_acc_Resnet.std():.4f}' )

The average testing accuracy FNN is 0.8435, and the std is 0.0195
The average testing accuracy CNN is 0.9332, and the std is 0.0080
The average testing accuracy ResNet is 0.8699, and the std is 0.0454


In [None]:
print('The best estimated transition matrix by loss correction is:')
print(best_T_estimated)

# The provided transition matrix
T_true = np.array([[0.5,0.2, 0.3], [0.3, 0.5, 0.2], [0.2, 0.3, 0.5]])

print("The estimation error in transition matrix (mse) by loss correction is:")
print((np.square(T_true - best_T_estimated)).mean())

The best estimated transition matrix by loss correction is:
[[0.66529697 0.15440804 0.21152596]
 [0.20973    0.54397291 0.11993556]
 [0.12497307 0.30161905 0.66853851]]
The estimation error in transition matrix (mse) by loss correction is:
0.009750985322292405


In [None]:
print('The best estimated transition matrix by dual T estimator is:')
print(best_T_dual_estimated)

print("The estimation error in transition matrix (mse) by dual T estimator is:")
print((np.square(T_true - best_T_dual_estimated)).mean())

The best estimated transition matrix by dual T estimator is:
[[0.41141052 0.30109887 0.2869903 ]
 [0.27609988 0.39435128 0.30012959]
 [0.30818424 0.30917714 0.41031554]]
The estimation error in transition matrix (mse) by dual T estimator is:
0.00664760953341222


## 4.2 FashionMINIST0.6.npz

In [None]:
# Load the dataset
dataset = np.load("./data/FashionMNIST0.6.npz")
# Convert its shape
Xtr_val = dataset['Xtr'].reshape([18000, 1, 28, 28])
Str_val = dataset['Str']
# Convert its shape
Xts = dataset['Xts'].reshape([3000, 1, 28, 28])
Yts = dataset['Yts']

# The transition matrix provided
Ta = torch.tensor([[0.4, 0.3, 0.3], [0.3, 0.4, 0.3], [0.3, 0.3, 0.4]]).to(device)

# Lists to store the testing accuracy
test_acc_FNN = []
test_acc_CNN = []
test_acc_Resnet = []

# The optimal model with the bset validation performance
best_model = None
best_T_estimated = 0
best_T_dual_estimated = 0
best_val_acc = 0

# Loop for 10 random train validation split
for i in range(10):
    # Random seed
    seed_torch(3 ** i)
    # Training validation splitting
    X_train, X_val, y_train, y_val = train_test_split(Xtr_val, Str_val, test_size=0.2)

    # Conver to tensor
    X_train = torch.tensor(X_train, dtype=torch.float32)
    X_val = torch.tensor(X_val, dtype=torch.float32)
    X_test = torch.tensor(Xts, dtype=torch.float32)

    y_train = torch.tensor(y_train, dtype=torch.long)
    y_val = torch.tensor(y_val, dtype=torch.long)
    y_test = torch.tensor(Yts, dtype=torch.long)

    # For normalization
    stats = ((0.1307,), (0.3081,))

    # For training transformation
    train_tfms = tt.Compose([tt.ToPILImage(),
                            tt.RandomHorizontalFlip(), 
                            tt.ToTensor(), 
                            tt.Normalize(*stats,inplace=True)])
    # For validation and testing transformation   
    valid_tfms = tt.Compose([tt.ToPILImage(), tt.ToTensor(), tt.Normalize(*stats)])

    train_loader, val_loader, test_loader = get_loader(X_train, X_val, X_test, y_train, y_val, y_test, train_tfms, valid_tfms, batch_size = 100, num_workers = 0)

    # The FNN model
    FNN_model = FCNet().to(device)
    FNN_optimizer = torch.optim.Adam(FNN_model.parameters(), lr=0.001)
    FNN_avg_acc_val, FNN_avg_acc_test, FNN_estimated_T, FNN_estimated_T_dual = train_val_test(FNN_model, train_loader, val_loader, test_loader, FNN_optimizer, Ta, num_epoch = 4)
    test_acc_FNN.append(float(FNN_avg_acc_test.cpu()))

    if FNN_avg_acc_val > best_val_acc:
        best_val_acc = FNN_avg_acc_val
        best_model = FNN_model
        best_T_estimated = FNN_estimated_T
        best_T_dual_estimated = FNN_estimated_T_dual

    # The CNN model
    CNN_model = CNN(grayscale=True).to(device)
    CNN_optimizer = torch.optim.Adam(CNN_model.parameters(), lr=0.001)
    CNN_avg_acc_val, CNN_avg_acc_test, CNN_estimated_T, CNN_estimated_T_dual = train_val_test(CNN_model, train_loader, val_loader, test_loader, CNN_optimizer, Ta, num_epoch = 4)
    test_acc_CNN.append(float(CNN_avg_acc_test.cpu()))

    if CNN_avg_acc_val > best_val_acc:
        best_val_acc = CNN_avg_acc_val
        best_model = CNN_model
        best_T_estimated = CNN_estimated_T
        best_T_dual_estimated = CNN_estimated_T_dual

    # The ResNet model
    Resnet_model = ResNet(block=BasicBlock, num_blocks=[2, 2, 2, 2], num_classes=3, grayscale=True).to(device)
    Resnet_optimizer = torch.optim.Adam(Resnet_model.parameters(), lr=0.001)
    Resnet_avg_acc_val, Resnet_avg_acc_test, Resnet_estimated_T, Resnet_estimated_T_dual = train_val_test(Resnet_model, train_loader, val_loader, test_loader, Resnet_optimizer, Ta, num_epoch = 4)
    test_acc_Resnet.append(float(Resnet_avg_acc_test.cpu()))

    if Resnet_avg_acc_val > best_val_acc:
        best_val_acc = Resnet_avg_acc_val
        best_model = Resnet_model
        best_T_estimated = Resnet_estimated_T
        best_T_dual_estimated = Resnet_estimated_T_dual

In [None]:
test_acc_FNN = np.array(test_acc_FNN)
print(f'The average testing accuracy FNN is {test_acc_FNN.mean():.4f}, and the std is {test_acc_FNN.std():.4f}' )

test_acc_CNN = np.array(test_acc_CNN)
print(f'The average testing accuracy CNN is {test_acc_CNN.mean():.4f}, and the std is {test_acc_CNN.std():.4f}' )

test_acc_Resnet = np.array(test_acc_Resnet)
print(f'The average testing accuracy ResNet is {test_acc_Resnet.mean():.4f}, and the std is {test_acc_Resnet.std():.4f}' )

The average testing accuracy FNN is 0.7070, and the std is 0.0308
The average testing accuracy CNN is 0.8579, and the std is 0.0278
The average testing accuracy ResNet is 0.6950, and the std is 0.0988


In [None]:
print('The best estimated transition matrix by loss correction is:')
print(best_T_estimated)

# The provided transition matrix
T_true = np.array([[0.4, 0.3, 0.3], [0.3, 0.4, 0.3], [0.3, 0.3, 0.4]])

print("The estimation error in transition matrix (mse) by loss correction is:")
print((np.square(T_true - best_T_estimated)).mean())

The best estimated transition matrix by loss correction is:
[[0.45818657 0.28933543 0.32200667]
 [0.26656845 0.43239841 0.29609001]
 [0.27524495 0.27826613 0.38190329]]
The estimation error in transition matrix (mse) by loss correction is:
0.000842108992334572


In [None]:
print('The best estimated transition matrix by dual T estimator is:')
print(best_T_dual_estimated)

print("The estimation error in transition matrix (mse) by dual T estimator is:")
print((np.square(T_true - best_T_dual_estimated)).mean())

The best estimated transition matrix by dual T estimator is:
[[0.34532686 0.32872515 0.33246407]
 [0.32603551 0.34373554 0.32950672]
 [0.32869206 0.32849888 0.33746317]]
The estimation error in transition matrix (mse) by dual T estimator is:
0.0016809624247598593


## 4.3 CIFAR.npz

In [None]:
dataset = np.load("./data/CIFAR.npz")
Xtr_val = dataset['Xtr']
Str_val = dataset['Str']
Xts = dataset['Xts']
Yts = dataset['Yts']

# No transition matrix provided, so we firstly try to estimate it

Ta = torch.tensor([[1., 0., 0.], [0., 1., 0.], [0., 0., 1.]]).to(device)

# Firstly we need to estimate the transition matrix using 10 random splitting and training. 
# The model with the best validation performance will be used to estimate the transition matrix

# To store the optimal model with the bset validation performance
best_model = None
best_T_estimated = 0
best_T_dual_estimated = 0
best_val_acc = 0
for i in range(10):
    # Random seed
    seed_torch(5 * i) 

    # Random traning validation splitting
    X_train, X_val, y_train, y_val = train_test_split(Xtr_val, Str_val, test_size=0.2)

    # Convert the dataset to tensor with the correct shape
    X_train = torch.tensor(X_train, dtype=torch.float32).permute(0,3,1,2)
    X_val = torch.tensor(X_val, dtype=torch.float32).permute(0,3,1,2)
    X_test = torch.tensor(Xts, dtype=torch.float32).permute(0,3,1,2)

    # Convert the dataset to tensor
    y_train = torch.tensor(y_train, dtype=torch.long)
    y_val = torch.tensor(y_val, dtype=torch.long)
    y_test = torch.tensor(Yts, dtype=torch.long)

    # To nomalize
    stats = ((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010))

    # Training transform
    train_tfms = tt.Compose([tt.ToPILImage(),
                            tt.RandomHorizontalFlip(), 
                            tt.ToTensor(), 
                            tt.Normalize(*stats,inplace=True)])
    # Validation and testing transform    
    valid_tfms = tt.Compose([tt.ToPILImage(), tt.ToTensor(), tt.Normalize(*stats)])

    train_loader, val_loader, test_loader = get_loader(X_train, X_val, X_test, y_train, y_val, y_test, train_tfms, valid_tfms, batch_size = 100, num_workers = 0)

    # The FNN model
    FNN_model = FCNet(input_dim = 3072).to(device)
    FNN_optimizer = torch.optim.Adam(FNN_model.parameters(), lr=0.001)
    FNN_avg_acc_val, FNN_avg_acc_test, FNN_estimated_T, FNN_estimated_T_dual = train_val_test(FNN_model, train_loader, val_loader, test_loader, FNN_optimizer, Ta, num_epoch = 4)

    # Find the optimal model with highest validation accuracy
    if FNN_avg_acc_val > best_val_acc:
        best_val_acc = FNN_avg_acc_val
        best_model = FNN_model
        best_T_estimated = FNN_estimated_T
        best_T_dual_estimated = FNN_estimated_T_dual

    # The CNN model
    CNN_model = CNN(grayscale=False).to(device)
    CNN_optimizer = torch.optim.Adam(CNN_model.parameters(), lr=0.001)
    CNN_avg_acc_val, CNN_avg_acc_test, CNN_estimated_T, CNN_estimated_T_dual = train_val_test(CNN_model, train_loader, val_loader, test_loader, CNN_optimizer, Ta, num_epoch = 4)

    # Find the optimal model with highest validation accuracy
    if CNN_avg_acc_val > best_val_acc:
        best_val_acc = CNN_avg_acc_val
        best_model = CNN_model
        best_T_estimated = CNN_estimated_T
        best_T_dual_estimated = CNN_estimated_T_dual

    # The ResNet model
    Resnet_model = ResNet(block=BasicBlock, num_blocks=[2, 2, 2, 2], num_classes=3, grayscale=False).to(device)
    Resnet_optimizer = torch.optim.Adam(Resnet_model.parameters(), lr=0.001)
    Resnet_avg_acc_val, Resnet_avg_acc_test, Resnet_estimated_T, Resnet_estimated_T_dual = train_val_test(Resnet_model, train_loader, val_loader, test_loader, Resnet_optimizer, Ta, num_epoch = 4)

    # Find the optimal model with highest validation accuracy
    if Resnet_avg_acc_val > best_val_acc:
        best_val_acc = Resnet_avg_acc_val
        best_model = Resnet_model
        best_T_estimated = Resnet_estimated_T
        best_T_dual_estimated = Resnet_estimated_T_dual

In [None]:
print('The best estimated transition matrix by loss correction is:')
print(best_T_estimated)

The best estimated transition matrix by loss correction is:
[[0.46960118 0.09578571 0.35432899]
 [0.3438389  0.5333789  0.21702285]
 [0.18655995 0.37083539 0.42864814]]


In [None]:
print('The best estimated transition matrix by dual T estimator is:')
print(best_T_dual_estimated)

The best estimated transition matrix by dual T estimator is:
[[0.3493624  0.32104207 0.32735027]
 [0.32317779 0.35741034 0.3283265 ]
 [0.32684683 0.31559371 0.3480214 ]]


In [None]:
# After we have estimated the transition matrix by the two methods, we aply the matrix to evaluate their performances
dataset = np.load("./data/CIFAR.npz")
Xtr_val = dataset['Xtr']
Str_val = dataset['Str']
Xts = dataset['Xts']
Yts = dataset['Yts']


# Use the estimated transition matrix
Ta = torch.from_numpy(best_T_estimated).float().to(device)
Ta_dual = torch.from_numpy(best_T_dual_estimated).float().to(device)


# The lists to store the testing accuracy
test_acc_FNN = []
test_acc_CNN = []
test_acc_Resnet = []

test_acc_dual_FNN = []
test_acc_dual_CNN = []
test_acc_dual_Resnet = []

for i in range(10):
    # Random seed
    seed_torch(2 ** i)

    # Random training valition splitting
    X_train, X_val, y_train, y_val = train_test_split(Xtr_val, Str_val, test_size=0.2)

    # Convert the dataset to tensor with the correct shape
    X_train = torch.tensor(X_train, dtype=torch.float32).permute(0,3,1,2)
    X_val = torch.tensor(X_val, dtype=torch.float32).permute(0,3,1,2)
    X_test = torch.tensor(Xts, dtype=torch.float32).permute(0,3,1,2)

    # Convert the dataset to tensor
    y_train = torch.tensor(y_train, dtype=torch.long)
    y_val = torch.tensor(y_val, dtype=torch.long)
    y_test = torch.tensor(Yts, dtype=torch.long)

    stats = ((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010))

    train_tfms = tt.Compose([tt.ToPILImage(),
                            tt.RandomHorizontalFlip(), 
                            tt.ToTensor(), 
                            tt.Normalize(*stats,inplace=True)])
    
    valid_tfms = tt.Compose([tt.ToPILImage(), tt.ToTensor(), tt.Normalize(*stats)])

    train_loader, val_loader, test_loader = get_loader(X_train, X_val, X_test, y_train, y_val, y_test, train_tfms, valid_tfms, batch_size = 100, num_workers = 0)

    # The FNN model
    FNN_model = FCNet(input_dim = 3072).to(device)
    FNN_optimizer = torch.optim.Adam(FNN_model.parameters(), lr=0.001)
    FNN_avg_acc_val, FNN_avg_acc_test, FNN_average_acc_dual_test = train_val_test_CIFAR(FNN_model, train_loader, val_loader, test_loader, FNN_optimizer, Ta, Ta_dual, num_epoch = 4)
    test_acc_FNN.append(float(FNN_avg_acc_test.cpu()))
    test_acc_dual_FNN.append(float(FNN_average_acc_dual_test.cpu()))

    # The CNN model
    CNN_model = CNN(grayscale=False).to(device)
    CNN_optimizer = torch.optim.Adam(CNN_model.parameters(), lr=0.001)
    CNN_avg_acc_val, CNN_avg_acc_test, CNN_average_acc_dual_test = train_val_test_CIFAR(CNN_model, train_loader, val_loader, test_loader, CNN_optimizer, Ta, Ta_dual, num_epoch = 4)
    test_acc_CNN.append(float(CNN_avg_acc_test.cpu()))
    test_acc_dual_CNN.append(float(CNN_average_acc_dual_test.cpu()))

    # The ResNet model
    Resnet_model = ResNet(block=BasicBlock, num_blocks=[2, 2, 2, 2], num_classes=3, grayscale=False).to(device)
    Resnet_optimizer = torch.optim.Adam(Resnet_model.parameters(), lr=0.001)
    Resnet_avg_acc_val, Resnet_avg_acc_test, Resnet_average_acc_dual_test = train_val_test_CIFAR(Resnet_model, train_loader, val_loader, test_loader, Resnet_optimizer, Ta, Ta_dual, num_epoch = 4)
    test_acc_Resnet.append(float(Resnet_avg_acc_test.cpu()))
    test_acc_dual_Resnet.append(float(Resnet_average_acc_dual_test.cpu()))

In [None]:
print('loss correction')
print('-------------------------------------------')

test_acc_dual_FNN = np.array(test_acc_dual_FNN)
print(f'The average testing accuracy FNN is {test_acc_dual_FNN.mean():.4f}, and the std is {test_acc_dual_FNN.std():.4f}' )

test_acc_dual_CNN = np.array(test_acc_dual_CNN)
print(f'The average testing accuracy CNN is {test_acc_dual_CNN.mean():.4f}, and the std is {test_acc_dual_CNN.std():.4f}' )

test_acc_Resnet = np.array(test_acc_Resnet)
print(f'The average testing accuracy ResNet is {test_acc_Resnet.mean():.4f}, and the std is {test_acc_Resnet.std():.4f}' )

loss correction
-------------------------------------------
The average testing accuracy FNN is 0.5182, and the std is 0.0191
The average testing accuracy CNN is 0.5736, and the std is 0.0951
The average testing accuracy ResNet is 0.5207, and the std is 0.0877


In [None]:
print('dual T estimator')
print('-------------------------------------------')

test_acc_FNN = np.array(test_acc_FNN)
print(f'The average testing accuracy FNN is {test_acc_FNN.mean():.4f}, and the std is {test_acc_FNN.std():.4f}' )

test_acc_CNN = np.array(test_acc_CNN)
print(f'The average testing accuracy CNN is {test_acc_CNN.mean():.4f}, and the std is {test_acc_CNN.std():.4f}' )

test_acc_dual_Resnet = np.array(test_acc_dual_Resnet)
print(f'The average testing accuracy ResNet is {test_acc_dual_Resnet.mean():.4f}, and the std is {test_acc_dual_Resnet.std():.4f}' )

dual T estimator
-------------------------------------------
The average testing accuracy FNN is 0.5435, and the std is 0.0282
The average testing accuracy CNN is 0.6384, and the std is 0.0931
The average testing accuracy ResNet is 0.5053, and the std is 0.0666
