# How to make your custom Data Pipeline for 🤗 transformers
transformers has their own style of Data Pipeline. When you want to implement their model to solve your own NLP problem, you should build your own custom data pipeline. 

---

## Introduction of 🤗's Data pipeline
**Main Consideration of 🤗's Data Pipeline**
- Uniform Interface for Multiple training set. (e.g. Glue task)
- Uniform Interface for Multiple Pretrained Language Model. (e.g. RoBerta vs Bert)
- Make sure only the first process in distributed training processes the dataset, and the others will use the cache.

<br>

### Step 1. 🤗's core data structure.
- 🤗 introduce their own data structure which is `InputExample` and `InputFeatures`.
- It is defined by dataclass.

#### 1.1) `InputExample`

```python
@dataclass
class InputExample:
    
    guid: str
    text_a: str
    text_b: Optional[str] = None
    label: Optional[str] = None

    def to_json_string(self):
        """Serializes this instance to a JSON string."""
        return json.dumps(dataclasses.asdict(self), indent=2) + "\n"
```

- The base unit to store **1 raw data**
    - **guid** : Id of document
    - **text_a** : string. The untokenized text of the first sequence. For single
            sequence tasks, only this sequence must be specified.
    - **text_b** :(Optional) string. The untokenized text of the second sequence.
            Only must be specified for sequence pair tasks.
    - **label** : (Optional) string. The label of the example. This should be
            specified for train and dev examples, but not for test examples.
            

- It can be Seriealized to a Json String. (for caching)

In [1]:
#### The usage of InputExample
from transformers.data.processors.utils import InputExample

# Sentiment Analysis
guid = 42
text = "This foundation is HORRIBLE i dont know why i keep seeing positive reviews."
labels = "NEGATIVE"

# define InputExample object
example = InputExample(guid=guid, text_a=text, label=labels)
print(example.text_a)
print(example.label)

  _np_qint8 = np.dtype([("qint8", np.int8, 1)])
  _np_quint8 = np.dtype([("quint8", np.uint8, 1)])
  _np_qint16 = np.dtype([("qint16", np.int16, 1)])
  _np_quint16 = np.dtype([("quint16", np.uint16, 1)])
  _np_qint32 = np.dtype([("qint32", np.int32, 1)])
  np_resource = np.dtype([("resource", np.ubyte, 1)])


This foundation is HORRIBLE i dont know why i keep seeing positive reviews.
NEGATIVE


<br>

#### 1.2) `InputFeatures`

```python
@dataclass(frozen=True)
class InputFeatures:

    input_ids: List[int]
    attention_mask: Optional[List[int]] = None
    token_type_ids: Optional[List[int]] = None
    label: Optional[Union[int, float]] = None

    def to_json_string(self):
        """Serializes this instance to a JSON string."""
        return json.dumps(dataclasses.asdict(self)) + "\n"
```

- The base unit to store **1 featured data** (text is tokenized and transformed for Inputs)
    - **input_ids** : Indices of input sequence tokens in the vocabulary.
    - **attention_mask** : Mask to avoid performing attention on padding token indices. Usually  ``1`` for tokens that are NOT MASKED, ``0`` for MASKED (padded) tokens.
    - **token_type_ids** : (Optional) Segment token indices to indicate first and second
            portions of the inputs. Only some models use them.
    - **label** : (Optional) Label corresponding to the input. Int for classification problems, float for regression problems.
    
    

- It can be Seriealized to a Json String. (for caching)

In [2]:
#### The usage of InputFeatures
from transformers.data.processors.utils import InputFeatures

# load tokenizer
from transformers import AutoTokenizer
tokenizer = AutoTokenizer.from_pretrained('bert-base-cased')

# encode text
encoded_input = tokenizer.encode_plus(example.text_a)

# define InputFeatures object
feature = InputFeatures(
    input_ids = encoded_input['input_ids'],
    attention_mask = encoded_input['attention_mask'],
    token_type_ids = encoded_input['token_type_ids'],
    label = labels
)

