In [1]:
!pip install datasets



In [2]:
import abc

from dataclasses import dataclass
from datasets import Dataset
from PIL import Image


@dataclass
class TaskConfig:
    max_dataset_len: int | None = None
    rotate_choices: bool = False


class Task(abc.ABC):
    def __init__(self, config: TaskConfig):
        self.config = config

        if self.config.max_dataset_len is not None:
            self.dataset = self._prepare_dataset().select(
                range(self.config.max_dataset_len)
            )
        else:
            self.dataset = self._prepare_dataset()

    @abc.abstractmethod
    def _prepare_dataset(self) -> Dataset:
        """Prepares the dataset."""
        pass

    @abc.abstractmethod
    def doc_to_text(self, doc) -> str:
        """Converts a document to text."""
        pass

    @abc.abstractmethod
    def doc_to_visual(self, doc) -> list[Image.Image]:
        """Converts a document to visual."""
        pass

    @abc.abstractmethod
    def doc_to_id(self, doc) -> str:
        """Converts a document to id."""
        pass

    @abc.abstractmethod
    def doc_to_answer(self, doc) -> str:
        """Converts a document to answer."""
        pass


In [3]:
from datasets import load_dataset, Dataset

In [4]:
class JapaneseHeronBench(Task):
    default_metric = "heron-bench"

    @staticmethod
    def _prepare_dataset() -> Dataset:
        ds = load_dataset("Silviase/Japanese-Heron-Bench", split="train")
        ds = ds.rename_column("text", "input_text")
        return ds

    @staticmethod
    def doc_to_text(doc) -> str:
        return doc["input_text"]

    @staticmethod
    def doc_to_visual(doc) -> list[Image.Image]:
        return [doc["image"]]

    @staticmethod
    def doc_to_id(doc) -> str:
        return str(doc["question_id"])

    @staticmethod
    def doc_to_answer(doc) -> str:
        return doc["answer"]["gpt-4-0125-preview"]

In [5]:
from typing import Callable


class TaskRegistry:
    """Registry to map metrics to their corresponding scorer classes."""

    _tasks: dict[str, Callable[[TaskConfig], Task]] = {
        "japanese-heron-bench": JapaneseHeronBench,
    }

    @classmethod
    def get_task_list(cls):
        return list(cls._tasks.keys())

    @classmethod
    def load_task(cls, task_name: str, task_config: TaskConfig = TaskConfig()) -> Task:
        try:
            return cls._tasks[task_name](task_config)  # type: ignore
        except KeyError:
            raise ValueError(f"Task '{task_name}' is not supported.")

In [6]:
from abc import ABC, abstractmethod
from dataclasses import dataclass


@dataclass
class AggregateOutput:
    overall_score: float
    details: dict[str, float]


@dataclass
class ScorerConfig:
    docs: dict | None = None
    judge_model: str | None = None
    batch_size: int = 10
    random_choice: bool = False


class Scorer(ABC):
    def __init__(self, config: ScorerConfig):
        self.config = config

    @abstractmethod
    def score(self, refs: list[str], preds: list[str]) -> list:
        raise NotImplementedError

    @abstractmethod
    def aggregate(self, scores: list) -> AggregateOutput:
        raise NotImplementedError

In [7]:
from abc import ABC, abstractmethod
from dataclasses import dataclass


@dataclass
class AggregateOutput:
    overall_score: float
    details: dict[str, float]


@dataclass
class ScorerConfig:
    docs: dict | None = None
    judge_model: str | None = None
    batch_size: int = 10
    random_choice: bool = False


class Scorer(ABC):
    def __init__(self, config: ScorerConfig):
        self.config = config

    @abstractmethod
    def score(self, refs: list[str], preds: list[str]) -> list:
        raise NotImplementedError

    @abstractmethod
    def aggregate(self, scores: list) -> AggregateOutput:
        raise NotImplementedError

In [8]:
from collections import defaultdict
import numpy as np
import re
import json
from collections import defaultdict
import numpy as np
import re
import json


