In [1]:
# Imports

from itertools import permutations, combinations
import os

from cvxopt import solvers, matrix
import matplotlib.pyplot as plt
import numpy as np
import torch
import torchvision
from torchvision import transforms, models,datasets
from torch.utils.data import Dataset, DataLoader

In [2]:
# Functions

def multi_classifier_output(data_loader, list_classifiers, label_idx_in_imagenet, soft_max):
#     num_classifiers = len(list_classifiers)
#     num_classes = len(label_idx_in_imagenet)
    
    output_all_multi_classifier = []
    labels_all = []
    for i, (images, labels) in enumerate(data_loader):
        print(i)
        if i == 3:
            break
        output_list = []
        for j, classifier in enumerate(list_classifiers):
            print(j)
            output = classifier(images)
            output_simple = output[:, label_idx_in_imagenet]
            output_simple_sm = soft_max(output_simple)
            output_list.append(output_simple_sm)
        output_batch_multi_classifier = torch.stack(output_list, dim=2)
        output_all_multi_classifier.append(output_batch_multi_classifier)
        labels_all.append(labels)
    output_all_multi_classifier = torch.cat(output_all_multi_classifier, dim=0)
    labels_all = torch.cat(labels_all, dim=0)
#     labels_al_one_hot = make_one_hot(labels_all, 3)
    
    return output_all_multi_classifier, labels_all


def cal_chi(fm, x):
    """
    Calculates ChI with given fuzzy measure and input
    
    :param fm: Fuzzy measure
    :param x: Input
    :return: Single value Chi output
    """
    pi_i = np.argsort(-x) + 1 # Arg sort of input, with the smallest index9 being 1
    ch = x[pi_i[0] - 1] * (fm[str(pi_i[:1])])
    for i in range(1, len(x)):
        latt_pti = np.sort(pi_i[:i+1])
        latt_ptimin1 = np.sort(pi_i[:i])
        ch = ch + x[pi_i[i] - 1] * (fm[str(latt_pti)] - fm[str(latt_ptimin1)])
    return ch


def get_cal_chi(fm):
    return lambda x: cal_chi(fm, x)


def calculate_accuracy(target, output):
    acc = np.sum(target == output.round()) / len(target)
    return acc


def make_one_hot(labels, num_classes):
    one_hot = torch.zeros(len(labels), num_classes)
    for i, label in enumerate(labels):
        one_hot[i, label] = 1
    return one_hot

In [3]:
# ChIQP

# Silencing solvers.qp
from contextlib import redirect_stdout
from io import StringIO
class NullIO(StringIO):
    def write(self, txt):
        pass


def silent(fn):
    """Decorator to silence functions."""
    def silent_fn(*args, **kwargs):
        with redirect_stdout(NullIO()):
            return fn(*args, **kwargs)
    return silent_fn


class Choquet_Integral_QP:

    def __init__(self):
        """Instantiation of a ChoquetIntegral.
           This sets up the ChI. It doesn't take any input parameters
           because you may want to use pass your own values in(as opposed
           to learning from data). To instatiate, use
           chi = ChoquetIntegral.ChoquetIntegral()
        """
        self.trainSamples, self.trainLabels = [], []
        self.testSamples, self.testLabels = [], []
        self.N, self.numberConstraints, self.M = 0, 0, 0
        self.g = 0
        self.fm = []
        self.type = []


    def train_chi(self, x1, l1):
        """
        This trains this instance of your ChoquetIntegral w.r.t x1 and l1.
        :param x1: These are the training samples of size N x M(inputs x number of samples)
        :param l1: These are the training labels of size 1 x M(label per sample)
        """
        self.type = 'quad'
        self.trainSamples = x1
        self.trainLabels = l1
        self.N = self.trainSamples.shape[0]
        self.M = self.trainSamples.shape[1]