print(f"input_ids -> {feature.input_ids}")
print(f"attention_mask -> {feature.attention_mask}")
print(f"token_type_ids -> {feature.token_type_ids}")
print(f"label -> {feature.label}")

input_ids -> [101, 1188, 4686, 1110, 145, 9565, 20595, 13360, 2036, 178, 1274, 1204, 1221, 1725, 178, 1712, 3195, 3112, 3761, 119, 102]
attention_mask -> [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]
token_type_ids -> [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
label -> NEGATIVE


<br>

### Step 2. Code your own pipeline with `Processor` and `convert_examples_to_features`

#### 2.1) `Processor`

🤗 offers `DataProcessor`.

**The Features of Processor**
- Read Data from the file by train, dev, test and return `InputExample`.
- You can get the label_list by `self.get_labels()`.
- Subclass DataProcessor and overwrite your own method or property.
- You can use Processor for TF similar manner.

```python
class DataProcessor:
    """Base class for data converters for sequence classification data sets."""

    def get_example_from_tensor_dict(self, tensor_dict):
        """Gets an example from a dict with tensorflow tensors.
        Args:
            tensor_dict: Keys and values should match the corresponding Glue
                tensorflow_dataset examples.
        """
        raise NotImplementedError()

    def get_train_examples(self, data_dir):
        """Gets a collection of :class:`InputExample` for the train set."""
        raise NotImplementedError()

    def get_dev_examples(self, data_dir):
        """Gets a collection of :class:`InputExample` for the dev set."""
        raise NotImplementedError()

    def get_test_examples(self, data_dir):
        """Gets a collection of :class:`InputExample` for the test set."""
        raise NotImplementedError()

    def get_labels(self):
        """Gets the list of labels for this data set."""
        raise NotImplementedError()

    def tfds_map(self, example):
        """Some tensorflow_datasets datasets are not formatted the same way the GLUE datasets are.
        This method converts examples to the correct format."""
        if len(self.get_labels()) > 1:
            example.label = self.get_labels()[int(example.label)]
        return example

    @classmethod
    def _read_tsv(cls, input_file, quotechar=None):
        """Reads a tab separated value file."""
        with open(input_file, "r", encoding="utf-8-sig") as f:
            return list(csv.reader(f, delimiter="\t", quotechar=quotechar))
        
```

**Ex) Processor for Text classification**
- Sampled AmazonReview Dataset.
    - Sampled 1% from `torchtext.datasets.AmazonReviewPolarity` dataset.
- `get_*_examples()` methods are for hold-out validation.
    - Prepare your dataset splitted by train, dev, test set.
    - For fine-tuning, tsv format is recommended.
- With `get_labels`, you can get the label_list.
- Possible customization.
    - text preprocessing
    - cross-validation
    - and others.

In [3]:
import os
import re
import emoji
from transformers import InputExample, DataProcessor

class AmazonReviewSentimentAnalysisProcessor(DataProcessor):
    def __init__(self):
        emojis = ''.join(emoji.UNICODE_EMOJI.keys())
        self.eng = re.compile(f'[^ .,?!/@$%~％·∼()\x00-\x7Fa-zA-Z{emojis}]+')
        self.url = re.compile(
    r'https?:\/\/(www\.)?[-a-zA-Z0-9@:%._\+~#=]{1,256}\.[a-zA-Z0-9()]{1,6}\b([-a-zA-Z0-9()@:%_\+.~#?&//=]*)')
    
    """Processor for the Amazon Review Sentiment Analysis Data."""
    def get_train_examples(self, data_dir):
        """[Overwrite] Gets a collection of :class:`InputExample` for the train set."""
        return self._create_examples(self._read_tsv(os.path.join(data_dir, "train.tsv")), "train")

    def get_dev_examples(self, data_dir):
        """[Overwrite] Gets a collection of :class:`InputExample` for the dev set."""
        return self._create_examples(self._read_tsv(os.path.join(data_dir, "dev.tsv")), "dev")
    
    def get_test_examples(self, data_dir):
        """[Overwrite] Gets a collection of :class:`InputExample` for the test set."""
        return self._create_examples(self._read_tsv(os.path.join(data_dir, "test.tsv")), "test")
        
    def get_labels(self):
        """[Overwrite] Gets the list of labels for this data set."""
        return ["NEGATIVE", "POSITIVE"]
    
    def _create_examples(self, lines, set_type):
        """[Custom] Read dataset and return InputExample"""
        examples = []
        for (i, line) in enumerate(lines):
            guid = "%s-%s" % (set_type, i)
            text_a = self._preprocess(line[1])
            label = line[0]
            examples.append(InputExample(guid=guid, text_a=text_a, text_b=None, label=label))
        return examples
    
    def _preprocess(self, string):
        """[Custom] Preprocessing raw data with regular expression"""
        if type(string)==str:
            string = self.url.sub(" ", string)
            string = self.eng.sub(" ", string)
            return string
        else:
            return string

