In [1]:
!pip install -q datasets lightning
!pip install huggingface_hub  # Install if not already installed
!pip install rouge_score
!pip install zss

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m40.4/40.4 kB[0m [31m1.7 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m815.2/815.2 kB[0m [31m11.6 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
Collecting rouge_score
  Downloading rouge_score-0.1.2.tar.gz (17 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: rouge_score
  Building wheel for rouge_score (setup.py) ... [?25l[?25hdone
  Created wheel for rouge_score: filename=rouge_score-0.1.2-py3-none-any.whl size=24935 sha256=6576f4b4701219107d5a79deee6687832fd12c98803eee57858cb3ed015f48e3
  Stored in directory: /root/.cache/pip/wheels/5f/dd/89/461065a73be61a532ff8599a28e9beef17985c9e9c31e541b4
Successfully built rouge_score
Installing collected packages: rouge_score
Successfully installed rouge_score-0.1.2
Collecting zss
  Downloading zss-1.2.0.tar.gz (9.8 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wh

In [2]:
import re
import json
from torch.utils.data import Dataset
from typing import Any, List, Dict
import random
from datasets import load_dataset
from huggingface_hub import login
from torch.utils.data import DataLoader
from transformers import AutoProcessor, PaliGemmaForConditionalGeneration
from nltk.metrics.distance import edit_distance
from nltk.translate.bleu_score import sentence_bleu
from rouge_score import rouge_scorer
from sklearn.metrics import f1_score
import zss  

In [None]:
# Constants
REPO_ID = "google/paligemma-3b-mix-224"
#FINETUNED_MODEL_ID_BASE = "xxxxxxxxxxxxxxxx"
#FINETUNED_MODEL_ID = "xxxxxxxxxxx"
MAX_LENGTH = 512
DATASET_NAME = "naver-clova-ix/cord-v2"
#TOKEN = "xxxxxxxxxxxxxxxxxxxxxx"
PROMPT = "extract JSON."

In [3]:
class CustomDataset(Dataset):
    def __init__(
        self,
        dataset_name_or_path: str,
        split: str = "train",
        sort_json_key: bool = True,
    ):
        super().__init__()

        self.split = split
        self.sort_json_key = sort_json_key

        self.dataset = load_dataset(dataset_name_or_path, split=self.split)
        self.dataset_length = len(self.dataset)

        self.gt_token_sequences = []
        for sample in self.dataset:
            ground_truth = json.loads(sample["ground_truth"])
            if "gt_parses" in ground_truth:  # when multiple ground truths are available, e.g., docvqa
                assert isinstance(ground_truth["gt_parses"], list)
                gt_jsons = ground_truth["gt_parses"]
            else:
                assert "gt_parse" in ground_truth and isinstance(ground_truth["gt_parse"], dict)
                gt_jsons = [ground_truth["gt_parse"]]

            self.gt_token_sequences.append(
                [
                    self.json2token(
                        gt_json,
                        sort_json_key=self.sort_json_key,
                    )
                    for gt_json in gt_jsons  # load json from list of json
                ]
            )

    def json2token(self, obj: Any, sort_json_key: bool = True):
        """
        Convert an ordered JSON object into a token sequence
        """
        if type(obj) == dict:
            if len(obj) == 1 and "text_sequence" in obj:
                return obj["text_sequence"]
            else:
                output = ""
                if sort_json_key:
                    keys = sorted(obj.keys(), reverse=True)
                else:
                    keys = obj.keys()
                for k in keys:
                    output += (
                        fr"<s_{k}>"
                        + self.json2token(obj[k], sort_json_key)
                        + fr"</s_{k}>"
                    )
                return output
        elif type(obj) == list:
            return r"<sep/>".join(
                [self.json2token(item, sort_json_key) for item in obj]
            )
        else:
            obj = str(obj)
            return obj

    def __len__(self) -> int:
        return self.dataset_length

    def __getitem__(self, idx: int) -> Dict:
        #Returns one item of the dataset.
        #Returns:
            #image : the original Receipt image
            #target_sequence : tokenized ground truth sequence
        sample = self.dataset[idx]

        # inputs
        image = sample["image"]
        target_sequence = random.choice(self.gt_token_sequences[idx])  # can be more than one, e.g., DocVQA Task 1
        print(target_sequence)
        return image, target_sequence

In [4]:
def run_inference(dataset, processor, model, test_custom_dataset):
    total_levenshtein_distance = 0
    total_bleu_score = 0
    total_rouge1 = 0
    total_rouge2 = 0
    total_rougeL = 0
    num_samples = 1 #len(dataset["test"])
    results = []

    for i in range(1):
        test_example = dataset["test"][i]
        test_image = test_example["image"]
        _,target_sequence = test_custom_dataset[i]
        
        inputs = processor(text=PROMPT, images=test_image, return_tensors="pt")
        for k,v in inputs.items():
            print(k,v.shape)
            
        generated_ids = model.generate(**inputs, max_new_tokens=MAX_LENGTH)    
        image_token_index = model.config.image_token_index
        num_image_tokens = len(generated_ids[generated_ids==image_token_index])
        num_text_tokens = len(processor.tokenizer.encode(PROMPT))
        num_prompt_tokens = num_image_tokens + num_text_tokens + 2
        generated_text = processor.batch_decode(generated_ids[:, num_prompt_tokens:], skip_special_tokens=True, clean_up_tokenization_spaces=False)[0]
        print('generated_text',generated_text)
        
        generated_json = token2json(generated_text)
        actual_json = token2json(target_sequence)
        levenshtein_distance = edit_distance(str(generated_json), str(actual_json))
        bleu_score = compute_bleu(str(generated_json), str(actual_json))
        rouge_scores = compute_rouge(str(generated_json), str(actual_json))

        total_levenshtein_distance += levenshtein_distance
        total_bleu_score += bleu_score
        total_rouge1 += rouge_scores["rouge1"].fmeasure
        total_rouge2 += rouge_scores["rouge2"].fmeasure
        total_rougeL += rouge_scores["rougeL"].fmeasure

        results.append({
            "sample": i + 1,
            "generated_json": generated_json,
            "actual_json": actual_json,
            "levenshtein_distance": levenshtein_distance,
            "bleu_score": bleu_score,
            "rouge1": rouge_scores["rouge1"].fmeasure,
            "rouge2": rouge_scores["rouge2"].fmeasure,
            "rougeL": rouge_scores["rougeL"].fmeasure,
        })
    
    avg_levenshtein_distance = total_levenshtein_distance / num_samples
    avg_bleu_score = total_bleu_score / num_samples
    avg_rouge1 = total_rouge1 / num_samples
    avg_rouge2 = total_rouge2 / num_samples
    avg_rougeL = total_rougeL / num_samples
    
    return results, avg_levenshtein_distance, avg_bleu_score, avg_rouge1, avg_rouge2, avg_rougeL


def save_results(results, avg_metrics, filename="inference_results.txt"):
    with open(filename, "w") as f:
        for res in results:
            f.write(f"Sample {res['sample']}:\n")
            f.write(f"Generated JSON: {res['generated_json']}\n")
            f.write(f"Actual JSON: {res['actual_json']}\n")
            f.write(f"Levenshtein Distance: {res['levenshtein_distance']}\n")
            f.write(f"BLEU Score: {res['bleu_score']}\n")
            f.write(f"ROUGE Scores: R1={res['rouge1']}, R2={res['rouge2']}, RL={res['rougeL']}\n\n")
        f.write(f"Average Metrics: LD={avg_metrics[0]}, BLEU={avg_metrics[1]}, ROUGE-1={avg_metrics[2]}, ROUGE-2={avg_metrics[3]}, ROUGE-L={avg_metrics[4]}\n")
    print(f"Results saved to {filename}")

In [5]:

def token2json(tokens, is_inner_value=False, added_vocab=None):
        #Convert a (generated) token sequence into an ordered JSON format.
        if added_vocab is None:
            added_vocab = processor.tokenizer.get_added_vocab()

        output = {}

        while tokens:
            start_token = re.search(r"<s_(.*?)>", tokens, re.IGNORECASE)
            if start_token is None:
                break
            key = start_token.group(1)
            key_escaped = re.escape(key)

            end_token = re.search(rf"</s_{key_escaped}>", tokens, re.IGNORECASE)
            start_token = start_token.group()
            if end_token is None:
                tokens = tokens.replace(start_token, "")
            else:
                end_token = end_token.group()
                start_token_escaped = re.escape(start_token)
                end_token_escaped = re.escape(end_token)
                content = re.search(
                    f"{start_token_escaped}(.*?){end_token_escaped}", tokens, re.IGNORECASE | re.DOTALL
                )
                if content is not None:
                    content = content.group(1).strip()
                    if r"<s_" in content and r"</s_" in content:  # non-leaf node
                        value = token2json(content, is_inner_value=True, added_vocab=added_vocab)
                        if value:
                            if len(value) == 1:
                                value = value[0]
                            output[key] = value
                    else:  # leaf nodes
                        output[key] = []
                        for leaf in content.split(r"<sep/>"):
                            leaf = leaf.strip()
                            if leaf in added_vocab and leaf[0] == "<" and leaf[-2:] == "/>":
                                leaf = leaf[1:-2]  # for categorical special tokens
                            output[key].append(leaf)
                        if len(output[key]) == 1:
                            output[key] = output[key][0]

                tokens = tokens[tokens.find(end_token) + len(end_token) :].strip()
                if tokens[:6] == r"<sep/>":  # non-leaf nodes
                    return [output] + token2json(tokens[6:], is_inner_value=True, added_vocab=added_vocab)

        if len(output):
            return [output] if is_inner_value else output
        else:
            return [] if is_inner_value else {"text_sequence": tokens}
            
def compute_bleu(generated, actual):
    return sentence_bleu([actual.split()], generated.split())


def compute_rouge(generated, actual):
    scorer = rouge_scorer.RougeScorer(["rouge1", "rouge2", "rougeL"], use_stemmer=True)
    return scorer.score(generated, actual)


In [7]:
if __name__ == "__main__":
    test_custom_dataset = CustomDataset("naver-clova-ix/cord-v2", split="test")
    dataset = load_dataset(DATASET_NAME)
    login(TOKEN)
    processor = AutoProcessor.from_pretrained(REPO_ID)
    model = PaliGemmaForConditionalGeneration.from_pretrained(FINETUNED_MODEL_ID_BASE)
    results, *avg_metrics = run_inference(dataset, processor, model,test_custom_dataset)
    save_results(results, avg_metrics)
    print(f"Evaluation Metrics: {avg_metrics}")

adapter_config.json:   0%|          | 0.00/726 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/1.03k [00:00<?, ?B/s]

model.safetensors.index.json:   0%|          | 0.00/62.6k [00:00<?, ?B/s]

Downloading shards:   0%|          | 0/3 [00:00<?, ?it/s]

model-00001-of-00003.safetensors:   0%|          | 0.00/4.95G [00:00<?, ?B/s]

model-00002-of-00003.safetensors:   0%|          | 0.00/5.00G [00:00<?, ?B/s]

model-00003-of-00003.safetensors:   0%|          | 0.00/1.74G [00:00<?, ?B/s]

`config.hidden_act` is ignored, you should use `config.hidden_activation` instead.
Gemma's activation function will be set to `gelu_pytorch_tanh`. Please, use
`config.hidden_activation` if you want to override this behaviour.
See https://github.com/huggingface/transformers/pull/29402 for more details.


Loading checkpoint shards:   0%|          | 0/3 [00:00<?, ?it/s]

generation_config.json:   0%|          | 0.00/137 [00:00<?, ?B/s]

adapter_model.safetensors:   0%|          | 0.00/45.3M [00:00<?, ?B/s]

You are passing both `text` and `images` to `PaliGemmaProcessor`. The processor expects special image tokens in the text, as many tokens as there are images per each text. It is recommended to add `<image>` tokens in the very beginning of your text. For this call, we will infer how many images each text has and add special tokens.


<s_total><s_total_price>60.000</s_total_price><s_menuqty_cnt>2.00</s_menuqty_cnt><s_creditcardprice>60.000</s_creditcardprice></s_total><s_sub_total><s_tax_price>5.455</s_tax_price><s_subtotal_price>60.000</s_subtotal_price><s_discount_price>-60.000</s_discount_price></s_sub_total><s_menu><s_price>60.000</s_price><s_num>901016</s_num><s_nm>-TICKET CP</s_nm><s_itemsubtotal>60.000</s_itemsubtotal><s_cnt>2</s_cnt></s_menu>
input_ids torch.Size([1, 261])
attention_mask torch.Size([1, 261])
pixel_values torch.Size([1, 3, 224, 224])
generated_text <s_total><s_total_price>60.000</s_total_price><s_menuqty_cnt>2</s_menuqty_cnt><s_cashprice>60.000</s_cashprice></s_total><s_sub_total><s_tax_price>5.400</s_tax_price><s_subtotal_price>54.400</s_subtotal_price></s_sub_total><s_menu><s_price>60.000</s_price><s_nm>TICKET CP</s_nm><s_cnt>2</s_cnt></s_menu>
Results saved to inference_results.txt
Evaluation Metrics: [89.0, 0.18777843231751812, 0.7428571428571429, 0.5882352941176471, 0.7428571428571429]
