**Running OntoZSL on ImNet-A with Basic KG in the Standard ZSL setting**
---
You can run other settings by changing the parameters of "dataset", "manual_seed", "batch_size", "lr", "noise_size" and "semantic type".

The parameters in other settings are attached in the end.


**1. Bind your Google Drive**

In [2]:
from google.colab import drive
drive.mount('/content/drive/')

Drive already mounted at /content/drive/; to attempt to forcibly remount, call drive.mount("/content/drive/", force_remount=True).


**2. Import Package**

In [3]:
import os
import sys
import time
import random
import argparse
import scipy.io as scio
import numpy as np
import torch
import torch.nn as nn
from torch.autograd import Variable
import torch.autograd as autograd
import torch.optim as optim
import torch.backends.cudnn as cudnn
from sklearn import preprocessing
from sklearn.preprocessing import MinMaxScaler


**3. Data Preparation and Loading**

In [4]:
def load_semantic_embed(data_path, dataset, type):
    """
    Load Semantic Embeddings.
    """

    file_name = ''

    if dataset == 'AwA2':
        file_path = os.path.join(data_path, 'semantic_embeddings')
        if type == 'att':
            file_name = os.path.join(data_path, 'binaryAtt_splits.mat')
        elif type == 'w2v':
            file_name = os.path.join(file_path, 'awa_w2v.mat')
        elif type == 'w2v-glove':
            file_name = os.path.join(file_path, 'awa_w2v_glove.mat')
        elif type == 'hie':
            file_name = os.path.join(file_path, 'awa_hierarchy_gae.mat')
        elif type == 'kge':
            file_name = os.path.join(file_path, 'kge_CH_AH_CA_60000.mat')
        elif type == 'kge_text':
            file_name = os.path.join(file_path, 'kge_CH_AH_CA_60000_text_140.mat')
        elif type == 'kge_facts':
            file_name = os.path.join(file_path, 'kge_CH_AH_CA_Facts_60000_80000.mat')
        elif type == 'kge_logics':
            file_name = os.path.join(file_path, 'kge_CH_AH_CA_Logics_70000.mat')
        else:
            print("WARNING: invalid semantic embeddings type")

    else:
        file_path = os.path.join(data_path, dataset, 'semantic_embeddings')
        if type == 'hie':
            file_name = os.path.join(file_path, 'hierarchy_gae.mat')
        elif type == 'w2v':
            file_name = os.path.join(data_path, 'w2v.mat')
        elif type == 'w2v-glove':
            file_name = os.path.join(file_path, 'w2v_glove.mat')
        elif type == 'att':
            file_name = os.path.join(file_path, 'atts_binary.mat')
        else:
            print('WARNING: invalid semantic embeddings type')



        if dataset == 'ImNet_A':
            if type == 'kge':
                file_name = os.path.join(file_path, 'kge_CH_AH_CA_60000.mat')
            elif type == 'kge_text':
                file_name = os.path.join(file_path, 'kge_CH_AH_CA_60000_text_nei_140.mat')
            elif type == 'kge_facts':
                file_name = os.path.join(file_path, 'kge_CH_AH_CA_Facts_60000_70000.mat')
        if dataset == 'ImNet_O':
            if type == 'kge':
                file_name = os.path.join(file_path, 'kge_CH_AH_CA_60000.mat')
            elif type == 'kge_text':
                file_name = os.path.join(file_path, 'kge_CH_AH_CA_60000_text_nei_140.mat')
            elif type == 'kge_facts':
                file_name = os.path.join(file_path, 'kge_CH_AH_CA_Facts_60000_70000.mat')


    if file_name:
        matcontent = scio.loadmat(file_name)
        if dataset == 'AwA2':
            if type == 'att':
                embeddings = matcontent['att'].T
            else:
                embeddings = matcontent['embeddings']
        else:
            if type == 'w2v':
                embeddings = matcontent['w2v'][:2549]
            else:
                embeddings = matcontent['embeddings']
    else:
        print('WARNING: invalid semantic embeddings file path')
    return embeddings

def load_imagenet(data_path, dataset):

    def load_classes(file_name):
        classes = list()
        wnids = open(file_name, 'rU')
        try:
            for line in wnids:
                classes.append(line[:-1])
        finally:
            wnids.close()
        return classes

    def read_features(file_path, inds, type, nsample=None):
        fea_set = list()
        label_set = list()
        for idx in inds:
            # print(idx)
            file = os.path.join(file_path, str(idx)+'.mat')
            feature = np.array(scio.loadmat(file)['features'])
            if type == 'seen':
                if nsample and feature.shape[0] > nsample:
                    feature = feature[:nsample]
            label = np.array((idx-1), dtype=int)
            label = label.repeat(feature.shape[0])
            fea_set.append(feature)
            label_set.append(label)

        return np.vstack(tuple(fea_set)), np.hstack(tuple(label_set))

    # split.mat : wnids, words
    matcontent = scio.loadmat(os.path.join(data_path, 'split.mat'))
    wnids = matcontent['allwnids'].squeeze().tolist()


    seen_classes = load_classes(os.path.join(data_path, dataset, 'seen.txt'))
    unseen_classes = load_classes(os.path.join(data_path, dataset, 'unseen.txt'))
    seen_index = [wnids.index(wnid)+1 for wnid in seen_classes]
    unseen_index = [wnids.index(wnid) + 1 for wnid in unseen_classes]


    train_seen_feat_file = os.path.join(data_path, 'Res101_Features', 'ILSVRC2012_train')
    test_seen_feat_file = os.path.join(data_path, 'Res101_Features', 'ILSVRC2012_val')
    test_unseen_feat_file = os.path.join(data_path, 'Res101_Features', 'ILSVRC2011')

    train_seen_features, train_seen_labels = read_features(train_seen_feat_file, seen_index, 'seen')
    # extract a subset with 300 images per classes for training classifier
    train_seen_features_sub, train_seen_labels_sub = read_features(train_seen_feat_file, seen_index, 'seen', 300)
    test_unseen_features, test_unseen_labels = read_features(test_unseen_feat_file, unseen_index, 'unseen')
    test_seen_features, test_seen_labels = read_features(test_seen_feat_file, seen_index, 'seen')

    return train_seen_features, train_seen_labels, \
           train_seen_features_sub, train_seen_labels_sub, \
           test_unseen_features, test_unseen_labels, \
           test_seen_features, test_seen_labels