In [4]:
# define processor
data_dir = 'sample' # this could be assigned by argparse.
processor = AmazonReviewSentimentAnalysisProcessor()

# labels
label_list = processor.get_labels()
print(label_list)

['NEGATIVE', 'POSITIVE']


In [5]:
# examples
train_examples = processor.get_train_examples(data_dir)
print(f"# of train set {len(train_examples)}")
dev_examples = processor.get_dev_examples(data_dir)
print(f"# of dev set {len(dev_examples)}")
test_examples = processor.get_test_examples(data_dir)
print(f"# of test set {len(test_examples)}")

# of train set 28800
# of dev set 3600
# of test set 3600


In [6]:
example = train_examples[42]
print(example.guid)
print(example.text_a)
print(example.label)

train-42
"A friend gave me a copy of ""Dresden"" and I watched it many times before giving it back and ordering my own copy. I watched the movie several times for different reasons: 1. History, 2. View from the German side, 3. View from the British side, 4. Acting skills. I love the movie, am impressed with F. Woll's in the main role. I think some scenes were too Hollywood, loved the honesty of the script, not too much violence, but one can start to feel the horror of the bombings and being trapped in cellars and on the ground by the devastating fires. I was a mistake, a big one, History was destroyed as it often is in wars. Did the Germans deserve this so much emotionally human reaction? Yes and No. Not only the Germans or the people of Dresden lost a jewel."
POSITIVE


<br>

#### 2.2) `convert_examples_to_features`
- Tokenize and indices InputExample to InputFeatures for input of PLM.

```python
def convert_examples_to_features(
        examples: List[InputExample],
        tokenizer: PreTrainedTokenizer,
        max_length: Optional[int] = None, # Maximum Sequence Length
        pad_on_left = False, # If set to ``True``, the examples will be padded on the left rather than on the right (default)
        pad_token = 0,
        mask_padding_with_zero = True, 
        return_tensors = None # if 'pt' torch.Tensor elif 'tf' else List[InputFeatures]
    ):
    
    ## tokenize and indices for inputs (consider your PLM inputs)
    
    ## transform labels for your modeling task (classification or regression or ??)

    return features
```

> [You can add convert_examples_to_features in the Processor class.](https://github.com/huggingface/transformers/blob/05810cd80a5ca83065e0dbe5335c030c4a435ddb/src/transformers/data/processors/utils.py#L124)

**Ex) convert_examples_to_features for Text classification**
- I highly recommend padding with the `data_collator` for saving your GPU memory.

In [28]:
from typing import List, Optional, Union
from transformers import AutoTokenizer, InputFeatures, PreTrainedTokenizer

tokenizer = AutoTokenizer.from_pretrained('bert-base-cased')

