# original examples

In [1]:
from transformers import AutoProcessor, AutoModel
from datasets import load_dataset
cachedir = '/home/dijinli/Disk/Workspace/multi-modal-relation-extraction/data/hf'
processor = AutoProcessor.from_pretrained("microsoft/layoutlmv3-base", apply_ocr=False, )
model = AutoModel.from_pretrained("microsoft/layoutlmv3-base", cache_dir = cachedir)

dataset = load_dataset("nielsr/funsd-layoutlmv3", split="train",cache_dir = cachedir)
example = dataset[0]
image = example["image"]
words = example["tokens"]
boxes = example["bboxes"]

encoding = processor(image, words, boxes=boxes, return_tensors="pt")

outputs = model(**encoding)
last_hidden_states = outputs.last_hidden_state
type(image)

  from .autonotebook import tqdm as notebook_tqdm
Found cached dataset funsd-layoutlmv3 (/home/dijinli/Disk/Workspace/multi-modal-relation-extraction/data/hf/nielsr___funsd-layoutlmv3/funsd/1.0.0/0e3f4efdfd59aa1c3b4952c517894f7b1fc4d75c12ef01bcc8626a69e41c1bb9)


PIL.PngImagePlugin.PngImageFile

In [3]:
encoding.keys()

dict_keys(['input_ids', 'attention_mask', 'bbox', 'pixel_values'])

# Data process

## load and show

In [1]:
from datasets import load_from_disk
data = load_from_disk('/home/dijinli/Disk/Workspace/multi-modal-relation-extraction/data/processed')

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
data.features
import pickle
with open('data.pkl', 'wb') as f:
    pickle.dump(data[0], f)

## process

### feature extract

In [6]:
from typing import Dict, Iterable, Optional, Union

import numpy as np

from transformers.utils import is_vision_available
from transformers.utils.generic import TensorType

from transformers.image_processing_utils import BaseImageProcessor, BatchFeature, get_size_dict
from transformers.image_transforms import normalize, rescale, resize, to_channel_dimension_format, to_pil_image
from transformers.image_utils import (
    IMAGENET_STANDARD_MEAN,
    IMAGENET_STANDARD_STD,
    ChannelDimension,
    ImageInput,
    PILImageResampling,
    infer_channel_dimension_format,
    is_batched,
    to_numpy_array,
    valid_images,
)
from transformers.utils import is_pytesseract_available, logging, requires_backends


if is_vision_available():
    import PIL

# soft dependency
if is_pytesseract_available():
    import pytesseract

logger = logging.get_logger(__name__)


def normalize_box(box, width, height):
    return [
        int(1000 * (box[0] / width)),
        int(1000 * (box[1] / height)),
        int(1000 * (box[2] / width)),
        int(1000 * (box[3] / height)),
    ]


def apply_tesseract(image: np.ndarray, lang: Optional[str], tesseract_config: Optional[str]):
    """Applies Tesseract OCR on a document image, and returns recognized words + normalized bounding boxes."""

    # apply OCR
    pil_image = to_pil_image(image)
    image_width, image_height = pil_image.size
    data = pytesseract.image_to_data(pil_image, lang=lang, output_type="dict", config=tesseract_config)
    words, left, top, width, height = data["text"], data["left"], data["top"], data["width"], data["height"]

    # filter empty words and corresponding coordinates
    irrelevant_indices = [idx for idx, word in enumerate(words) if not word.strip()]
    words = [word for idx, word in enumerate(words) if idx not in irrelevant_indices]
    left = [coord for idx, coord in enumerate(left) if idx not in irrelevant_indices]
    top = [coord for idx, coord in enumerate(top) if idx not in irrelevant_indices]
    width = [coord for idx, coord in enumerate(width) if idx not in irrelevant_indices]
    height = [coord for idx, coord in enumerate(height) if idx not in irrelevant_indices]

    # turn coordinates into (left, top, left+width, top+height) format
    actual_boxes = []
    for x, y, w, h in zip(left, top, width, height):
        actual_box = [x, y, x + w, y + h]
        actual_boxes.append(actual_box)

    # finally, normalize the bounding boxes
    normalized_boxes = []
    for box in actual_boxes:
        normalized_boxes.append(normalize_box(box, image_width, image_height))

    assert len(words) == len(normalized_boxes), "Not as many words as there are bounding boxes"

    return words, normalized_boxes


def flip_channel_order(image: np.ndarray, data_format: Optional[ChannelDimension] = None) -> np.ndarray:
    input_data_format = infer_channel_dimension_format(image)
    if input_data_format == ChannelDimension.LAST:
        image = image[..., ::-1]
    elif input_data_format == ChannelDimension.FIRST:
        image = image[:, ::-1, ...]
    else:
        raise ValueError(f"Unsupported channel dimension: {input_data_format}")

    if data_format is not None:
        image = to_channel_dimension_format(image, data_format)
    return image