def parse_score(llm_output: str) -> dict[str, int]:
    json_pattern = "r"

    if not matches:
        json_pattern = r"\{.*?\}"
        matches = re.findall(json_pattern, llm_output, re.DOTALL)

    for json_string in matches:
        json_string = json_string.strip()
        try:
            parsed_json = json.loads(json_string)
            if (
                isinstance(parsed_json, dict)
                and "score" in parsed_json
                and "score_gpt" in parsed_json
            ):
                return {
                    "score": int(parsed_json["score"]),
                    "score_gpt": int(parsed_json["score_gpt"]),
                }
        except json.JSONDecodeError:
            try:
                json_string_clean = re.sub(r"[\x00-\x1F\x7F]", "", json_string)
                parsed_json = json.loads(json_string_clean)
                if (
                    isinstance(parsed_json, dict)
                    and "score" in parsed_json
                    and "score_gpt" in parsed_json
                ):
                    return {
                        "score": int(parsed_json["score"]),
                        "score_gpt": int(parsed_json["score_gpt"]),
                    }
            except json.JSONDecodeError:
                continue

    return {"score": -1, "score_gpt": -1}


INSTRUCTION = """
You are an expert evaluator. You are given the following information:
- Context: A description of the image.
- Question: A question about the image.
- GPT-4o Answer: GPT-4o's answer to the question.
- Model Answer: The target model's answer to the question.

Your task is to evaluate each answer independently based on how well it answers the question given the context.

Please assign a score from 1 to 10 for each answer according to the following guideline:
- 10: Perfect — Completely correct, relevant, and fully addresses the question based on the context.
- 8-9: Very Good — Mostly correct with only minor inaccuracies or slight omissions.
- 6-7: Good — Generally correct but contains noticeable errors or lacks important details.
- 4-5: Poor — Significant errors or missing key points, but some relevance remains.
- 1-3: Very Poor — Mostly or completely incorrect, irrelevant, or nonsensical.

Output Format (JSON):
Return the result in the following JSON format:
```json
{{
    "score_gpt": int,
    "score": int
}}
```
Do not output anything other than the JSON.

Input:
{{
    "context": {context},
    "question": {question},
    "gpt4o_answer": {gpt4o_answer},
    "model_answer": {model_answer}
}}

Output:
"""



class HeronBenchScorer(Scorer):
    def score(self, refs, preds: list[str]) -> list[dict[str, int]]:
        docs = self.config.docs
        assert docs is not None
        assert self.config.client is not None
        assert self.config.judge_model is not None

        contents = [
            INSTRUCTION.format(
                context=doc["context"],
                question=doc["input_text"],
                gpt4o_answer=ref,
                model_answer=pred,
            )
            for doc, ref, pred in zip(docs, refs, preds)
        ]

        completions: list[str] = ask_gpt4_batch(
            contents, 1024, self.config.client, self.config.judge_model
        )

        scores: list[dict[str, nt]] = [parse_score(c) for c in completions]
        return scores

    def aggregate(self, scores: list[dict[str, int]]) -> AggregateOutput:
        docs = self.config.docs
        assert docs is not None
        category_list = ["conv", "detail", "complex"]
        heron_metrics = defaultdict(float)
        for category in category_list:
            score_owns = [
                score["score"]
                for score, doc in zip(scores, docs)
                if doc["category"] == category
            ]
            score_gpts = [
                score["score_gpt"]
                for score, doc in zip(scores, docs)
                if doc["category"] == category
            ]
            if len(score_owns) == 0 or np.mean(score_owns) == -1:
                continue
            avg_score = np.mean(score_owns)
            avs_score_rel = (
                100
                * np.mean(score_owns)
                / max(
                    0.01, np.mean(score_gpts)
                )  # divide by 0.01 when 0 division happens
            )
            heron_metrics[category] = avg_score
            heron_metrics[category + "_rel"] = avs_score_rel
        heron_metrics["parse_error_count"] = sum(
            score["score"] == -1 for score in scores
        )
        heron_metrics["overall"] = sum([score["score"] for score in scores]) / len(
            scores
        )
        heron_metrics["overall_rel"] = sum(
            [heron_metrics[category + "_rel"] for category in category_list]
        ) / len(category_list)
        output = AggregateOutput(
            overall_score=heron_metrics["overall_rel"],
            details=heron_metrics,
        )
        return output

