# DRIVE



In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
%cd /content/drive/MyDrive/VQA

/content/drive/MyDrive/VQA


# DATA

## AOKVQA

In [None]:
AOKVQA_DIR="/content/drive/MyDrive/VQA/data/aokvqa/"

In [None]:
!mkdir -p $AOKVQA_DIR

In [None]:
!curl -fsSL https://prior-datasets.s3.us-east-2.amazonaws.com/aokvqa/aokvqa_v1p0.tar.gz | tar xvz -C $AOKVQA_DIR

aokvqa_v1p0_train.json
aokvqa_v1p0_val.json
aokvqa_v1p0_test.json
large_vocab_train.csv
specialized_vocab_train.csv


In [None]:
!ls /content/drive/MyDrive/VQA/data/aokvqa/

aokvqa_v1p0_test.json	aokvqa_v1p0_val.json   specialized_vocab_train.csv
aokvqa_v1p0_train.json	large_vocab_train.csv


## F-VQA

# CODE


## Cmd


In [None]:
!ls

cfgs  code  data  kg  run.ipynb


In [None]:
!mkdir -p /content/drive/MyDrive/VQA/cfgs

In [None]:
!mkdir -p /content/drive/MyDrive/VQA/data

In [None]:
!mkdir -p /content/drive/MyDrive/VQA/code

In [None]:
!mkdir -p /content/drive/MyDrive/VQA/code/utils

In [None]:
!mkdir -p /content/drive/MyDrive/VQA/code/model

In [None]:
!mkdir -p /content/drive/MyDrive/VQA/code/model/fusion_net

In [None]:
!mkdir -p /content/drive/MyDrive/VQA/code/model/answer_net

In [None]:
!mkdir -p /content/drive/MyDrive/VQA/code/data

## Data


###### preprocess.py

In [None]:
%%writefile /content/drive/MyDrive/VQA/code/data/preprocess.py
import os
import os.path as osp
import re
import random
import itertools
import h5py
import torch
import torch.utils.data as data
import pdb
from torch.utils.data.dataloader import default_collate
from collections import Counter
from PIL import Image
# this is used for normalizing questions
_special_chars = re.compile('[^a-z0-9 ]*')

# these try to emulate the original normalization scheme for answers
_period_strip = re.compile(r'(?!<=\d)(\.)(?!\d)')
_comma_strip = re.compile(r'(\d)(,)(\d)')
_punctuation_chars = re.escape(r';/[]"{}()=+\_-><@`,?!')
_punctuation = re.compile(r'([{}])'.format(re.escape(_punctuation_chars)))
_punctuation_with_a_space = re.compile(r'(?<= )([{0}])|([{0}])(?= )'.format(_punctuation_chars))


def invert_dict(d): return {v: k for k, v in d.items()}


def process_punctuation(s):
    # the original is somewhat broken, so things that look odd here might just be to mimic that behaviour
    # this version should be faster since we use re instead of repeated operations on str's
    original_s = s
    if _punctuation.search(s) is None:
        return s
    s = _punctuation_with_a_space.sub('', s)
    if re.search(_comma_strip, s) is not None:
        s = s.replace(',', '')
    s = _punctuation.sub(' ', s)
    s = _period_strip.sub('', s)
    if s.strip() == '':
        return original_s.strip()
    else:
        return s.strip()


def extract_vocab(iterable, top_k=None, start=0, input_vocab=None):
    """ Turns an iterable of list of tokens into a vocabulary.
        These tokens could be single answers or word tokens in questions.
    """
    all_tokens = itertools.chain.from_iterable(iterable)
    counter = Counter(all_tokens)
    if top_k:
        most_common = counter.most_common(top_k)
        most_common = (t for t, c in most_common)
    else:
        most_common = counter.keys()
    # descending in count, then lexicographical order
    tokens = sorted(most_common, key=lambda x: (counter[x], x), reverse=True)

    vocab = {t: i for i, t in enumerate(tokens, start=start)}
    return vocab


class CocoImages(data.Dataset):
    def __init__(self, path, transform=None):
        super(CocoImages, self).__init__()
        self.path = path
        self.id_to_filename = self._find_images()
        self.sorted_ids = sorted(self.id_to_filename.keys())  # used for deterministic iteration order
        print('found {} images in {}'.format(len(self), self.path))
        self.transform = transform

    def _find_images(self):
        id_to_filename = {}
        for filename in os.listdir(self.path):
            if not filename.endswith('.jpg'):
                continue
            id_and_extension = filename.split('_')[-1]
            id = int(id_and_extension.split('.')[0])
            id_to_filename[id] = filename
        return id_to_filename

    def __getitem__(self, item):
        id = self.sorted_ids[item]
        path = os.path.join(self.path, self.id_to_filename[id])
        img = Image.open(path).convert('RGB')

        if self.transform is not None:
            img = self.transform(img)
        return id, img

    def __len__(self):
        return len(self.sorted_ids)


class Composite(data.Dataset):
    """ Dataset that is a composite of several Dataset objects. Useful for combining splits of a dataset. """

    def __init__(self, *datasets):
        self.datasets = datasets

    def __getitem__(self, item):
        current = self.datasets[0]
        for d in self.datasets:
            if item < len(d):
                return d[item]
            item -= len(d)
        else:
            raise IndexError('Index too large for composite dataset')

    def __len__(self):
        return sum(map(len, self.datasets))

    def _get_answer_vectors(self, answer_indices):
        return self.datasets[0]._get_answer_vectors(answer_indices)

    def _get_answer_sequences(self, answer_indices):
        return self.datasets[0]._get_answer_sequences(answer_indices)

    @property
    def vector(self):
        return self.datasets[0].vector

    @property
    def token_to_index(self):
        return self.datasets[0].token_to_index

    @property
    def answer_to_index(self):
        return self.datasets[0].answer_to_index

    @property
    def index_to_answer(self):
        return self.datasets[0].index_to_answer

    @property
    def num_tokens(self):
        return self.datasets[0].num_tokens

    @property
    def num_answer_tokens(self):
        return self.datasets[0].num_answer_tokens

    @property
    def vocab(self):
        return self.datasets[0].vocab


def eval_collate_fn(batch):
    # put question lengths in descending order so that we can use packed sequences later
    batch.sort(key=lambda x: x[-1], reverse=True)
    return data.dataloader.default_collate(batch)

Writing /content/drive/MyDrive/VQA/code/data/preprocess.py


###### base.py

In [None]:
%%writefile /content/drive/MyDrive/VQA/code/data/base.py
import json
import os
import os.path as osp
import nltk
import h5py
import torch
import torch.utils.data as data
import pdb
from nltk import word_tokenize, pos_tag
import re
import numpy as np
import sys
import pickle as pkl

################
from .preprocess import invert_dict