def load_dataset(data_path):
    # load resnet features
    matcontent = scio.loadmat(os.path.join(data_path, 'res101.mat'))
    feature = matcontent['features'].T
    label = matcontent['labels'].astype(int).squeeze() - 1

    # load split.mat
    split_matcontent = scio.loadmat(os.path.join(data_path, 'binaryAtt_splits.mat'))
    # numpy array index starts from 0, matlab starts from 1
    trainval_loc = split_matcontent['trainval_loc'].squeeze() - 1
    test_seen_loc = split_matcontent['test_seen_loc'].squeeze() - 1
    test_unseen_loc = split_matcontent['test_unseen_loc'].squeeze() - 1

    return feature[trainval_loc], label[trainval_loc], \
           feature[test_unseen_loc], label[test_unseen_loc], \
           feature[test_seen_loc], label[test_seen_loc]


def GetNowTime():
    return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time()))

def map_label(label, classes):
    mapped_label = torch.LongTensor(label.size())  # 19832
    for i in range(classes.size(0)):
        mapped_label[label == classes[i]] = i
    return mapped_label

class DATA_LOADER(object):
    def __init__(self, args):

        if args.dataset == 'AwA2':
            self.read_dataset(args)
        else:
            self.read_imagenet(args)

        self.index_in_epoch = 0
        self.epochs_completed = 0

        self.feat_dim = self.train_seen_feature.shape[1]  # 2048
        self.sem_dim = self.semantic.shape[1]  # 500

        self.ntrain = self.train_seen_feature.size()[0]  # number of training samples

        self.seenclasses = torch.from_numpy(np.unique(self.train_seen_label.numpy()))
        self.unseenclasses = torch.from_numpy(np.unique(self.test_unseen_label.numpy()))




    def read_imagenet(self, args):
        data_path = os.path.join(args.data_dir, 'ImageNet')

        # read seen features
        train_seen_features, train_seen_labels,\
            train_seen_features_sub, train_seen_labels_sub, \
            test_unseen_features, test_unseen_labels, \
            test_seen_features, test_seen_labels = load_imagenet(data_path, args.dataset)

        scaler = preprocessing.MinMaxScaler()
        self.train_seen_feature = torch.from_numpy(scaler.fit_transform(train_seen_features)).float()
        self.train_seen_label = torch.from_numpy(train_seen_labels).long()

        self.test_unseen_feature = torch.from_numpy(scaler.transform(test_unseen_features)).float()
        self.test_unseen_label = torch.from_numpy(test_unseen_labels).long()

        self.test_seen_feature = torch.from_numpy(scaler.transform(test_seen_features)).float()
        self.test_seen_label = torch.from_numpy(test_seen_labels).long()

        # self.train_seen_feature_sub = torch.from_numpy(scaler.fit_transform(train_seen_features_sub)).float()
        # self.train_seen_label_sub = torch.from_numpy(train_seen_labels_sub).long()

        self.train_seen_feature_sub = torch.from_numpy(train_seen_features_sub).float()
        self.train_seen_label_sub = torch.from_numpy(train_seen_labels_sub).long()


        embeddings = load_semantic_embed(data_path, args.dataset, args.semantic_type)
        self.semantic = torch.from_numpy(embeddings).float()


    def read_dataset(self, args):
        data_path = os.path.join(args.data_dir, args.dataset)
        # read seen features
        train_seen_features, train_seen_labels, \
        test_unseen_features, test_unseen_labels, \
        test_seen_features, test_seen_labels = load_dataset(data_path)

        # if args.pre_process:
        scaler = preprocessing.MinMaxScaler()
        self.train_seen_feature = torch.from_numpy(scaler.fit_transform(train_seen_features)).float()
        self.train_seen_label = torch.from_numpy(train_seen_labels).long()
        mx = self.train_seen_feature.max()
        self.train_seen_feature.mul_(1 / mx)

        self.test_unseen_feature = torch.from_numpy(scaler.fit_transform(test_unseen_features)).float()
        self.test_unseen_feature.mul_(1 / mx)
        self.test_unseen_label = torch.from_numpy(test_unseen_labels).long()

        self.test_seen_feature = torch.from_numpy(scaler.fit_transform(test_seen_features)).float()
        self.test_seen_feature.mul_(1 / mx)
        self.test_seen_label = torch.from_numpy(test_seen_labels).long()

        embeddings = load_semantic_embed(data_path, args.dataset, args.semantic_type)
        self.semantic = torch.from_numpy(embeddings).float()




    def next_batch_one_class(self, batch_size):
        if self.index_in_epoch == self.ntrain_class:
            self.index_in_epoch = 0
            perm = torch.randperm(self.ntrain_class)
            self.train_class[perm] = self.train_class[perm]

        iclass = self.train_class[self.index_in_epoch]
        idx = self.train_seen_label.eq(iclass).nonzero().squeeze()
        perm = torch.randperm(idx.size(0))
        idx = idx[perm]
        iclass_feature = self.train_seen_feature[idx]
        iclass_label = self.train_seen_label[idx]
        self.index_in_epoch += 1
        return iclass_feature[0:batch_size], iclass_label[0:batch_size], self.semantic[iclass_label[0:batch_size]]

    def next_batch(self, batch_size):
        idx = torch.randperm(self.ntrain)[0:batch_size]
        batch_feature = self.train_seen_feature[idx]
        batch_label = self.train_seen_label[idx]
        batch_sem = self.semantic[batch_label]
        return batch_feature, batch_label, batch_sem

    # select batch samples by randomly drawing batch_size classes
    def next_batch_uniform_class(self, batch_size):
        batch_class = torch.LongTensor(batch_size)
        for i in range(batch_size):
            idx = torch.randperm(self.ntrain_class)[0]
            batch_class[i] = self.train_class[idx]

        batch_feature = torch.FloatTensor(batch_size, self.train_seen_feature.size(1))
        batch_label = torch.LongTensor(batch_size)
        batch_sem = torch.FloatTensor(batch_size, self.semantic.size(1))
        for i in range(batch_size):
            iclass = batch_class[i]
            idx_iclass = self.train_seen_label.eq(iclass).nonzero().squeeze()
            idx_in_iclass = torch.randperm(idx_iclass.size(0))[0]
            idx_file = idx_iclass[idx_in_iclass]
            batch_feature[i] = self.train_seen_feature[idx_file]
            batch_label[i] = self.train_seen_label[idx_file]
            batch_sem[i] = self.semantic[batch_label[i]]
        return batch_feature, batch_label, batch_sem