class LayoutLMREImageProcessor(BaseImageProcessor):
    r"""
    Constructs a LayoutLMRE image processor.
    Args:
        do_resize (`bool`, *optional*, defaults to `True`):
            Whether to resize the image's (height, width) dimensions to `(size["height"], size["width"])`. Can be
            overridden by `do_resize` in `preprocess`.
        size (`Dict[str, int]` *optional*, defaults to `{"height": 224, "width": 224}`):
            Size of the image after resizing. Can be overridden by `size` in `preprocess`.
        resample (`PILImageResampling`, *optional*, defaults to `PILImageResampling.BILINEAR`):
            Resampling filter to use if resizing the image. Can be overridden by `resample` in `preprocess`.
        do_rescale (`bool`, *optional*, defaults to `True`):
            Whether to rescale the image's pixel values by the specified `rescale_value`. Can be overridden by
            `do_rescale` in `preprocess`.
        rescale_factor (`float`, *optional*, defaults to 1 / 255):
            Value by which the image's pixel values are rescaled. Can be overridden by `rescale_factor` in
            `preprocess`.
        do_normalize (`bool`, *optional*, defaults to `True`):
            Whether to normalize the image. Can be overridden by the `do_normalize` parameter in the `preprocess`
            method.
        image_mean (`Iterable[float]` or `float`, *optional*, defaults to `IMAGENET_STANDARD_MEAN`):
            Mean to use if normalizing the image. This is a float or list of floats the length of the number of
            channels in the image. Can be overridden by the `image_mean` parameter in the `preprocess` method.
        image_std (`Iterable[float]` or `float`, *optional*, defaults to `IMAGENET_STANDARD_STD`):
            Standard deviation to use if normalizing the image. This is a float or list of floats the length of the
            number of channels in the image. Can be overridden by the `image_std` parameter in the `preprocess` method.
        apply_ocr (`bool`, *optional*, defaults to `True`):
            Whether to apply the Tesseract OCR engine to get words + normalized bounding boxes. Can be overridden by
            the `apply_ocr` parameter in the `preprocess` method.
        ocr_lang (`str`, *optional*):
            The language, specified by its ISO code, to be used by the Tesseract OCR engine. By default, English is
            used. Can be overridden by the `ocr_lang` parameter in the `preprocess` method.
        tesseract_config (`str`, *optional*):
            Any additional custom configuration flags that are forwarded to the `config` parameter when calling
            Tesseract. For example: '--psm 6'. Can be overridden by the `tesseract_config` parameter in the
            `preprocess` method.
    """

    model_input_names = ["pixel_values"]

    def __init__(
        self,
        do_resize: bool = True,
        size: Dict[str, int] = None,
        resample: PILImageResampling = PILImageResampling.BILINEAR,
        do_rescale: bool = True,
        rescale_value: float = 1 / 255,
        do_normalize: bool = True,
        image_mean: Union[float, Iterable[float]] = None,
        image_std: Union[float, Iterable[float]] = None,
        apply_ocr: bool = True,
        ocr_lang: Optional[str] = None,
        tesseract_config: Optional[str] = "",
        **kwargs
    ) -> None:
        super().__init__(**kwargs)
        size = size if size is not None else {"height": 224, "width": 224}
        size = get_size_dict(size)

        self.do_resize = do_resize
        self.size = size
        self.resample = resample
        self.do_rescale = do_rescale
        self.rescale_factor = rescale_value
        self.do_normalize = do_normalize
        self.image_mean = image_mean if image_mean is not None else IMAGENET_STANDARD_MEAN
        self.image_std = image_std if image_std is not None else IMAGENET_STANDARD_STD
        self.apply_ocr = apply_ocr
        self.ocr_lang = ocr_lang
        self.tesseract_config = tesseract_config

    def resize(
        self,
        image: np.ndarray,
        size: Dict[str, int],
        resample: PILImageResampling = PILImageResampling.BILINEAR,
        data_format: Optional[Union[str, ChannelDimension]] = None,
        **kwargs
    ) -> np.ndarray:
        """
        Resize an image to (size["height"], size["width"]) dimensions.
        Args:
            image (`np.ndarray`):
                Image to resize.
            size (`Dict[str, int]`):
                Size of the output image.
            resample (`PILImageResampling`, *optional*, defaults to `PILImageResampling.BILINEAR`):
                Resampling filter to use when resiizing the image.
            data_format (`str` or `ChannelDimension`, *optional*):
                The channel dimension format of the image. If not provided, it will be the same as the input image.
        """
        size = get_size_dict(size)
        if "height" not in size or "width" not in size:
            raise ValueError(f"The size dictionary must contain the keys 'height' and 'width'. Got {size.keys()}")
        output_size = (size["height"], size["width"])
        return resize(image, size=output_size, resample=resample, data_format=data_format, **kwargs)

    def rescale(
        self,
        image: np.ndarray,
        scale: Union[int, float],
        data_format: Optional[Union[str, ChannelDimension]] = None,
        **kwargs
    ) -> np.ndarray:
        """
        Rescale an image by a scale factor. image = image * scale.
        Args:
            image (`np.ndarray`):
                Image to rescale.
            scale (`int` or `float`):
                Scale to apply to the image.
            data_format (`str` or `ChannelDimension`, *optional*):
                The channel dimension format of the image. If not provided, it will be the same as the input image.
        """
        return rescale(image, scale=scale, data_format=data_format, **kwargs)

    def normalize(
        self,
        image: np.ndarray,
        mean: Union[float, Iterable[float]],
        std: Union[float, Iterable[float]],
        data_format: Optional[Union[str, ChannelDimension]] = None,
        **kwargs
    ) -> np.ndarray:
        """
        Normalize an image.
        Args:
            image (`np.ndarray`):
                Image to normalize.
            mean (`float` or `Iterable[float]`):
                Mean values to be used for normalization.
            std (`float` or `Iterable[float]`):
                Standard deviation values to be used for normalization.
            data_format (`str` or `ChannelDimension`, *optional*):
                The channel dimension format of the image. If not provided, it will be the same as the input image.
        """
        return normalize(image, mean=mean, std=std, data_format=data_format, **kwargs)

    def preprocess(
        self,
        images: ImageInput,
        do_resize: bool = None,
        size: Dict[str, int] = None,
        resample=None,
        do_rescale: bool = None,
        rescale_factor: float = None,
        do_normalize: bool = None,
        image_mean: Union[float, Iterable[float]] = None,
        image_std: Union[float, Iterable[float]] = None,
        apply_ocr: bool = None,
        ocr_lang: Optional[str] = None,
        tesseract_config: Optional[str] = None,
        return_tensors: Optional[Union[str, TensorType]] = None,
        data_format: ChannelDimension = ChannelDimension.FIRST,
        **kwargs,
    ) -> PIL.Image.Image:
        """
        Preprocess an image or batch of images.
        Args:
            images (`ImageInput`):
                Image to preprocess.
            do_resize (`bool`, *optional*, defaults to `self.do_resize`):
                Whether to resize the image.
            size (`Dict[str, int]`, *optional*, defaults to `self.size`):
                Desired size of the output image after applying `resize`.
            resample (`int`, *optional*, defaults to `self.resample`):
                Resampling filter to use if resizing the image. This can be one of the `PILImageResampling` filters.
                Only has an effect if `do_resize` is set to `True`.
            do_rescale (`bool`, *optional*, defaults to `self.do_rescale`):
                Whether to rescale the image pixel values between [0, 1].
            rescale_factor (`float`, *optional*, defaults to `self.rescale_factor`):
                Rescale factor to apply to the image pixel values. Only has an effect if `do_rescale` is set to `True`.
            do_normalize (`bool`, *optional*, defaults to `self.do_normalize`):
                Whether to normalize the image.
            image_mean (`float` or `Iterable[float]`, *optional*, defaults to `self.image_mean`):
                Mean values to be used for normalization. Only has an effect if `do_normalize` is set to `True`.
            image_std (`float` or `Iterable[float]`, *optional*, defaults to `self.image_std`):
                Standard deviation values to be used for normalization. Only has an effect if `do_normalize` is set to
                `True`.
            apply_ocr (`bool`, *optional*, defaults to `self.apply_ocr`):
                Whether to apply the Tesseract OCR engine to get words + normalized bounding boxes.
            ocr_lang (`str`, *optional*, defaults to `self.ocr_lang`):
                The language, specified by its ISO code, to be used by the Tesseract OCR engine. By default, English is
                used.
            tesseract_config (`str`, *optional*, defaults to `self.tesseract_config`):
                Any additional custom configuration flags that are forwarded to the `config` parameter when calling
                Tesseract.
            return_tensors (`str` or `TensorType`, *optional*):
                The type of tensors to return. Can be one of:
                    - Unset: Return a list of `np.ndarray`.
                    - `TensorType.TENSORFLOW` or `'tf'`: Return a batch of type `tf.Tensor`.
                    - `TensorType.PYTORCH` or `'pt'`: Return a batch of type `torch.Tensor`.
                    - `TensorType.NUMPY` or `'np'`: Return a batch of type `np.ndarray`.
                    - `TensorType.JAX` or `'jax'`: Return a batch of type `jax.numpy.ndarray`.
            data_format (`ChannelDimension` or `str`, *optional*, defaults to `ChannelDimension.FIRST`):
                The channel dimension format for the output image. Can be one of:
                    - `ChannelDimension.FIRST`: image in (num_channels, height, width) format.
                    - `ChannelDimension.LAST`: image in (height, width, num_channels) format.
        """
        do_resize = do_resize if do_resize is not None else self.do_resize
        size = size if size is not None else self.size
        size = get_size_dict(size)
        resample = resample if resample is not None else self.resample
        do_rescale = do_rescale if do_rescale is not None else self.do_rescale
        rescale_factor = rescale_factor if rescale_factor is not None else self.rescale_factor
        do_normalize = do_normalize if do_normalize is not None else self.do_normalize
        image_mean = image_mean if image_mean is not None else self.image_mean
        image_std = image_std if image_std is not None else self.image_std
        apply_ocr = apply_ocr if apply_ocr is not None else self.apply_ocr
        ocr_lang = ocr_lang if ocr_lang is not None else self.ocr_lang
        tesseract_config = tesseract_config if tesseract_config is not None else self.tesseract_config

        if not is_batched(images):
            images = [images]

        if not valid_images(images):
            raise ValueError(
                "Invalid image type. Must be of type PIL.Image.Image, numpy.ndarray, "
                "torch.Tensor, tf.Tensor or jax.ndarray."
            )

        if do_resize and size is None:
            raise ValueError("Size must be specified if do_resize is True.")

        if do_rescale and rescale_factor is None:
            raise ValueError("Rescale factor must be specified if do_rescale is True.")

        if do_normalize and (image_mean is None or image_std is None):
            raise ValueError("If do_normalize is True, image_mean and image_std must be specified.")

        # All transformations expect numpy arrays.
        images = [to_numpy_array(image) for image in images]

        # Tesseract OCR to get words + normalized bounding boxes
        if apply_ocr:
            requires_backends(self, "pytesseract")
            words_batch = []
            boxes_batch = []
            for image in images:
                words, boxes = apply_tesseract(image, ocr_lang, tesseract_config)
                words_batch.append(words)
                boxes_batch.append(boxes)

        if do_resize:
            images = [self.resize(image=image, size=size, resample=resample) for image in images]

        if do_rescale:
            images = [self.rescale(image=image, scale=rescale_factor) for image in images]

        if do_normalize:
            images = [self.normalize(image=image, mean=image_mean, std=image_std) for image in images]

        # flip color channels from RGB to BGR (as Detectron2 requires this)
        images = [to_channel_dimension_format(image, data_format) for image in images]

        data = BatchFeature(data={"pixel_values": images}, tensor_type=return_tensors)

        if apply_ocr:
            data["words"] = words_batch
            data["boxes"] = boxes_batch
        return data