def convert_examples_to_features(
    examples: List[InputExample],
    tokenizer: PreTrainedTokenizer,
    max_length: Optional[int] = None,
):
    # Set max_len for tokenization
    if max_length is None:
        max_length = tokenizer.max_len
    
    # Define processor and labels
    processor = AmazonReviewSentimentAnalysisProcessor()
    
    label_list = processor.get_labels()
    label_map = {label: i for i, label in enumerate(label_list)}
    
    # transform features
    features = []
    for ex_index, example in enumerate(examples):
        
        tokenized_examples = tokenizer.encode_plus(example.text_a, max_length=max_length, truncation=True)
        
        input_ids = tokenized_examples['input_ids']
        token_type_ids = tokenized_examples['token_type_ids']
        attention_mask = tokenized_examples['attention_mask']
        
        label_map[example.label]
        
        features.append(
            InputFeatures(
                input_ids = input_ids, 
                attention_mask = attention_mask,
                token_type_ids = token_type_ids,
                label = label_map[example.label]
            )
        )
        if ex_index < 5:
            pass
            # define your own logger
#             logger.info("*** Example ***")
#             logger.info("guid: %s" % (example.guid))
#             logger.info("features: %s" % features[i])

    return features

In [36]:
# The example of test features
test_features = convert_examples_to_features(test_examples, tokenizer=tokenizer, max_length=64)
test_features[12]

InputFeatures(input_ids=[101, 1188, 3317, 1110, 1141, 1104, 1103, 1436, 2865, 16601, 146, 1138, 1518, 1215, 119, 1135, 19819, 1139, 2555, 1121, 5917, 1272, 1122, 1144, 170, 3528, 6440, 1104, 26701, 1134, 1209, 1145, 18055, 1155, 7920, 1191, 1128, 1132, 170, 179, 8032, 2895, 119, 1109, 1902, 1110, 1145, 1304, 3505, 132, 1122, 1144, 170, 1822, 10845, 1204, 1134, 2144, 112, 189, 2845, 1229, 1128, 102], attention_mask=[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1], token_type_ids=[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], label=1)

<br>

### Step 3. Define Dataset

It has been long journey for defining 🤗 data pipelines. 

Let's combine all things to define Dataset. (`torch.utils.data.dataset.Dataset`)

In [None]:
class BLueDataset(Dataset):
    def __init__(
        self,
        args: BlueDataTrainingArguments,
        tokenizer: PreTrainedTokenizer,
        limit_length: Optional[int] = None,
        evaluate=False,
    ):
        self.args = args
        processor = blue_processors[args.task_name]()
        self.output_mode = blue_output_modes[args.task_name]
        
        # Load data features from cache or dataset file
        cached_features_file = os.path.join(
            args.data_dir,
            "cached_{}_{}_{}_{}".format(
                "dev" if evaluate else "train", tokenizer.__class__.__name__, str(args.max_seq_length), args.task_name,
            ),
        )
        
        # Make sure only the first process in distributed training processes the dataset,
        # and the others will use the cache.
        lock_path = cached_features_file + ".lock"
        with FileLock(lock_path):

            if os.path.exists(cached_features_file) and not args.overwrite_cache:
                start = time.time()
                self.features = torch.load(cached_features_file)
                logger.info(
                    f"Loading features from cached file {cached_features_file} [took %.3f s]", time.time() - start
                )
            else:
                logger.info(f"Creating features from dataset file at {args.data_dir}")
                label_list = processor.get_labels()
                examples = (processor.get_dev_examples(args.data_dir) 
                            if evaluate 
                            else processor.get_train_examples(args.data_dir))
                if limit_length is not None:
                    examples = examples[:limit_length]
                self.features = _blue_convert_examples_to_features(
                    examples,
                    tokenizer,
                    max_length=args.max_seq_length,
                    label_list=label_list,
                    output_mode=self.output_mode,
                )
                start = time.time()
                torch.save(self.features, cached_features_file)
                # ^ This seems to take a lot of time so I want to investigate why and how we can improve.
                logger.info(
                    f"Saving features into cached file %s [took %.3f s]", cached_features_file, time.time() - start
                )
    
    def __len__(self):
        return len(self.features)

    def __getitem__(self, i) -> InputFeatures:
        return self.features[i]
    
    def get_labels(self):
        return self.label_list