# model preparation

In [1]:
import torch
import torch.nn as nn
import torchvision.transforms as transforms
import torchvision.datasets as dsets
from torch.autograd import Variable
import torch.nn.functional as F

class linear_model(nn.Module):
    def __init__(self, input_dim, output_dim):
        super(linear_model, self).__init__()
        self.linear = nn.Linear(input_dim, output_dim)
        
    def forward(self, x):
        out = x.view(-1, self.num_flat_features(x))
        out = self.linear(out)
        return out
        
    def num_flat_features(self, x):
        size = x.size()[1:]
        num_features = 1
        for s in size:
            num_features *= s
        return num_features
    
class mlp_model(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim):
        super(mlp_model, self).__init__()
        self.fc1 = nn.Linear(input_dim, hidden_dim)
        self.relu1 = nn.ReLU()
        self.fc2 = nn.Linear(hidden_dim, output_dim)

    def forward(self, x):
        out = x.view(-1, self.num_flat_features(x))
        out = self.fc1(out)
        out = self.relu1(out)
        out = self.fc2(out)
        return out

    def num_flat_features(self, x):
        size = x.size()[1:]
        num_features = 1
        for s in size:
            num_features *= s
        return num_features


ModuleNotFoundError: No module named 'torch'

# data algorithm

In [1]:
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import math

def accuracy_check(loader, model, device):
    with torch.no_grad():
        total, num_samples = 0, 0
        for images, labels in loader:
            labels, images = labels.to(device), images.to(device)
            outputs = model(images)
            _, predicted = torch.max(outputs.data, 1)
            total += (predicted == labels).sum().item()
            num_samples += labels.size(0)
    return total / num_samples

ModuleNotFoundError: No module named 'torch'

# data preparation

In [None]:
import sys
import numpy as np
import torch
import torchvision.transforms as transforms
import torchvision.datasets as dsets
import torchvision.models as models
from scipy.special import comb

def generate_uniform_comp_labels(dataname, labels):
    if torch.min(labels) > 1:
        raise RuntimeError('testError')
    elif torch.min(labels) == 1:
        labels = labels - 1

    K = torch.max(labels) - torch.min(labels) + 1
    n = labels.shape[0]
    cardinality = 2**K - 2
    number = torch.tensor([comb(K, i+1) for i in range(K-1)]) # 0 to K-2, convert list to tensor
    frequency_dis = number / cardinality
    prob_dis = torch.zeros(K-1) # tensor of K-1
    for i in range(K-1):
        if i == 0:
            prob_dis[i] = frequency_dis[i]
        else:
            prob_dis[i] = frequency_dis[i]+prob_dis[i-1]

    random_n = torch.from_numpy(np.random.uniform(0, 1, n)).float() # tensor: n
    mask_n = torch.ones(n) # n is the number of train_data
    partialY = torch.ones(n, K)
    temp_num_comp_train_labels = 0 # save temp number of comp train_labels
    
    for j in range(n): # for each instance
        if j % 1000 == 0:
            print("current index:", j)
        for jj in range(K-1): # 0 to K-2
            if random_n[j] <= prob_dis[jj] and mask_n[j] == 1:
                temp_num_comp_train_labels = jj+1 # decide the number of complementary train_labels
                mask_n[j] = 0

        candidates = torch.from_numpy(np.random.permutation(K.item())) # because K is tensor type
        candidates = candidates[candidates!=labels[j]]
        temp_comp_train_labels = candidates[:temp_num_comp_train_labels]
        
        for kk in range(len(temp_comp_train_labels)):
            partialY[j, temp_comp_train_labels[kk]] = 0 # fulfill the partial label matrix
    return partialY