class VisualQA(data.Dataset):
    def __init__(self,
                 args,
                 vector):
        super(VisualQA, self).__init__()

        # vocab
        self.vector = vector
        self.args = args
        # process question
        # self.args.question_vocab_path = osp.join(project_root, 'data', 'question.vocab.json') # a joint question vocab across all dataset
        with open(self.args.question_vocab_path, 'r') as fd:
            question_vocab = json.load(fd)
        self.token_to_index = question_vocab['question']
        self._max_question_length = question_vocab['max_question_length']
        self.image_features_path = args.FVQA.feature_path
        self.index_to_token = invert_dict(self.token_to_index)

        answer_vocab_path = self.args.FVQA.answer_vocab_path
        fact_vocab_path = self.args.FVQA.fact_vocab_path
        relation_vocab_path = self.args.FVQA.relation_vocab_path

        if self.args.fact_map:
            with open(fact_vocab_path, 'r') as fd:
                answer_vocab = json.load(fd)
        elif self.args.relation_map:
            with open(relation_vocab_path, 'r') as fd:
                answer_vocab = json.load(fd)
        else:
            with open(answer_vocab_path, 'r') as fd:
                answer_vocab = json.load(fd)
        self.answer_to_index = answer_vocab['answer']
        self.index_to_answer = invert_dict(self.answer_to_index)

        self.cached_answers_g2v = {}  # 只编码KGE
        self.cached_answers_w2v = {}  # 只编码序列
        self.cached_answers_gae = {}
        self.cached_answers_bert = {}
        self.unk_vector = self.vector['UNK']
        if "KG" in self.args.method_choice:
            self._map_kg()
        if "GAE" in self.args.method_choice:
            # self._map_gae()
            self._map_bert()

    @property
    def max_question_length(self):
        return self._max_question_length

    @property
    def max_answer_length(self):
        assert hasattr(self, answers), 'Dataloader must have access to answers'
        if not hasattr(self, '_max_answer_length'):
            self._max_answer_length = max(map(len, self.answers))
        return self._max_answer_length

    @property
    def num_tokens(self):
        return len(self.token_to_index)

    @property
    def num_answers(self):
        return len(self.answer_to_index)

    def __len__(self):
        return len(self.questions)

    # Internal data utility---------------------------------------

    def _load_image(self, image_id):
        """ Load an image """
        # pdb.set_trace()
        index = self.image_id_to_index[image_id]
        spa = torch.zeros([1, 1])  # init

        if self.args.fusion_model == 'UD' or self.args.fusion_model == 'BAN':
            spatials = self.features_file['spatial_features']
            dataset = self.features_file['image_features']  # 直接读取特征文件
            spa = spatials[index].astype('float32')
            spa = torch.from_numpy(spa)
        else:
            dataset = self.features_file['features']  # 直接读取特征文件

        img = dataset[index].astype('float32')

        return torch.from_numpy(img), spa

    def _create_image_id_to_index(self):
        """ Create a mapping from a COCO image id into the corresponding index into the h5 file """
        if not hasattr(self, 'features_file'):
            # Loading the h5 file has to be done here and not in __init__ because when the DataLoader
            # forks for multiple works, every child would use the same file object and fail
            # Having multiple readers using different file objects is fine though, so we just init in here.
            self.features_file = h5py.File(self.image_features_path, 'r')

        if self.args.fusion_model == 'UD' or self.args.fusion_model == 'BAN':
            import _pickle as cPickle
            image_id_to_index = cPickle.load(open(self.args.FVQA.img_id2idx, "rb"))
            # pdb.set_trace()
            self.s_dim = self.features_file['spatial_features'].shape[2]
            self.v_dim = self.features_file['image_features'].shape[2]  # 直接读取特征文件

        else:
            with h5py.File(self.image_features_path, 'r') as features_file:
                image_ids = features_file['ids'][()]
            image_id_to_index = {id: i for i, id in enumerate(image_ids)}
        return image_id_to_index

    def _encode_question(self, question):
        """ Turn a question into a vector of indices and a question length """
        vec = torch.zeros(self.max_question_length).long()
        for i, token in enumerate(question):
            index = self.token_to_index.get(token, 0)
            vec[i] = index
        return vec, len(question)

    def _map_kg(self):
        if "KG" not in self.args.method_choice:
            return
        # print("using kg embedding")
        kg_path = self.args.FVQA.kg_path
        entity_path = self.args.FVQA.entity_path  # 来源中的词对应的向量
        relation_path = self.args.FVQA.relation_path  # 同上
        relation2id_path = self.args.FVQA.relation2id_path  # 搜寻候选答案的来源
        entity2id_path = self.args.FVQA.entity2id_path  # 搜寻候选答案的来源

        a = np.load(entity_path)
        b = np.load(relation_path)
        self.map_kg = np.vstack((a, b))

        # 随机得到一个矩阵，以模拟随机的情况
        # self.map_ran=torch.zeros(self.map_kg.shape)
        # self.map_ran = torch.rand(self.map_kg.shape)
        # self.map_ran = torch.randn(self.map_kg.shape)
        # self.map_kg = self.map_ran

        self.map_kg = torch.Tensor(self.map_kg).view(-1, 300)

        self.stoi_kg = {}
        with open(os.path.join(entity2id_path), "r") as f:
            while 1:
                line = f.readline()
                if not line:
                    break
                line = re.split('\t|\n', line)[:2]
                self.stoi_kg[line[0]] = int(line[1])
        sz = len(self.stoi_kg)
        with open(os.path.join(relation2id_path), "r") as f:
            while 1:
                line = f.readline()
                if not line:
                    break
                line = re.split('\t|\n', line)[:2]
                self.stoi_kg[line[0]] = int(line[1]) + sz

    def _map_gae(self):
        if "GAE" not in self.args.method_choice:
            return
        # print("using kg embedding")

        _gae_path = self.args.FVQA.gae_path
        gae_path = osp.join(_gae_path, str(self.args.FVQA.gae_node_num) + "_init_" + self.args.FVQA.gae_init + ".pkl")
        print("gae file:", gae_path)
        with open(gae_path, 'rb') as f:
            if sys.version_info > (3, 0):
                features = pkl.load(f, encoding='latin1')
            else:
                features = pkl.load(f)
        # 下标到gae向量的映射
        self.map_gae = torch.FloatTensor(np.array(features)).view(-1, 300)
        vertices_f = osp.join(_gae_path, "g_nodes_" + str(self.args.FVQA.gae_node_num) + ".json")
        self.stoi_gae = {}
        with open(vertices_f) as fp:
            vertices_list = json.load(fp)

        for i, vertex in enumerate(vertices_list):
            self.stoi_gae[vertex] = i
        # print("test map gae")
        # pdb.set_trace()

    def _map_bert(self):
        if "GAE" not in self.args.method_choice:
            return
        # print("using kg embedding")

        cache_path = osp.join(self.args.FVQA.bert_path, "map_bert.pt")
        if not osp.exists(cache_path):
            _bert_path = self.args.FVQA.bert_path

            bert_path = osp.join(_bert_path, "conceptnet_bert_embeddings.pt")
            print("bert file:", bert_path)
            _cache = torch.load(bert_path)  # torch.Size([78334, 1024])

            self.map_bert = torch.FloatTensor(self.args.FVQA.max_ans, self.args.bert_dim)
            # 下标到gae向量的映射
            all = []

            with open(osp.join(_bert_path, "cn_node_names_for_embeddings.txt"), 'r', encoding='utf-8') as f:
                while 1:
                    line = f.readline()
                    if not line:
                        break
                    line = re.split('\n', line)
                    all.append(line[0])

            self.stoi_bert = {}  # answer to vector文件的 id 下标
            for key, value in self.answer_to_index.items():
                self.stoi_bert[key] = value
                if key in all:
                    self.map_bert[value] = _cache[all.index(key), :]
                else:
                    cnt = 0.0
                    tmp = torch.zeros(1, self.args.bert_dim).cuda()
                    for i, j in enumerate(all):
                        if len(j) >= 4 and len(key) >= 3 and (key in j or j in key):
                            # pdb.set_trace()
                            tmp += _cache[i, :]  # 取平均
                            cnt += 1
                        if cnt >= 3:
                            break
                    if cnt == 0:
                        raise TypeError('cnt can not = 0 !!!')
                    self.map_bert[value] = tmp / (cnt + 1e-12)

            if (self.map_bert != self.map_bert).any():
                raise TypeError('cnt can not = 0 !!!')
            # pdb.set_trace()
            torch.save({'map_bert': self.map_bert, 'stoi_bert': self.stoi_bert}, cache_path)
        else:
            _cache = torch.load(cache_path)
            self.map_bert = _cache['map_bert']  # 词向量列表 + 长度
            self.stoi_bert = _cache['stoi_bert']  # 答案下标

        # print("test map gae")
        # pdb.set_trace()

    def _get_answer_vectors(self, ways, answer_indices):
        dim = self.vector.dim
        if ways == 'GAE':
            dim = self.args.bert_dim
            return self._encode_answer_vector(self._encode_answer_vector_bert, dim, answer_indices)
            # return self._encode_answer_vector(self._encode_answer_vector_gae, dim, answer_indices)
        elif ways == 'KG':
            return self._encode_answer_vector(self._encode_answer_vector_g2v, dim, answer_indices)
        elif ways == 'W2V':
            return self._encode_answer_vector(self._encode_answer_vector_w2v, dim, answer_indices)

    def _encode_answer_vector(self, encode_model, dim, answer_indices):
        if isinstance(answer_indices[0], list):
            N, C = len(answer_indices), len(answer_indices[0])
            vector = torch.zeros(N, C, dim)
            for i, answer_ids in enumerate(answer_indices):
                for j, answer_id in enumerate(answer_ids):
                    if answer_id != -1:
                        vector[i, j, :] = encode_model(self.index_to_answer[answer_id])
                    else:
                        vector[i, j, :] = self.unk_vector
        else:
            vector = torch.zeros(len(answer_indices), dim)
            for idx, answer_id in enumerate(answer_indices):

                if answer_id != -1:
                    if type(answer_id).__name__ == 'int':
                        vector[idx, :] = encode_model(self.index_to_answer[answer_id])
                    else:
                        vector[idx, :] = encode_model(self.index_to_answer[answer_id.item()])
                else:
                    vector[idx, :] = self.unk_vector
        return vector, []

    def _get_answer_sequences_w2v(self, answer_indices):
        seqs, lengths = [], []
        max_seq_length = 0
        if isinstance(answer_indices[0], list):
            N, C = len(answer_indices), len(answer_indices[0])
            for i, answer_ids in enumerate(answer_indices):
                _seqs = []
                for j, answer_id in enumerate(answer_ids):
                    if answer_id != -1:
                        _seqs.append(self._encode_answer_sequence_w2v(self.index_to_answer[answer_id]))
                    else:
                        _seqs.append([self.unk_vector])
                    if max_seq_length < len(_seqs[-1]):
                        max_seq_length = len(_seqs[-1])  # determing max length
                seqs.append(_seqs)

            vector = torch.zeros(N, C, max_seq_length, self.vector.dim)
            for i, _seqs in enumerate(seqs):
                for j, seq in enumerate(_seqs):
                    if len(seq) != 0:
                        vector[i, j, :len(seq), :] = torch.cat(seq, dim=0)
                    lengths.append(len(seq))
            assert len(lengths) == N * \
                C, 'Wrong lengths - length: {} vs N: {}, C: {} vs seqs: {}'.format(len(lengths), N, C, len(seqs))
        else:
            for idx, answer_id in enumerate(answer_indices):
                if answer_id != -1:
                    if type(answer_id).__name__ == 'int':
                        seqs.append(self._encode_answer_sequence_w2v(self.index_to_answer[answer_id]))
                    else:
                        seqs.append(self._encode_answer_sequence_w2v(self.index_to_answer[answer_id.item()]))
                else:
                    seqs.append([self.unk_vector])

                if max_seq_length < len(seqs[-1]):
                    max_seq_length = len(seqs[-1])  # determing max length

            vector = torch.zeros(len(answer_indices), max_seq_length, self.vector.dim)
            for idx, seq in enumerate(seqs):
                if len(seq) != 0:
                    vector[idx, :len(seq), :] = torch.cat(seq, dim=0)
                lengths.append(len(seq))

        return vector, lengths

    def _encode_answer_vector_bert(self, answer):  # 向量求平均

        if isinstance(self.cached_answers_bert.get(answer, -1), int):
            answer_vec = torch.zeros(1, self.args.bert_dim)
            idk = self.stoi_bert.get(answer, -1)
            if idk >= 0:
                answer_vec = self.map_bert[idk]
            self.cached_answers_bert[answer] = answer_vec
        return self.cached_answers_bert[answer]

    def _encode_answer_vector_gae(self, answer):  # 向量求平均
        if isinstance(self.cached_answers_gae.get(answer, -1), int):
            answer_vec = torch.zeros(1, self.vector.dim)
            idk = self.stoi_gae.get(answer, -1)
            if idk >= 0:
                answer_vec = self.map_gae[idk].reshape(1, 300)
            self.cached_answers_gae[answer] = answer_vec
        return self.cached_answers_gae[answer]

    def _encode_answer_vector_g2v(self, answer):  # 向量求平均
        if isinstance(self.cached_answers_g2v.get(answer, -1), int):
            answer_vec = torch.zeros(1, self.vector.dim)

            idk = self.stoi_kg.get(answer, -1)
            if idk >= 0:
                answer_vec = self.map_kg[idk].reshape(1, 300)
            self.cached_answers_g2v[answer] = answer_vec
        return self.cached_answers_g2v[answer]

    def _encode_answer_vector_w2v(self, answer):  # 向量求平均
        if isinstance(self.cached_answers_w2v.get(answer, -1), int):
            tokens = nltk.word_tokenize(answer)
            answer_vec = torch.zeros(1, self.vector.dim)
            cnt = 0
            for i, token in enumerate(tokens):
                if self.vector.check(token):
                    answer_vec += self.vector[token]
                    cnt += 1
            self.cached_answers_w2v[answer] = answer_vec / (cnt + 1e-12)
            # pdb.set_trace()
        return self.cached_answers_w2v[answer]

    def _encode_answer_sequence_w2v(self, answer):
        if isinstance(self.cached_answers_w2v.get(answer, -1), int):
            tokens = nltk.word_tokenize(answer)
            answer_seq = []
            for i, token in enumerate(tokens):
                if self.vector.check(token):
                    answer_seq.append(self.vector[token].view(1, self.vector.dim))
                else:
                    answer_seq.append(self.vector['<unk>'].view(1, self.vector.dim))
            self.cached_answers_w2v[answer] = answer_seq

        return self.cached_answers_w2v[answer]

    def _encode_multihot_labels(self, answers):
        """ Turn an answer into a vector """
        max_answer_index = self.args.TEST.max_answer_index
        answer_vec = torch.zeros(max_answer_index)
        for answer in answers:
            index = self.answer_to_index.get(answer)
            if index is not None:
                if index < max_answer_index:
                    answer_vec[index] += 1
        return answer_vec

    def evaluate(self, predictions):
        raise NotImplementedError

Writing /content/drive/MyDrive/VQA/code/data/base.py


###### fvqa.py

In [None]:
%%writefile /content/drive/MyDrive/VQA/code/data/fvqa.py
import json
import os
import os.path as osp
import nltk
from collections import Counter
import torch
import torch.utils.data as data
import pdb

################
from .base import VisualQA
from .preprocess import process_punctuation


def get_loader(args, vector, train=False, val=False):
    """ Returns a data loader for the desired split """
    assert train + val == 1, 'need to set exactly one of {train, val, test} to True'  # 必须有一个为真
    id = args.FVQA.data_choice
    if train:
        filepath = "train" + id
        print("use train data:", id)
        filepath = os.path.join(args.FVQA.train_data_path, filepath)
    else:
        filepath = "test" + id
        filepath = os.path.join(args.FVQA.test_data_path, filepath)

    split = FVQA(  # 定义每一次训练的VQA输入 # ok
        args,
        path_for(args, train=train, val=val, filepath=filepath),  # train的问题
        vector,  # 对应的词向量
        file_path=filepath
    )
    batch_size = args.TRAIN.batch_size
    if val:
        batch_size = args.TEST.batch_size
    loader = torch.utils.data.DataLoader(  # 定义传统的DataLoader
        split,
        batch_size=batch_size,
        shuffle=True,  # only shuffle the data in training
        pin_memory=True,
        num_workers=args.TRAIN.data_workers,
    )

    return loader


