# [가제] Transformers의 데이터파이프라인

최근 Huggingface(🤗)의 transformers가 NLP의 democratization을 이끌고 있다. transformers는 SoTA 모델을 가장 쉽고 빠르게 모든 사람이 쓸 수 있도록 만들어졌다. 하지만, transformers의 간단함의 감동을 하고, 막상 실제 문제를 풀려고 하면, 복잡하고 추상화된 코드 구조 때문에 어려움을 겪는 경우가 많다.(물론 버전이 올라가면서 코드가 엄청나가 간소화되고 있다.)

특히, 가장 고민해야되는 부분이 데이터파이프라인이다. processor, convert_examples_to_features를 처음 보면 뭐지?라는 생각이 든다. 이에 대한 정리를 하겠다.

---

## transformers의 데이터 파이프라인 이해하기
허깅페이스의 transformers의 코드를 보면, PLM(Pretrained Language Model)을 학습할 때, 어떤 부분을 사람들이 고민해왔는지 간접적으로 이해할 수 있습니다. 특히, RNN 계열의 모델링을 해왔던 사람들에게 transformers의 `Dataset`은 처음 보기엔 생소하고 너무 복잡할 수 있습니다. 하지만 실제 코드를 하나씩 뜯어보면 PLM을 학습할 때, 데이터 파이프라인 측면에서 어떤 점을 고민하는지 이해할 수 있습니다. 

### Input 데이터에 대한 표준화
transformers는 데이터 파이프라인을 표준화하기 위해서 `InputExample`과 `InputFeatures`라는 데이터클래스를 정의했습니다. 코드를 살펴보면 아래와 같습니다.

- **`InputExample`**
    - Raw 데이터를 다루는 역할을 합니다.
    - 일반적으로 text, token classification 문제와 같이 single sequence 데이터에 대한 문제를 해결할 경우 `text_a`만 사용합니다.
    - 반면, STS, SQuad와 같이 pair형태의 sequence에 대한 문제를 해결할 경우, `text_a`와 `text_b`를 함께 사용합니다.
    
```python
@dataclass
class InputExample:
    """
    A single training/test example for simple sequence classification.
    Args:
        guid: Unique id for the example.
        text_a: string. The untokenized text of the first sequence. For single
            sequence tasks, only this sequence must be specified.
        text_b: (Optional) string. The untokenized text of the second sequence.
            Only must be specified for sequence pair tasks.
        label: (Optional) string. The label of the example. This should be
            specified for train and dev examples, but not for test examples.
    """

    guid: str
    text_a: str
    text_b: Optional[str] = None
    label: Optional[str] = None

    def to_json_string(self):
        """Serializes this instance to a JSON string."""
        return json.dumps(dataclasses.asdict(self), indent=2) + "\n"

```


- **`InputFeatures`**
    - Feature화된 데이터를 다루는 역할을 합니다.
        - NLP에서 가장 대표적인 Feature화는 tokenization입니다.

```python
@dataclass(frozen=True)
class InputFeatures:
    """
    A single set of features of data.
    Property names are the same names as the corresponding inputs to a model.
    Args:
        input_ids: Indices of input sequence tokens in the vocabulary.
        attention_mask: Mask to avoid performing attention on padding token indices.
            Mask values selected in ``[0, 1]``:
            Usually  ``1`` for tokens that are NOT MASKED, ``0`` for MASKED (padded) tokens.
        token_type_ids: (Optional) Segment token indices to indicate first and second
            portions of the inputs. Only some models use them.
        label: (Optional) Label corresponding to the input. Int for classification problems,
            float for regression problems.
    """

    input_ids: List[int]
    attention_mask: Optional[List[int]] = None
    token_type_ids: Optional[List[int]] = None
    label: Optional[Union[int, float]] = None

    def to_json_string(self):
        """Serializes this instance to a JSON string."""
        return json.dumps(dataclasses.asdict(self)) + "\n"
```

위 코드를 살펴보면, 공통적으로 `to_json_string`이라는 메서드가 존재함을 확인할 수 있습니다. 이는 tokenization을 적용한 데이터를 캐싱하여 사용하기 위함입니다.





In [3]:
import dataclasses
import json

In [5]:
from dataclasses import dataclass

In [7]:
from typing import List, Optional, Union

