# Component Test: Train Distributed with TFJob

## Author
- Sebastian Lehrig <sebastian.lehrig1@ibm.com>

## License
Apache-2.0 License

## Imports & Constants

In [None]:
# In case packages cannot be found, do:
# mamba uninstall transformers tokenizers
# pip install transformers tf_utils

In [None]:
from datasets import load_dataset
import kfp
import kfp.dsl as dsl
from minio import Minio
import tarfile
from transformers import AutoTokenizer
import yaml

%load_ext lab_black

In [None]:
DATASET_NAME = "glue-dataset"
DATASET_DIR = f"./{DATASET_NAME}"
DATASET_FILE = f"{DATASET_NAME}.tar.gz"
MODEL_NAME = "glue-model"
MODEL_FILE = f"{MODEL_NAME}.tar.gz"

# minio-service-kubeflow.apps.b2s001.pbm.ihost.com
MINIO_URL = "minio-service.kubeflow:9000"
MINIO_USER = "minio"
MINIO_PASS = "minio123"
DATASETS_BUCKET = "datasets"
MODELS_BUCKET = "models"

with open("/var/run/secrets/kubernetes.io/serviceaccount/namespace") as f:
    NAMESPACE = f.read()
NAMESPACE

## Client objects for interaction

In [None]:
kfp_client = kfp.Client()

In [None]:
minio_client = Minio(
    MINIO_URL, access_key=MINIO_USER, secret_key=MINIO_PASS, secure=False
)

## Load test data

In [None]:
dataset = load_dataset("glue", "mrpc", split="train")
dataset[0]

## Preprocess data

In [None]:
tokenizer = AutoTokenizer.from_pretrained("bert-base-cased")

In [None]:
def encode(examples):
    return tokenizer(
        examples["sentence1"],
        examples["sentence2"],
        truncation=True,
        padding="max_length",
    )


dataset = dataset.map(encode, batched=True)

In [None]:
dataset = dataset.map(lambda examples: {"labels": examples["label"]}, batched=True)

## Save data to disk & tar.gz it

In [None]:
def make_tarfile(output_filename, source_dir):
    with tarfile.open(output_filename, "w:gz") as tar:
        tar.add(source_dir, arcname=".")


dataset.save_to_disk(DATASET_DIR)
make_tarfile(DATASET_FILE, DATASET_DIR)

In [None]:
features = {x: dataset[x] for x in ["input_ids", "token_type_ids", "attention_mask"]}

## Upload data to MinIO

In [None]:
# Create datasets bucket if it does not yet exist
response = minio_client.list_buckets()
datasets_bucket_exists = False
for bucket in response:
    if bucket.name == DATASETS_BUCKET:
        datasets_bucket_exists = True

if not datasets_bucket_exists:
    minio_client.make_bucket(bucket_name=DATASETS_BUCKET)

minio_client.fput_object(
    bucket_name=DATASETS_BUCKET,  # bucket name in Minio
    object_name=DATASET_FILE,  # file name in bucket of Minio
    file_path=DATASET_FILE,  # file path / name in local system
)

# https://kubeflow.apps.b2s001.pbm.ihost.com/pipeline/artifacts/minio/datasets/glue-dataset.tar.gz?namespace=user-example-com
s3_address = f"s3://{MINIO_URL}/{DATASETS_BUCKET}/{DATASET_FILE}"
s3_address

## Specify train function

In [None]:
train_parameters = {
    "dataset_bucket": DATASETS_BUCKET,
    "model_bucket": MODELS_BUCKET,
    "dataset_file": DATASET_FILE,
    "model_file": MODEL_FILE,
    "storage_uri": MINIO_URL,
    "storage_username": MINIO_USER,
    "storage_password": MINIO_PASS,
}