class FVQA(VisualQA):  # ok
    """ FVQA dataset, open-ended """

    def __init__(self, args, qa_path, vector, file_path=None):
        self.args = args
        answer_vocab_path = self.args.FVQA.answer_vocab_path
        super(FVQA, self).__init__(args, vector)
        # load annotation
        with open(qa_path, 'r') as fd:
            self.qa_json = json.load(fd)

        # print('extracting answers...')

        # 把问题变成id向量+长度的表示, 答案变成id向量
        if args.fact_map:
            #  得到对应的名字
            name = "fact"
            self.answers = list(prepare_fact(self.qa_json))  # 候选答案列表的列表 [[answer1,answer2,...][....]] 每个问题对应的答案. 单词表示
        elif args.relation_map:
            name = "relation"
            self.answers = list(prepare_relation(self.qa_json))  # 候选答案列表的列表 [[answer1,answer2,...][....]] 每个问题对应的答案. 单词表示
        else:
            name = "answer"
            self.answers = list(prepare_answers(self.qa_json))  # 候选答案列表的列表 [[answer1,answer2,...][....]] 每个问题对应的答案. 单词表示

        cache_filepath = self._get_cache_path(qa_path, file_path, name)

        # self.support_relation = list(prepare_relation(self.qa_json))
        self.questions, self.answer_indices = self._qa_id_represent(cache_filepath)
        # pdb.set_trace()
        # process images 处理图片

    def open_hdf5(self):
        self.image_features_path = self.args.FVQA.feature_path
        self.image_id_to_index = self._create_image_id_to_index()  # 得到图片编号到下标的表示
        # self.image_ids = [q['image_id'] for q in questions_json['questions']]
        self.image_ids = self._get_img_id()

    def __getitem__(self, item):  # ok
        if not hasattr(self, 'image_ids'):
            self.open_hdf5()
        # if item > len(self.answers):
        #     pdb.set_trace()

        question, question_length = self.questions[item]  # 问题向量列表
        # sample answers
        # self.answer_indices[item]：[1,2,3] or [-1, -1 ...]
        # answer_cands = Counter(self.answer_indices[item])  # 单个答案 返回类型：Counter({1: 1, 2: 1, 3: 1})
        # answer_indices = list(answer_cands.keys())  # 答案有哪几个（下标）[[1,2,3]]
        # counts = list(answer_cands.values())  # 这几个答案分别出现了多少次[10]

        label = self._encode_multihot_labels(self.answers[item])  # 答案的multihot表示 前百分之多少的答案
        image_id = self.image_ids[item]
        image, spa = self._load_image(image_id)  # 直接获得图片的特征
        # unique_answers, answer_vectors = self._generate_batch_answer(answer_indices, counts)
        # answer_vectors == label
        # assert answer_vectors == label
        # return image, spa, question, unique_answers, answer_vectors, label, item, question_length
        # pdb.set_trace()
        return image, spa, question, label, item, question_length

    def _get_cache_path(self, qa_path, file_path, name):
        w2v = ""
        if "KG" in self.args.method_choice:
            if "w2v" in self.args.FVQA.entity_path:
                w2v = "(w2vinit)_" + self.args.FVQA.entity_num + "_" + self.args.FVQA.KGE
            else:
                w2v = "_" + self.args.FVQA.entity_num + "_" + self.args.FVQA.KGE
        if "train" in qa_path:
            cache_filepath = osp.join(file_path, "fvqa_" + name + "_and_question_train_" +
                                      self.args.method_choice + w2v + "_" + str(self.args.FVQA.max_ans) + ".pt")
        else:
            cache_filepath = osp.join(file_path, "fvqa_" + name + "_and_question_test_" + self.args.method_choice + w2v + "_" + str(
                self.args.FVQA.max_ans) + ".pt")
        return cache_filepath

    def _qa_id_represent(self, cache_filepath):
        if not os.path.exists(cache_filepath):
            # print('encoding questions...')
            questions = list(prepare_questions(self.qa_json))  # 问题词列表的列表
            questions = [self._encode_question(q) for q in questions]  # 把问题变成id向量+长度的表示

            # 对于候选答案列表中的每一个问题对应的候选答案列表，转换成下标表示[[1,2,3],[2,3,4]......]  1——>一个答案
            answer_indices = [[self.answer_to_index.get(_a, -1) for _a in a] for a in self.answers]  # 如果没有匹配就是 -1
            torch.save({'questions': questions, 'answer_indices': answer_indices}, cache_filepath)

        else:
            # 已经有，对应这个训练/测试集 的问题w2v表，[train 和 test是不一样的]
            _cache = torch.load(cache_filepath)
            questions = _cache['questions']  # 词向量列表 + 长度
            answer_indices = _cache['answer_indices']  # 答案下标
            # self.answer_vectors = _cache['answer_vectors']  # 答案的向量表示[平均]

        return questions, answer_indices

    def _get_img_id(self):
        image_ids = []
        keys = list(self.qa_json.keys())
        for a in keys:
            filename = self.qa_json[a]["img_file"]
            id_and_extension = filename.split('_')[-1]
            id = int(id_and_extension.split('.')[0])
            if not filename.endswith('.jpg'):
                id += 1000000  # 把jpg和jpeg的分开
                # pdb.set_trace()
            image_ids.append(id)
        return image_ids

    # def _generate_batch_answer(self, indices, counts):  # 获得每一个batch的500个候选答案。
    #     unique_answers = list(range(0, self.args.FVQA.max_ans))
    #     # unique_answers = list(set( aid for aids in indices for aid in aids ))
    #     answer_dict = {k: i for i, k in enumerate(unique_answers)}
    #     answer_vector = torch.zeros(len(indices), len(unique_answers))  # 128,500
    #
    #     for i in range(len(counts)):  # 128
    #         for j, c in zip(indices[i], counts[i]):
    #             answer_vector[i, answer_dict[j]] = c  # 把出现的次数附上
    #
    #     return unique_answers, answer_vector


def path_for(args, train=False, val=False, filepath=""):
    # tra = "all_qs_dict_release_train_" + str(args.FVQA.max_ans) + ".json"
    # tes = "all_qs_dict_release_test_" + str(args.FVQA.max_ans) + ".json"
    tra = "all_qs_dict_release_train_500.json"
    tes = "all_qs_dict_release_test_500.json"
    if train == True:
        return os.path.join(args.FVQA.train_data_path, filepath, tra)
    else:
        return os.path.join(args.FVQA.test_data_path, filepath, tes)


def prepare_questions(questions_json):  # ok
    """ Tokenize and normalize questions from a given question json in the usual VQA format. """
    keys = list(questions_json.keys())
    questions = []
    for a in keys:
        questions.append(questions_json[a]['question'])  # question的list
    for question in questions:
        question = question.lower()[:-1]
        yield nltk.word_tokenize(process_punctuation(question))  # 得到一个词的list，例如['I', 'LOVE', 'YOU']


def prepare_answers(answers_json):  # ok
    """ Normalize answers from a given answer json in the usual VQA format. """
    keys = list(answers_json.keys())
    answers = []

    for a in keys:
        answer = answers_json[a]["answer"]
        answers.append([answer] * 10)  # 双层list，内层的list对应一个问题的答案序列
    for answer_list in answers:
        ret = list(map(process_punctuation, answer_list))  # 去除标点等操作
        yield ret


def prepare_fact(answers_json):  # ok
    """ Normalize answers from a given answer json in the usual VQA format. """
    keys = list(answers_json.keys())
    support_facts = []
    for a in keys:
        answer = answers_json[a]["answer"]
        facts = answers_json[a]["fact"]
        f1 = facts[0]
        f2 = facts[2]
        if answer != f1 and answer != f2:
            pdb.set_trace()
        assert (answer == f1 or answer == f2)
        if answer == f1:
            fact = f2
        else:
            fact = f1
        support_facts.append([fact] * 10)  # 双层list，内层的list对应一个问题的答案序列
    for support_facts_list in support_facts:
        ret = list(map(process_punctuation, support_facts_list))  # 去除标点等操作
        yield ret


def prepare_relation(answers_json):  # ok
    """ Normalize answers from a given answer json in the usual VQA format. """
    keys = list(answers_json.keys())
    relations = []
    for a in keys:
        facts = answers_json[a]["fact"]
        relation = facts[1]

        relations.append([relation] * 10)  # 双层list，内层的list对应一个问题的答案序列
    for relation_list in relations:
        ret = list(map(process_punctuation, relation_list))  # 去除标点等操作
        yield ret

Writing /content/drive/MyDrive/VQA/code/data/fvqa.py


###### aokvqa.py

In [None]:
%%writefile /content/drive/MyDrive/VQA/code/data/aokvqa.py
import json
import os
import os.path as osp
import nltk
from collections import Counter
import torch
import torch.utils.data as data
import pdb

################
from .base import VisualQA
from .preprocess import process_punctuation


def get_loader(args, vector, train=False, val=False):
    """ Returns a data loader for the desired split """
    assert train + val == 1, 'need to set exactly one of {train, val, test} to True'  # 必须有一个为真
    id = args.FVQA.data_choice
    if train:
        filepath = "train" + id
        print("use train data:", id)
        filepath = os.path.join(args.FVQA.train_data_path, filepath)
    else:
        filepath = "test" + id
        filepath = os.path.join(args.FVQA.test_data_path, filepath)

    split = FVQA(  # 定义每一次训练的VQA输入 # ok
        args,
        path_for(args, train=train, val=val, filepath=filepath),  # train的问题
        vector,  # 对应的词向量
        file_path=filepath
    )
    batch_size = args.TRAIN.batch_size
    if val:
        batch_size = args.TEST.batch_size
    loader = torch.utils.data.DataLoader(  # 定义传统的DataLoader
        split,
        batch_size=batch_size,
        shuffle=True,  # only shuffle the data in training
        pin_memory=True,
        num_workers=args.TRAIN.data_workers,
    )

    return loader


class FVQA(VisualQA):  # ok
    """ FVQA dataset, open-ended """

    def __init__(self, args, qa_path, vector, file_path=None):
        self.args = args
        answer_vocab_path = self.args.FVQA.answer_vocab_path
        super(FVQA, self).__init__(args, vector)
        # load annotation
        with open(qa_path, 'r') as fd:
            self.qa_json = json.load(fd)

        # print('extracting answers...')

        # 把问题变成id向量+长度的表示, 答案变成id向量
        if args.fact_map:
            #  得到对应的名字
            name = "fact"
            self.answers = list(prepare_fact(self.qa_json))  # 候选答案列表的列表 [[answer1,answer2,...][....]] 每个问题对应的答案. 单词表示
        elif args.relation_map:
            name = "relation"
            self.answers = list(prepare_relation(self.qa_json))  # 候选答案列表的列表 [[answer1,answer2,...][....]] 每个问题对应的答案. 单词表示
        else:
            name = "answer"
            self.answers = list(prepare_answers(self.qa_json))  # 候选答案列表的列表 [[answer1,answer2,...][....]] 每个问题对应的答案. 单词表示

        cache_filepath = self._get_cache_path(qa_path, file_path, name)

        # self.support_relation = list(prepare_relation(self.qa_json))
        self.questions, self.answer_indices = self._qa_id_represent(cache_filepath)
        # pdb.set_trace()
        # process images 处理图片

    def open_hdf5(self):
        self.image_features_path = self.args.FVQA.feature_path
        self.image_id_to_index = self._create_image_id_to_index()  # 得到图片编号到下标的表示
        # self.image_ids = [q['image_id'] for q in questions_json['questions']]
        self.image_ids = self._get_img_id()

    def __getitem__(self, item):  # ok
        if not hasattr(self, 'image_ids'):
            self.open_hdf5()
        # if item > len(self.answers):
        #     pdb.set_trace()

        question, question_length = self.questions[item]  # 问题向量列表
        # sample answers
        # self.answer_indices[item]：[1,2,3] or [-1, -1 ...]
        # answer_cands = Counter(self.answer_indices[item])  # 单个答案 返回类型：Counter({1: 1, 2: 1, 3: 1})
        # answer_indices = list(answer_cands.keys())  # 答案有哪几个（下标）[[1,2,3]]
        # counts = list(answer_cands.values())  # 这几个答案分别出现了多少次[10]

        label = self._encode_multihot_labels(self.answers[item])  # 答案的multihot表示 前百分之多少的答案
        image_id = self.image_ids[item]
        image, spa = self._load_image(image_id)  # 直接获得图片的特征
        # unique_answers, answer_vectors = self._generate_batch_answer(answer_indices, counts)
        # answer_vectors == label
        # assert answer_vectors == label
        # return image, spa, question, unique_answers, answer_vectors, label, item, question_length
        # pdb.set_trace()
        return image, spa, question, label, item, question_length

    def _get_cache_path(self, qa_path, file_path, name):
        w2v = ""
        if "KG" in self.args.method_choice:
            if "w2v" in self.args.FVQA.entity_path:
                w2v = "(w2vinit)_" + self.args.FVQA.entity_num + "_" + self.args.FVQA.KGE
            else:
                w2v = "_" + self.args.FVQA.entity_num + "_" + self.args.FVQA.KGE
        if "train" in qa_path:
            cache_filepath = osp.join(file_path, "fvqa_" + name + "_and_question_train_" +
                                      self.args.method_choice + w2v + "_" + str(self.args.FVQA.max_ans) + ".pt")
        else:
            cache_filepath = osp.join(file_path, "fvqa_" + name + "_and_question_test_" + self.args.method_choice + w2v + "_" + str(
                self.args.FVQA.max_ans) + ".pt")
        return cache_filepath

    def _qa_id_represent(self, cache_filepath):
        if not os.path.exists(cache_filepath):
            # print('encoding questions...')
            questions = list(prepare_questions(self.qa_json))  # 问题词列表的列表
            questions = [self._encode_question(q) for q in questions]  # 把问题变成id向量+长度的表示

            # 对于候选答案列表中的每一个问题对应的候选答案列表，转换成下标表示[[1,2,3],[2,3,4]......]  1——>一个答案
            answer_indices = [[self.answer_to_index.get(_a, -1) for _a in a] for a in self.answers]  # 如果没有匹配就是 -1
            torch.save({'questions': questions, 'answer_indices': answer_indices}, cache_filepath)

        else:
            # 已经有，对应这个训练/测试集 的问题w2v表，[train 和 test是不一样的]
            _cache = torch.load(cache_filepath)
            questions = _cache['questions']  # 词向量列表 + 长度
            answer_indices = _cache['answer_indices']  # 答案下标
            # self.answer_vectors = _cache['answer_vectors']  # 答案的向量表示[平均]

        return questions, answer_indices

    def _get_img_id(self):
        image_ids = []
        keys = list(self.qa_json.keys())
        for a in keys:
            filename = self.qa_json[a]["img_file"]
            id_and_extension = filename.split('_')[-1]
            id = int(id_and_extension.split('.')[0])
            if not filename.endswith('.jpg'):
                id += 1000000  # 把jpg和jpeg的分开
                # pdb.set_trace()
            image_ids.append(id)
        return image_ids

    # def _generate_batch_answer(self, indices, counts):  # 获得每一个batch的500个候选答案。
    #     unique_answers = list(range(0, self.args.FVQA.max_ans))
    #     # unique_answers = list(set( aid for aids in indices for aid in aids ))
    #     answer_dict = {k: i for i, k in enumerate(unique_answers)}
    #     answer_vector = torch.zeros(len(indices), len(unique_answers))  # 128,500
    #
    #     for i in range(len(counts)):  # 128
    #         for j, c in zip(indices[i], counts[i]):
    #             answer_vector[i, answer_dict[j]] = c  # 把出现的次数附上
    #
    #     return unique_answers, answer_vector