In [25]:
from typing import Callable


class ScorerRegistry:
    """Registry to map metrics to their corresponding scorer classes."""

    _scorers: dict[str, Callable[[ScorerConfig], Scorer]] = {
        "heron-bench": HeronBenchScorer,
        "rougel": RougeLScorer,
    }

    @classmethod
    def get_metric_list(cls) -> list[str]:
        """Get a list of supported metrics."""
        return list(cls._scorers.keys())

    @classmethod
    def load_scorer(
        cls, metric: str, scorer_config: ScorerConfig = ScorerConfig()
    ) -> Scorer:
        """Load a scorer instance from the scorer registry."""
        try:
            return cls._scorers[metric](scorer_config)  # type: ignore
        except KeyError:
            raise ValueError(f"Metric '{metric}' is not supported.")

In [26]:
!pip install fugashi



In [27]:
!pip install rouge_score



In [28]:
import re
from rouge_score import rouge_scorer, scoring
from fugashi import Tagger
import unicodedata
from concurrent.futures import ProcessPoolExecutor, Future

In [29]:
class RougeLScorer(Scorer):
    @staticmethod
    def score(refs: list[str], preds: list[str]) -> list[float]:
        futures: list[Future[dict[str, float]]] = []
        with ProcessPoolExecutor() as executor:
            for ref, pred in zip(refs, preds):
                future = executor.submit(rouge_ja, [ref], [pred])
                futures.append(future)
        scores = [f.result()["rougeL"] for f in futures]
        return scores

    @staticmethod
    def aggregate(scores: list[float]) -> AggregateOutput:
        mean = sum(scores) / len(scores)
        return AggregateOutput(mean, {"rougel": mean})

In [30]:

def rouge_ja(refs: list[str], preds: list[str]) -> dict:
    """Compute ROUGE-L scores for Japanese text.
    Args:
        refs: list of reference strings
        preds: list of predicted strings
    Returns:
        dict: dictionary with keys: { 'rouge1', 'rouge2', 'rougeL' }
        Each value is a float representing the ROUGE score (f-measure) * 100.
    """
    assert isinstance(refs, list) and isinstance(
        preds, list
    ), "refs and preds must be lists."
    tokenizer = MecabTokenizer()
    rouge_types = ["rouge1", "rouge2", "rougeL"]
    # mecab-based rouge
    scorer = rouge_scorer.RougeScorer(
        rouge_types,
        tokenizer=tokenizer,
    )

    # Accumulate confidence intervals.
    aggregator = scoring.BootstrapAggregator()
    for ref, pred in zip(refs, preds):
        aggregator.add_scores(scorer.score(ref, pred))
    result = aggregator.aggregate()
    return {type: result[type].mid.fmeasure * 100 for type in rouge_types}

In [31]:
class MecabTokenizer:
    def __init__(self) -> None:
        self.tagger = Tagger("-Owakati")

    def normalize_answer(self, text: str) -> str:
        """Lower case text, remove punctuation and extra whitespace, etc."""

        def white_space_fix(text: str) -> str:
            return " ".join(text.split())

        def remove_emoji(text: str) -> str:
            text = "".join(["" if emoji.is_emoji(c) else c for c in text])
            emoji_pattern = re.compile(
                "["
                "\U0001f600-\U0001f64f"  # emoticons
                "\U0001f300-\U0001f5ff"  # symbols & pictographs
                "\U0001f680-\U0001f6ff"  # transport & map symbols
                "\U0001f1e0-\U0001f1ff"  # flags (iOS)
                "\U00002702-\U000027b0"
                "]+",
                flags=re.UNICODE,
            )
            return emoji_pattern.sub(r"", text)

        text = remove_emoji(text)
        # see neologdn docs for details, but handles things like full/half width variation
        # text = neologdn.normalize(text) FIXME: fix c++12 error when installing neologdn
        text = unicodedata.normalize("NFKC", text)
        text = white_space_fix(text)
        return text

    def tokenize(self, text):
        return self.tagger.parse(self.normalize_answer(text)).split()


