# Deep Learning 基礎講座　最終課題: VQA

## 概要
画像と質問から，回答を予測するタスクです．．
- サンプル数: 訓練 19,873 サンプル，テスト 4,969 サンプル
- 入力: 画像データ（RGB，サイズは画像によって異なります），質問文（系列長はサンプルごとに異なります）
- 出力: 回答文（系列長はサンプルごとに異なります）
- 評価指標: VQA での評価指標（[こちら
](https://visualqa.org/evaluation.html)を参照）を利用しています．

### データセット ([VizWiz 2023 edition](https://www.kaggle.com/datasets/nqa112/vizwiz-2023-edition)) の詳細
- 24,842 枚の画像データセットと，各画像に対する 1 つの質問文と 10 人の回答者による回答文から構成されます．
  - 10 人の回答は全て同じとは限りません．
- 24.842 サンプルのうち，80 % (19.873) が訓練データ (train)，20 % (4969) がテストデータ (val) として与えられます．
  - テストデータに対する回答文を正解ラベルとし，配布していません．
  - データ提供元とは異なるデータ分割になっています．

### タスクの詳細
- 本コンペティションでは，与えられた画像と質問文に対して，適切な回答文を出力するモデルを作成していただきます．
- 評価は [VQA](https://visualqa.org/index.html) (Visual Question Answering) に基づいて，以下の式で計算されます．

$$\text{Acc}(ans) = \text{min}(\frac{humans \; that \; said \; ans}{3}, 1)$$

- 1 つのデータに対し， 10 人の回答のうち 9 人の回答を選択し上記の式で性能評価した， 10 パターンの Acc の平均をそのデータに対する Acc とします．
- 予測結果と正解ラベルを比較する前に，回答を lowercase にする，冠詞は削除するなどの前処理を行っています（[詳細](https://visualqa.org/evaluation.html)）．

## 考えられる工夫の例
- 事前学習モデルの fine-tuning
    - 画像特徴量，言語特徴量を取得するときに，事前学習モデルを fine-tuning することで性能向上が見込めます（今回のタスクと大きく異なるデータセットでの事前学習では効果が小さい可能性がありますので注意しましょう）．
- 質問文の表現
    - ベースラインでは，質問文をモデルに入力する際に，one-hot ベクトルにしています．これを tokenizer 等を利用して分散表現にすることで，モデル学習しやすくなります．
- ソフトラベルの利用
    - ベースラインでは 10 人の回答の中で最も多かった回答を正解ラベルとして訓練しています．この点を各回答の頻度に合わせてソフトラベルを利用することで，より多くの情報を利用して学習が可能になります．
- 画像の前処理
    - 画像の前処理には形状を同じにする Resize のみを利用しています．「畳み込みニューラルネットワーク」，「深層学習と画像認識」等で紹介されていたデータ拡張を追加することで，汎化性能の向上が見込めます．

## 修了要件を満たす条件
- ベースラインでは，omnicampus 上での性能評価において， 49.3% となります．したがって，ベースラインを超える 49.4% 以上の提出のみ，修了要件として認めます．
- ベースラインから改善を加えることで， 60% に性能向上することを運営で確認しています．こちらを 1 つの指標として取り組んでみてください．

## 注意点
- 学習するモデルについて制限はありませんが，必ず訓練データで学習したモデルで予測してください．
    - 事前学習済みモデルを利用して，訓練データを fine-tuning しても構いません．
    - 埋め込み抽出モデルなど，モデルの一部を訓練しないケースは構いません．
    - 学習を一切せずに，ChatGPT などの基盤モデルを利用することは禁止とします．

### データの準備
データをダウンロードした際に，google drive したため，利用するために google drive をマウントする必要があります．また， drive 上で展開することができないため，/content ディレクトリ下にコピーし "data.zip" を展開します．  
google drive 上に "data.zip" が配置されていない場合は実行できません．google drive 上に "data.zip" (**12GB**) を配置することが可能であれば，"data_download.ipynb" を先に実行してください．難しい場合は，omnicampus 演習環境を利用してください．．



In [None]:
# omnicampus 上では 4 セル目まで実行不要
# ドライブのマウント
from google.colab import drive
drive.mount('/content/drive')

In [None]:
# データダウンロード用の notebook にてgoogle drive への保存後，
# 反映に時間がかかる可能性がありますので，google drive のマウント後，
# data.zip がディレクトリ内にあることを確認してから実行してください．
# data.zip を /content 下にコピーする
!cp "/content/drive/MyDrive/data.zip" "/content"

In [None]:
# カレントディレクトリ下のファイル群を確認
# data.zip が表示されれば問題ないです
%ls

In [None]:
# データを解凍する
!unzip data.zip

omnicampus 演習環境では，data_download.ipynb のマウント，zip 化，drive へのコピーを実行しないことで，"data.zip" を解凍した形で配置されます．したがって，data ディレクトリが存在するディレクトリをカレントディレクトリとするだけで良いです．



In [None]:
# omnicampus 実行用
# 以下の例では/workspace に data ディレクトリがあると想定
%cd /workspace/VQA

### 1. import library

In [None]:
import re
import random
import time
from statistics import mode

from PIL import Image
import numpy as np
import pandas
import torch
import torch.nn as nn
import torchvision
from torchvision import transforms

### 2. utils

In [None]:
def set_seed(seed):
    """
    シードを固定する．

    Parameters
    ----------
    seed : int
        乱数生成に用いるシード値．
    """
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

In [None]:
def process_text(text):
    """
    入力文と回答のフォーマットを統一するための関数．

    Parameters
    ----------
    text : str
        入力文，もしくは回答．
    """
    # lowercase
    text = text.lower()

    # 数詞を数字に変換
    num_word_to_digit = {
        'zero': '0', 'one': '1', 'two': '2', 'three': '3', 'four': '4',
        'five': '5', 'six': '6', 'seven': '7', 'eight': '8', 'nine': '9',
        'ten': '10'
    }
    for word, digit in num_word_to_digit.items():
        text = text.replace(word, digit)

    # 小数点のピリオドを削除
    text = re.sub(r'(?<!\d)\.(?!\d)', '', text)

    # 冠詞の削除
    text = re.sub(r'\b(a|an|the)\b', '', text)

    # 短縮形のカンマの追加
    contractions = {
        "dont": "don't", "isnt": "isn't", "arent": "aren't", "wont": "won't",
        "cant": "can't", "wouldnt": "wouldn't", "couldnt": "couldn't"
    }
    for contraction, correct in contractions.items():
        text = text.replace(contraction, correct)

    # 句読点をスペースに変換
    text = re.sub(r"[^\w\s':]", ' ', text)

    # 句読点をスペースに変換
    text = re.sub(r'\s+,', ',', text)

    # 連続するスペースを1つに変換
    text = re.sub(r'\s+', ' ', text).strip()

    return text

In [None]:
class VQADataset(torch.utils.data.Dataset):
    """
    VQA データセットを扱うためのクラス．
    """
    def __init__(self, df_path, image_dir, transform=None, answer=True):
        self.transform = transform  # 画像の前処理
        self.image_dir = image_dir  # 画像ファイルのディレクトリ
        self.df = pandas.read_json(df_path)  # 画像ファイルのパス，question, answerを持つDataFrame
        self.answer = answer

        # question / answerの辞書を作成
        self.question2idx = {}
        self.answer2idx = {}
        self.idx2question = {}
        self.idx2answer = {}

        # 質問文に含まれる単語を辞書に追加
        for question in self.df["question"]:
            question = process_text(question)
            words = question.split(" ")
            for word in words:
                if word not in self.question2idx:
                    self.question2idx[word] = len(self.question2idx)
        self.idx2question = {v: k for k, v in self.question2idx.items()}  # 逆変換用の辞書(question)

        if self.answer:
            # 回答に含まれる文章を辞書に追加
            for answers in self.df["answers"]:
                for answer in answers:
                    word = answer["answer"]
                    word = process_text(word)
                    if word not in self.answer2idx:
                        self.answer2idx[word] = len(self.answer2idx)
            self.idx2answer = {v: k for k, v in self.answer2idx.items()}  # 逆変換用の辞書(answer)

    def update_dict(self, dataset):
        """
        検証用データ，テストデータの辞書を訓練データの辞書に更新する．

        Parameters
        ----------
        dataset : Dataset
            訓練データのDataset
        """
        self.question2idx = dataset.question2idx
        self.answer2idx = dataset.answer2idx
        self.idx2question = dataset.idx2question
        self.idx2answer = dataset.idx2answer

    def __getitem__(self, idx):
        """
        対応するidxのデータ（画像，質問，回答）を取得．

        Parameters
        ----------
        idx : int
            取得するデータのインデックス

        Returns
        -------
        image : torch.Tensor  (C, H, W)
            画像データ
        question : torch.Tensor  (vocab_size)
            質問文をone-hot表現に変換したもの
        answers : torch.Tensor  (n_answer)
            10人の回答者の回答のid
        mode_answer_idx : torch.Tensor  (1)
            10人の回答者の回答の中で最頻値の回答のid
        """
        image = Image.open(f"{self.image_dir}/{self.df['image'][idx]}")
        image = self.transform(image)
        question = np.zeros(len(self.idx2question) + 1)  # 未知語用の要素を追加
        question_words = self.df["question"][idx].split(" ")
        for word in question_words:
            try:
                question[self.question2idx[word]] = 1  # one-hot表現に変換
            except KeyError:
                question[-1] = 1  # 未知語

        if self.answer:
            answers = [self.answer2idx[process_text(answer["answer"])] for answer in self.df["answers"][idx]]
            mode_answer_idx = mode(answers)  # 最頻値を取得（正解ラベル）

            return image, torch.Tensor(question), torch.Tensor(answers), int(mode_answer_idx)

        else:
            return image, torch.Tensor(question)

    def __len__(self):
        return len(self.df)

In [None]:
def VQA_criterion(batch_pred, batch_answers):
    """
    VQA タスクに用いられる評価関数．
    """
    total_acc = 0.

    for pred, answers in zip(batch_pred, batch_answers):
        acc = 0.
        for i in range(len(answers)):
            num_match = 0
            for j in range(len(answers)):
                if i == j:
                    continue
                if pred == answers[j]:
                    num_match += 1
            acc += min(num_match / 3, 1)
        total_acc += acc / 10

    return total_acc / len(batch_pred)

### 3. model

In [None]:
class BasicBlock(nn.Module):
    """
    ResNet の basic block
    """
    expansion = 1

    def __init__(self, in_channels, out_channels, stride=1):
        """
        コンストラクタ．

        Parameters
        ----------
        in_channles: int
            入力のチャネル数
        out_channels:
            出力のチャネル数
        stride: int
            ストライド
        """
        super().__init__()

        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, padding=1)
        self.bn2 = nn.BatchNorm2d(out_channels)
        self.relu = nn.ReLU(inplace=True)

        self.shortcut = nn.Sequential()
        if stride != 1 or in_channels != out_channels:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride),
                nn.BatchNorm2d(out_channels)
            )

    def forward(self, x):
        """
        順伝播処理

        Parameters
        ----------
        x: torch.Tensor
            ブロックへの入力

        Returns
        -------
        out: torch.Tensor
            ブロックへの出力
        """
        residual = x
        out = self.relu(self.bn1(self.conv1(x)))
        out = self.bn2(self.conv2(out))

        out += self.shortcut(residual)
        out = self.relu(out)

        return out