### total process


In [45]:
from typing import List, Optional, Union

from transformers.processing_utils import ProcessorMixin
from transformers.tokenization_utils_base import BatchEncoding, PaddingStrategy, PreTokenizedInput, TextInput, TruncationStrategy
from transformers.utils import TensorType

class LayoutLMREProcessor():
    r"""
    Constructs a LayoutLMRE processor which combines a LayoutLMRE feature extractor and a LayoutLMRE tokenizer into a
    single processor.
    [`LayoutLMREProcessor`] offers all the functionalities you need to prepare data for the model.
    It first uses [`LayoutLMREFeatureExtractor`] to resize and normalize document images, and optionally applies OCR to
    get words and normalized bounding boxes. These are then provided to [`LayoutLMRETokenizer`] or
    [`LayoutLMRETokenizerFast`], which turns the words and bounding boxes into token-level `input_ids`,
    `attention_mask`, `token_type_ids`, `bbox`. Optionally, one can provide integer `word_labels`, which are turned
    into token-level `labels` for token classification tasks (such as FUNSD, CORD).
    Args:
        feature_extractor (`LayoutLMREFeatureExtractor`):
            An instance of [`LayoutLMREFeatureExtractor`]. The feature extractor is a required input.
        tokenizer (`LayoutLMRETokenizer` or `LayoutLMRETokenizerFast`):
            An instance of [`LayoutLMRETokenizer`] or [`LayoutLMRETokenizerFast`]. The tokenizer is a required input.
    """
    feature_extractor_class = ("LayoutLMREFeatureExtractor", 'LayoutLMREImageProcessor')
    tokenizer_class = ("LayoutLMRETokenizer", "LayoutLMRETokenizerFast")
    def __init__(
        self,
        feature_extractor,
        tokenizer,
    ):
        self.feature_extractor = feature_extractor
        self.tokenizer = tokenizer
        
    def __call__(
        self,
        images,
        text: Union[TextInput, PreTokenizedInput, List[TextInput], List[PreTokenizedInput]] = None,
        pair: Optional[str] = None,
        text_pair: Optional[Union[PreTokenizedInput, List[PreTokenizedInput]]] = None,
        boxes: Union[List[List[int]], List[List[List[int]]]] = None,
        word_labels: Optional[Union[List[int], List[List[int]]]] = None,
        add_special_tokens: bool = True,
        padding: Union[bool, str, PaddingStrategy] = False,
        truncation: Union[bool, str, TruncationStrategy] = None,
        max_length: Optional[int] = None,
        stride: int = 0,
        pad_to_multiple_of: Optional[int] = None,
        return_token_type_ids: Optional[bool] = None,
        return_attention_mask: Optional[bool] = None,
        return_overflowing_tokens: bool = False,
        return_special_tokens_mask: bool = False,
        return_offsets_mapping: bool = False,
        return_length: bool = False,
        verbose: bool = True,
        return_tensors: Optional[Union[str, TensorType]] = None,
        **kwargs
    ) -> BatchEncoding:
        """
        This method first forwards the `images` argument to [`~LayoutLMREFeatureExtractor.__call__`]. In case
        [`LayoutLMREFeatureExtractor`] was initialized with `apply_ocr` set to `True`, it passes the obtained words and
        bounding boxes along with the additional arguments to [`~LayoutLMRETokenizer.__call__`] and returns the output,
        together with resized and normalized `pixel_values`. In case [`LayoutLMREFeatureExtractor`] was initialized
        with `apply_ocr` set to `False`, it passes the words (`text`/``text_pair`) and `boxes` specified by the user
        along with the additional arguments to [`~LayoutLMRETokenizer.__call__`] and returns the output, together with
        resized and normalized `pixel_values`.
        Please refer to the docstring of the above two methods for more information.
        """
        # verify input
        if self.feature_extractor.apply_ocr and (boxes is not None):
            raise ValueError(
                "You cannot provide bounding boxes "
                "if you initialized the feature extractor with apply_ocr set to True."
            )

        if self.feature_extractor.apply_ocr and (word_labels is not None):
            raise ValueError(
                "You cannot provide word labels if you initialized the feature extractor with apply_ocr set to True."
            )

        # first, apply the feature extractor
        features = self.feature_extractor(images=images, return_tensors=return_tensors)
        boxes = [[a[0], b[1], c[0], d[1]] for [a,b,c,d] in boxes]
        # second, apply the tokenizer
        if text is not None and self.feature_extractor.apply_ocr and text_pair is None:
            if isinstance(text, str):
                text = [text]  # add batch dimension (as the feature extractor always adds a batch dimension)
            text_pair = features["words"]
        if text and pair:
            text,boxes, e1_e2 = self.process_text(text, boxes, pair)
        encoded_inputs = self.tokenizer(
            text=text if text is not None else features["words"],
            text_pair=text_pair if text_pair is not None else None,
            boxes=boxes if boxes is not None else features["boxes"],
            word_labels=word_labels,
            add_special_tokens=add_special_tokens,
            padding=padding,
            truncation=truncation,
            max_length=max_length,
            stride=stride,
            pad_to_multiple_of=pad_to_multiple_of,
            return_token_type_ids=return_token_type_ids,
            return_attention_mask=return_attention_mask,
            return_overflowing_tokens=return_overflowing_tokens,
            return_special_tokens_mask=return_special_tokens_mask,
            return_offsets_mapping=return_offsets_mapping,
            return_length=return_length,
            verbose=verbose,
            return_tensors=return_tensors,
            **kwargs,
        )

        # add pixel values
        images = features.pop("pixel_values")
        if return_overflowing_tokens is True:
            images = self.get_overflowing_images(images, encoded_inputs["overflow_to_sample_mapping"])
        encoded_inputs["pixel_values"] = images

        return encoded_inputs
    def process_text(self, text, boxes, pair):
        e1_idx = text.index(pair[0])
        text.insert(e1_idx, '<E1>')
        boxes.insert(e1_idx, boxes[e1_idx])
        text.insert(e1_idx+2, '</E1>')
        boxes.insert(e1_idx+2, boxes[e1_idx+1])
        e2_idx = text.index(pair[1])
        text.insert(e2_idx, '<E2>')
        boxes.insert(e2_idx, boxes[e2_idx])
        text.insert(e2_idx+2, '</E2>')
        boxes.insert(e2_idx+2, boxes[e2_idx+1])
        pair = (text.index('<E1>'), text.index('<E2>'))
        return text, boxes, pair
        
    def get_overflowing_images(self, images, overflow_to_sample_mapping):
        # in case there's an overflow, ensure each `input_ids` sample is mapped to its corresponding image
        images_with_overflow = []
        for sample_idx in overflow_to_sample_mapping:
            images_with_overflow.append(images[sample_idx])

        if len(images_with_overflow) != len(overflow_to_sample_mapping):
            raise ValueError(
                "Expected length of images to be the same as the length of `overflow_to_sample_mapping`, but got"
                f" {len(images_with_overflow)} and {len(overflow_to_sample_mapping)}"
            )

        return images_with_overflow

    def batch_decode(self, *args, **kwargs):
        """
        This method forwards all its arguments to PreTrainedTokenizer's [`~PreTrainedTokenizer.batch_decode`]. Please
        refer to the docstring of this method for more information.
        """
        return self.tokenizer.batch_decode(*args, **kwargs)

    def decode(self, *args, **kwargs):
        """
        This method forwards all its arguments to PreTrainedTokenizer's [`~PreTrainedTokenizer.decode`]. Please refer
        to the docstring of this method for more information.
        """
        return self.tokenizer.decode(*args, **kwargs)

    @property
    def model_input_names(self):
        return ["input_ids", "bbox", "attention_mask", "pixel_values"]