In [None]:
def train_model(
    dataset_bucket: str,
    model_bucket: str,
    dataset_file: str,
    model_file: str,
    storage_uri: str,
    storage_username: str,
    storage_password: str,
):
    """See https://github.com/kubeflow/training-operator/tree/master/examples/tensorflow/distribution_strategy/keras-API."""

    from datasets import load_from_disk
    import json
    from minio import Minio
    import os
    import tarfile
    import tensorflow as tf
    import tensorflow_datasets as tfds
    from transformers import TFAutoModelForSequenceClassification

    def load_data(minio_client):
        BUFFER_SIZE = 10000
        DATSET_PATH = "./dataset"

        print(f"Fetching data from {dataset_bucket}/{dataset_file}...")
        data = minio_client.get_object(dataset_bucket, dataset_file)
        with open(dataset_file, "wb") as file_data:
            for d in data.stream(32 * 1024):
                file_data.write(d)

        print(f"Extracting {dataset_file} to {DATSET_PATH}...")
        with tarfile.open(dataset_file, "r:gz") as tar_gz_ref:
            tar_gz_ref.extractall(DATSET_PATH)

        print("Result:")
        print(os.listdir(DATSET_PATH))

        print(f"Loading dataset from {DATSET_PATH}...")
        dataset = load_from_disk(DATSET_PATH)

        print("Transforming to TensorFlow format...")
        dataset.set_format(
            type="tensorflow",
            columns=["input_ids", "token_type_ids", "attention_mask", "labels"],
        )
        features = {
            x: dataset[x] for x in ["input_ids", "token_type_ids", "attention_mask"]
        }
        tf_dataset = tf.data.Dataset.from_tensor_slices((features, dataset["labels"]))

        return tf_dataset.cache().shuffle(BUFFER_SIZE)

    def make_tarfile(output_filename, source_dir):
        with tarfile.open(output_filename, "w:gz") as tar:
            tar.add(source_dir, arcname=os.path.basename(source_dir))

    def upload_data(file, minio_client):
        print(f"Uploading {file} to {model_bucket}/{file}...")

        # Create models bucket if it does not yet exist
        response = minio_client.list_buckets()
        models_bucket_exists = False
        for bucket in response:
            if bucket.name == model_bucket:
                models_bucket_exists = True

        if not models_bucket_exists:
            minio_client.make_bucket(bucket_name=model_bucket)

        minio_client.fput_object(
            bucket_name=model_bucket,  # bucket name in Minio
            object_name=file,  # file name in bucket of Minio
            file_path=file,  # file path / name in local system
        )

    def build_and_compile_model():
        print("Building model...")
        model = TFAutoModelForSequenceClassification.from_pretrained("bert-base-cased")
        model.summary()

        opt = tf.keras.optimizers.Adam(learning_rate=3e-5)
        loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(
            reduction=tf.keras.losses.Reduction.NONE, from_logits=True
        )
        model.compile(optimizer=opt, loss=loss_fn, metrics=["accuracy"])

        return model

    def decay(epoch):
        if epoch < 3:
            return 1e-3
        if 3 <= epoch < 7:
            return 1e-4
        return 1e-5

    # Prepare distributed training with GPU support
    os.environ["NCCL_DEBUG"] = "INFO"
    tfds.disable_progress_bar()

    # to decide if a worker is chief, get TASK_INDEX in Cluster info
    tf_config = json.loads(os.environ.get("TF_CONFIG") or "{}")
    print(f"tf_config: {tf_config}")

    if tf_config == json.loads("{}"):
        TASK_INDEX = 0
    else:
        TASK_INDEX = tf_config["task"]["index"]

    def is_chief():
        return TASK_INDEX == 0

    # MultiWorkerMirroredStrategy creates copies of all variables in the model's
    # layers on each device across all workers
    # if your GPUs don't support NCCL, replace "communication" with another
    communication_options = tf.distribute.experimental.CommunicationOptions(
        implementation=tf.distribute.experimental.CommunicationImplementation.RING
    )
    strategy = tf.distribute.MultiWorkerMirroredStrategy(
        communication_options=communication_options
    )

    BATCH_SIZE_PER_REPLICA = 8
    BATCH_SIZE = BATCH_SIZE_PER_REPLICA * strategy.num_replicas_in_sync
    print(f"BATCH_SIZE_PER_REPLICA: {BATCH_SIZE_PER_REPLICA}")
    print(f"num_replicas_in_sync: {strategy.num_replicas_in_sync}")
    print(f"BATCH_SIZE: {BATCH_SIZE}")

    with strategy.scope():
        minio_client = Minio(
            storage_uri,
            access_key=storage_username,
            secret_key=storage_password,
            secure=False,
        )
        ds_train = load_data(minio_client).batch(BATCH_SIZE).repeat()
        options = tf.data.Options()
        options.experimental_distribute.auto_shard_policy = (
            tf.data.experimental.AutoShardPolicy.DATA
        )
        ds_train = ds_train.with_options(options)
        # Model building/compiling need to be within `strategy.scope()`.
        multi_worker_model = build_and_compile_model()

    # Checkpointing
    checkpoint_dir = "/train/checkpoint"
    checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt_{epoch}")

    # Function for decaying the learning rate.
    # You can define any decay function you need.
    # Callback for printing the LR at the end of each epoch.
    class PrintLR(tf.keras.callbacks.Callback):
        def on_epoch_end(self, epoch, logs=None):
            print(
                "\nLearning rate for epoch {} is {}".format(
                    epoch + 1, multi_worker_model.optimizer.lr.numpy()
                )
            )

    callbacks = [
        tf.keras.callbacks.TensorBoard(log_dir="./logs"),
        tf.keras.callbacks.ModelCheckpoint(
            filepath=checkpoint_prefix, save_weights_only=True
        ),
        tf.keras.callbacks.LearningRateScheduler(decay),
        PrintLR(),
    ]

    # Keras' `model.fit()` trains the model with specified number of epochs and
    # number of steps per epoch. Note that the numbers here are for demonstration
    # purposes only and may not sufficiently produce a model with good quality.
    print("Training model...")
    multi_worker_model.fit(ds_train, epochs=10, steps_per_epoch=70, callbacks=callbacks)

    # Saving a model
    saved_model_dir = "/train/saved_model/"
    if is_chief():
        model_path = saved_model_dir

    else:
        # Save to a path that is unique across workers.
        model_path = saved_model_dir + "/worker_tmp_" + str(TASK_INDEX)

    multi_worker_model.save(model_path)

    # Upload to object store
    if is_chief():
        make_tarfile(model_file, model_path)
        upload_data(model_file, minio_client)