def path_for(args, train=False, val=False, filepath=""):
    # tra = "all_qs_dict_release_train_" + str(args.FVQA.max_ans) + ".json"
    # tes = "all_qs_dict_release_test_" + str(args.FVQA.max_ans) + ".json"
    tra = "all_qs_dict_release_train_500.json"
    tes = "all_qs_dict_release_test_500.json"
    if train == True:
        return os.path.join(args.FVQA.train_data_path, filepath, tra)
    else:
        return os.path.join(args.FVQA.test_data_path, filepath, tes)


def prepare_questions(questions_json):  # ok
    """ Tokenize and normalize questions from a given question json in the usual VQA format. """
    keys = list(questions_json.keys())
    questions = []
    for a in keys:
        questions.append(questions_json[a]['question'])  # question的list
    for question in questions:
        question = question.lower()[:-1]
        yield nltk.word_tokenize(process_punctuation(question))  # 得到一个词的list，例如['I', 'LOVE', 'YOU']


def prepare_answers(answers_json):  # ok
    """ Normalize answers from a given answer json in the usual VQA format. """
    keys = list(answers_json.keys())
    answers = []

    for a in keys:
        answer = answers_json[a]["answer"]
        answers.append([answer] * 10)  # 双层list，内层的list对应一个问题的答案序列
    for answer_list in answers:
        ret = list(map(process_punctuation, answer_list))  # 去除标点等操作
        yield ret


def prepare_fact(answers_json):  # ok
    """ Normalize answers from a given answer json in the usual VQA format. """
    keys = list(answers_json.keys())
    support_facts = []
    for a in keys:
        answer = answers_json[a]["answer"]
        facts = answers_json[a]["fact"]
        f1 = facts[0]
        f2 = facts[2]
        if answer != f1 and answer != f2:
            pdb.set_trace()
        assert (answer == f1 or answer == f2)
        if answer == f1:
            fact = f2
        else:
            fact = f1
        support_facts.append([fact] * 10)  # 双层list，内层的list对应一个问题的答案序列
    for support_facts_list in support_facts:
        ret = list(map(process_punctuation, support_facts_list))  # 去除标点等操作
        yield ret


def prepare_relation(answers_json):  # ok
    """ Normalize answers from a given answer json in the usual VQA format. """
    keys = list(answers_json.keys())
    relations = []
    for a in keys:
        facts = answers_json[a]["fact"]
        relation = facts[1]

        relations.append([relation] * 10)  # 双层list，内层的list对应一个问题的答案序列
    for relation_list in relations:
        ret = list(map(process_punctuation, relation_list))  # 去除标点等操作
        yield ret

Writing /content/drive/MyDrive/VQA/code/data/aokvqa.py


## Content


###### main.py


In [None]:
%%writefile /content/drive/MyDrive/VQA/code/main.py
import os
import argparse
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
from torch.utils.data import DataLoader
from torch.utils.tensorboard import SummaryWriter
from tqdm import tqdm
import pdb
import torch.optim as optim
from torch.autograd import Variable
from torch.utils.data.dataloader import default_collate
import warnings
from pprint import pprint

# self-defined
# import model.fusion_net as fusion_net
# import model.answer_net as answer_net
# from model import Vector, SimpleClassifier
# from config import cfg
# from torchlight import initialize_exp, set_seed, snapshot, get_dump_path, show_params
# from utils import unseen_mask, freeze_layer, cosine_sim, Metrics, instance_bce_with_logits
# from data import fvqa
# import copy
# torch.multiprocessing.set_start_method('spawn')

warnings.filterwarnings('ignore')