**4. Classifier in testing stage, is trained with generated unseen (and real seen) features**

In [5]:
class CLASSIFIER:
    # train_Y is interger
    # CLASSIFIER(syn_feature,util.map_label(syn_label,data.unseenclasses),data,data.unseenclasses.size(0),opt.cuda,opt.classifier_lr, 0.5, 25, opt.syn_num, False)
    def __init__(self, args, _train_X, _train_Y, data_loader, _nclass, _cuda, _lr=0.001, _beta1=0.5, _nepoch=20,
                 _batch_size=100, generalized=True, ratio=0.6, epoch=20):

        self.train_X = _train_X
        self.train_Y = _train_Y
        self.args = args

        self.test_seen_feature = data_loader.test_seen_feature
        self.test_seen_label = data_loader.test_seen_label
        self.test_unseen_feature = data_loader.test_unseen_feature
        self.test_unseen_label = data_loader.test_unseen_label
        self.seenclasses = data_loader.seenclasses
        self.unseenclasses = data_loader.unseenclasses
        self.batch_size = _batch_size
        self.nepoch = _nepoch
        self.nclass = _nclass
        self.input_dim = _train_X.size(1)
        self.cuda = _cuda
        self.model = LINEAR_LOGSOFTMAX(self.input_dim, self.nclass)
        self.model.apply(weights_init)
        self.criterion = nn.NLLLoss()


        self.data = data_loader

        self.input = torch.FloatTensor(_batch_size, self.input_dim)
        self.label = torch.LongTensor(_batch_size)

        self.lr = _lr
        self.beta1 = _beta1
        # setup optimizer
        self.optimizer = optim.Adam(self.model.parameters(), lr=_lr, betas=(_beta1, 0.999))
        self.ratio = ratio
        self.epoch = epoch

        if self.cuda:
            self.model.cuda()
            self.criterion.cuda()
            self.input = self.input.cuda()
            self.label = self.label.cuda()

        self.index_in_epoch = 0
        self.epochs_completed = 0
        self.ntrain = self.train_X.size()[0]
        self.backup_X = _train_X
        self.backup_Y = _train_Y

        if generalized:
            self.fit_gzsl()
        else:
            self.fit_zsl()

    def pairwise_distances(self, x, y=None):
        '''
        Input: x is a Nxd matrix
               y is an optional Mxd matirx
        Output: dist is a NxM matrix where dist[i,j] is the square norm between x[i,:] and y[j,:]
                if y is not given then use 'y=x'.
        i.e. dist[i,j] = ||x[i,:]-y[j,:]||^2
        '''
        x_norm = (x ** 2).sum(1).view(-1, 1)
        if y is not None:
            y_t = torch.transpose(y, 0, 1)
            y_norm = (y ** 2).sum(1).view(1, -1)
        else:
            y_t = torch.transpose(x, 0, 1)
            y_norm = x_norm.view(1, -1)

        dist = x_norm + y_norm - 2.0 * torch.mm(x, y_t)
        # Ensure diagonal is zero if x=y
        if y is None:
            dist = dist - torch.diag(dist.diag)
        return torch.clamp(dist, 0.0, np.inf)

    def fit_zsl(self):
        first_acc = 0
        for epoch in range(self.nepoch):
            for i in range(0, self.ntrain, self.batch_size):
                self.model.zero_grad()
                batch_input, batch_label = self.next_batch(self.batch_size)
                self.input.copy_(batch_input)
                self.label.copy_(batch_label)
                inputv = Variable(self.input)  # fake_feature
                labelv = Variable(self.label)  # fake_labels
                output = self.model(inputv)
                loss = self.criterion(output, labelv)  # 使用fake_unseen_feature和labels来训练分类器
                loss.backward()
                self.optimizer.step()
            # using real testing data (of unseen classes) to test classifier2

            # testing only hit@1
            overall_acc, acc_of_all = self.val_zsl(self.test_unseen_feature, self.test_unseen_label,
                                                                 self.unseenclasses)


            #  get the highest evaluation result
            if overall_acc > first_acc:
                first_acc = overall_acc


        print('First Acc: {:.2f}%'.format(first_acc * 100))

    # for gzsl
    def fit_gzsl(self):
        # 3个length
        # test_seen_length = self.test_seen_feature.shape[0]  # 1764
        # test_unseen_length = self.test_unseen_feature.shape[0]  # 2967
        # all_length = test_seen_length + test_unseen_length
        all_test_feature = torch.cat((self.test_seen_feature, self.test_unseen_feature), 0)
        all_test_label = torch.cat((self.test_seen_label, self.test_unseen_label), 0)
        all_classes = torch.cat((self.seenclasses, self.unseenclasses), 0)
        first_acc = 0
        first_all_pred = None
        first_all_output = None

        best_H = 0
        seen_acc = 0
        unseen_acc = 0
        for epoch in range(self.nepoch):
            for i in range(0, self.ntrain, self.batch_size):  # self.ntrain=22057, self.batch_size=300
                self.model.zero_grad()
                batch_input, batch_label = self.next_batch(self.batch_size)
                self.input.copy_(batch_input)
                self.label.copy_(batch_label)
                inputv = Variable(self.input)
                labelv = Variable(self.label)
                output = self.model(inputv)
                loss = self.criterion(output, labelv)
                loss.backward()
                self.optimizer.step()

            acc_seen, pred_seen = self.val_gzsl(self.test_seen_feature, self.test_seen_label, all_classes,
                                                             self.seenclasses, 'seen')
            acc_unseen, pred_unseen = self.val_gzsl(self.test_unseen_feature, self.test_unseen_label, all_classes,
                                                                   self.unseenclasses, 'unseen')
            H = 2 * acc_seen * acc_unseen / (acc_seen + acc_unseen)
            if H > best_H:
                best_H = H
                seen_acc = acc_seen
                unseen_acc = acc_unseen
        print('First Seen: {:.2f}%, Unseen: {:.2f}%, First H: {:.2f}%'.format(seen_acc * 100,
                                                                              unseen_acc * 100,
                                                                              best_H * 100))


    def val_zsl(self, test_X, test_label, target_classes):
        start = 0
        ntest = test_X.size()[0]
        predicted_label = torch.LongTensor(test_label.size())
        for i in range(0, ntest, self.batch_size):
            end = min(ntest, start + self.batch_size)
            if self.cuda:
                output = self.model(Variable(test_X[start:end].cuda(), volatile=True))
            else:
                output = self.model(Variable(test_X[start:end], volatile=True))

            _, predicted_label[start:end] = torch.max(output.data, 1)
            start = end
        overall_acc = self.compute_acc_avg_per_class(map_label(test_label, target_classes), predicted_label,
                                                     target_classes.size(0))
        acc_of_all = self.compute_each_class_acc(map_label(test_label, target_classes), predicted_label,
                                                 target_classes.size(0))
        return overall_acc, acc_of_all

    def val_zsl_Hit(self, test_X, test_label, target_classes):
        start = 0
        ntest = test_X.size()[0]
        predicted_label = torch.LongTensor(test_label.size())
        predicted_labels = torch.LongTensor(test_label.size(0), target_classes.size(0))
        # all_output = None
        for i in range(0, ntest, self.batch_size):
            end = min(ntest, start + self.batch_size)
            if self.cuda:
                output = self.model(Variable(test_X[start:end].cuda(), volatile=True))
            else:
                output = self.model(Variable(test_X[start:end], volatile=True))
            _, predicted_label[start:end] = torch.max(output.data, 1)
            _, predicted_labels[start:end] = output.data.sort(1, descending=True)
            start = end
        # print("pred shape:", predicted_labels.shape)
        overall_acc = self.compute_acc_avg_per_class(map_label(test_label, target_classes), predicted_label,
                                                     target_classes.size(0))
        overall_acc_Hit = self.compute_acc_avg_per_class_Hit(map_label(test_label, target_classes), predicted_labels,
                                                     target_classes.size(0))

        return overall_acc, overall_acc_Hit.squeeze()

    def val_gzsl(self, test_X, test_label, all_classes, target_classes, cls_type):
        start = 0
        ntest = test_X.size()[0]
        predicted_label = torch.LongTensor(test_label.size())
        for i in range(0, ntest, self.batch_size):
            end = min(ntest, start + self.batch_size)
            if self.cuda:
                output = self.model(Variable(test_X[start:end].cuda(), volatile=True))
            else:
                output = self.model(Variable(test_X[start:end], volatile=True))

            _, predicted_label[start:end] = torch.max(output.data, 1)
            start = end
        overall_acc = self.compute_acc_avg_per_class_gzsl(map_label(test_label, all_classes), predicted_label, target_classes.size(0), cls_type)
        return overall_acc, predicted_label

    def next_batch(self, batch_size):
        start = self.index_in_epoch
        # shuffle the data at the first epoch
        if self.epochs_completed == 0 and start == 0:
            perm = torch.randperm(self.ntrain)
            self.train_X = self.train_X[perm]
            self.train_Y = self.train_Y[perm]
        # the last batch
        if start + batch_size > self.ntrain:
            self.epochs_completed += 1
            rest_num_examples = self.ntrain - start
            if rest_num_examples > 0:
                X_rest_part = self.train_X[start:self.ntrain]
                Y_rest_part = self.train_Y[start:self.ntrain]
            # shuffle the data
            perm = torch.randperm(self.ntrain)
            self.train_X = self.train_X[perm]
            self.train_Y = self.train_Y[perm]
            # start next epoch
            start = 0
            self.index_in_epoch = batch_size - rest_num_examples
            end = self.index_in_epoch
            X_new_part = self.train_X[start:end]
            Y_new_part = self.train_Y[start:end]
            if rest_num_examples > 0:
                return torch.cat((X_rest_part, X_new_part), 0), torch.cat((Y_rest_part, Y_new_part), 0)
            else:
                return X_new_part, Y_new_part
        else:
            self.index_in_epoch += batch_size
            end = self.index_in_epoch
            return self.train_X[start:end], self.train_Y[start:end]

    # def compute_per_class_acc_gzsl
    def compute_acc_avg_per_class_gzsl(self, test_label, predicted_label, nclass, cls_type):
        acc_per_class = 0
        if cls_type == 'seen':
            n = 0
        if cls_type == 'unseen':
            n = self.seenclasses.size(0)
        # print("n: ", n)
        for i in range(nclass):
            i = i + n
            idx = (test_label == i)
            if torch.sum(idx).float() == 0:
                continue
            else:
                acc_per_class += torch.sum(test_label[idx] == predicted_label[idx]).float() / torch.sum(idx).float()
        acc_per_class /= nclass
        return acc_per_class
        # compute Macro metric, i.e., average the accuracy of each class


    # def compute_per_class_acc
    def compute_acc_avg_per_class(self, test_label, predicted_label, nclass):
        acc_per_class = torch.FloatTensor(nclass).fill_(0)
        for i in range(nclass):
            idx = (test_label == i)
            if torch.sum(idx).float() != 0:
                acc_per_class[i] = torch.sum(test_label[idx] == predicted_label[idx]).float() / torch.sum(idx).float()
        return acc_per_class.mean()

    # def compute_per_class_acc
    def compute_acc_avg_per_class_Hit(self, test_label, predicted_label, nclass):
        top = [1, 2, 5]
        acc_per_class = torch.FloatTensor(nclass, len(top)).fill_(0)
        for i in range(nclass):
            idxs = (test_label == i).nonzero().squeeze()
            if torch.sum(idxs).float() != 0:
                hits = torch.FloatTensor(top).fill_(0)
                for idx in idxs:
                    for j in range(len(top)):
                        current_top = top[j]
                        for sort_id in range(current_top):
                            if test_label[idx] == predicted_label[idx][sort_id]:
                                hits[j] = hits[j] + 1
                                break
                # print("sum:", torch.sum(idx))
                acc_per_class[i] = hits/idxs.size(0)


        return acc_per_class.mean(dim=0, keepdim=True)

    # get the accuracy of each class
    # def compute_every_class_acc
    def compute_each_class_acc(self, test_label, predicted_label, nclass):
        acc_per_class = torch.FloatTensor(nclass).fill_(0)
        for i in range(nclass):
            idx = (test_label == i)
            if torch.sum(idx).float() != 0:
                acc_per_class[i] = torch.sum(test_label[idx] == predicted_label[idx]).float() / torch.sum(idx).float()
        return acc_per_class