In [None]:
train_model_comp_text = kfp.components.func_to_component_text(
    func=train_model,
    packages_to_install=["transformers", "tf_utils", "tensorflow_datasets"],
    base_image="quay.io/ibm/kubeflow-notebook-image-ppc64le@sha256:97695b7b4dfab12a65b3d9aaea65649bee1769e578c0965f96648aa55f81fb27",
)

In [None]:
train_model_comp_yaml = yaml.safe_load(train_model_comp_text)
container_yaml = train_model_comp_yaml["implementation"]["container"]

image = container_yaml["image"]
command = container_yaml["command"]
args = container_yaml["args"]
for idx, arg in enumerate(args):
    if type(arg) is dict:
        args[idx] = train_parameters[arg["inputValue"]]

## Load component from file

In [None]:
distributed_train_comp = kfp.components.load_component_from_file("component.yaml")

## Create pipeline

In [None]:
@dsl.pipeline(
    name="Component Test - Train Distributed with TFJob",
    description="A simple component test",
)
def train_pipeline(
    model_name: str,
    image: str,
    command: list,
    args: list,
    namespace: str,
):

    distributed_train_comp(
        model_name=model_name,
        image=image,
        command=command,
        args=args,
        namespace=namespace,
        number_of_workers="2",
        pvc_size="10Gi",
    )

## Run the pipline within an experiment

In [None]:
# Specify argument values for your pipeline run.
arguments = {
    "model_name": MODEL_NAME,
    "image": image,
    "command": command,
    "args": args,
    "namespace": NAMESPACE,
}

kfp_client.create_run_from_pipeline_func(
    train_pipeline, arguments=arguments, namespace=NAMESPACE
)