In [8]:
@dataclass
class InputExample:
    """
    A single training/test example for simple sequence classification.
    Args:
        guid: Unique id for the example.
        text_a: string. The untokenized text of the first sequence. For single
            sequence tasks, only this sequence must be specified.
        text_b: (Optional) string. The untokenized text of the second sequence.
            Only must be specified for sequence pair tasks.
        label: (Optional) string. The label of the example. This should be
            specified for train and dev examples, but not for test examples.
    """

    guid: str
    text_a: str
    text_b: Optional[str] = None
    label: Optional[str] = None

    def to_json_string(self):
        """Serializes this instance to a JSON string."""
        return json.dumps(dataclasses.asdict(self), indent=2) + "\n"

#### `InputExample` 와 `InputFeatures`
`InputExample`과 `InputFeatures`는 PLM의 input 데이터를 다루는데 특화된 dataclass임.

- `InputExample`은 PLM과 관련된 raw 형태의 데이터를 다루는데 적합함.
    - Pair 형태의 데이터를 받는 것이 일반적
    - Pa
    - 많은 PLM이 



In [None]:

- `InputFeatures`은 InputExample에서 Feature화된 형태의 데이터임.
    - NLP에서 Feature화란 tokenization과 index화를 통해 PLM 모델의 input 형태로 변환해주는 것임.
    - 또한, attention_mask와 Segments와 관련된 ...


In [None]:
from transformers.data.processors.utils import InputExample, InputFeatures

In [None]:
`InputExample`, `InputFeatures`, `DataProcessor`


<!-- ### 주요특징
- `processor`라는 객체를 통해 data를 read함.
- `***_convert_examples_to_features`라는 함수를 통해 raw_data를 feature화함.
    - NLP에서 feature화라는 것은 tokenization과 거의 동치임.
 -->

In [1]:
from transformers import InputExample, InputFeatures, DataProcessor

In [None]:
class GlueDataset(Dataset):
    """
    This will be superseded by a framework-agnostic approach
    soon.
    """

    args: GlueDataTrainingArguments
    output_mode: str
    features: List[InputFeatures]

    def __init__(
        self,
        args: GlueDataTrainingArguments,
        tokenizer: PreTrainedTokenizer,
        limit_length: Optional[int] = None,
        mode: Union[str, Split] = Split.train,
        cache_dir: Optional[str] = None,
    ):
        self.args = args
        self.processor = glue_processors[args.task_name]()
        self.output_mode = glue_output_modes[args.task_name]
        if isinstance(mode, str):
            try:
                mode = Split[mode]
            except KeyError:
                raise KeyError("mode is not a valid split name")
        # Load data features from cache or dataset file
        cached_features_file = os.path.join(
            cache_dir if cache_dir is not None else args.data_dir,
            "cached_{}_{}_{}_{}".format(
                mode.value, tokenizer.__class__.__name__, str(args.max_seq_length), args.task_name,
            ),
        )
        label_list = self.processor.get_labels()
        if args.task_name in ["mnli", "mnli-mm"] and tokenizer.__class__ in (
            RobertaTokenizer,
            RobertaTokenizerFast,
            XLMRobertaTokenizer,
            BartTokenizer,
            BartTokenizerFast,
        ):
            # HACK(label indices are swapped in RoBERTa pretrained model)
            label_list[1], label_list[2] = label_list[2], label_list[1]
        self.label_list = label_list

        # Make sure only the first process in distributed training processes the dataset,
        # and the others will use the cache.
        lock_path = cached_features_file + ".lock"
        with FileLock(lock_path):

            if os.path.exists(cached_features_file) and not args.overwrite_cache:
                start = time.time()
                self.features = torch.load(cached_features_file)
                logger.info(
                    f"Loading features from cached file {cached_features_file} [took %.3f s]", time.time() - start
                )
            else:
                logger.info(f"Creating features from dataset file at {args.data_dir}")

                if mode == Split.dev:
                    examples = self.processor.get_dev_examples(args.data_dir)
                elif mode == Split.test:
                    examples = self.processor.get_test_examples(args.data_dir)
                else:
                    examples = self.processor.get_train_examples(args.data_dir)
                if limit_length is not None:
                    examples = examples[:limit_length]
                self.features = glue_convert_examples_to_features(
                    examples,
                    tokenizer,
                    max_length=args.max_seq_length,
                    label_list=label_list,
                    output_mode=self.output_mode,
                )
                start = time.time()
                torch.save(self.features, cached_features_file)
                # ^ This seems to take a lot of time so I want to investigate why and how we can improve.
                logger.info(
                    "Saving features into cached file %s [took %.3f s]", cached_features_file, time.time() - start
                )

    def __len__(self):
        return len(self.features)

    def __getitem__(self, i) -> InputFeatures:
        return self.features[i]

    def get_labels(self):
        return self.label_list