class LINEAR_LOGSOFTMAX(nn.Module):
    def __init__(self, input_dim, nclass):
        super(LINEAR_LOGSOFTMAX, self).__init__()
        self.fc = nn.Linear(input_dim, nclass)
        self.logic = nn.LogSoftmax(dim=1)

    def forward(self, x):
        o = self.logic(self.fc(x))
        return o


**5. Classifier Pre-training, for supervised classification loss in training stage**

In [6]:
class pretrain_CLASSIFIER:
    # train_Y is interger
    def __init__(self, _train_X, _train_Y, _nclass, _input_dim, _cuda, _lr=0.001, _beta1=0.5, _nepoch=20,
                 _batch_size=100):

        self.train_X = _train_X
        self.train_Y = _train_Y
        self.batch_size = _batch_size
        self.nepoch = _nepoch
        self.nclass = _nclass
        self.input_dim = _input_dim
        self.cuda = _cuda
        self.model = LINEAR_LOGSOFTMAX(self.input_dim, self.nclass)
        self.model.apply(weights_init)
        self.criterion = nn.NLLLoss()

        self.input = torch.FloatTensor(_batch_size, self.input_dim)
        self.label = torch.LongTensor(_batch_size)

        self.lr = _lr
        self.beta1 = _beta1
        # setup optimizer
        self.optimizer = optim.Adam(self.model.parameters(), lr=_lr, betas=(_beta1, 0.999))

        if self.cuda:
            self.model.cuda()
            self.criterion.cuda()
            self.input = self.input.cuda()
            self.label = self.label.cuda()

        self.index_in_epoch = 0
        self.epochs_completed = 0
        self.ntrain = self.train_X.size()[0]

        self.fit()


    def fit(self):
        for epoch in range(self.nepoch):
            for i in range(0, self.ntrain, self.batch_size):
                self.model.zero_grad()
                batch_input, batch_label = self.next_batch(self.batch_size)
                self.input.copy_(batch_input)
                self.label.copy_(batch_label)

                inputv = Variable(self.input)
                labelv = Variable(self.label)
                output = self.model(inputv)
                loss = self.criterion(output, labelv)
                loss.backward()
                self.optimizer.step()

    def next_batch(self, batch_size):
        start = self.index_in_epoch
        # shuffle the data at the first epoch
        if self.epochs_completed == 0 and start == 0:
            perm = torch.randperm(self.ntrain)
            self.train_X = self.train_X[perm]
            self.train_Y = self.train_Y[perm]
        # the last batch
        if start + batch_size > self.ntrain:
            self.epochs_completed += 1
            rest_num_examples = self.ntrain - start
            if rest_num_examples > 0:
                X_rest_part = self.train_X[start:self.ntrain]
                Y_rest_part = self.train_Y[start:self.ntrain]
            # shuffle the data
            perm = torch.randperm(self.ntrain)
            self.train_X = self.train_X[perm]
            self.train_Y = self.train_Y[perm]
            # start next epoch
            start = 0
            self.index_in_epoch = batch_size - rest_num_examples
            end = self.index_in_epoch
            X_new_part = self.train_X[start:end]
            Y_new_part = self.train_Y[start:end]
            if rest_num_examples > 0:
                return torch.cat((X_rest_part, X_new_part), 0), torch.cat((Y_rest_part, Y_new_part), 0)
            else:
                return X_new_part, Y_new_part
        else:
            self.index_in_epoch += batch_size
            end = self.index_in_epoch
            # from index start to index end-1
            return self.train_X[start:end], self.train_Y[start:end]

    # test_label is integer
    def val(self, test_X, test_label, target_classes):
        start = 0
        ntest = test_X.size()[0]
        predicted_label = torch.LongTensor(test_label.size())
        for i in range(0, ntest, self.batch_size):
            end = min(ntest, start + self.batch_size)
            if self.cuda:
                output = self.model(Variable(test_X[start:end].cuda(), volatile=True))
            else:
                output = self.model(Variable(test_X[start:end], volatile=True))
            _, predicted_label[start:end] = torch.max(output.data, 1)
            start = end

        acc = self.compute_per_class_acc(map_label(test_label, target_classes), predicted_label,
                                         target_classes.size(0))
        return acc

    def compute_per_class_acc(self, test_label, predicted_label, nclass):
        acc_per_class = torch.FloatTensor(nclass).fill_(0)
        for i in range(nclass):
            idx = (test_label == i)
            acc_per_class[i] = torch.sum(test_label[idx] == predicted_label[idx]).float() / torch.sum(idx).float()
        return acc_per_class.mean()


