In [None]:
!pip install git+https://github.com/cosmoquester/transformers-tf-finetune.git

In [None]:
import json
import random
import urllib.request
from typing import Dict

import tensorflow as tf
from transformers import AdamWeightDecay, AutoTokenizer, TFAutoModelForSequenceClassification

from transformers_tf_finetune.utils import (
    LRScheduler,
    get_device_strategy,
    path_join,
    set_random_seed,
    tfbart_sequence_classifier_to_transformers,
)

In [None]:
tfbart_sequence_classifier_to_transformers()

# Config

In [None]:
#: transformers pretrained path
pretrained_model = "cosmoquester/bart-ko-small"
#: pretrained tokenizer fast pretrained path
pretrained_tokenizer = "cosmoquester/bart-ko-small"
#: load from pytorch weight
from_pytorch = False
#: use huggingface credential for private model
use_auth_token = ""

train_dataset_path = "https://raw.githubusercontent.com/KLUE-benchmark/KLUE/main/klue_benchmark/klue-nli-v1.1/klue-nli-v1.1_train.json"
dev_dataset_path = "https://raw.githubusercontent.com/KLUE-benchmark/KLUE/main/klue_benchmark/klue-nli-v1.1/klue-nli-v1.1_dev.json"
#: output directory to save log and model checkpoints, should be GCS path with TPU
output_path = None

#: training params
epochs = 10
learning_rate = 5e-5
min_learning_rate = 1e-5
warmup_rate = 0.06
warmup_steps = None
batch_size = 128
dev_batch_size = 512
num_valid_dataset = 3000
tensorboard_update_freq = 1

#: device to use (TPU or GPU or CPU)
device = "TPU"
#: Use mixed precision FP16
mixed_precision = False
#: Set random seed
seed = None

In [None]:
if output_path is not None and output_path.startswith("gs://"):
  from google.colab import auth
  auth.authenticate_user()

In [None]:
def load_dataset(
    dataset_path: str, tokenizer: AutoTokenizer, label2id: Dict[str, int], shuffle: bool = False
) -> tf.data.Dataset:
    """
    Load KLUE NLI dataset from local file or web

    :param dataset_path: local file path or file uri
    :param tokenizer: PreTrainedTokenizer for tokenizing
    :param label2id: dictionary for mapping label to index
    :param shuffle: whether shuffling lines or not
    :returns: KLUE NLI dataset, number of dataset
    """
    if dataset_path.startswith("https://"):
        with urllib.request.urlopen(dataset_path) as response:
            data = response.read().decode("utf-8")
    else:
        with open(dataset_path) as f:
            data = f.read()
    examples = json.loads(data)
    if shuffle:
        random.shuffle(examples)

    start_token = tokenizer.bos_token or tokenizer.cls_token or ""
    end_token = tokenizer.eos_token or tokenizer.sep_token or ""
    sep = tokenizer.sep_token or tokenizer.eos_token

    sentences = []
    labels = []
    for example in examples:
        sentences.append(start_token + example["premise"] + sep + example["hypothesis"] + end_token)
        labels.append(label2id[example["gold_label"]])

    inputs = dict(
        tokenizer(
            sentences,
            padding=True,
            return_tensors="tf",
            return_token_type_ids=False,
            return_attention_mask=True,
        )
    )

    dataset = tf.data.Dataset.from_tensor_slices((inputs, labels))
    return dataset

In [None]:
if seed:
    set_random_seed(seed)

In [None]:
strategy = get_device_strategy(device)

# Mixed Precision

In [None]:
with strategy.scope():
    if mixed_precision:
        mixed_type = "mixed_bfloat16" if device == "TPU" else "mixed_float16"
        policy = tf.keras.mixed_precision.experimental.Policy(mixed_type)
        tf.keras.mixed_precision.experimental.set_policy(policy)

# Load Dataset

In [None]:
with strategy.scope():
    tokenizer = AutoTokenizer.from_pretrained(pretrained_tokenizer, use_auth_token=use_auth_token)

    label2id = {"neutral": 0, "entailment": 1, "contradiction": 2}
    dataset = load_dataset(train_dataset_path, tokenizer, label2id, True)
    train_dataset = dataset.skip(num_valid_dataset).batch(batch_size)
    valid_dataset = dataset.take(num_valid_dataset).batch(dev_batch_size)
    dev_dataset = load_dataset(dev_dataset_path, tokenizer, label2id).batch(dev_batch_size)

# Load Model

In [None]:
with strategy.scope():
    model = TFAutoModelForSequenceClassification.from_pretrained(
        pretrained_model,
        num_labels=len(label2id),
        use_auth_token=use_auth_token,
        from_pt=from_pytorch,
    )
    model.config.id2label = {v: k for k, v in label2id.items()}
    model.config.label2id = label2id

# Model Compile

In [None]:
with strategy.scope():
    outputs = model(tf.keras.Input([None], dtype=tf.int32), return_dict=True)
    training_model = tf.keras.Model({"input_ids": model.input}, outputs.logits)
    training_model.compile(
        optimizer=AdamWeightDecay(
            LRScheduler(
                len(train_dataset) * epochs,
                learning_rate,
                min_learning_rate,
                warmup_rate,
                warmup_steps,
            ),
            weight_decay_rate=0.01,
            exclude_from_weight_decay=["LayerNorm", "layer_norm", "bias"],
        ),
        loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
        metrics=[tf.keras.metrics.SparseCategoricalAccuracy(name="accuracy")],
    )

# Model Training

In [None]:
with strategy.scope():
    training_model.fit(
        train_dataset,
        validation_data=valid_dataset,
        epochs=epochs,
        callbacks=[
            tf.keras.callbacks.ModelCheckpoint(
                path_join(output_path, "best_model.ckpt"),
                save_weights_only=True,
                save_best_only=True,
                monitor="val_accuracy",
                mode="max",
                verbose=1,
            ),
            tf.keras.callbacks.TensorBoard(
                path_join(output_path, "logs"), update_freq=tensorboard_update_freq
            ),
        ] if output_path is not None else None,
    )

# Model Evaluate

In [None]:
with strategy.scope():
    loss, accuracy = training_model.evaluate(dev_dataset)