def generate_multi_comp_labels(data, labels, s):
    k = torch.max(labels) + 1
    n = labels.shape[0]
    index_ins = torch.arange(n) # torch type
    realY = torch.zeros(n, k)
    realY[index_ins, labels] = 1
    partialY = torch.ones(n, k)
    
    labels_hat = labels.clone().numpy()
    candidates = np.repeat(np.arange(k).reshape(1, k), len(labels_hat), 0) # candidate labels without true class
    mask = np.ones((len(labels_hat), k), dtype=bool)
    for i in range(s):
        mask[np.arange(n), labels_hat] = False
        candidates_ = candidates[mask].reshape(n, k-1-i)
        idx = np.random.randint(0, k-1-i, n)
        comp_labels = candidates_[np.arange(n), np.array(idx)]
        partialY[index_ins, torch.from_numpy(comp_labels)] = 0
        if i == 0:
            complementary_labels = torch.from_numpy(comp_labels)
            multiple_data = data
        else:
            complementary_labels = torch.cat((complementary_labels, torch.from_numpy(comp_labels)), dim=0)
            multiple_data = torch.cat((multiple_data, data), dim = 0)
        labels_hat = comp_labels
    return partialY
        
def class_prior(complementary_labels):
    return np.bincount(complementary_labels) / len(complementary_labels)

def prepare_mnist_data(batch_size):
    ordinary_train_dataset = dsets.MNIST(root='./data/mnist', train=True, transform=transforms.ToTensor(), download=True)
    test_dataset = dsets.MNIST(root='./data/mnist', train=False, transform=transforms.ToTensor())
    train_loader = torch.utils.data.DataLoader(dataset=ordinary_train_dataset, batch_size=batch_size, shuffle=True, num_workers=0)
    test_loader = torch.utils.data.DataLoader(dataset=test_dataset, batch_size=batch_size, shuffle=False, num_workers=0)
    full_train_loader = torch.utils.data.DataLoader(dataset=ordinary_train_dataset, batch_size=len(ordinary_train_dataset.data), shuffle=True, num_workers=0)
    num_classes = len(ordinary_train_dataset.classes)
    return full_train_loader, train_loader, test_loader, ordinary_train_dataset, test_dataset, num_classes

def prepare_kmnist_data(batch_size):
    ordinary_train_dataset = dsets.KMNIST(root='./data/KMNIST', train=True, transform=transforms.ToTensor(), download=True)
    test_dataset = dsets.KMNIST(root='./data/KMNIST', train=False, transform=transforms.ToTensor())
    train_loader = torch.utils.data.DataLoader(dataset=ordinary_train_dataset, batch_size=batch_size, shuffle=True, num_workers=0)
    test_loader = torch.utils.data.DataLoader(dataset=test_dataset, batch_size=batch_size, shuffle=False, num_workers=0)
    full_train_loader = torch.utils.data.DataLoader(dataset=ordinary_train_dataset, batch_size=len(ordinary_train_dataset.data), shuffle=True, num_workers=0)
    num_classes = len(ordinary_train_dataset.classes)
    return full_train_loader, train_loader, test_loader, ordinary_train_dataset, test_dataset, num_classes

def prepare_fashion_data(batch_size):
    ordinary_train_dataset = dsets.FashionMNIST(root='./data/FashionMnist', train=True, transform=transforms.ToTensor(), download=True)
    test_dataset = dsets.FashionMNIST(root='./data/FashionMnist', train=False, transform=transforms.ToTensor())
    train_loader = torch.utils.data.DataLoader(dataset=ordinary_train_dataset, batch_size=batch_size, shuffle=True, num_workers=0)
    test_loader = torch.utils.data.DataLoader(dataset=test_dataset, batch_size=batch_size, shuffle=False, num_workers=0)
    full_train_loader = torch.utils.data.DataLoader(dataset=ordinary_train_dataset, batch_size=len(ordinary_train_dataset.data), shuffle=True, num_workers=0)
    num_classes = len(ordinary_train_dataset.classes)
    return full_train_loader, train_loader, test_loader, ordinary_train_dataset, test_dataset, num_classes

def prepare_cifar10_data(batch_size):
    train_transform = transforms.Compose(
        [transforms.ToTensor(), # transforms.RandomHorizontalFlip(), transforms.RandomCrop(32,4),
         transforms.Normalize((0.4914, 0.4822, 0.4465), (0.247, 0.243, 0.261))])
    test_transform = transforms.Compose(
        [transforms.ToTensor(),
         transforms.Normalize((0.4914, 0.4822, 0.4465), (0.247, 0.243, 0.261))])
    ordinary_train_dataset = dsets.CIFAR10(root='./data', train=True, transform=train_transform, download=True)
    test_dataset = dsets.CIFAR10(root='./data', train=False, transform=test_transform)
    train_loader = torch.utils.data.DataLoader(dataset=ordinary_train_dataset, batch_size=batch_size, shuffle=True, num_workers=0)
    test_loader = torch.utils.data.DataLoader(dataset=test_dataset, batch_size=batch_size, shuffle=False, num_workers=0)
    full_train_loader = torch.utils.data.DataLoader(dataset=ordinary_train_dataset, batch_size=len(ordinary_train_dataset.data), shuffle=True, num_workers=0)
    return full_train_loader, train_loader, test_loader, ordinary_train_dataset, test_dataset, 10