class LINEAR_LOGSOFTMAX(nn.Module):
    def __init__(self, input_dim, nclass):
        super(LINEAR_LOGSOFTMAX, self).__init__()
        self.fc = nn.Linear(input_dim, nclass)
        self.logic = nn.LogSoftmax(dim=1)

    def forward(self, x):
        o = self.logic(self.fc(x))
        return o


**6. OntoZSL model with generator and discriminator**

In [7]:
def weights_init(m):
    classname = m.__class__.__name__
    if classname.find('Linear') != -1:
        m.weight.data.normal_(0.0, 0.02)
        m.bias.data.fill_(0)
    elif classname.find('BatchNorm') != -1:
        m.weight.data.normal_(1.0, 0.02)
        m.bias.data.fill_(0)

class MLP_CRITIC(nn.Module):
    def __init__(self, args):
        super(MLP_CRITIC, self).__init__()
        self.fc1 = nn.Linear(args.feat_dim + args.sem_dim, args.NDH)
        # self.fc2 = nn.Linear(opt.ndh, opt.ndh)
        self.fc2 = nn.Linear(args.NDH, 1)
        self.lrelu = nn.LeakyReLU(0.2, True)

        self.apply(weights_init)

    def forward(self, x, sem):
        h = torch.cat((x, sem), 1)
        h = self.lrelu(self.fc1(h))
        h = self.fc2(h)
        return h