class BottleneckBlock(nn.Module):
    """
    ResNet の bottleneck block
    """
    expansion = 4

    def __init__(self, in_channels, out_channels, stride=1):
        """
        コンストラクタ．

        Parameters
        ----------
        in_channles: int
            入力のチャネル数
        out_channels:
            出力のチャネル数
        stride: int
            ストライド
        """
        super().__init__()

        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=1)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=stride, padding=1)
        self.bn2 = nn.BatchNorm2d(out_channels)
        self.conv3 = nn.Conv2d(out_channels, out_channels * self.expansion, kernel_size=1, stride=1)
        self.bn3 = nn.BatchNorm2d(out_channels * self.expansion)
        self.relu = nn.ReLU(inplace=True)

        self.shortcut = nn.Sequential()
        if stride != 1 or in_channels != out_channels * self.expansion:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_channels, out_channels * self.expansion, kernel_size=1, stride=stride),
                nn.BatchNorm2d(out_channels * self.expansion)
            )

    def forward(self, x):
        """
        順伝播処理

        Parameters
        ----------
        x: torch.Tensor
            ブロックへの入力

        Returns
        -------
        out: torch.Tensor
            ブロックへの出力
        """
        residual = x
        out = self.relu(self.bn1(self.conv1(x)))
        out = self.relu(self.bn2(self.conv2(out)))
        out = self.bn3(self.conv3(out))

        out += self.shortcut(residual)
        out = self.relu(out)

        return out