### test process

In [48]:
from transformers import AutoTokenizer
tokenizer = AutoTokenizer.from_pretrained("microsoft/layoutlmv3-base", cache_dir = cachedir)
# data.push_to_hub('Hantao/ChemProcessed')

print(example.keys())

dict_keys(['ocr-path', 'relation-pair', 'ocr-token', 'ocr-bbox', 'image_path', 'image', 'pair', 'class-label', 'e1_e2'])


In [34]:
token_list = [
    '<E1>',
    '<E2>',
    '</E1>',
    '</E2>',
]
tokenizer.add_tokens(token_list)
image = data[0]['image']
data[0]['ocr-bbox']

[[[299.0, 41.0], [317.0, 41.0], [317.0, 53.0], [299.0, 53.0]],
 [[299.0, 83.0], [346.0, 85.0], [346.0, 101.0], [299.0, 98.0]],
 [[231.0, 171.0], [295.0, 171.0], [295.0, 188.0], [231.0, 188.0]],
 [[351.0, 170.0], [411.0, 167.0], [412.0, 185.0], [352.0, 189.0]],
 [[3.0, 216.0], [26.0, 216.0], [26.0, 227.0], [3.0, 227.0]],
 [[429.0, 215.0], [453.0, 215.0], [453.0, 227.0], [429.0, 227.0]],
 [[1.0, 244.0], [208.0, 244.0], [208.0, 258.0], [1.0, 258.0]],
 [[453.0, 244.0], [614.0, 244.0], [614.0, 258.0], [453.0, 258.0]]]