#         print("Number Inputs : ", self.N, "; Number Samples : ", self.M)
        self.fm = self.produce_lattice()

        return self



    def chi_quad(self, x2):
        """
        This will produce an output for this instance of the ChI
        This will use the learned(or specified) Choquet integral to
        produce an output w.r.t. to the new input.
        :param x2: testing sample
        :return: output of the choquet integral.
        """
        if self.type == 'quad':
            n = len(x2)
            pi_i = np.argsort(x2)[::-1][:n] + 1
            ch = x2[pi_i[0] - 1] * (self.fm[str(pi_i[:1])])
            for i in range(1, n):
                latt_pti = np.sort(pi_i[:i + 1])
                latt_ptimin1 = np.sort(pi_i[:i])
                ch = ch + x2[pi_i[i] - 1] * (self.fm[str(latt_pti)] - self.fm[str(latt_ptimin1)])
            return ch
        else:
            print("If using sugeno measure, you need to use chi_sugeno.")


    def produce_lattice(self):
        """
            This method builds is where the lattice(or FM variables) will be learned.
            The FM values can be found via a quadratic program, which is used here
            after setting up constraint matrices. Refer to papers for complete overview.
        :return: Lattice, the learned FM variables.
        """

        fm_len = 2 ** self.N - 1  # nc
        E = np.zeros((fm_len, fm_len))  # D
        L = np.zeros(fm_len)  # f
        index_keys = self.get_keys_index()
        for i in range(0, self.M):  # it's going through one sample at a time.
            l = self.trainLabels[i]  # this is the labels
            fm_coeff = self.get_fm_class_img_coeff(index_keys, self.trainSamples[:, i], fm_len)  # this is Hdiff
            # print(fm_coeff)
#             print(L, l, fm_coeff)
            L = L + (-2) * l * fm_coeff
            E = E + np.matmul(fm_coeff.reshape((fm_len, 1)), fm_coeff.reshape((1, fm_len)))

        G, h, A, b = self.build_constraint_matrices(index_keys, fm_len)
        solvers_qp = silent(solvers.qp)
        sol = solvers_qp(matrix(2 * E, tc='d'), matrix(L.T, tc='d'), matrix(G, tc='d'), matrix(h, tc='d'),
                         matrix(A, tc='d'), matrix(b, tc='d'))
        g = sol['x']
        Lattice = {}
        for key in index_keys.keys():
            Lattice[key] = g[index_keys[key]]
        return Lattice


    def build_constraint_matrices(self, index_keys, fm_len):
        """
        This method builds the necessary constraint matrices.
        :param index_keys: map to reference lattice components
        :param fm_len: length of the fuzzy measure
        :return: the constraint matrices
        """

        vls = np.arange(1, self.N + 1)
        line = np.zeros(fm_len)
        G = line
        line[index_keys[str(np.array([1]))]] = -1.
        h = np.array([0])
        for i in range(2, self.N + 1):
            line = np.zeros(fm_len)
            line[index_keys[str(np.array([i]))]] = -1.
            G = np.vstack((G, line))
            h = np.vstack((h, np.array([0])))
        for i in range(2, self.N + 1):
            parent = np.array(list(combinations(vls, i)))
            for latt_pt in parent:
                for j in range(len(latt_pt) - 1, len(latt_pt)):
                    children = np.array(list(combinations(latt_pt, j)))
                    for latt_ch in children:
                        line = np.zeros(fm_len)
                        line[index_keys[str(latt_ch)]] = 1.
                        line[index_keys[str(latt_pt)]] = -1.
                        G = np.vstack((G, line))
                        h = np.vstack((h, np.array([0])))

        line = np.zeros(fm_len)
        line[index_keys[str(vls)]] = 1.
        G = np.vstack((G, line))
        h = np.vstack((h, np.array([1])))

        # equality constraints
        A = np.zeros((1, fm_len))
        A[0, -1] = 1
        b = np.array([1]);

        return G, h, A, b


    def get_fm_class_img_coeff(self, Lattice, h, fm_len):  # Lattice is FM_name_and_index, h is the samples, fm_len
        """
        This creates a FM map with the name as the key and the index as the value
        :param Lattice: dictionary with FM
        :param h: sample
        :param fm_len: fm length
        :return: the fm_coeff
        """

        n = len(h)  # len(h) is the number of the samples
        fm_coeff = np.zeros(fm_len)
        pi_i = np.argsort(-h) + 1
        for i in range(1, n):
            fm_coeff[Lattice[str(np.sort(pi_i[:i]))]] = h[pi_i[i - 1] - 1] - h[pi_i[i] - 1]
        fm_coeff[Lattice[str(np.sort(pi_i[:n]))]] = h[pi_i[n - 1] - 1]
        np.matmul(fm_coeff, np.transpose(fm_coeff))
        return fm_coeff


    def get_keys_index(self):
        """
        Sets up a dictionary for referencing FM.
        :return: The keys to the dictionary
        """

        vls = np.arange(1, self.N + 1)
        count = 0
        Lattice = {}
        for i in range(0, self.N):
            Lattice[str(np.array([vls[i]]))] = count
            count = count + 1
        for i in range(2, self.N + 1):
            A = np.array(list(combinations(vls, i)))
            for latt_pt in A:
                Lattice[str(latt_pt)] = count
                count = count + 1
        return Lattice