class Runner:
    def __init__(self, args):
        # prepare for: data , model, loss fuction, optimizer

        self.log_dir = get_dump_path(args)
        self.model_dir = os.path.join(self.log_dir, 'model')

        self.word2vec = Vector(args.FVQA.common_data_path)
        # data load
        self.train_loader = fvqa.get_loader(args, self.word2vec, train=True)
        self.val_loader = fvqa.get_loader(args, self.word2vec, val=True)

        self.avocab = default_collate(list(range(0, args.FVQA.max_ans)))

        # question_word2vec: get the word vector (for each word in question )
        # the id of which could map to the vector of corresponding token
        self.question_word2vec = self.word2vec._prepare(self.train_loader.dataset.token_to_index)

        # get the fusion_model and answer_net
        self._model_choice(args)

        # get the mask from zsl
        self.negtive_mux = unseen_mask(args, self.val_loader)

        # optimizer
        params_for_optimization = list(self.fusion_model.parameters()) + list(self.answer_net.parameters())
        self.optimizer = optim.Adam([p for p in params_for_optimization if p.requires_grad], lr=args.TRAIN.lr)

        # loss fuction
        self.log_softmax = nn.LogSoftmax(dim=1).cuda()

        # Recorder
        self.max_acc = [0, 0, 0, 0]
        self.max_zsl_acc = [0, 0, 0, 0]
        self.best_epoch = 0
        self.correspond_loss = 1e20

        self.early_stop = 0

        print("fusion_model:")
        pprint(self.fusion_model)
        print("Answer Model:")
        pprint(self.answer_net)

        self.args = args

        # test stage:
        if self.args.now_test:
            print("begin test! ...")
            print("loading model  ...")
            self._load_model(self.fusion_model, "fusion")
            self._load_model(self.answer_net, "embedding")

    def run(self):
        # 1. define the parameters which are out the epoch
        # 2. Update statistical indicator
        # 3. concate of answer embedding

        # Answer embedding :
        # choices belong to: ['CLS', 'W2V', 'KG', 'GAE', 'KG_W2V', 'KG_GAE', 'GAE_W2V', 'KG_GAE_W2V']
        # well, we recommend only use the parameter : 'CLS' or 'W2V'.
        # since that the resource of other choices need extra training.
        if args.method_choice != 'CLS':
            previous_var = None
            for method_choice in self.method_list:
                # get the corresponding choice embedding
                answer_var, answer_len = self.train_loader.dataset._get_answer_vectors(method_choice, self.avocab)

                # normalize in row and then concate then
                answer_var = F.normalize(answer_var, p=2, dim=1)
                if previous_var is not None:
                    previous_var = torch.cat([previous_var, answer_var], dim=1)
                else:
                    previous_var = answer_var
            self.answer_var = Variable(previous_var.float()).cuda()

        # warm up (ref: ramen)
        self.gradual_warmup_steps = [i * self.args.TRAIN.lr for i in torch.linspace(0.5, 2.0, 7)]
        self.lr_decay_epochs = range(14, 47, self.args.TRAIN.lr_decay_step)

        # if test:
        if self.args.now_test:
            self.args.TRAIN.epochs = 2

        for epoch in range(self.args.TRAIN.epochs):

            self.early_stop += 1
            if self.args.patience < self.early_stop:
                # early stop
                break
            # warm up
            if epoch < len(self.gradual_warmup_steps):
                self.optimizer.param_groups[0]['lr'] = self.gradual_warmup_steps[epoch]
            elif epoch in self.lr_decay_epochs:
                self.optimizer.param_groups[0]['lr'] *= self.args.TRAIN.lr_decay_rate

            self.train_metrics = Metrics()
            self.val_metrics = Metrics()
            self.zsl_metrics = Metrics()
            # use TOP50 metrics for fact mapping:
            if self.args.fact_map == 1:
                self.train_metrics = Metrics(topnum=50)
                self.val_metrics = Metrics(topnum=50)
                self.zsl_metrics = Metrics(topnum=50)

            # train
            if not self.args.now_test:
                ######## begin training!! #######
                self.train(epoch)
                #################################
                lr = self.optimizer.param_groups[0]['lr']
                # recode:
                logger.info(
                    f'Train Epoch {epoch}: LOSS={self.train_metrics.total_loss: .5f}, lr={lr: .6f}, acc1={self.train_metrics.acc_1: .2f},acc3={self.train_metrics.acc_3: .2f},acc10={self.train_metrics.acc_10: .2f}')
            # eval
            if epoch % 1 == 0 and epoch > 0:
                ######## begin evaling!! #######
                self.eval(epoch)
                #################################
                logger.info('#################################################################################################################')
                logger.info(f'Test Epoch {epoch}: LOSS={self.val_metrics.total_loss: .5f}, acc1={self.val_metrics.acc_1: .2f}, acc3={self.val_metrics.acc_3: .2f}, acc10={self.val_metrics.acc_10: .2f}')
                if args.ZSL and not self.args.fact_map and not args.relation_map:
                    logger.info(f'Zsl Epoch {epoch}: LOSS={self.zsl_metrics.total_loss: .5f}, acc1={self.zsl_metrics.acc_1: .2f}, acc3={self.zsl_metrics.acc_3: .2f}, acc10={self.zsl_metrics.acc_10: .2f}')
                logger.info('#################################################################################################################')

                # add 0.1 accuracy punishment, avoid for too much attention on hit@10 acc
                # 添加0.1的精读惩罚, 防止模型过多的关注hit@10 acc
                if self.val_metrics.total_loss < (self.correspond_loss - 1) or self.val_metrics.acc_all > (self.max_acc[3] + 0.2):
                    # reset early_stop and updata
                    self.early_stop = 0
                    self.best_epoch = epoch
                    self.correspond_loss = self.val_metrics.total_loss
                    self._updata_best_result(self.max_acc, self.val_metrics)

                    self.best_fusion_model = copy.deepcopy(self.fusion_model)
                    self.best_answer_net = copy.deepcopy(self.answer_net)

                    # ZSL result
                    if args.ZSL and not self.args.fact_map and not args.relation_map:
                        self._updata_best_result(self.max_zsl_acc, self.zsl_metrics)

                if not args.no_tensorboard and not self.args.now_test:
                    writer.add_scalar('loss', self.val_metrics.total_loss, epoch)
                    writer.add_scalar('acc1', self.val_metrics.acc_1, epoch)
                    writer.add_scalar('acc3', self.val_metrics.acc_3, epoch)
                    writer.add_scalar('acc10', self.val_metrics.acc_10, epoch)

        # save the model
        if not self.args.now_test and self.args.save_model:
            self.fusion_model_path = self._save_model(self.best_fusion_model, "fusion")
            self.answer_net_path = self._save_model(self.best_answer_net, "embedding")

    def train(self, epoch):
        self.fusion_model.train()
        self.answer_net.train()
        prefix = "train"
        tq = tqdm(self.train_loader, desc='{} E{:03d}'.format(prefix, epoch), ncols=0)

        for visual_features, boxes, question_features, answers, idx, q_len in tq:
            visual_features = Variable(visual_features.float()).cuda()
            boxes = Variable(boxes.float()).cuda()
            question_features = Variable(question_features).cuda()
            answers = Variable(answers).cuda()
            q_len = Variable(q_len).cuda()
            fusion_embedading = self.fusion_model(visual_features, boxes, question_features, q_len)

            # Classifier-based methods
            if args.method_choice == 'CLS':
                # TODO: Normalization?
                predicts = self.answer_net(fusion_embedading)
                loss = instance_bce_with_logits(predicts, answers / 10)
            # Mapping-based methods
            else:
                answer_embedding = self.answer_net(self.answer_var)
                # notice the temperature (correspoding to specific score)
                predicts = cosine_sim(fusion_embedading, answer_embedding) / self.args.loss_temperature
                predicts = predicts.to(torch.float64)
                nll = -self.log_softmax(predicts).to(torch.float64)
                # loss = (nll * answers[0] / answers[0].sum(1, keepdim=True)).sum(dim=1).mean()
                loss = (nll * answers / answers.sum(1, keepdim=True)).sum(dim=1).mean()

            self.optimizer.zero_grad()
            loss.backward()
            self.optimizer.step()
            self.train_metrics.update_per_batch(loss, answers.data, predicts.data)
        self.train_metrics.update_per_epoch()

    def eval(self, epoch):
        self.fusion_model.eval()
        self.answer_net.eval()
        prefix = "eval"
        tq = tqdm(self.val_loader, desc='{} E{:03d}'.format(prefix, epoch), ncols=0)

        for visual_features, boxes, question_features, answers, idx, q_len in tq:
            with torch.no_grad():
                visual_features = Variable(visual_features.float()).cuda()
                boxes = Variable(boxes.float()).cuda()
                question_features = Variable(question_features).cuda()
                answers = Variable(answers).cuda()
                q_len = Variable(q_len).cuda()
                fusion_embedading = self.fusion_model(visual_features, boxes, question_features, q_len)

                if args.method_choice == 'CLS':
                    predicts = self.answer_net(fusion_embedading)
                    loss = instance_bce_with_logits(predicts, answers / 10)

                else:
                    answer_embedding = self.answer_net(self.answer_var)
                    predicts = cosine_sim(fusion_embedading, answer_embedding) / self.args.loss_temperature
                    predicts = predicts.to(torch.float64)
                    nll = -self.log_softmax(predicts).to(torch.float64)
                    loss = (nll * answers / answers.sum(1, keepdim=True)).sum(dim=1).mean()

                if args.ZSL == 1 and not self.args.fact_map and not args.relation_map:
                    # if predicts.shape[0] != self.negtive_mux.shape[0]:
                    #     pdb.set_trace()
                    zsl_predicts = predicts + self.negtive_mux[:predicts.shape[0], :]

            self.val_metrics.update_per_batch(loss, answers.data, predicts.data)
            if args.ZSL == 1 and not self.args.fact_map and not args.relation_map:
                self.zsl_metrics.update_per_batch(loss, answers.data, zsl_predicts.data)

        self.val_metrics.update_per_epoch()
        if args.ZSL == 1 and not self.args.fact_map and not args.relation_map:
            self.zsl_metrics.update_per_epoch()

    def _model_choice(self, args):
        assert args.fusion_model in ['SAN', 'MLP', 'BAN', 'UD']
        # models api
        self.fusion_model = getattr(fusion_net, args.fusion_model)(args, self.train_loader.dataset,
                                                                   self.question_word2vec).cuda()
        # freeze word embedding
        if args.freeze_w2v and args.fusion_model != 'MLP':
            freeze_layer(self.fusion_model.w_emb)

        # answer models
        assert args.method_choice in ['CLS', 'W2V', 'KG', 'GAE', 'KG_W2V', 'KG_GAE', 'GAE_W2V', 'KG_GAE_W2V']
        ans_len_table = {'W2V': 300, 'KG': 300, 'GAE': 1024, 'CLS': 0}
        self.method_list = args.method_choice.split('_')
        self.method_list.sort()
        for i in self.method_list:
            args.ans_feature_len += ans_len_table[i]
        # Mapping-based methods
        if args.method_choice != 'CLS':
            assert args.answer_embedding in ['MLP']
            self.answer_net = getattr(answer_net, args.answer_embedding)(args, self.train_loader.dataset).cuda()
        else:
            # Classifier-based methods
            self.answer_net = SimpleClassifier(args.embedding_size, 2 * args.hidden_size, args.FVQA.max_ans, 0.5).cuda()

    def _updata_best_result(self, max_acc, metrics):
        max_acc[3] = metrics.acc_all
        max_acc[2] = metrics.acc_10
        max_acc[1] = metrics.acc_3
        max_acc[0] = metrics.acc_1

    def _load_model(self, model, function):
        assert function == "fusion" or function == "embedding"
        # support entity mapping
        if self.args.fact_map:
            target = "fact"
        # relation mapping
        elif self.args.relation_map:
            target = "relation"
        else:
            target = "answer"
        model_name = type(model).__name__
        if not self.args.ZSL:
            target = "general_" + target
        save_path = os.path.join(self.args.FVQA.model_save_path, function)
        save_path = os.path.join(save_path, f'{target}_{model_name}_{self.args.FVQA.data_choice}.pkl')

        model.load_state_dict(torch.load(save_path))
        print(f"loading {function} model done!")

    def _save_model(self, model, function):
        assert function == "fusion" or function == "embedding"
        if self.args.fact_map:
            target = "fact"
        elif self.args.relation_map:
            target = "relation"
        else:
            target = "answer"
        model_name = type(model).__name__
        if not self.args.ZSL:
            target = "general_" + target
        save_path = os.path.join(self.args.FVQA.model_save_path, function)
        os.makedirs(save_path, exist_ok=True)
        save_path = os.path.join(save_path, f'{target}_{model_name}_{self.args.FVQA.data_choice}.pkl')

        torch.save(model.state_dict(), save_path)
        return save_path


if __name__ == '__main__':
    # Config loading...
    cfg = cfg()
    args = cfg.get_args()
    cfg.update_train_configs(args)
    set_seed(cfg.random_seed)

    # Environment initialization...
    logger = initialize_exp(cfg)
    logger_path = get_dump_path(cfg)
    if not cfg.no_tensorboard:
        writer = SummaryWriter(log_dir=os.path.join(logger_path, 'tensorboard'))

    torch.cuda.set_device(cfg.gpu_id)

    # Run...
    runner = Runner(cfg)
    runner.run()

    #  information output:
    logger.info(f"best performance = {runner.max_acc[0]: .2f},{runner.max_acc[1]: .2f},{runner.max_acc[2]: .2f}. best epoch = {runner.best_epoch}, correspond_loss={runner.correspond_loss: .4f}")
    if args.ZSL == 1 and not args.fact_map and not args.relation_map:
        logger.info(f" zsl performance = {runner.max_zsl_acc[0]: .2f},{runner.max_zsl_acc[1]: .2f},{runner.max_zsl_acc[2]: .2f}")
    if not cfg.now_test:
        logger.info(f" fusion_model_path = {runner.fusion_model_path}")
        logger.info(f" answer_net_path = {runner.answer_net_path}")
    if not cfg.no_tensorboard:
        writer.close()

Overwriting /content/drive/MyDrive/VQA/code/main.py


In [None]:
!ls

cfgs  code  data  kg  run.ipynb


###### config.py


In [None]:
%%writefile /content/drive/MyDrive/VQA/code/config.py
import argparse

Overwriting /content/drive/MyDrive/VQA/code/config.py


## Model


###### \_\_init__.py

In [None]:
%%writefile /content/drive/MyDrive/VQA/code/model/__init__.py
from .attention import BiAttention
from .classifier import SimpleClassifier
from .counting import Counter
from .fc import FCNet, BCNet

Writing /content/drive/MyDrive/VQA/code/model/__init__.py


###### answer_net.py

In [None]:
%%writefile /content/drive/MyDrive/VQA/code/model/answer_net.py
from answer_net.mlp import MLP

Writing /content/drive/MyDrive/VQA/code/model/answer_net.py


###### fusion_net.py

In [None]:
%%writefile /content/drive/MyDrive/VQA/code/model/fusion_net.py
from fusion_net.updn import UD
from fusion_net.ban import BAN
from fusion_net.san import SAN
from fusion_net.mlp import MLP

###### vector.py

In [None]:
%%writefile /content/drive/MyDrive/VQA/code/model/vector.py
import array
import zipfile
from tqdm import tqdm
from six.moves.urllib.request import urlretrieve
import os
import os.path as osp
import torch
import io

class Vector(object):
    def __init__(self, cache_path,
                 vector_type='glove.840B', unk_init=torch.Tensor.zero_) -> object:
        urls = {
            'glove.42B': 'http://nlp.stanford.edu/data/glove.42B.300d.zip',
            'glove.840B': 'http://nlp.stanford.edu/data/glove.840B.300d.zip',
            'glove.6B': 'http://nlp.stanford.edu/data/glove.6B.zip',
        }
        url = urls[vector_type] if urls.get(vector_type, False) != False else None
        name = osp.splitext(osp.basename(url))[0] + '.txt'  # glove.840B.300d.txt

        self.unk_init = unk_init
        self.cache(name, cache_path, url=url)

    def __getitem__(self, token):
        if self.stoi.get(token, -1) != -1:
            return self.vectors[self.stoi[token]]
        else:
            return self.unk_init(torch.Tensor(1, self.dim))

    def _prepare(self, vocab):
        word2vec = torch.Tensor(len(vocab), self.dim)
        for token, idx in vocab.items():
            word2vec[idx, :] = self[token]

        return word2vec

    def check(self, token):
        if self.stoi.get(token, -1) != -1:
            return True
        else:
            return False

    def cache(self, name, cache_path, url=None):
        # cache_path='.vector_cache',
        #name= "glove.840B.300d.txt"
        #url = 'http://nlp.stanford.edu/data/glove.840B.300d.zip'

        path = osp.join(cache_path, name)
        path_pt = "{}.pt".format(path)

        if not osp.isfile(path_pt):
            # download vocab file if it does not exists
            if not osp.exists(path) and url:
                dest = osp.join(cache_path, os.path.basename(url))
                if not osp.exists(dest):
                    print('[-] Downloading vectors from {}'.format(url))
                    if not osp.exists(cache_path):
                        os.mkdir(cache_path)

                    with tqdm(unit='B', unit_scale=True, miniters=1, desc=dest) as t:
                        urlretrieve(url, dest, reporthook=reporthook(t))

                print('[-] Extracting vectors into {}'.format(path))
                ext = os.path.splitext(dest)[1][1:]
                if ext == 'zip':
                    with zipfile.ZipFile(dest, "r") as zf:
                        zf.extractall(cache_path)

            if not os.path.isfile(path):
                raise RuntimeError('no vectors found at {}'.format(path))

            # build vocab list
            itos, vectors, dim = [], array.array(str('d')), None

            # Try to read the whole file with utf-8 encoding.
            binary_lines = False
            try:
                with io.open(path, encoding="utf8") as f:
                    lines = [line for line in f]
            # If there are malformed lines, read in binary mode
            # and manually decode each word from utf-8
            except:
                print("[!] Could not read {} as UTF8 file, "
                      "reading file as bytes and skipping "
                      "words with malformed UTF8.".format(path))
                with open(path, 'rb') as f:
                    lines = [line for line in f]
                binary_lines = True

            print("[-] Loading vectors from {}".format(path))  # 读取vector
            for line in tqdm(lines, total=len(lines)):
                # Explicitly splitting on " " is important, so we don't
                # get rid of Unicode non-breaking spaces in the vectors.
                entries = line.rstrip().split(" ")
                word, entries = entries[0], entries[1:]
                if dim is None and len(entries) > 1:
                    dim = len(entries)
                elif len(entries) == 1:
                    print("Skipping token {} with 1-dimensional "
                          "vector {}; likely a header".format(word, entries))
                    continue
                elif dim != len(entries):
                    raise RuntimeError(
                        "Vector for token {} has {} dimensions, but previously "
                        "read vectors have {} dimensions. All vectors must have "
                        "the same number of dimensions.".format(word, len(entries), dim))

                vectors.extend(float(x) for x in entries)
                itos.append(word)

            self.itos = itos
            self.stoi = {word: i for i, word in enumerate(itos)}
            self.vectors = torch.Tensor(vectors).view(-1, dim)
            self.dim = dim
            print('* Caching vectors to {}'.format(path_pt))
            torch.save((self.itos, self.stoi, self.vectors, self.dim), path_pt)
        else:
            print('* Loading vectors to {}'.format(path_pt))
            self.itos, self.stoi, self.vectors, self.dim = torch.load(path_pt)

