# Scale the Experiment on Kubeflow

This Notebook scales the fine-tuning experiment to a [Kubeflow](https://www.kubeflow.org/) cluster by breaking the process into distinct steps and running them in a Directed Acyclic Graph (DAG) using [Kubeflow Pipelines](https://www.kubeflow.org/docs/components/pipelines/).

There are several advantages to this approach:

- 🚀 Scale and accelerate the experiment by leveraging more resources and more powerful machines.
- 🎏 Parallelize steps that can run independently.
- 🫙 Cache steps, such as data processing, to avoid repeating them on each run.
- 📅 Schedule recurring runs to retrain your model whenever new data is available.
- 📈 Track and visualize the experiment's parameters, outputs, and performance metrics, making it easier to compare and debug.
- ✨ Automate model deployment by integrating KFP with CI/CD pipelines once.

The following pipeline comprises three steps:

- 🔻 Download the data and model in a specific PVC. These steps run in parallel.
- 🔨 Preprocess the `train` and `validation` data splits. The steps run in parallel.
- 🎵 Fine-tune the model and store in in a different PVC.

In [None]:
from pathlib import Path
from typing import Optional, Union, List

from kfp import dsl, compiler, kubernetes, client

In [None]:
ROOT = Path("/")
SERVICE_ACCOUNT = ROOT/Path("var/run/secrets/kubernetes.io/serviceaccount")
NAMESPACE = open(SERVICE_ACCOUNT/"namespace", "r").read()

In [None]:
@dsl.component(packages_to_install=['datasets==2.21.0'])
def download_data(
    dataset_id: Optional[str] = "rajpurkar/squad") -> None:
    """Download the SQuAD dataset to a specified location.

    Args:
        dataset_id: The name of the dataset variant. Default is "rajpurkar/squad".

    Returns:
        None
    """
    import os
    
    os.environ["HF_DATASETS_DOWNLOADED_DATASETS_PATH"] = os.path.join("/huggingface", "datasets", "squad")

    import datasets

    datasets.load_dataset(dataset_id)

In [None]:
@dsl.component(packages_to_install=['transformers[torch]==4.44.2'])
def download_model(model_id: Optional[str] = "google-bert/bert-base-uncased") -> None:
    """Download the BERT model to a specified location.

    Args:
        model_id: The name of the model variant. Default is "google-bert/bert-base-uncased".

    Returns:
        None
    """
    import os
    
    os.environ["HF_HUB_CACHE"] = os.path.join("/huggingface", "hub")

    from transformers import AutoTokenizer, AutoModelForQuestionAnswering

    model = AutoModelForQuestionAnswering.from_pretrained(model_id)
    tokenizer = AutoTokenizer.from_pretrained(model_id, clean_up_tokenization_spaces=False)

In [None]:
@dsl.component(packages_to_install=['datasets==2.21.0', 'transformers==4.44.2'])
def process_training_data(
    dataset_id: Optional[str] = "rajpurkar/squad",
    model_id: Optional[str] = "google-bert/bert-base-uncased",
    huggingface_cache: Optional[str] = "/huggingface"
) -> None:
    """Process the train split of the SQuAD dataset and store it to a specified location.

    Args:
        dataset_id: The name of the dataset variant. Default is "rajpurkar/squad".
        model_id: The name of the model variant. Default is "google-bert/bert-base-uncased".
        huggingface_cache: The path to store the processed dataset. Default is /data/processed_train.

    Returns:
        None
    """
    import os
    from functools import partial
    from pathlib import Path

    os.environ["HF_HUB_CACHE"] = os.path.join("/huggingface", "hub")
    os.environ["HF_DATASETS_DOWNLOADED_DATASETS_PATH"] = os.path.join("/huggingface", "datasets", "squad")

    import datasets
    from transformers import AutoTokenizer
    
    data = datasets.load_dataset(dataset_id, split="train")
    tokenizer = AutoTokenizer.from_pretrained(model_id, clean_up_tokenization_spaces=False)
    
    output_path = Path(huggingface_cache) / "datasets" / "squad_processed"
    
    def _preprocess(examples, tokenizer, max_length, stride):
        questions = [q.strip() for q in examples["question"]]
        inputs = tokenizer(
          questions,
          examples["context"],
          truncation="only_second",
          padding="max_length",
          stride=stride,
          max_length=max_length,
          return_offsets_mapping=True,
          return_overflowing_tokens=True,
        )

        answers = examples["answers"]
        offset_mapping = inputs.pop("offset_mapping")
        sample_map = inputs.pop("overflow_to_sample_mapping")

        start_positions = []
        end_positions = []

        for i, offset in enumerate(offset_mapping):
            sample_idx = sample_map[i]
            answer = answers[sample_idx]
            start_char = answer["answer_start"][0]
            end_char = answer["answer_start"][0] + len(answer["text"][0])
            sequence_ids = inputs.sequence_ids(i)

            idx = 0
            while sequence_ids[idx] != 1:
                idx += 1
            context_start = idx
            while sequence_ids[idx] == 1:
                idx += 1
            context_end = idx - 1

            # If the answer is not fully inside the context, label it (0, 0)
            if offset[context_start][0] > end_char or offset[context_end][1] < start_char:
                start_positions.append(0)
                end_positions.append(0)
            else:
                # Otherwise it's the start and end token positions
                idx = context_start
                while idx <= context_end and offset[idx][0] <= start_char:
                    idx += 1
                start_positions.append(idx - 1)

                idx = context_end
                while idx >= context_start and offset[idx][1] >= end_char:
                    idx -= 1
                end_positions.append(idx + 1)

        inputs["start_positions"] = start_positions
        inputs["end_positions"] = end_positions

        return inputs
    
    preprocess_data = partial(_preprocess, tokenizer=tokenizer, max_length=384, stride=128)
    processed_data = data.map(preprocess_data, batched=True, remove_columns=data.column_names)
    processed_data.save_to_disk(os.path.join(output_path, "train"))

In [None]:
@dsl.component(packages_to_install=['datasets==2.21.0', 'transformers==4.44.2'])
def process_validation_data(
    dataset_id: Optional[str] = "rajpurkar/squad",
    model_id: Optional[str] = "google-bert/bert-base-uncased",
    huggingface_cache: Optional[str] = "/huggingface"
) -> None:
    """Process the train split of the SQuAD dataset and store it to a specified location.

    Args:
        dataset_id: The name of the dataset variant. Default is "rajpurkar/squad".
        model_id: The name of the model variant. Default is "google-bert/bert-base-uncased".
        huggingface_cache: The path to store the processed dataset. Default is /data/processed_train.

    Returns:
        None
    """
    import os
    from functools import partial
    from pathlib import Path
    
    os.environ["HF_HUB_CACHE"] = os.path.join("/huggingface", "hub")
    os.environ["HF_DATASETS_DOWNLOADED_DATASETS_PATH"] = os.path.join("/huggingface", "datasets", "squad")

    import datasets
    from transformers import AutoTokenizer
    
    data = datasets.load_dataset(dataset_id, split="validation")
    tokenizer = AutoTokenizer.from_pretrained(model_id, clean_up_tokenization_spaces=False)
    
    output_path = Path(huggingface_cache) / "datasets" / "squad_processed"
    
    def _preprocess(examples, tokenizer, max_length, stride):
        questions = [q.strip() for q in examples["question"]]
        inputs = tokenizer(
          questions,
          examples["context"],
          truncation="only_second",
          padding="max_length",
          stride=stride,
          max_length=max_length,
          return_offsets_mapping=True,
          return_overflowing_tokens=True,
        )

        example_ids = []
        answers = examples["answers"]
        offset_mapping = inputs["offset_mapping"]
        sample_map = inputs.pop("overflow_to_sample_mapping")

        start_positions = []
        end_positions = []

        for i, offset in enumerate(offset_mapping):
            sample_idx = sample_map[i]
            example_ids.append(examples["id"][sample_idx])
            answer = answers[sample_idx]
            start_char = answer["answer_start"][0]
            end_char = answer["answer_start"][0] + len(answer["text"][0])
            sequence_ids = inputs.sequence_ids(i)

            offset = inputs["offset_mapping"][i]
            inputs["offset_mapping"][i] = [
                o if sequence_ids[k] == 1 else None for k, o in enumerate(offset)
            ]

            idx = 0
            while sequence_ids[idx] != 1:
                idx += 1
            context_start = idx
            while sequence_ids[idx] == 1:
                idx += 1
            context_end = idx - 1

            # If the answer is not fully inside the context, label it (0, 0)
            if offset[context_start][0] > end_char or offset[context_end][1] < start_char:
                start_positions.append(0)
                end_positions.append(0)
            else:
                # Otherwise it's the start and end token positions
                idx = context_start
                while idx <= context_end and offset[idx][0] <= start_char:
                    idx += 1
                start_positions.append(idx - 1)

                idx = context_end
                while idx >= context_start and offset[idx][1] >= end_char:
                    idx -= 1
                end_positions.append(idx + 1)

        inputs["example_id"] = example_ids
        inputs["start_positions"] = start_positions
        inputs["end_positions"] = end_positions

        return inputs
    
    preprocess_data = partial(_preprocess, tokenizer=tokenizer, max_length=384, stride=128)
    processed_data = data.map(preprocess_data, batched=True, remove_columns=data.column_names)
    processed_data.save_to_disk(os.path.join(output_path, "validation"))

In [None]:
@dsl.component(packages_to_install=['datasets==2.21.0', 'transformers[torch]==4.44.2', 'tensorboard==2.14.0'])
def fine_tune(
    model_id: Optional[str] = "google-bert/bert-base-uncased",
    huggingface_cache: Optional[str] = "/huggingface",
    logging_steps: Optional[int] = 100,
    save_steps: Optional[int] = 200,
    eval_strategy: Optional[str] = "steps",
    logging_strategy: Optional[str] = "steps",
    save_strategy: Optional[str] = "steps",
    learning_rate: Optional[float] = 3e-5,
    num_train_epochs: Optional[int] = 2,
    weight_decay: Optional[float] = 1e-2,
    use_bf16: Optional[bool] = True,
    per_device_train_batch_size: Optional[int] = 32,
    per_device_eval_batch_size: Optional[int] = 64
) -> None:
    """Fine-tune BERT on the SQuAD dataset.

    Args:
        model_id: The name of the model variant. Default is "google-bert/bert-base-uncased".
        huggingface_cache: The path to store the HuggingFace assets. Default is /huggingface.
        logging_steps: The number of steps between each log. Default is 200.
        save_steps: The number of steps between each model checkpoint save. Default is 200.
        eval_strategy: The evaluation strategy to use during training. Default is "steps".
        logging_strategy: The logging strategy to use during training. Default is "steps".
        save_strategy: The checkpoint saving strategy to use during training. Default is "steps".
        learning_rate: The initial learning rate for training. Default is 3e-5.
        num_train_epochs: The number of epochs to train the model. Default is 2.
        weight_decay: The weight decay to apply during optimization to prevent overfitting. Default is 1e-2.
        use_bf16: Whether to use bf16 (Brain Float 16) precision during training. Default is True.
        per_device_train_batch_size: The batch size for training per device (e.g., per GPU/TPU). Default is 32.
        per_device_eval_batch_size: The batch size for evaluation per device (e.g., per GPU/TPU). Default is 64.

    Returns:
        None
    """
    import os
    from pathlib import Path
    
    os.environ["HF_HUB_CACHE"] = os.path.join("/huggingface", "hub")
    os.environ["HF_DATASETS_DOWNLOADED_DATASETS_PATH"] = os.path.join("/huggingface", "datasets", "squad")

    import datasets
    from transformers import Trainer, TrainingArguments
    from transformers import AutoTokenizer, AutoModelForQuestionAnswering

    model = AutoModelForQuestionAnswering.from_pretrained(model_id)
    tokenizer = AutoTokenizer.from_pretrained(model_id, clean_up_tokenization_spaces=False)
    
    data_path = Path(huggingface_cache) / "datasets" / "squad_processed"
    train_data = datasets.load_from_disk(os.path.join(data_path, "train"))
    valid_data = datasets.load_from_disk(os.path.join(data_path, "validation"))

    logs_dir = os.path.join("/runs", "logs")
    checkpoints_dir = os.path.join("/runs", "checkpoints")

    training_args = TrainingArguments(
        output_dir=checkpoints_dir,
        logging_dir=logs_dir,
        eval_strategy=eval_strategy,
        logging_steps=logging_steps,
        logging_strategy=logging_strategy,
        save_steps=save_steps,
        save_strategy=save_strategy,
        learning_rate=learning_rate,
        num_train_epochs=num_train_epochs,
        weight_decay=weight_decay,
        bf16=use_bf16,
        per_device_train_batch_size=per_device_train_batch_size,
        per_device_eval_batch_size=per_device_eval_batch_size,
    )
    
    trainer = Trainer(
        model=model,
        args=training_args,
        train_dataset=train_data,
        eval_dataset=valid_data,
        tokenizer=tokenizer,
    )
    
    trainer.train()
    
    trainer.save_model(os.path.join("runs", "trained_model"))

In [None]:
@dsl.pipeline
def bert_squad_pipeline(
    namespace: str,
    dataset_id: Optional[str] = "rajpurkar/squad",
    model_id: Optional[str] = "google-bert/bert-base-uncased",
    huggingface_cache: Optional[str] = "/huggingface",
    logging_steps: Optional[int] = 100,
    save_steps: Optional[int] = 200,
    eval_strategy: Optional[str] = "steps",
    logging_strategy: Optional[str] = "steps",
    save_strategy: Optional[str] = "steps",
    learning_rate: Optional[float] = 3e-5,
    num_train_epochs: Optional[int] = 2,
    weight_decay: Optional[float] = 1e-2,
    use_bf16: Optional[bool] = True,
    per_device_train_batch_size: Optional[int] = 32,
    per_device_eval_batch_size: Optional[int] = 64
) -> None:
    """Define a KFP Pipeline for downloading competition data and launching a distributed training job.

    Args:
        dataset_id: The name of the dataset variant. Default is "rajpurkar/squad".
        model_id: The name of the model variant. Default is "google-bert/bert-base-uncased".
        huggingface_cache: The path to store the HuggingFace assets. Default is /huggingface.
        logging_steps: The number of steps between each log. Default is 200.
        save_steps: The number of steps between each model checkpoint save. Default is 200.
        eval_strategy: The evaluation strategy to use during training. Default is "steps".
        logging_strategy: The logging strategy to use during training. Default is "steps".
        save_strategy: The checkpoint saving strategy to use during training. Default is "steps".
        learning_rate: The initial learning rate for training. Default is 3e-5.
        num_train_epochs: The number of epochs to train the model. Default is 2.
        weight_decay: The weight decay to apply during optimization to prevent overfitting. Default is 1e-2.
        use_bf16: Whether to use bf16 (Brain Float 16) precision during training. Default is True.
        per_device_train_batch_size: The batch size for training per device (e.g., per GPU/TPU). Default is 32.
        per_device_eval_batch_size: The batch size for evaluation per device (e.g., per GPU/TPU). Default is 64.

    Returns:
        None
    """
    # create a PVC to store the dataset
    huggingface_pvc = kubernetes.CreatePVC(
        pvc_name='huggingface',
        access_modes=['ReadWriteMany'],
        size='8.0Gi',
        storage_class_name='longhorn'
    )
    
    # create a PVC to store the run logs and assets
    bert_squad_pvc = kubernetes.CreatePVC(
        pvc_name='bert-squad',
        access_modes=['ReadWriteMany'],
        size='8.0Gi',
        storage_class_name='longhorn'
    )

    download_data_step = download_data(dataset_id=dataset_id).after(huggingface_pvc)
    download_data_step.set_caching_options(enable_caching=True)
    
    download_model_step = download_model(model_id=model_id).after(huggingface_pvc)
    download_model_step.set_caching_options(enable_caching=True)
    
    process_training_data_step = process_training_data(
        dataset_id=dataset_id, model_id=model_id, huggingface_cache=huggingface_cache).after(download_data_step)
    process_training_data_step.set_caching_options(enable_caching=True)
    
    process_validation_data_step = process_validation_data(
        dataset_id=dataset_id, model_id=model_id, huggingface_cache=huggingface_cache).after(download_data_step)
    process_validation_data_step.set_caching_options(enable_caching=True)
    
    fine_tune_task = fine_tune(
        model_id=model_id,
        huggingface_cache=huggingface_cache,
        logging_steps=logging_steps,
        save_steps=save_steps,
        eval_strategy=eval_strategy,
        logging_strategy=logging_strategy,
        save_strategy=save_strategy,
        learning_rate=learning_rate,
        num_train_epochs=num_train_epochs,
        weight_decay=weight_decay,
        use_bf16=use_bf16,
        per_device_train_batch_size=per_device_train_batch_size,
        per_device_eval_batch_size=per_device_eval_batch_size
    ).after(bert_squad_pvc, process_training_data_step, process_validation_data_step)
    fine_tune_task.set_accelerator_type("nvidia.com/gpu")
    fine_tune_task.set_accelerator_limit(1)
    fine_tune_task.set_caching_options(enable_caching=False)

    kubernetes.mount_pvc(
        download_data_step,
        pvc_name=huggingface_pvc.outputs['name'],
        mount_path='/huggingface')
    kubernetes.mount_pvc(
        download_model_step,
        pvc_name=huggingface_pvc.outputs['name'],
        mount_path='/huggingface')
    kubernetes.mount_pvc(
        process_training_data_step,
        pvc_name=huggingface_pvc.outputs['name'],
        mount_path='/huggingface')
    kubernetes.mount_pvc(
        process_validation_data_step,
        pvc_name=huggingface_pvc.outputs['name'],
        mount_path='/huggingface')
    kubernetes.mount_pvc(
        fine_tune_task,
        pvc_name=huggingface_pvc.outputs['name'],
        mount_path='/huggingface')
    kubernetes.mount_pvc(
        fine_tune_task,
        pvc_name=bert_squad_pvc.outputs['name'],
        mount_path='/runs')

In [None]:
compiler.Compiler().compile(bert_squad_pipeline, package_path='bert-squad-pipeline.yaml')

In [None]:
client = client.Client()

experiment = client.create_experiment(
    name="bert-squad-experiment",
    description="Fine-tune Bert on the SQuAD dataset, for QA tasks.",
    namespace=NAMESPACE)

In [None]:
pipeline = client.create_run_from_pipeline_package(
    pipeline_file="bert-squad-pipeline.yaml",
    experiment_name=experiment.display_name,
    namespace=NAMESPACE,
    run_name="bert-squad-run",
    arguments={
        "namespace": NAMESPACE,
        "dataset_id": "rajpurkar/squad",
        "model_id": "google-bert/bert-base-uncased",
        "huggingface_cache": "/huggingface",
        "logging_steps": 100,
        "save_steps": 200,
        "eval_strategy": "steps",
        "logging_strategy": "steps",
        "save_strategy": "epoch",
        "learning_rate": 3e-5,
        "num_train_epochs": 2,
        "weight_decay": 1e-2,
        "use_bf16": True,
        "per_device_train_batch_size": 32,
        "per_device_eval_batch_size": 64
    },
)