class ResNet(nn.Module):
    """
    ResNet の実装
    """
    def __init__(self, block, layers):
        """
        コンストラクタ．

        Parameters
        ----------
        block: torch.nn.Module
            利用するブロックのクラス (BasicBlock / BottleneckBlock)
        layers: list
            各ブロックの層数
        """
        super().__init__()
        self.in_channels = 64

        self.conv1 = nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3)
        self.bn1 = nn.BatchNorm2d(64)
        self.relu = nn.ReLU(inplace=True)
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)

        self.layer1 = self._make_layer(block, layers[0], 64)
        self.layer2 = self._make_layer(block, layers[1], 128, stride=2)
        self.layer3 = self._make_layer(block, layers[2], 256, stride=2)
        self.layer4 = self._make_layer(block, layers[3], 512, stride=2)

        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(512 * block.expansion, 512)

    def _make_layer(self, block, blocks, out_channels, stride=1):
        """
        同じ構成を繰り返す部分を生成する．

        Parameters
        ----------
        block: torch.nn.Module
            利用するブロックのクラス (BasicBlock / BottleneckBlock)
        blocks: int
            層数
        out_channels: int
            出力のチャネル数
        stride: int
            ストライド

        Returns
        -------
        layers: torch.nn.ModuleList
            生成した層
        """
        layers = []
        layers.append(block(self.in_channels, out_channels, stride))
        self.in_channels = out_channels * block.expansion
        for _ in range(1, blocks):
            layers.append(block(self.in_channels, out_channels))

        return nn.Sequential(*layers)

    def forward(self, x):
        """
        順伝播処理

        Parameters
        ----------
        x: torch.Tensor
            入力データ

        Returns
        -------
        x: torch.Tensor
            ResNet によって生成される特徴量
        """
        x = self.relu(self.bn1(self.conv1(x)))
        x = self.maxpool(x)

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)

        x = self.avgpool(x)
        x = x.view(x.size(0), -1)
        x = self.fc(x)

        return x