class MLP_G(nn.Module):
    def __init__(self, args):
        super(MLP_G, self).__init__()
        self.fc1 = nn.Linear(args.sem_dim + args.noise_size, args.NGH)
        self.fc2 = nn.Linear(args.NGH, args.feat_dim)
        self.lrelu = nn.LeakyReLU(0.2, True)
        self.relu = nn.ReLU(True)

        self.apply(weights_init)

    def forward(self, noise, sem):
        h = torch.cat((noise, sem), 1)
        h = self.lrelu(self.fc1(h))
        h = self.relu(self.fc2(h))
        return h

**7. Model training**

In [8]:
class Runner:
    def __init__(self, args):
        self.args = args

        print('============ Params ============')
        print('\n'.join('%s: %s' % (k, str(v)) for k, v
                        in sorted(dict(vars(self.args)).items())))
        print('============================================')


        # load data
        self.data = DATA_LOADER(self.args)
        self.feat_dim = self.data.feat_dim
        self.sem_dim = self.data.sem_dim
        self.semantic = self.data.semantic
        print("Training samples: ", self.data.ntrain)  # number of training samples

        args.feat_dim = self.feat_dim
        args.sem_dim = self.sem_dim

        # initialize generator and discriminator
        self.netG = MLP_G(args)
        self.netD = MLP_CRITIC(args)
        # setup optimizer
        self.optimizerD = optim.Adam(self.netD.parameters(), lr=args.lr, betas=(args.beta, 0.999))
        self.optimizerG = optim.Adam(self.netG.parameters(), lr=args.lr, betas=(args.beta, 0.999))
        # classification loss
        self.cls_criterion = nn.NLLLoss()  # cross entropy loss

        self.input_fea = torch.FloatTensor(args.batch_size, self.feat_dim)  # (64, 2048)
        self.input_sem = torch.FloatTensor(args.batch_size, self.data.sem_dim)  # (64, 500)
        self.noise = torch.FloatTensor(args.batch_size, args.noise_size)  # (64, 500)
        self.input_label = torch.LongTensor(args.batch_size)

        if self.args.cuda:
            self.netD.cuda()
            self.netG.cuda()
            self.input_fea = self.input_fea.cuda()
            self.noise, self.input_sem = self.noise.cuda(), self.input_sem.cuda()
            self.cls_criterion.cuda()
            self.input_label = self.input_label.cuda()

        # train a classifier on seen classes, obtain \theta of Equation (4)
        self.pretrain_cls = pretrain_CLASSIFIER(self.data.train_seen_feature,
                                                      map_label(self.data.train_seen_label, self.data.seenclasses),
                                                      self.data.seenclasses.size(0), self.feat_dim, args.cuda, 0.001, 0.5,
                                                      100, 2 * args.batch_size)

        # freeze the classifier during the optimization
        for p in self.pretrain_cls.model.parameters():  # set requires_grad to False
            p.requires_grad = False


    def sample(self):
        batch_feature, batch_label, batch_sem = self.data.next_batch(args.batch_size)
        self.input_fea.copy_(batch_feature)
        self.input_sem.copy_(batch_sem)
        self.input_label.copy_(map_label(batch_label, self.data.seenclasses))


    def generate_syn_feature(self, num):
        classes = self.data.unseenclasses
        nclass = classes.size(0)
        syn_feature = torch.FloatTensor(nclass * num, self.feat_dim)
        syn_label = torch.LongTensor(nclass * num)
        syn_sem = torch.FloatTensor(num, self.sem_dim)
        syn_noise = torch.FloatTensor(num, args.noise_size)
        if self.args.cuda:
            syn_sem = syn_sem.cuda()
            syn_noise = syn_noise.cuda()
        for i in range(nclass):
            iclass = classes[i]
            iclass_sem = self.semantic[iclass]
            syn_sem.copy_(iclass_sem.repeat(num, 1))
            syn_noise.normal_(0, 1)
            output = self.netG(Variable(syn_noise, volatile=True), Variable(syn_sem, volatile=True))
            syn_feature.narrow(0, i * num, num).copy_(output.data.cpu())
            syn_label.narrow(0, i * num, num).fill_(iclass)
        return syn_feature, syn_label



    # the last item of equation (2)
    def calc_gradient_penalty(self, real_data, fake_data, input_sem):
        # print real_data.size()
        alpha = torch.rand(args.batch_size, 1)
        alpha = alpha.expand(real_data.size())
        if args.cuda:
            alpha = alpha.cuda()

        interpolates = alpha * real_data + ((1 - alpha) * fake_data)

        if args.cuda:
            interpolates = interpolates.cuda()

        interpolates = Variable(interpolates, requires_grad=True)

        disc_interpolates = self.netD(interpolates, Variable(input_sem))

        ones = torch.ones(disc_interpolates.size())
        if args.cuda:
            ones = ones.cuda()

        gradients = autograd.grad(outputs=disc_interpolates, inputs=interpolates,
                                  grad_outputs=ones,
                                  create_graph=True, retain_graph=True, only_inputs=True)[0]
        # args.GP_Weight = 10
        gradient_penalty = ((gradients.norm(2, dim=1) - 1) ** 2).mean() * args.GP_weight
        return gradient_penalty





    def train(self):
        one = torch.tensor(1, dtype=torch.float)
        mone = one * -1
        if self.args.cuda:
            one = one.cuda()
            mone = mone.cuda()

        for epoch in range(args.epoch):
            for i in range(0, self.data.ntrain, args.batch_size):
                # print("batch...", i)
                # iteratively train the generator and discriminator
                for p in self.netD.parameters():
                    p.requires_grad = True

                # DISCRIMINATOR
                # args.critic_iter = 5, following WGAN-GP
                for iter_d in range(args.critic_iter):
                    self.sample()  # sample by batch
                    self.netD.zero_grad()
                    # torch.gt: compare the 'input_res[1]' and '0' element by element
                    input_feav = Variable(self.input_fea)
                    input_semv = Variable(self.input_sem)

                    # loss of real data
                    criticD_real = self.netD(input_feav, input_semv)
                    criticD_real = criticD_real.mean()
                    criticD_real.backward(mone)
                    # loss of generated data
                    self.noise.normal_(0, 1)
                    noisev = Variable(self.noise)
                    fake = self.netG(noisev, input_semv)   # generate samples
                    # detach(): return a new variable, do not compute gradient for it
                    criticD_fake = self.netD(fake.detach(), input_semv)
                    criticD_fake = criticD_fake.mean()
                    criticD_fake.backward(one)

                    # loss with Lipschitz constraint
                    gradient_penalty = self.calc_gradient_penalty(self.input_fea, fake.data, self.input_sem)
                    gradient_penalty.backward()

                    # Wasserstein_D = criticD_real - criticD_fake
                    # Final Loss of Discriminator
                    D_cost = criticD_fake - criticD_real + gradient_penalty
                    self.optimizerD.step()

                for p in self.netD.parameters():  # reset requires_grad
                    p.requires_grad = False  # avoid computation
                # GENERATOR
                self.netG.zero_grad()
                input_semv = Variable(self.input_sem)
                self.noise.normal_(0, 1)
                noisev = Variable(self.noise)
                fake = self.netG(noisev, input_semv)
                criticG_fake = self.netD(fake, input_semv)
                criticG_fake = criticG_fake.mean()
                G_cost = -criticG_fake
                # classification loss
                c_errG = self.cls_criterion(self.pretrain_cls.model(fake), Variable(self.input_label))

                errG = G_cost + args.cls_weight * c_errG

                errG.backward()
                self.optimizerG.step()

            print('EP[%d/%d]******************************************************' % (epoch, args.epoch))

            # evaluate the model, set G to evaluation mode
            self.netG.eval()
            # train_X: input features (of unseen or seen) for training classifier2 in testing stage
            # train_Y: training labels
            # Generalized zero-shot learning
            if args.gzsl:
                syn_feature, syn_label = self.generate_syn_feature(args.syn_num)
                if args.dataset == 'AwA2':
                    train_X = torch.cat((self.data.train_seen_feature, syn_feature), 0)
                    train_Y = torch.cat((self.data.train_seen_label, syn_label), 0)
                    classes = torch.cat((self.data.seenclasses, self.data.unseenclasses), 0)
                    nclass = classes.size(0)
                    CLASSIFIER(args, train_X, map_label(train_Y, classes), self.data, nclass, args.cuda,
                                                    args.cls_lr, 0.5, 50, 2 * args.syn_num, True)
                else:
                    train_X = torch.cat((self.data.train_seen_feature_sub, syn_feature), 0)
                    train_Y = torch.cat((self.data.train_seen_label_sub, syn_label), 0)
                    classes = torch.cat((self.data.seenclasses, self.data.unseenclasses), 0)
                    nclass = classes.size(0)
                    CLASSIFIER(args, train_X, map_label(train_Y, classes), self.data, nclass, args.cuda,
                                                        args.cls_lr, 0.5, 50, 2 * args.batch_size, True)

            # Zero-shot learning
            else:
                # synthesize samples of unseen classes, for training classifier2 in testing stage
                syn_feature, syn_label = self.generate_syn_feature(args.syn_num)
                CLASSIFIER(args, syn_feature, map_label(syn_label, self.data.unseenclasses), self.data,
                                                 self.data.unseenclasses.size(0), args.cuda, args.cls_lr, 0.5, 50, 10*args.syn_num, False, args.ratio, epoch)

            self.netG.train()
            # sys.stdout.flush()