In [32]:
!pip install unidic-lite



In [33]:
!python -m unidic download

/usr/bin/python3: No module named unidic


In [34]:
!pip install emoji



In [35]:
import emoji

ここまでがllm-jp-eval-mmの必要な部分を動くように変更したもの

ここからが評価

In [36]:
from PIL import Image
from transformers import AutoModelForImageTextToText, AutoProcessor
import os

import torch

os.environ["TOKENIZERS_PARALLELISM"] = "false"


# 1. GemmaVLMWrapper クラス: マージ済みモデルをロードして generate() を実装
class GemmaVLMWrapper:
    def __init__(self, model_dir: str):
        self.model = AutoModelForImageTextToText.from_pretrained(
            model_dir, torch_dtype=torch.bfloat16, device_map="auto"
        )
        self.processor = AutoProcessor.from_pretrained(model_dir, use_fast=True)

    def generate(
        self, images: list[Image.Image], text: str, max_new_tokens: int = 50
    ) -> str:
        """
        images: PIL.Image のリスト。必ず1枚以上の画像が渡されることを前提とします。
        text: 入力テキスト（質問文など）。
        max_new_tokens: 生成する最大トークン数。
        """
        # チャット形式で入力プロンプトを作成します。
        # collate_fn の中と同様に、ユーザー発話で画像とテキストを組み合わせたメッセージリストを作ります。
        messages = [
            {
                "role": "user",
                "content": [{"type": "image", "image": img} for img in images]
                + [{"type": "text", "text": text}],
            }
        ]
        # ここでチャットテンプレートを適用し、画像トークンが含まれるプロンプト文字列を生成します。
        prompt = self.processor.apply_chat_template(
            messages, add_generation_prompt=False, tokenize=False
        )

        # processor に渡す際は、画像はもともとリストのまま（さらにリストでラップ）にする必要があります。
        inputs = self.processor(
            text=[prompt], images=[images], return_tensors="pt", padding=True
        )
        inputs = {k: v.to(self.model.device) for k, v in inputs.items()}

        with torch.no_grad():
            output_ids = self.model.generate(**inputs, max_new_tokens=max_new_tokens)
        return self.processor.tokenizer.decode(output_ids[0], skip_special_tokens=True)

In [37]:
task = TaskRegistry.load_task("japanese-heron-bench")

# 評価用に全サンプルから予測と正解をリストに収集する
references = []
predictions = []

# Gemmaモデルのラッパーを初期化（保存済みマージ済みモデルのディレクトリ）
model_dir = "/content/drive/MyDrive/gemma3-jicvqa-finetuned"
model = GemmaVLMWrapper(model_dir)

# 2. タスクのすべてのサンプルをループ処理する
for example in task.dataset:
    # 各サンプルから入力テキスト、視覚入力、正解回答を取得
    input_text = task.doc_to_text(example)
    images = task.doc_to_visual(
        example
    )  # images は list[Image.Image] として返される前提
    reference = task.doc_to_answer(example)

    # モデルから予測を生成
    pred = model.generate(images, input_text)

    # 予測と正解をリストに追加
    predictions.append(pred)
    references.append(reference)

Loading checkpoint shards:   0%|          | 0/6 [00:00<?, ?it/s]

Evaluation result: AggregateOutput(overall_score=np.float64(31.956091217206772), details={'rougel': np.float64(31.956091217206772)})


In [38]:
# 3. スコアラーで評価を行う
scorer = ScorerRegistry.load_scorer("rougel", ScorerConfig(docs=task.dataset))
scores = scorer.score(references, predictions)
result = scorer.aggregate(scores)
print("Evaluation result:", result)

Evaluation result: AggregateOutput(overall_score=np.float64(31.956091217206772), details={'rougel': np.float64(31.956091217206772)})