In [None]:
def ResNet18():
    """
    ResNet18 を生成する関数．
    """
    return ResNet(BasicBlock, [2, 2, 2, 2])


def ResNet50():
    """
    ResNet50 を生成する関数．
    """
    return ResNet(BottleneckBlock, [3, 4, 6, 3])

In [None]:
class VQAModel(nn.Module):
    """
    VQA タスクを解くためのモデル例．
    """
    def __init__(self, vocab_size: int, n_answer: int):
        """
        コンストラクタ．

        Parameters
        ----------
        vocab_size: int
            入力文の語彙数
        n_answer: int
            出力のクラス数
        """
        super().__init__()
        self.resnet = ResNet18()
        self.text_encoder = nn.Linear(vocab_size, 512)

        self.fc = nn.Sequential(
            nn.Linear(1024, 512),
            nn.ReLU(inplace=True),
            nn.Linear(512, n_answer)
        )

    def forward(self, image, question):

        image_feature = self.resnet(image)  # 画像の特徴量
        question_feature = self.text_encoder(question)  # テキストの特徴量

        x = torch.cat([image_feature, question_feature], dim=1)
        x = self.fc(x)

        return x

### 4. train

In [None]:
def train(model, dataloader, optimizer, criterion, device):
    """
    学習用の関数．

    Parameters
    ----------
    model: torch.nn.Module
        学習するモデル
    dataloader: torch.utils.data.DataLoader
        学習に利用するデータローダ
    optimizer: torch.optim.Optim
        最適化手法
    criterion: torch.nn.Module
        損失関数
    device: torch.device
        学習に利用するデバイス

    Returns
    -------
    total_loss: float
        平均損失
    total_acc: float
        平均正解率
    simple_acc: float
        最頻値に対する正解率（VQA の評価指標とは異なることに注意）
    time: float
        1 エポックの学習にかかった時間 (sec)
    """
    model.train()

    total_loss = 0
    total_acc = 0
    simple_acc = 0

    start = time.time()
    for image, question, answers, mode_answer in dataloader:
        image, question, answer, mode_answer = \
            image.to(device), question.to(device), answers.to(device), mode_answer.to(device)

        pred = model(image, question)
        loss = criterion(pred, mode_answer.squeeze())

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        total_loss += loss.item()
        total_acc += VQA_criterion(pred.argmax(1), answers)  # VQA accuracy
        simple_acc += (pred.argmax(1) == mode_answer).float().mean().item()  # simple accuracy

    return total_loss / len(dataloader), total_acc / len(dataloader), simple_acc / len(dataloader), time.time() - start