def prepare_train_loaders_for_multi_comp_labels(full_train_loader, batch_size, s):
    for i, (data, labels) in enumerate(full_train_loader):
        K = torch.max(labels)+1 # K is number of classes, full_train_loader is full batch
    partialY = generate_multi_comp_labels(data, labels, s)
    partial_matrix_dataset = torch.utils.data.TensorDataset(data, partialY.float())
    partial_matrix_train_loader = torch.utils.data.DataLoader(dataset=partial_matrix_dataset, batch_size=batch_size, shuffle=True, num_workers=0)
    dim = int(data.reshape(-1).shape[0]/data.shape[0])
    return partial_matrix_train_loader, dim

def prepare_train_loaders_for_uniform_comp_labels(dataname, full_train_loader, batch_size):
    for i, (data, labels) in enumerate(full_train_loader):
        K = torch.max(labels)+1 # K is number of classes, full_train_loader is full batch
    partialY = generate_uniform_comp_labels(dataname, labels)
    partial_matrix_dataset = torch.utils.data.TensorDataset(data, partialY.float())
    partial_matrix_train_loader = torch.utils.data.DataLoader(dataset=partial_matrix_dataset, batch_size=batch_size, shuffle=True, num_workers=0)
    dim = int(data.reshape(-1).shape[0]/data.shape[0])
    return partial_matrix_train_loader, dim


# loss

In [None]:
import torch.nn as nn
import torch
import math
import torch.nn.functional as F

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

def mae_loss(outputs, partialY):
    sm_outputs = F.softmax(outputs, dim=1)
    loss_fn = nn.L1Loss(reduction='none')
    loss_matrix = loss_fn(sm_outputs, partialY.float())
    sample_loss = loss_matrix.sum(dim=-1)
    return sample_loss
    
def mse_loss(outputs, Y):
    sm_outputs = F.softmax(outputs, dim=1)
    loss_fn = nn.MSELoss(reduction='none')
    loss_matrix = loss_fn(sm_outputs, Y.float())
    sample_loss = loss_matrix.sum(dim=-1)
    return sample_loss

def gce_loss(outputs, Y):
    q = 0.7
    sm_outputs = F.softmax(outputs, dim=1)
    pow_outputs = torch.pow(sm_outputs, q)
    sample_loss = (1-(pow_outputs*Y).sum(dim=1))/q # n
    return sample_loss

def phuber_ce_loss(outputs, Y):
    trunc_point = 0.1
    n = Y.shape[0]
    soft_max = nn.Softmax(dim=1)
    sm_outputs = soft_max(outputs)
    final_outputs = sm_outputs * Y
    final_confidence = final_outputs.sum(dim=1)
   
    ce_index = (final_confidence > trunc_point)
    sample_loss = torch.zeros(n).to(device)

    if ce_index.sum() > 0:
        ce_outputs = outputs[ce_index,:]
        logsm = nn.LogSoftmax(dim=-1)
        logsm_outputs = logsm(ce_outputs)
        final_ce_outputs = logsm_outputs * Y[ce_index,:]
        sample_loss[ce_index] = - final_ce_outputs.sum(dim=-1)

    linear_index = (final_confidence <= trunc_point)

    if linear_index.sum() > 0:
        sample_loss[linear_index] = -math.log(trunc_point) + (-1/trunc_point)*final_confidence[linear_index] + 1

    return sample_loss

def ce_loss(outputs, Y):
    logsm = nn.LogSoftmax(dim=1)
    logsm_outputs = logsm(outputs)
    final_outputs = logsm_outputs * Y
    sample_loss = - final_outputs.sum(dim=1)
    return sample_loss