**8. Parameter Settings and Run Model**

In [None]:
if __name__ == '__main__':

    parser = argparse.ArgumentParser()
    '''
    Data loading
    '''

    parser.add_argument('--data_dir', default='/content/drive/MyDrive/ISWC_demo/ZS_IMGC/data', help='path to save dataset')
    parser.add_argument('--dataset', default='ImNet_A', help='target datasets, options: {AwA2, ImNet_A, ImNet_O}')

    parser.add_argument('--semantic_type', default='kge', type=str, help='the type of class embedding to input, options: {att, w2v, w2v-glove, hie, kge (Basic KG), kge_text (Basic KG+literal), kge_facts (Basic KG+CN), kge_logics (Basic KG+logics)}')
    parser.add_argument('--noise_size', type=int, default=100, help='size of noise vectors')
    '''
    Generator and Discriminator Parameter
    '''
    parser.add_argument('--NGH', default=4096, help='size of the hidden units in generator')
    parser.add_argument('--NDH', default=4096, help='size of the hidden units in discriminator')
    parser.add_argument('--critic_iter', default=5, help='critic iteration of discriminator, default=5, following WGAN-GP setting')
    parser.add_argument('--GP_weight', type=float, default=10, help='gradient penalty regularizer, default=10, the completion of Lipschitz Constraint in WGAN-GP')
    parser.add_argument('--cls_weight', default=0.01, help='loss weight for the supervised classification loss')
    parser.add_argument('--syn_num', default=300, type=int, help='number of features generating for each unseen class; awa_default = 300')
    '''
    Training Parameter
    '''
    parser.add_argument('--gzsl', action='store_true', default=False, help='enable generalized zero-shot learning')
    parser.add_argument('--cuda', default=True, help='')
    parser.add_argument("--gpu", type=int, default=0, help="Which GPU to use?")
    parser.add_argument('--manual_seed', default=9416, type=int, help='random seed')  #
    parser.add_argument('--batch_size', default=4096, type=int, help='training batch size')
    parser.add_argument('--epoch', default=100, help='training epoch')
    parser.add_argument('--lr', default=0.0001, type=float, help='learning rate to train GAN')
    parser.add_argument('--cls_lr', default=0.001, help='after generating unseen features, the learning rate for training softmax classifier')
    parser.add_argument('--ratio', default=0.1, help='ratio of easy samples')
    parser.add_argument('--beta', default=0.5, help='beta for adam, default=0.5')


    args = parser.parse_known_args()[0]

    if args.manual_seed is None:
        args.manual_seed = random.randint(1, 10000)
    print("Random Seed: ", args.manual_seed)

    np.random.seed(args.manual_seed)
    random.seed(args.manual_seed)
    torch.manual_seed(args.manual_seed)
    if torch.cuda.is_available():
        torch.cuda.set_device(args.gpu)
        print('using gpu {}'.format(args.gpu))
        torch.cuda.manual_seed_all(args.manual_seed)
        torch.backends.cudnn.deterministic = True




    print(GetNowTime())
    print('Begin run!!!')

    run = Runner(args)
    run.train()

    print('End run!!!')
    print(GetNowTime())