In [4]:
# Models, Dataset & Dataloader

# Download pretrained models on imagenet
resnet18 = models.resnet18(pretrained=True)
alexnet = models.alexnet(pretrained=True)
squeezenet = models.squeezenet1_0(pretrained=True)
vgg16 = models.vgg16(pretrained=True)
densenet = models.densenet161(pretrained=True)
inception = models.inception_v3(pretrained=True)
googlenet = models.googlenet(pretrained=True)
shufflenet = models.shufflenet_v2_x1_0(pretrained=True)
mobilenet_v2 = models.mobilenet_v2(pretrained=True)
mobilenet_v3_large = models.mobilenet_v3_large(pretrained=True)
mobilenet_v3_small = models.mobilenet_v3_small(pretrained=True)
resnext50_32x4d = models.resnext50_32x4d(pretrained=True)
wide_resnet50_2 = models.wide_resnet50_2(pretrained=True)
mnasnet = models.mnasnet1_0(pretrained=True)

# Dataset & Dataloader
batch_size = 100
transform = transforms.Compose(
    [transforms.Resize(256),
     transforms.CenterCrop(224),
     transforms.ToTensor(),
     transforms.Normalize(mean=[0.485, 0.456, 0.406],
                          std=[0.229, 0.224, 0.225])
])  
img_path = r'./data/LJ-Test'

train_set = datasets.ImageFolder(os.path.join(img_path, 'train'), transform)
test_set = datasets.ImageFolder(os.path.join(img_path, 'val'), transform)
train_loader = DataLoader(train_set, batch_size=batch_size, shuffle=True)
train_iter = iter(train_loader)
test_loader = DataLoader(test_set, batch_size=batch_size, shuffle=True)
test_iter = iter(test_loader)

In [5]:
# Get train&test model output from multiple classifiers

list_classifiers = [mobilenet_v3_small, mobilenet_v3_large, squeezenet, ]
label_idx_in_imagenet = [217, 574, 701]
soft_max = torch.nn.Softmax(dim=1)

train_output_all_multi_classifier, train_labels_all = multi_classifier_output(train_loader, list_classifiers, label_idx_in_imagenet, soft_max)
test_output_all_multi_classifier, test_labels_all = multi_classifier_output(test_loader, list_classifiers, label_idx_in_imagenet, soft_max)

0
0
1
2


  return torch.max_pool2d(input, kernel_size, stride, padding, dilation, ceil_mode)


1
0
1
2
2
0
1
2
3
0
0
1
2
1
0
1
2
2
0
1
2
3


In [6]:
# Train ChIQP
train_labels_all_one_hot = make_one_hot(train_labels_all, 3)

if isinstance(train_output_all_multi_classifier, (torch.Tensor, torch.FloatTensor)):
    train_output_all_multi_classifier = train_output_all_multi_classifier.detach().numpy()
if isinstance(train_labels_all, (torch.Tensor, torch.FloatTensor)):
    train_labels_all = train_labels_all.detach().numpy()
if isinstance(train_labels_all_one_hot, (torch.Tensor, torch.FloatTensor)):
    train_labels_all_one_hot = train_labels_all_one_hot.detach().numpy()