Overwriting /content/drive/MyDrive/VQA/code/model/vector.py


###### attetion.py

In [None]:
%%writefile /content/drive/MyDrive/VQA/code/model/attention.py
import torch
import torch.nn as nn
from torch.nn.utils.weight_norm import weight_norm
from .fc import FCNet, BCNet
import torch.nn.functional as F

class BaseAttention(nn.Module):
    def __init__(self, v_dim, q_dim, num_hid):
        super(BaseAttention, self).__init__()
        self.nonlinear = FCNet([v_dim + q_dim, num_hid])
        self.linear = weight_norm(nn.Linear(num_hid, 1), dim=None)

    def forward(self, v, q):
        """
        v: [batch, k, vdim]
        q: [batch, qdim]
        """
        logits = self.logits(v, q)
        w = nn.functional.softmax(logits, 1)
        return w

    def logits(self, v, q):
        num_objs = v.size(1)
        q = q.unsqueeze(1).repeat(1, num_objs, 1)
        vq = torch.cat((v, q), 2)
        joint_repr = self.nonlinear(vq)
        logits = self.linear(joint_repr)
        return logits


class UpDnAttention(nn.Module):
    def __init__(self, v_dim, q_dim, num_hid, dropout=0.2):
        super(UpDnAttention, self).__init__()

        self.v_proj = FCNet([v_dim, num_hid])
        self.q_proj = FCNet([q_dim, num_hid])
        self.dropout = nn.Dropout(dropout)
        self.linear = weight_norm(nn.Linear(num_hid, 1), dim=None)

    def forward(self, v, q):
        """
        v: [batch, k, vdim]
        q: [batch, qdim]
        """
        logits = self.logits(v, q)
        w = nn.functional.softmax(logits, 1)
        return w

    def logits(self, v, q):
        batch, k, _ = v.size()
        v_proj = self.v_proj(v)  # [batch, k, qdim]
        q_proj = self.q_proj(q).unsqueeze(1).repeat(1, k, 1)
        joint_repr = v_proj * q_proj
        joint_repr = self.dropout(joint_repr)
        logits = self.linear(joint_repr)
        return logits

class SanAttention(nn.Module):
  def __init__(self, v_features, q_features, mid_features, glimpses, drop=0.0):
    super(SanAttention, self).__init__()
    self.v_conv = nn.Conv2d(v_features, mid_features, 1, bias=False)  # let self.lin take care of bias
    self.q_lin = nn.Linear(q_features, mid_features)
    self.x_conv = nn.Conv2d(mid_features, glimpses, 1)

    self.drop = nn.Dropout(drop)
    self.relu = nn.LeakyReLU(inplace=True)

  def forward(self, v, q):
    v = self.v_conv(self.drop(v))
    q = self.q_lin(self.drop(q))
    q = tile_2d_over_nd(q, v)
    x = self.relu(v + q)
    x = self.x_conv(self.drop(x))
    return x

def tile_2d_over_nd(feature_vector, feature_map):
  """ Repeat the same feature vector over all spatial positions of a given feature map.
    The feature vector should have the same batch size and number of features as the feature map.
  """
  n, c = feature_vector.size()
  spatial_size = feature_map.dim() - 2
  tiled = feature_vector.view(n, c, *([1] * spatial_size)).expand_as(feature_map)
  return tiled

def apply_attention(input, attention):
  """ Apply any number of attention maps over the input.
    The attention map has to have the same size in all dimensions except dim=1.
  """
  # import pdb
  # pdb.set_trace()
  n, c = input.size()[:2]
  glimpses = attention.size(1)

  # flatten the spatial dims into the third dim, since we don't need to care about how they are arranged
  input = input.view(n, c, -1)
  attention = attention.view(n, glimpses, -1)
  s = input.size(2)

  # apply a softmax to each attention map separately
  # since softmax only takes 2d inputs, we have to collapse the first two dimensions together
  # so that each glimpse is normalized separately
  attention = attention.view(n * glimpses, -1)
  attention = F.softmax(attention)

  # apply the weighting by creating a new dim to tile both tensors over
  target_size = [n, glimpses, c, s]
  input = input.view(n, 1, c, s).expand(*target_size)
  attention = attention.view(n, glimpses, 1, s).expand(*target_size)
  weighted = input * attention
  # sum over only the spatial dimension
  weighted_mean = weighted.sum(dim=3)
  # the shape at this point is (n, glimpses, c, 1)
  return weighted_mean.view(n, -1)


class BiAttention(nn.Module):
    def __init__(self, x_dim, y_dim, z_dim, glimpse, dropout=[.2, .5]):
        super(BiAttention, self).__init__()

        self.glimpse = glimpse
        self.logits = weight_norm(BCNet(x_dim, y_dim, z_dim, glimpse, dropout=dropout, k=3),
                                  name='h_mat', dim=None)

    def forward(self, v, q, v_mask=True):
        """
        v: [batch, k, vdim]
        q: [batch, qdim]
        """
        p, logits = self.forward_all(v, q, v_mask)
        return p, logits

    def forward_all(self, v, q, v_mask=True):
        v_num = v.size(1)
        q_num = q.size(1)
        logits = self.logits(v, q)  # b x g x v x q

        if v_mask:
            mask = (0 == v.abs().sum(2)).unsqueeze(1).unsqueeze(3).expand(logits.size())
            logits.data.masked_fill_(mask.data, -float('inf'))

        p = nn.functional.softmax(logits.view(-1, self.glimpse, v_num * q_num), 2)
        return p.view(-1, self.glimpse, v_num, q_num), logits

Overwriting /content/drive/MyDrive/VQA/code/model/attention.py


###### language_model.py

In [None]:
%%writefile /content/drive/MyDrive/VQA/code/model/language_model.py
import torch
import torch.nn as nn
from torch.autograd import Variable
import numpy as np
from torch.nn.utils.rnn import pack_padded_sequence
import torch.nn.init as init
import pdb


class WordEmbedding(nn.Module):
    """Word Embedding

    The ntoken-th dim is used for padding_idx, which agrees *implicitly*
    with the definition in Dictionary.
    """

    def __init__(self, ntoken, emb_dim, dropout=0):
        super(WordEmbedding, self).__init__()
        self.emb = nn.Embedding(ntoken + 1, emb_dim, padding_idx=ntoken)
        self.dropout = nn.Dropout(dropout)
        self.ntoken = ntoken
        self.emb_dim = emb_dim

    def init_embedding(self, np_file):
        # weight_init = torch.from_numpy(np.load(np_file))
        weight_init = np_file
        assert weight_init.shape == (self.ntoken, self.emb_dim)
        self.emb.weight.data[:self.ntoken] = weight_init

    def forward(self, x):
        emb = self.emb(x)
        emb = self.dropout(emb)
        return emb


class UpDnQuestionEmbedding(nn.Module):
    def __init__(self, in_dim, num_hid, nlayers, bidirect, dropout=0, rnn_type='GRU'):
        """Module for question embedding
        """
        super(UpDnQuestionEmbedding, self).__init__()
        assert rnn_type == 'LSTM' or rnn_type == 'GRU'
        rnn_cls = nn.LSTM if rnn_type == 'LSTM' else nn.GRU

        self.rnn = rnn_cls(
            in_dim, num_hid, nlayers,
            bidirectional=bidirect,
            dropout=dropout,
            batch_first=True)

        self.in_dim = in_dim
        self.num_hid = num_hid
        self.nlayers = nlayers
        self.rnn_type = rnn_type
        self.ndirections = 1 + int(bidirect)

    def init_hidden(self, batch):
        # just to get the type of tensor
        weight = next(self.parameters()).data
        hid_shape = (self.nlayers * self.ndirections, batch, self.num_hid)
        if self.rnn_type == 'LSTM':
            return (Variable(weight.new(*hid_shape).zero_()),
                    Variable(weight.new(*hid_shape).zero_()))
        else:
            return Variable(weight.new(*hid_shape).zero_())

    def forward(self, x):
        # x: [batch, sequence, in_dim]
        batch = x.size(0)
        hidden = self.init_hidden(batch)
        self.rnn.flatten_parameters()
        output, hidden = self.rnn(x, hidden)

        if self.ndirections == 1:
            return output[:, -1]

        forward_ = output[:, -1, :self.num_hid]
        backward = output[:, 0, self.num_hid:]
        return torch.cat((forward_, backward), dim=1)

    def forward_all(self, x):
        # x: [batch, sequence, in_dim]
        batch = x.size(0)
        hidden = self.init_hidden(batch)
        self.rnn.flatten_parameters()
        output, hidden = self.rnn(x, hidden)
        return output


class QuestionEmbedding(nn.Module):
    def __init__(self, in_dim, num_hid, nlayers=1, bidirect=True, dropout=0, rnn_type='GRU', words_dropout=None,
                 dropout_before_rnn=None,
                 dropout_after_rnn=None):
        """Module for question embedding
        """
        super(QuestionEmbedding, self).__init__()
        assert rnn_type == 'LSTM' or rnn_type == 'GRU'
        rnn_cls = nn.LSTM if rnn_type == 'LSTM' else nn.GRU
        self.bidirect = bidirect
        self.ndirections = 1 + int(bidirect)
        if bidirect:
            num_hid = int(num_hid / 2)
        self.words_dropout = words_dropout
        if dropout_before_rnn is not None:
            self.dropout_before_rnn = nn.Dropout(p=dropout_before_rnn)
        else:
            self.dropout_before_rnn = None
        self.rnn = rnn_cls(
            in_dim, num_hid, nlayers,
            bidirectional=bidirect,
            dropout=dropout,
            batch_first=True)
        if dropout_after_rnn is not None:
            self.dropout_after_rnn = nn.Dropout(p=dropout_after_rnn)
        else:
            self.dropout_after_rnn = None

        self.in_dim = in_dim
        self.num_hid = num_hid
        self.nlayers = nlayers
        self.rnn_type = rnn_type

    def init_hidden(self, batch):
        # just to get the type of tensor
        weight = next(self.parameters()).data
        hid_shape = (self.nlayers * self.ndirections, batch, self.num_hid)
        if self.rnn_type == 'LSTM':
            return (Variable(weight.new(*hid_shape).zero_()),
                    Variable(weight.new(*hid_shape).zero_()))
        else:
            return Variable(weight.new(*hid_shape).zero_())

    def forward(self, x, qlen=None):
        # x: [batch, sequence, in_dim]
        batch = x.size(0)
        num_tokens = x.size(1)
        if self.words_dropout is not None and self.words_dropout > 0:
            num_dropout = int(self.words_dropout * num_tokens)
            rand_ixs = np.random.randint(0, num_tokens, (batch, num_dropout))
            for bix, token_ixs in enumerate(rand_ixs):
                x[bix, token_ixs] *= 0
        hidden = self.init_hidden(batch)
        self.rnn.flatten_parameters()
        if self.dropout_before_rnn is not None:
            x = self.dropout_before_rnn(x)

        q_words_emb, hidden = self.rnn(x, hidden)  # q_words_emb: B x num_words x gru_dim, hidden: 1 x B x gru_dim

        out = None
        if self.bidirect:
            forward_ = q_words_emb[:, -1, :self.num_hid]
            backward = q_words_emb[:, 0, self.num_hid:]
            hid = torch.cat((forward_, backward), dim=1)
            out = hid
            # return q_words_emb, hid
        else:
            out = q_words_emb[:, -1]
            # return q_words_emb, q_words_emb[:, -1]

        if self.dropout_after_rnn is not None:
            out = self.dropout_after_rnn(out)
        return out