def unbiased_estimator(loss_fn, outputs, partialY, device):
    n, k = partialY.shape[0], partialY.shape[1]
    comp_num = k - partialY.sum(dim=1)
    temp_loss = torch.zeros(n, k).to(device)
    for i in range(k):
        tempY = torch.zeros(n, k).to(device)
        tempY[:, i] = 1.0
        temp_loss[:, i] = loss_fn(outputs, tempY)

    candidate_loss = (temp_loss * partialY).sum(dim=1)
    noncandidate_loss = (temp_loss * (1-partialY)).sum(dim=1)
    total_loss = candidate_loss - (k-comp_num-1.0)/comp_num * noncandidate_loss
    average_loss = total_loss.mean()
    return average_loss

def log_loss(outputs, partialY):
    k = partialY.shape[1]
    can_num = partialY.sum(dim=1).float() # n
    
    soft_max = nn.Softmax(dim=1)
    sm_outputs = soft_max(outputs)
    final_outputs = sm_outputs * partialY
    
    average_loss = - ((k-1)/(k-can_num) * torch.log(final_outputs.sum(dim=1))).mean()
    return average_loss

def exp_loss(outputs, partialY):
    k = partialY.shape[1]
    can_num = partialY.sum(dim=1).float() # n
    
    soft_max = nn.Softmax(dim=1)
    sm_outputs = soft_max(outputs)
    final_outputs = sm_outputs * partialY

    average_loss = ((k-1)/(k-can_num) * torch.exp(-final_outputs.sum(dim=1))).mean()
    return average_loss    


# demo

In [None]:


torch.manual_seed(0); torch.cuda.manual_seed_all(0)



np.random.seed(args.seed)

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

#if args.ds == 'mnist':
#    full_train_loader, train_loader, test_loader, ordinary_train_dataset, test_dataset, K = prepare_mnist_data(batch_size=args.bs)
#elif args.ds == 'kmnist':
#    full_train_loader, train_loader, test_loader, ordinary_train_dataset, test_dataset, K = prepare_kmnist_data(batch_size=args.bs)
#elif args.ds == 'fashion':
#   full_train_loader, train_loader, test_loader, ordinary_train_dataset, test_dataset, K = prepare_fashion_data(batch_size=args.bs)
#elif args.ds == 'cifar10':
full_train_loader, train_loader, test_loader, ordinary_train_dataset, test_dataset, K = prepare_cifar10_data(batch_size=args.bs)


partial_matrix_train_loader, dim = prepare_train_loaders_for_multi_comp_labels(full_train_loader=full_train_loader, batch_size=args.bs, s=5)

loss_fn = mae_loss

model = mlp_model(input_dim=dim, hidden_dim=500, output_dim=K)

model = model.to(device)

optimizer = torch.optim.Adam(model.parameters(), lr = args.lr, weight_decay = args.wd)

train_accuracy = accuracy_check(loader=train_loader, model=model, device=device)
test_accuracy = accuracy_check(loader=test_loader, model=model, device=device)

print('Epoch: 0. Tr Acc: {}. Te Acc: {}'.format(train_accuracy, test_accuracy))

test_acc_list = []
train_acc_list = []

for epoch in range(args.ep):
    model.train()
    for i, (images, labels) in enumerate(partial_matrix_train_loader):
        X, partialY = images.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = model(X)
        
        if args.lo == 'exp' or args.lo == 'log':
            average_loss = loss_fn(outputs, partialY.float())
        else:
            average_loss = unbiased_estimator(loss_fn, outputs, partialY.float(), device)
        average_loss.backward()
        optimizer.step()   
    
    model.eval()
    train_accuracy = accuracy_check(loader=train_loader, model=model, device=device)
    test_accuracy = accuracy_check(loader=test_loader, model=model, device=device)

    print('Epoch: {}. Tr Acc: {}. Te Acc: {}.'.format(epoch+1, train_accuracy, test_accuracy))
 
    if epoch >= (args.ep-10):
        test_acc_list.extend([test_accuracy])
        train_acc_list.extend([train_accuracy])
            
avg_test_acc = np.mean(test_acc_list)

avg_train_acc = np.mean(train_acc_list)

print("Average Test Accuracy over Last 10 Epochs:", avg_test_acc)

print("Average Training Accuracy over Last 10 Epochs:", avg_train_acc)