chiqps = []
for i in range(len(label_idx_in_imagenet)):
    # Initialize ChIQP
    chiqp = Choquet_Integral_QP()
    # Train 
    chiqp.train_chi(train_output_all_multi_classifier[:, i, :].transpose(), train_labels_all_one_hot[:, i])
    chiqps.append(chiqp)

In [9]:
# Accuracy comparison

# Single model accuracies
test_labels_all_one_hot = make_one_hot(test_labels_all, 3)
if isinstance(test_output_all_multi_classifier, (torch.Tensor, torch.FloatTensor)):
    test_output_all_multi_classifier = test_output_all_multi_classifier.detach().numpy()
if isinstance(test_labels_all, (torch.Tensor, torch.FloatTensor)):
    test_labels_all = test_labels_all.detach().numpy()
if isinstance(test_labels_all_one_hot, (torch.Tensor, torch.FloatTensor)):
    test_labels_all_one_hot = test_labels_all_one_hot.detach().numpy()
    


accuracies = []
for i in range(len(list_classifiers)):
    test_output_all = test_output_all_multi_classifier[:, :, i]
    test_predict = np.argmax(test_output_all, axis=1)
    acc = calculate_accuracy(target=test_labels_all, output=test_predict)
    accuracies.append(acc)
    print('Accuracy of single classifier', i, '=', acc)
    
    
# Test ChIQP, Fused model accuracy
fusions = []
for i in range(len(label_idx_in_imagenet)):
    chiqp = chiqps[i]
    fm = chiqp.fm
    fusion = np.apply_along_axis(get_cal_chi(fm), 1, test_output_all_multi_classifier[:, i, :])
    fusions.append(fusion)

fused_output_one_hot = np.stack(fusions, 1)
fused_output = np.argmax(fused_output_one_hot, axis=1)

acc = calculate_accuracy(target=test_labels_all, output=fused_output)
print('Fused accuracy =', acc)

Accuracy of single classifier 0 = 0.99
Accuracy of single classifier 1 = 0.9966666666666667
Accuracy of single classifier 2 = 0.9933333333333333
Fused accuracy = 1.0


In [11]:
# Unsupervised

# Imports

import itertools
from itertools import permutations
from math import tanh
import os
import pickle
import platform
import random
from tkinter import Tk

from cvxopt import solvers, matrix
import math
from matplotlib import animation
from  matplotlib import pyplot as plt
from matplotlib.ticker import FuncFormatter
import numpy as np
import torch
import torchvision
from torchvision import transforms, models,datasets
from torch.utils.data import Dataset, DataLoader
from tqdm import tqdm

# Extend width of Jupyter Notebook Cell to the size of browser
from IPython.core.display import display, HTML
display(HTML("<style>.container { width:100% !important; }</style>"))

# OS related settings
if platform.system() == 'Windows':
    print('Windows')
#     %matplotlib tk
    %matplotlib qt
elif platform.system() == 'Darwin':
    print('macOS')
    Tk().withdraw()
    %matplotlib osx
elif platform == 'linux' or platform == 'linux2':
    print('Linux')
# This line of "print" must exist right after %matplotlib command, otherwise JN will hang on the first import statement after this.
print('Interactive plot activated')



# Functions


def cal_chi(fm, x):
    """
    Calculates ChI with given fuzzy measure and input
    
    :param fm: Fuzzy measure
    :param x: Input
    :return: Single value Chi output
    """
    pi_i = np.argsort(-x) + 1 # Arg sort of input, with the smallest index9 being 1
    ch = x[pi_i[0] - 1] * (fm[str(pi_i[:1])])
    for i in range(1, len(x)):
        latt_pti = np.sort(pi_i[:i+1])
        latt_ptimin1 = np.sort(pi_i[:i])
        ch = ch + x[pi_i[i] - 1] * (fm[str(latt_pti)] - fm[str(latt_ptimin1)])
    return ch


def get_cal_chi(fm):
    return lambda x: cal_chi(fm, x)