In [35]:
from detectron2.data.detection_utils import read_image, convert_PIL_to_numpy
from transformers.image_utils import infer_channel_dimension_format
vec = convert_PIL_to_numpy(image, None)
# infer_channel_dimension_format(vec) # cause some bug here
vec.ndim
vec.shape[2]
new_image = image.convert('RGB')
vec = convert_PIL_to_numpy(new_image, None)
infer_channel_dimension_format(vec)

<ChannelDimension.LAST: 'channels_last'>

In [46]:

LayoutLMREFeatureExtractor = LayoutLMREImageProcessor
extractor = LayoutLMREFeatureExtractor()
example = data[1]
extractor.apply_ocr = False
processor = LayoutLMREProcessor(feature_extractor = extractor, tokenizer = tokenizer)
coded = processor(
    images = example['image'].convert('RGB'),
    text = example['ocr-token'],
    boxes = example['ocr-bbox'],
    pair = example['pair'],
    return_tensors='pt',
)

In [47]:
coded

{'input_ids': tensor([[    0,  1368,   705,  8765,  3411, 44174,  7561, 44174,   510,  2744,
         31528, 31528, 50266, 24892,  4306, 10159, 39305,   890,  1949,  1640,
           642,   611,   462,  1949, 50268, 50265, 36846, 39305,   890,  1949,
          1640,   611,   462,  1949,    43, 50267,     2]]), 'attention_mask': tensor([[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
         1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]]), 'bbox': tensor([[[  0.,   0.,   0.,   0.],
         [299.,  41., 317.,  53.],
         [299.,  41., 317.,  53.],
         [299.,  85., 346.,  98.],
         [299.,  85., 346.,  98.],
         [231., 171., 295., 188.],
         [231., 171., 295., 188.],
         [351., 167., 412., 189.],
         [351., 167., 412., 189.],
         [351., 167., 412., 189.],
         [  3., 216.,  26., 227.],
         [429., 215., 453., 227.],
         [  1., 244., 208., 258.],
         [  1., 244., 208., 258.],
         [  1., 244., 208., 258.],
     

## datamatching learned from plkmo/BRE

So after a long time search and using `debugpy` to step by step learn the proram, the first complicated one was just a wrong guessing lol


In [38]:
e1_start_id = tokenizer.convert_tokens_to_ids('<E1>')
e2_start_id = tokenizer.convert_tokens_to_ids('<E2>')
text = example['ocr-token']
print(example.keys())
print(example['pair'])
print(text.index(example['pair'][0]))
# text.insert(text.index(example['pair'][0]), '<E1>' )
print(example['ocr-token'])

dict_keys(['ocr-path', 'relation-pair', 'ocr-token', 'ocr-bbox', 'image_path', 'image', 'pair', 'class-label', 'e1_e2'])
['chlorophyllide(chlide)', 'protochlorophyllide(pchlide']
7
['hv', 'LPOR', 'NADPH', 'NADP+', 'HO', 'HO', 'protochlorophyllide(pchlide', 'chlorophyllide(chlide)']


# train and loss computations

## return data

In [None]:
from dataclasses import dataclass
from typing import Optional, Tuple

import torch

from transformers.file_utils import ModelOutput



class BaseModelOutput(ModelOutput):
    """
    Base class for model's outputs, with potential hidden states and attentions.

    Args:
        last_hidden_state (:obj:`torch.FloatTensor` of shape :obj:`(batch_size, sequence_length, hidden_size)`):
            Sequence of hidden-states at the output of the last layer of the model.
        hidden_states (:obj:`tuple(torch.FloatTensor)`, `optional`, returned when ``output_hidden_states=True`` is passed or when ``config.output_hidden_states=True``):
            Tuple of :obj:`torch.FloatTensor` (one for the output of the embeddings + one for the output of each layer)
            of shape :obj:`(batch_size, sequence_length, hidden_size)`.

            Hidden-states of the model at the output of each layer plus the initial embedding outputs.
        attentions (:obj:`tuple(torch.FloatTensor)`, `optional`, returned when ``output_attentions=True`` is passed or when ``config.output_attentions=True``):
            Tuple of :obj:`torch.FloatTensor` (one for each layer) of shape :obj:`(batch_size, num_heads,
            sequence_length, sequence_length)`.

            Attentions weights after the attention softmax, used to compute the weighted average in the self-attention
            heads.
    """

    last_hidden_state: torch.FloatTensor = None
    hidden_states: Optional[Tuple[torch.FloatTensor]] = None
    attentions: Optional[Tuple[torch.FloatTensor]] = None
    loss: Optional[torch.FloatTensor] = None,
    

## model writings

In [None]:
import math
from typing import Optional, Tuple, Union


import math
from typing import Optional, Tuple, Union

import torch
import torch.utils.checkpoint
from torch import nn
from torch.nn import BCEWithLogitsLoss, CrossEntropyLoss, MSELoss

from transformers.activations import ACT2FN
from transformers.modeling_outputs import (
    BaseModelOutputWithPastAndCrossAttentions,
    BaseModelOutputWithPoolingAndCrossAttentions,
    MaskedLMOutput,
    QuestionAnsweringModelOutput,
    SequenceClassifierOutput,
    TokenClassifierOutput,
)
from transformers.modeling_utils import PreTrainedModel
from transformers.pytorch_utils import apply_chunking_to_forward, find_pruneable_heads_and_indices, prune_linear_layer
from transformers.utils import add_start_docstrings, add_start_docstrings_to_model_forward, logging, replace_return_docstrings
from transformers import LayoutLMConfig


logger = logging.get_logger(__name__)

_CONFIG_FOR_DOC = "LayoutLMConfig"
_CHECKPOINT_FOR_DOC = "microsoft/layoutlm-base-uncased"

LAYOUTLM_PRETRAINED_MODEL_ARCHIVE_LIST = [
    "layoutlm-base-uncased",
    "layoutlm-large-uncased",
]


LayoutLMLayerNorm = nn.LayerNorm

from transformers import LayoutLMPreTrainedModel, LayoutLMModel
from transformers.utils import (
    add_start_docstrings, add_start_docstrings_to_model_forward, logging, replace_return_docstrings
)

class LayoutLMRE(LayoutLMPreTrainedModel):
    def __init__(self, config):
        super().__init__(config)
        self.num_labels = config.num_labels
        self.layoutlm = LayoutLMModel(config)
        self.dropout = nn.Dropout(config.hidden_dropout_prob)
        self.classifier = nn.Linear(config.hidden_size, config.num_labels)
        self.task = 'classification'

        # Initialize weights and apply final processing
        self.post_init()
        self.criterion = Two_Headed_Loss(lm_ignore_idx=config.pad_token_id, use_logits=True, normalize=False)
    def get_input_embeddings(self):
        return self.layoutlm.embeddings.word_embeddings

    # @add_start_docstrings_to_model_forward(LAYOUTLM_INPUTS_DOCSTRING.format("batch_size, sequence_length"))
    @replace_return_docstrings(output_type=TokenClassifierOutput, config_class=_CONFIG_FOR_DOC)
    def forward(
        self,
        input_ids: Optional[torch.LongTensor] = None,
        bbox: Optional[torch.LongTensor] = None,
        attention_mask: Optional[torch.FloatTensor] = None,
        token_type_ids: Optional[torch.LongTensor] = None,
        position_ids: Optional[torch.LongTensor] = None,
        head_mask: Optional[torch.FloatTensor] = None,
        inputs_embeds: Optional[torch.FloatTensor] = None,
        labels: Optional[torch.LongTensor] = None,
        output_attentions: Optional[bool] = None,
        output_hidden_states: Optional[bool] = None,
        return_dict: Optional[bool] = None,
        e1_e2_start: Optional[int] = None,
        mask_id: int = 0,
    ) -> Union[Tuple, TokenClassifierOutput]:
        r"""
        labels (`torch.LongTensor` of shape `(batch_size, sequence_length)`, *optional*):
            Labels for computing the token classification loss. Indices should be in `[0, transformers., config.num_labels - 1]`.
        Returns:
        Examples:
        ```python
        >>> from transformers import AutoTokenizer, LayoutLMForTokenClassification
        >>> import torch
        >>> tokenizer = AutoTokenizer.from_pretrained("microsoft/layoutlm-base-uncased")
        >>> model = LayoutLMForTokenClassification.from_pretrained("microsoft/layoutlm-base-uncased")
        >>> words = ["Hello", "world"]
        >>> normalized_word_boxes = [637, 773, 693, 782], [698, 773, 733, 782]
        >>> token_boxes = []
        >>> for word, box in zip(words, normalized_word_boxes):
        transformers.     word_tokens = tokenizer.tokenize(word)
        transformers.     token_boxes.extend([box] * len(word_tokens))
        >>> # add bounding boxes of cls + sep tokens
        >>> token_boxes = [[0, 0, 0, 0]] + token_boxes + [[1000, 1000, 1000, 1000]]
        >>> encoding = tokenizer(" ".join(words), return_tensors="pt")
        >>> input_ids = encoding["input_ids"]
        >>> attention_mask = encoding["attention_mask"]
        >>> token_type_ids = encoding["token_type_ids"]
        >>> bbox = torch.tensor([token_boxes])
        >>> token_labels = torch.tensor([1, 1, 0, 0]).unsqueeze(0)  # batch size of 1
        >>> outputs = model(
        transformers.     input_ids=input_ids,
        transformers.     bbox=bbox,
        transformers.     attention_mask=attention_mask,
        transformers.     token_type_ids=token_type_ids,
        transformers.     labels=token_labels,
        transformers. )
        >>> loss = outputs.loss
        >>> logits = outputs.logits
        ```"""
        return_dict = return_dict if return_dict is not None else self.config.use_return_dict

        outputs = self.layoutlm(
            input_ids=input_ids,
            bbox=bbox,
            attention_mask=attention_mask,
            token_type_ids=token_type_ids,
            position_ids=position_ids,
            head_mask=head_mask,
            inputs_embeds=inputs_embeds,
            output_attentions=output_attentions,
            output_hidden_states=output_hidden_states,
            return_dict=return_dict,
        )

        sequence_output = outputs[0]
        blankv1v2 = sequence_output[:, e1_e2_start, :]
        buffer = []
        for i in range(blankv1v2.shape[0]): # iterate batch & collect
            v1v2 = blankv1v2[i, i, :, :]
            v1v2 = torch.cat((v1v2[0], v1v2[1]))
            buffer.append(v1v2)
        del blankv1v2
        v1v2 = torch.stack([a for a in buffer], dim=0)
        del buffer
        
        loss = None
        if self.task is None:
            blanks_logits = self.activation(v1v2) # self.blanks_linear(- torch.log(Q)
            lm_logits = self.cls(sequence_output)
            lm_logits = lm_logits[(input_ids == mask_id)]
            x, masked_for_pred, e1_e2_start, _, blank_labels, _,_,_,_,_ = data
            
            
            loss = self.criterion(lm_logits, blanks_logits, masked_for_pred, blank_labels, verbose=verbose)
            # return blanks_logits, lm_logits

        classification_logits = self.classifier(v1v2)



        loss = None
        if labels is not None:
            loss_fct = CrossEntropyLoss()
            loss = loss_fct(logits.view(-1, self.num_labels), labels.view(-1))

        if not return_dict:
            output = (logits,) + outputs[2:]
            return ((loss,) + output) if loss is not None else output

        