class Seq2SeqRNN(nn.Module):
  def __init__(self, input_features, rnn_features, num_layers=1, drop=0.0,
               rnn_type='LSTM', rnn_bidirectional=False):
    super(Seq2SeqRNN, self).__init__()
    self.bidirectional = rnn_bidirectional

    if rnn_type == 'LSTM':
      self.rnn = nn.LSTM(input_size=input_features,
                hidden_size=rnn_features, dropout=drop,
                num_layers=num_layers, batch_first=True,
                bidirectional=rnn_bidirectional)
    elif rnn_type == 'GRU':
      self.rnn = nn.GRU(input_size=input_features,
                hidden_size=rnn_features, dropout=drop,
                num_layers=num_layers, batch_first=True,
                bidirectional=rnn_bidirectional)
    else:
      raise ValueError('Unsupported Type')

    self.init_weight(rnn_bidirectional, rnn_type)

  def init_weight(self, bidirectional, rnn_type):
    self._init_rnn(self.rnn.weight_ih_l0, rnn_type)
    self._init_rnn(self.rnn.weight_hh_l0, rnn_type)
    self.rnn.bias_ih_l0.data.zero_()
    self.rnn.bias_hh_l0.data.zero_()

    if bidirectional:
      self._init_rnn(self.rnn.weight_ih_l0_reverse, rnn_type)
      self._init_rnn(self.rnn.weight_hh_l0_reverse, rnn_type)
      self.rnn.bias_ih_l0_reverse.data.zero_()
      self.rnn.bias_hh_l0_reverse.data.zero_()

  def _init_rnn(self, weight, rnn_type):
    chunk_size = 4 if rnn_type == 'LSTM' else 3
    for w in weight.chunk(chunk_size, 0):
      init.xavier_uniform(w)

  def forward(self, q_emb, q_len):
    lengths = torch.LongTensor(q_len)
    lens, indices = torch.sort(lengths, 0, True)

    packed = pack_padded_sequence(q_emb[indices.cuda()], lens.tolist(), batch_first=True)
    if isinstance(self.rnn, nn.LSTM):
        # pdb.set_trace()
        _, ( outputs, _ ) = self.rnn(packed)
    elif isinstance(self.rnn, nn.GRU):
        _, outputs = self.rnn(packed)

    if self.bidirectional:
      outputs = torch.cat([ outputs[0, :, :], outputs[1, :, :] ], dim=1)
    else:
      outputs = outputs.squeeze(0)

    _, _indices = torch.sort(indices, 0)
    outputs = outputs[_indices.cuda()]

    return outputs

Overwriting /content/drive/MyDrive/VQA/code/model/language_model.py


#### Fusion Network


###### mlp.py

In [None]:
%%writefile /content/drive/MyDrive/VQA/code/model/fusion_net/mlp.py
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.nn.init as init
from torch.autograd import Variable

# from .fc import GroupMLP
# from .language_model import WordEmbedding
from model import GroupMLP
from model import WordEmbedding
from utils import freeze_layer


class MLP(nn.Module):
    #args, self.train_loader.dataset, self.question_word2vec
    # def __init__(self, args, dataset, question_word2vec):
    def __init__(self, args, dataset, embedding_weights=None, rnn_bidirectional=True):
        super(MLP, self).__init__()
        embedding_requires_grad = not args.freeze_w2v  # freeze 则不需要grad
        question_features = 300
        vision_features = args.output_features  # 图片的

        # self.text = BagOfWordsMLPProcessor(
        self.text = BagOfWordsProcessor(
            embedding_tokens=embedding_weights.size(0) if embedding_weights is not None else dataset.num_tokens,
            embedding_weights=embedding_weights,
            embedding_features=300,
            embedding_requires_grad=embedding_requires_grad,
        )
        self.mlp = GroupMLP(
            in_features=vision_features + question_features,
            mid_features= 4 * args.hidden_size,
            out_features=args.embedding_size,
            drop=0.5,
            groups=64,
        )

        for m in self.modules():
            if isinstance(m, nn.Linear) or isinstance(m, nn.Conv2d):
                init.xavier_uniform(m.weight)
                if m.bias is not None:
                    m.bias.data.zero_()

    def forward(self, v, b, q, q_len):
        q = F.normalize(self.text(q, list(q_len.data)), p=2, dim=1)  # 问题向量求平均值
        v = F.normalize(F.avg_pool2d(v, (v.size(2), v.size(3))).squeeze(), p=2, dim=1)

        combined = torch.cat([v, q], dim=1)
        embedding = self.mlp(combined)
        return embedding


class BagOfWordsProcessor(nn.Module):
    def __init__(self, embedding_tokens, embedding_features,
                 embedding_weights, embedding_requires_grad):
        super(BagOfWordsProcessor, self).__init__()
        self.embedding = nn.Embedding(embedding_tokens, embedding_features, padding_idx=0)
        self.embedding.weight.data = embedding_weights
        self.embedding.weight.requires_grad = embedding_requires_grad

    def forward(self, q, q_len):
        embedded = self.embedding(q)
        q_len = Variable(torch.Tensor(q_len).view(-1, 1) + 1e-12, requires_grad=False).cuda()

        return torch.div(torch.sum(embedded, 1), q_len)

Writing /content/drive/MyDrive/VQA/code/model/fusion_net/mlp.py


###### ban.py

In [None]:
%%writefile /content/drive/MyDrive/VQA/code/model/fusion_net/ban.py
"""
Bilinear Attention Networks
Jin-Hwa Kim, Jaehyun Jun, Byoung-Tak Zhang
https://arxiv.org/abs/1805.07932

This code is adapted from: https://github.com/jnhwkim/ban-vqa (written by Jin-Hwa Kim)
"""
import torch.nn as nn

# from .attention import BiAttention
# from .classifier import SimpleClassifier
# from .counting import Counter
# from .fc import FCNet, BCNet
# from .language_model import WordEmbedding, UpDnQuestionEmbedding
# from utils import freeze_layer

from model import BiAttention
from model import SimpleClassifier
from model import Counter
from model import FCNet, BCNet
from model import WordEmbedding, UpDnQuestionEmbedding
from utils import freeze_layer


class BAN(nn.Module):
    #args, self.train_loader.dataset, self.question_word2vec
    # def __init__(self, args, dataset, question_word2vec):
    def __init__(self, args, dataset, question_word2vec):
        super(BAN, self).__init__()
        self.args = args
        self.w_emb = WordEmbedding(question_word2vec.size(0), 300, .0)
        if args.freeze_w2v:
            self.w_emb.init_embedding(question_word2vec)
            freeze_layer(self.w_emb)
        self.q_emb = UpDnQuestionEmbedding(300, args.embedding_size, 1, False, .0)
        self.v_att = BiAttention(args.v_dim, self.q_emb.num_hid, self.q_emb.num_hid, args.glimpse)
        self.b_net = []
        self.q_prj = []
        self.c_prj = []
        self.objects = 10  # minimum number of boxes
        for i in range(args.glimpse):
            self.b_net.append(BCNet(args.v_dim, self.q_emb.num_hid, self.q_emb.num_hid, None, k=1))
            self.q_prj.append(FCNet([self.q_emb.num_hid, self.q_emb.num_hid], '', .2))
            self.c_prj.append(FCNet([self.objects + 1, self.q_emb.num_hid], 'ReLU', .0))

        self.b_net = nn.ModuleList(self.b_net)
        self.q_prj = nn.ModuleList(self.q_prj)
        self.c_prj = nn.ModuleList(self.c_prj)
        self.counter = Counter(self.objects)
        self.drop = nn.Dropout(.5)
        self.tanh = nn.Tanh()

    def forward(self, v, b, q, q_len):
        """Forward

        v: [batch, num_objs, obj_dim]
        b: [batch, num_objs, b_dim]
        q: [batch_size, seq_length]

        return: logits, not probs
        """
        w_emb = self.w_emb(q)
        q_emb = self.q_emb.forward_all(w_emb)  # [batch, q_len, q_dim]
        boxes = b[:, :, :4].transpose(1, 2)

        b_emb = [0] * self.args.glimpse
        att, logits = self.v_att.forward_all(v, q_emb)  # b x g x v x q

        for g in range(self.args.glimpse):
            b_emb[g] = self.b_net[g].forward_with_weights(v, q_emb, att[:, g, :, :])  # b x l x h

            atten, _ = logits[:, g, :, :].max(2)
            embed = self.counter(boxes, atten)

            q_emb = self.q_prj[g](b_emb[g].unsqueeze(1)) + q_emb
            q_emb = q_emb + self.c_prj[g](embed).unsqueeze(1)

        return q_emb.sum(1)

Writing /content/drive/MyDrive/VQA/code/model/fusion_net/ban.py


###### san.py

In [None]:
%%writefile %%writefile /content/drive/MyDrive/VQA/code/model/fusion_net/san.py
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.nn.init as init

from torch.autograd import Variable

# from .attention import SanAttention, apply_attention
# from .fc import GroupMLP
# from .language_model import Seq2SeqRNN, WordEmbedding
from model import SanAttention, apply_attention
from model import GroupMLP
from model import Seq2SeqRNN, WordEmbedding

import pdb
from utils import freeze_layer