def get_keys_index(dim):
    """
    Sets up a dictionary for referencing FM.
    :return: The keys to the dictionary
    """
    vls = np.arange(1, dim + 1)
    Lattice = {}
    for i in range(1, dim + 1):
        A = np.array(list(itertools.combinations(vls, i)))
        for latt_pt in A:
            Lattice[str(latt_pt)] = 1
    return Lattice


def get_min_fm_target(dim):
    fm = get_keys_index(dim)
    for key in fm.keys():
        if len(key.split()) != dim:
            fm[key] = 0
        else:
            fm[key] = 1
    return fm
    
    
def get_max_fm_target(dim):
    fm = get_keys_index(dim)
    return fm


def get_mean_fm_target(dim):
    fm = get_keys_index(dim)
    for key in fm.keys():
        fm[key] = len(key.split()) / dim
    return fm


def get_gmean_fm_target(dim):
    fm = get_mean_fm_target(dim)
    return fm



def create_synthetic_data(num_samples=100, accuracies=[0.9, 0.6, 0.5]):
    label = np.random.randint(0, 2, num_samples)

    flip_ind = []
    for acc in accuracies:
        flip_ind.append(np.random.choice(range(num_samples), round((1-acc)*num_samples), replace=False))

    outputs = []

    for ind in flip_ind:
        output_bin = np.copy(label)
        output_bin[ind] = 1 - output_bin[ind]
        output = np.asarray([(random.random()+1)/2 if o_b == 1 else random.random()/2 for o_b in output_bin])
        
        outputs.append(output)
    outputs = np.asarray(outputs)
    
    return(label, outputs)
    

def test_accuracy(target, output):
    acc = np.sum(target == output.round()) / len(target)
    return acc

macOS
Interactive plot activated


In [24]:
dim = len(list_classifiers)

pA = get_keys_index(dim)


for key in pA.keys():
    key_int = np.asarray(key[1:-1].split()).astype(int) - 1
    output_coalition = np.argmax(np.mean(test_output_all_multi_classifier[:, :, key_int], 2), 1)
    acc_coalition = calculate_accuracy(test_labels_all, output_coalition)
    pA[key] = acc_coalition
    
print(pA)

fm = get_mean_fm_target(dim)

for i in range(len(list(fm.keys())[-1].split())):
    for key in fm.keys():
        if len(key.split()) == i:
            pA_i_cardinality = [pA[k] for k in pA.keys() if len(k.split())==len(key.split())]
            fm[key] = fm[key] + tanh(100*(pA[key]-np.mean(pA_i_cardinality))) / (2*len(pA_i_cardinality))

print(fm)


# Test Unsup fm, Fused model accuracy
accuracies = []
for i in range(len(list_classifiers)):
    test_output_all = test_output_all_multi_classifier[:, :, i]
    test_predict = np.argmax(test_output_all, axis=1)
    acc = calculate_accuracy(target=test_labels_all, output=test_predict)
    accuracies.append(acc)
    print('Accuracy of single classifier', i, '=', acc)



fusions = []
for i in range(len(label_idx_in_imagenet)):
    fusion = np.apply_along_axis(get_cal_chi(fm), 1, test_output_all_multi_classifier[:, i, :])
    fusions.append(fusion)

fused_output_one_hot = np.stack(fusions, 1)
fused_output = np.argmax(fused_output_one_hot, axis=1)

acc = calculate_accuracy(target=test_labels_all, output=fused_output)
print('Fused accuracy =', acc)

{'[1]': 0.99, '[2]': 0.9966666666666667, '[3]': 0.9933333333333333, '[1 2]': 1.0, '[1 3]': 0.9966666666666667, '[2 3]': 1.0, '[1 2 3]': 1.0}
{'[1]': 0.2797478770780614, '[2]': 0.38691878958860687, '[3]': 0.3333333333333333, '[1 2]': 0.685109351754124, '[1 3]': 0.630227486052148, '[2 3]': 0.685109351754124, '[1 2 3]': 1.0}
Accuracy of single classifier 0 = 0.99
Accuracy of single classifier 1 = 0.9966666666666667
Accuracy of single classifier 2 = 0.9933333333333333
Fused accuracy = 1.0