def eval(model, dataloader, optimizer, criterion, device):
    """
    学習用の関数．

    Parameters
    ----------
    model: torch.nn.Module
        モデル
    dataloader: torch.utils.data.DataLoader
        評価に利用するデータローダ
    criterion: torch.nn.Module
        損失関数
    device: torch.device
        利用するデバイス

    Returns
    -------
    total_loss: float
        平均損失
    total_acc: float
        平均正解率
    simple_acc: float
        最頻値に対する正解率（VQA の評価指標とは異なることに注意）
    time: float
        1 エポックの評価にかかった時間 (sec)
    """
    model.eval()

    total_loss = 0
    total_acc = 0
    simple_acc = 0

    start = time.time()
    for image, question, answers, mode_answer in dataloader:
        image, question, answer, mode_answer = \
            image.to(device), question.to(device), answers.to(device), mode_answer.to(device)

        pred = model(image, question)
        loss = criterion(pred, mode_answer.squeeze())

        total_loss += loss.item()
        total_acc += VQA_criterion(pred.argmax(1), answers)  # VQA accuracy
        simple_acc += (pred.argmax(1) == mode_answer).mean().item()  # simple accuracy

    return total_loss / len(dataloader), total_acc / len(dataloader), simple_acc / len(dataloader), time.time() - start

### 5. make submission file

In [None]:
# deviceの設定
set_seed(42)
device = "cuda" if torch.cuda.is_available() else "cpu"

# dataloader / model
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor()
])
train_dataset = VQADataset(df_path="./data/train.json", image_dir="./data/train", transform=transform)
test_dataset = VQADataset(df_path="./data/valid.json", image_dir="./data/valid", transform=transform, answer=False)
test_dataset.update_dict(train_dataset)

train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=128, shuffle=True)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=1, shuffle=False)

model = VQAModel(vocab_size=len(train_dataset.question2idx)+1, n_answer=len(train_dataset.answer2idx)).to(device)

# optimizer / criterion
num_epoch = 4
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001, weight_decay=1e-5)

In [None]:
# train model
for epoch in range(num_epoch):
    train_loss, train_acc, train_simple_acc, train_time = train(model, train_loader, optimizer, criterion, device)
    print(f"【{epoch + 1}/{num_epoch}】\n"
            f"train time: {train_time:.2f} [s]\n"
            f"train loss: {train_loss:.4f}\n"
            f"train acc: {train_acc:.4f}\n"
            f"train simple acc: {train_simple_acc:.4f}")

In [None]:
# make submission file
model.eval()
submission = []
for image, question in test_loader:
    image, question = image.to(device), question.to(device)
    pred = model(image, question)
    pred = pred.argmax(1).cpu().item()
    submission.append(pred)

submission = [train_dataset.idx2answer[id] for id in submission]
submission = np.array(submission)
torch.save(model.state_dict(), "model.pt")
np.save("submission.npy", submission)

## 提出方法

以下の3点をzip化し，Omnicampusの「最終課題 (VQA)」から提出してください．

- `submission.npy`
- `model.pt`や`model_best.pt`など，テストに使用した重み（拡張子は`.pt`のみ）
- 本Colab Notebook

In [None]:
from zipfile import ZipFile

model_path = "model.pt"
notebook_path = "/content/drive/MyDrive/Colab Notebooks/DL_Basic_2025_Competition_VQA_baseline.ipynb"

with ZipFile("submission.zip", "w") as zf:
    zf.write("submission.npy")
    zf.write(model_path)
    zf.write(notebook_path, arcname="DL_Basic_2025_Competition_VQA_baseline.ipynb")