class SAN(nn.Module):
    #args, self.train_loader.dataset, self.question_word2vec
    #def __init__(self, args, dataset, question_word2vec):
    def __init__(self, args, dataset,embedding_weights=None,rnn_bidirectional=True):
        super(SAN, self).__init__()
        embedding_requires_grad = not args.freeze_w2v
        question_features = 1024
        rnn_features = int(question_features // 2) if rnn_bidirectional else int(question_features)
        vision_features = args.output_features
        glimpses = 2

        # vocab_size = embedding_weights.size(0)
        # vector_dim = embedding_weights.size(1)
        # self.embedding = nn.Embedding(vocab_size, vector_dim, padding_idx=0)
        # self.embedding.weight.data = embedding_weights
        # self.embedding.weight.requires_grad = embedding_requires_grad
        self.w_emb = WordEmbedding(embedding_weights.size(0), 300, .0)
        if args.freeze_w2v:
            self.w_emb.init_embedding(embedding_weights)
            freeze_layer(self.w_emb)

        self.drop = nn.Dropout(0.5)
        self.text = Seq2SeqRNN(
            input_features=embedding_weights.size(1),
            rnn_features=int(rnn_features),
            rnn_type='LSTM',
            rnn_bidirectional=rnn_bidirectional,
        )
        self.attention = SanAttention(
            v_features=vision_features,
            q_features=question_features,
            mid_features=512,
            glimpses=2,
            drop=0.5,
        )
        self.mlp = GroupMLP(
            in_features=glimpses * vision_features + question_features,
            mid_features= 4 * args.hidden_size,
            out_features=args.embedding_size,
            drop=0.5,
            groups=64,
        )

        for m in self.modules():
            if isinstance(m, nn.Linear) or isinstance(m, nn.Conv2d):
                init.xavier_uniform(m.weight)
                if m.bias is not None:
                    m.bias.data.zero_()



    def forward(self, v, b, q, q_len):
        # pdb.set_trace()
        q = self.text(self.drop(self.w_emb(q)), list(q_len.data))
        # q = self.text(self.embedding(q), list(q_len.data))

        v = F.normalize(v, p=2, dim=1)
        a = self.attention(v, q)
        v = apply_attention(v, a)

        combined = torch.cat([v, q], dim=1)
        embedding = self.mlp(combined)
        return embedding

UsageError: unrecognized arguments: /content/drive/MyDrive/VQA/code/model/fusion_net/san.py


###### updn.py

In [None]:
%%writefile /content/drive/MyDrive/VQA/code/model/fusion_net/updn.py

import torch
import torch.nn as nn

# from .language_model import WordEmbedding, UpDnQuestionEmbedding
# from .attention import UpDnAttention
# from .classifier import SimpleClassifier
# from .fc import FCNet
from model import WordEmbedding, UpDnQuestionEmbedding
from model import UpDnAttention
from model import SimpleClassifier
from model import FCNet
from utils import freeze_layer

class UD(nn.Module):
    def __init__(self, args, dataset, question_word2vec):
        super(UD, self).__init__()
        self.w_emb = WordEmbedding(question_word2vec.size(0), 300, 0.0)
        if args.freeze_w2v:
            self.w_emb.init_embedding(question_word2vec)
            freeze_layer(self.w_emb)
            # self.w_emb.weight.requires_grad = False

        self.q_emb = UpDnQuestionEmbedding(300, args.embedding_size, 1, False, 0.0)
        self.v_att = UpDnAttention(args.v_dim, self.q_emb.num_hid, args.embedding_size)
        self.q_net = FCNet([self.q_emb.num_hid, args.embedding_size])
        self.v_net = FCNet([args.v_dim, args.embedding_size])
        # self.classifier = SimpleClassifier(
        #     args.embedding_size, args.embedding_size * 2, args.num_ans_candidates, 0.5)

    def forward(self, v, b, q, qlen):
        """Forward

        v: [batch, num_objs, obj_dim]
        b: [batch, num_objs, b_dim]
        q: [batch_size, seq_length]

        return: logits, not probs
        """
        # print("q = {}".format(q))
        w_emb = self.w_emb(q)
        # print("w_emb = {}".format(w_emb))
        q_emb = self.q_emb(w_emb)  # [batch, q_dim]

        att = self.v_att(v, q_emb) # [spa, 1]
        v_emb = (att * v).sum(1)  # [batch, v_dim]

        q_repr = self.q_net(q_emb)
        v_repr = self.v_net(v_emb)
        joint_repr = q_repr * v_repr
       # logits = self.classifier(joint_repr)
        return joint_repr

Writing /content/drive/MyDrive/VQA/code/model/fusion_net/updn.py


#### Answer Network

###### fc.py

In [None]:
%%writefile /content/drive/MyDrive/VQA/code/model/fc.py
from __future__ import print_function
import torch.nn as nn
from torch.nn.utils.weight_norm import weight_norm
import torch


class FCNet(nn.Module):
    """Simple class for non-linear fully connect network
    """

    def __init__(self, dims, act='ReLU', dropout=0):
        super(FCNet, self).__init__()

        layers = []
        for i in range(len(dims) - 2):
            in_dim = dims[i]
            out_dim = dims[i + 1]
            if 0 < dropout:
                layers.append(nn.Dropout(dropout))
            layers.append(weight_norm(nn.Linear(in_dim, out_dim), dim=None))
            if '' != act:
                layers.append(getattr(nn, act)())
        if 0 < dropout:
            layers.append(nn.Dropout(dropout))
        layers.append(weight_norm(nn.Linear(dims[-2], dims[-1]), dim=None))
        if '' != act:
            layers.append(getattr(nn, act)())

        self.main = nn.Sequential(*layers)

    def forward(self, x):
        return self.main(x)


class BCNet(nn.Module):
    """Simple class for non-linear bilinear connect network
    """

    def __init__(self, v_dim, q_dim, h_dim, h_out, act='ReLU', dropout=[.2, .5], k=3):
        super(BCNet, self).__init__()

        self.c = 32
        self.k = k
        self.v_dim = v_dim
        self.q_dim = q_dim
        self.h_dim = h_dim
        self.h_out = h_out

        self.v_net = FCNet([v_dim, h_dim * self.k], act=act, dropout=dropout[0])
        self.q_net = FCNet([q_dim, h_dim * self.k], act=act, dropout=dropout[0])
        self.dropout = nn.Dropout(dropout[1])  # attention
        if 1 < k:
            self.p_net = nn.AvgPool1d(self.k, stride=self.k)

        if None == h_out:
            pass
        elif h_out <= self.c:
            self.h_mat = nn.Parameter(torch.Tensor(1, h_out, 1, h_dim * self.k).normal_())
            self.h_bias = nn.Parameter(torch.Tensor(1, h_out, 1, 1).normal_())
        else:
            self.h_net = weight_norm(nn.Linear(h_dim, h_out), dim=None)

    def forward(self, v, q):
        if None == self.h_out:
            v_ = self.v_net(v).transpose(1, 2).unsqueeze(3)
            q_ = self.q_net(q).transpose(1, 2).unsqueeze(2)
            d_ = torch.matmul(v_, q_)  # b x h_dim x v x q
            logits = d_.transpose(1, 2).transpose(2, 3)  # b x v x q x h_dim
            return logits

        # broadcast Hadamard product, matrix-matrix production
        # fast computation but memory inefficient
        # epoch 1, time: 157.84
        elif self.h_out <= self.c:
            v_ = self.dropout(self.v_net(v)).unsqueeze(1)
            q_ = self.q_net(q)
            h_ = v_ * self.h_mat  # broadcast, b x h_out x v x h_dim
            logits = torch.matmul(h_, q_.unsqueeze(1).transpose(2, 3))  # b x h_out x v x q
            logits = logits + self.h_bias
            return logits  # b x h_out x v x q

        # batch outer product, linear projection
        # memory efficient but slow computation
        # epoch 1, time: 304.87
        else:
            v_ = self.dropout(self.v_net(v)).transpose(1, 2).unsqueeze(3)
            q_ = self.q_net(q).transpose(1, 2).unsqueeze(2)
            d_ = torch.matmul(v_, q_)  # b x h_dim x v x q
            logits = self.h_net(d_.transpose(1, 2).transpose(2, 3))  # b x v x q x h_out
            return logits.transpose(2, 3).transpose(1, 2)  # b x h_out x v x q

    def forward_with_weights(self, v, q, w):
        v_ = self.v_net(v).transpose(1, 2).unsqueeze(2)  # b x d x 1 x v
        q_ = self.q_net(q).transpose(1, 2).unsqueeze(3)  # b x d x q x 1
        logits = torch.matmul(torch.matmul(v_, w.unsqueeze(1)), q_)  # b x d x 1 x 1
        logits = logits.squeeze(3).squeeze(2)
        if 1 < self.k:
            logits = logits.unsqueeze(1)  # b x 1 x d
            logits = self.p_net(logits).squeeze(1) * self.k  # sum-pooling
        return logits


class GroupMLP(nn.Module):
    def __init__(self, in_features, mid_features, out_features, drop=0.5, groups=1):
        super(GroupMLP, self).__init__()

        self.conv1 = nn.Conv1d(in_features, mid_features, 1)
        self.drop = nn.Dropout(p=drop)
        self.relu = nn.LeakyReLU()
        self.conv2 = nn.Conv1d(mid_features, out_features, 1, groups=groups)

    def forward(self, a):
        N, C = a.size()
        h = self.relu(self.conv1(a.view(N, C, 1)))
        return self.conv2(self.drop(h)).view(N, -1)


class GroupMLP_1lay(nn.Module):
    def __init__(self, in_features, mid_features, out_features, drop=0.5, groups=1):
        super(GroupMLP_1lay, self).__init__()

        self.conv1 = nn.Conv1d(in_features, mid_features, 1)
        self.batch_norm_fusion = nn.BatchNorm1d(mid_features, affine=False)
        self.drop = nn.Dropout(p=drop)
        self.relu = nn.LeakyReLU()
        self.conv2 = nn.Conv1d(mid_features, out_features, 1, groups=groups)

    def forward(self, a):
        N, C = a.size()
        h = self.conv1(a.view(N, C, 1))
        h = self.batch_norm_fusion(h)
        h = self.relu(h)
        return self.conv2(self.drop(h)).view(N, -1)


class GroupMLP_2lay(nn.Module):
    def __init__(self, in_features, mid_features, out_features, drop=0.5, groups=1):
        super(GroupMLP_2lay, self).__init__()

        self.conv1 = nn.Conv1d(in_features, mid_features, 1)
        self.batch_norm_fusion = nn.BatchNorm1d(mid_features, affine=False)
        self.drop = nn.Dropout(p=drop)
        self.relu = nn.LeakyReLU()
        self.conv2 = nn.Conv1d(mid_features, mid_features, 1, groups=groups)
        self.conv3 = nn.Conv1d(mid_features, out_features, 1, groups=groups)

    def forward(self, a):
        N, C = a.size()
        h = self.conv1(a.view(N, C, 1))
        h = self.relu(h)
        h = self.conv2(h)
        h = self.batch_norm_fusion(h)
        h = self.relu(h)
        return self.conv3(self.drop(h)).view(N, -1)

Writing /content/drive/MyDrive/VQA/code/model/fc.py


# Github


In [46]:
!more /content/token.txt | gh auth login --with-token

In [41]:
%%writefile /content/drive/MyDrive/VQA/.gitignore
kg/
data/

Overwriting /content/drive/MyDrive/VQA/.gitignore


In [3]:
!pwd

/content/drive/MyDrive/VQA


In [9]:
# !git init

[33mhint: Using 'master' as the name for the initial branch. This default branch name[m
[33mhint: is subject to change. To configure the initial branch name to use in all[m
[33mhint: [m
[33mhint: 	git config --global init.defaultBranch <name>[m
[33mhint: [m
[33mhint: Names commonly chosen instead of 'master' are 'main', 'trunk' and[m
[33mhint: 'development'. The just-created branch can be renamed via this command:[m
[33mhint: [m
[33mhint: 	git branch -m <name>[m
Initialized empty Git repository in /content/drive/MyDrive/VQA/.git/


In [10]:
# !git remote add origin https://github.com/LTBach/VQA.git

In [11]:
# !git branch -M main

In [13]:
!git add .

In [14]:
!git commit -m "Add code"

Author identity unknown

*** Please tell me who you are.

Run

  git config --global user.email "you@example.com"
  git config --global user.name "Your Name"

to set your account's default identity.
Omit --global to set the identity only in this repository.

fatal: unable to auto-detect email address (got 'root@07cd57bfa03d.(none)')


In [None]:
!

In [12]:
!git push -u origin main

error: src refspec main does not match any
[31merror: failed to push some refs to 'https://github.com/LTBach/VQA.git'
[m

In [5]:
git remote add origin https://github.com/LTBach/VQA.git
git branch -M main
git push -u origin main

usage: git [--version] [--help] [-C <path>] [-c <name>=<value>]
           [--exec-path[=<path>]] [--html-path] [--man-path] [--info-path]
           [-p | --paginate | -P | --no-pager] [--no-replace-objects] [--bare]
           [--git-dir=<path>] [--work-tree=<path>] [--namespace=<name>]
           [--super-prefix=<path>] [--config-env=<name>=<envvar>]
           <command> [<args>]

These are common Git commands used in various situations:

start a working area (see also: git help tutorial)
   clone     Clone a repository into a new directory
   init      Create an empty Git repository or reinitialize an existing one

work on the current change (see also: git help everyday)
   add       Add file contents to the index
   mv        Move or rename a file, a directory, or a symlink
   restore   Restore working tree files
   rm        Remove files from the working tree and from the index

examine the history and state (see also: git help revisions)
   bisect    Use binary search to find th