Random Seed:  9416
using gpu 0
2022-10-12 16:16:23
Begin run!!!
GP_weight: 10
NDH: 4096
NGH: 4096
batch_size: 4096
beta: 0.5
cls_lr: 0.001
cls_weight: 0.01
critic_iter: 5
cuda: True
data_dir: /content/drive/MyDrive/ISWC_demo/ZS_IMGC/data
dataset: ImNet_A
epoch: 100
gpu: 0
gzsl: False
lr: 0.0001
manual_seed: 9416
noise_size: 100
ratio: 0.1
semantic_type: kge
syn_num: 300




Training samples:  35150
EP[0/100]******************************************************




First Acc: 3.87%
EP[1/100]******************************************************
First Acc: 6.21%
EP[2/100]******************************************************
First Acc: 12.57%
EP[3/100]******************************************************
First Acc: 15.73%
EP[4/100]******************************************************
First Acc: 20.73%
EP[5/100]******************************************************
First Acc: 23.40%
EP[6/100]******************************************************
First Acc: 28.33%
EP[7/100]******************************************************
First Acc: 31.13%
EP[8/100]******************************************************
First Acc: 31.61%
EP[9/100]******************************************************
First Acc: 31.07%
EP[10/100]******************************************************
First Acc: 33.16%
EP[11/100]******************************************************
First Acc: 33.75%
EP[12/100]******************************************************
First Acc: 34.6

**Parameters in other Settings**


---

*   **run OntoZSL on ImNet-A with Basic KG in the Generalized ZSL setting**

============ Params ============
GP_weight: 10;
NDH: 4096;
NGH: 4096;
batch_size: 4096;
beta: 0.5;
cls_lr: 0.001;
cls_weight: 0.01;
critic_iter: 5;
cuda: True;
data_dir: /content/drive/MyDrive/ISWC_demo/ZS_IMGC/data;
dataset: ImNet_A;
epoch: 100;
gpu: 0;
gzsl: True;
lr: 0.0001;
manual_seed: 9416;
noise_size: 100;
ratio: 0.1;
semantic_type: kge;
syn_num: 300;

*   **run OntoZSL on AwA2 with "Basic KG+literal" in the Standard ZSL setting**

============ Params ============
GP_weight: 10;
NDH: 4096;
NGH: 4096;
batch_size: 64;
beta: 0.5;
cls_lr: 0.001;
cls_weight: 0.01;
critic_iter: 5;
cuda: True;
data_dir:/content/drive/MyDrive/ISWC_demo/ZS_IMGC/data;
dataset: AwA2;
epoch: 100;
gpu: 0;
gzsl: False;
lr: 1e-05;
manual_seed: 9182;
noise_size: 100;
ratio: 0.1;
semantic_type: kge_text;
syn_num: 300;
*   **run OntoZSL on AwA2 with "Basic KG+literal" in the Generalized ZSL setting**

============ Params ============
GP_weight: 10;
NDH: 4096;
NGH: 4096;
batch_size: 64;
beta: 0.5;
cls_lr: 0.001;
cls_weight: 0.01;
critic_iter: 5;
cuda: True;
data_dir: /content/drive/MyDrive/ISWC_demo/ZS_IMGC/data;
dataset: AwA2;
epoch: 100;
gpu: 0;
gzsl: True;
lr: 1e-05;
manual_seed: 9182;
noise_size: 100;
ratio: 0.1;
semantic_type: kge_text;
syn_num: 1800;

*   **run OntoZSL on ImNet-O with "att" in the Standard ZSL setting**

============ Params ============
GP_weight: 10;
NDH: 4096;
NGH: 4096;
batch_size: 4096;
beta: 0.5;
cls_lr: 0.001;
cls_weight: 0.01;
critic_iter: 5;
cuda: True;
data_dir: /content/drive/MyDrive/ISWC_demo/ZS_IMGC/data;
dataset: ImNet_O;
epoch: 100;
gpu: 0;
gzsl: False;
lr: 0.0001;
manual_seed: 9416;
noise_size: 85;
ratio: 0.1;
semantic_type: att;
syn_num: 300;




---


**The best parameter of noise_size for AwA2**

hie, kge, kge_text, kge_facts, kge_logics is set to 100, 
w2v is set to 500, w2v-glove is set to 300, att is set to 85

**The best parameter of noise_size for ImNet_A**

hie, kge, kge_text, kge_facts is set to 100, 
w2v is set to 500, w2v-glove is set to 300, att is set to 85

**The best parameter of noise_size for ImNet_O**

hie, kge, kge_text, kge_facts is set to 100, 
w2v is set to 500, w2v-glove is set to 300, att is set to 40