# Pipeline

In this Jupyter Notebook you will create a Kubeflow Pipeline that automates the workflow for the [ISIC 2024 - Skin Cancer Detection with 3D-TBP Kaggle](https://www.kaggle.com/competitions/isic-2024-challenge) competition. The Pipeline consists of two primary steps:

1. **📥 Download the Competition Dataset**: In this initial step, the Pipeline downloads the specified Kaggle competition dataset inside a PVC. Utilizing the Kaggle CLI and the previously
   created Kubernetes secret, the dataset is retrieved and prepared for subsequent use in the training process.

1. **🚀 Launch a Distributed Training Job (PyTorchJob)**: The second step involves launching a distributed training job using the Kubeflow Training operator, specifically, a PyTorchJob.
   This step orchestrates the training of a machine learning model in a distributed manner, leveraging the capabilities of PyTorch Distributed for efficient and scalable training.

By integrating these steps into a Kubeflow Pipeline, this Notebook facilitates a streamlined, reproducible, and automated approach to training a model for a Kaggle competition. The Pipeline ensures that the dataset is readily available and that the training job is efficiently executed within the Kubeflow environment, providing a robust framework for developing and deploying machine learning models.

In [None]:
from pathlib import Path
from typing import Optional, Union, List

from kfp import dsl, compiler, kubernetes, client

First, instantiate the Kubeflow Pipelines (KFP) client that you will use to submit the Pipeline.

> ❗Ensure that your Notebook has access to the Kubeflow Pipelines component.

In [None]:
kaggle_secret = "kaggle-secret"
root = Path("/")
sa = root/Path("var/run/secrets/kubernetes.io/serviceaccount")
ns = open(sa/"namespace", "r").read()
client = client.Client()

Each step of the Pipeline is a KFP component. A Pipeline component is self-contained set of code that performs one step in the ML workflow (Pipeline), such as data preprocessing, data transformation, model training, and so on. A component is analogous to a function, in that it has a name, parameters, return values, and a body. Read the [docs](https://www.kubeflow.org/docs/components/pipelines/concepts/component/) to learn more about KFP components.

The first component leverages the Kaggle secret you created before to download the dataset to a specified location. You will mount a PVC to this location so you can keep the dataset even after the completion of the Pipeline run.

In [None]:
@dsl.component(packages_to_install=['kaggle==1.6.14'])
def download_data(competition: str, data_path: Optional[str] = "/data") -> None:
    """Download the competition dataset from Kaggle to a specified location.

    Args:
        competition: The name of the Kaggle competition.
        data_path: The path where the dataset will be downloaded and extracted. Default is "/data".

    Returns:
        None
    """
    import os
    import json
    import zipfile
    import subprocess
    
    def init_kaggle() -> None:
        # create the Kaggle config directory
        kaggle_config_dir = os.path.join(
            os.path.expandvars('$HOME'), '.kaggle')
        os.makedirs(kaggle_config_dir, exist_ok = True)

        # write the `kaggle.json` config file
        api_dict = {
            "username": os.environ['KAGGLE_USERNAME'],
            "key":os.environ['KAGGLE_KEY']}
        with open(os.path.join(kaggle_config_dir, "kaggle.json"), "w", encoding='utf-8') as f:
            json.dump(api_dict, f)

        # change `kaggle.json` permissions
        cmd = f"chmod 600 {kaggle_config_dir}/kaggle.json"
        output = subprocess.check_output(cmd.split(" "))
        
    init_kaggle()
    
    import kaggle
    
    # download the competition files
    kaggle.api.competition_download_files(competition, path=data_path)
    with zipfile.ZipFile(os.path.join(data_path, f"{competition}.zip"), 'r') as zip_ref:
        zip_ref.extractall(data_path)

The second component launches a distributed training job using the Kubeflow Training Operator Python client. This component generates the Custom Resource (CR) using the corresponding SDK and submits it to Kubernetes. The resulting PyTorchJob comprises `Master` and `Worker` replicas (one of each), which share the training load, thereby speeding up the process.

In [None]:
@dsl.component(packages_to_install=["kubeflow-training==1.8.0"])
def launch_training(
    run_name: str,
    namespace: str,
    data_vol: str,
    logs_vol: str,
    image: str,
    image_cmd: Optional[List[str]] = list(),
    image_args: Optional[List[str]] = list(),
    data_mount_path: Optional[str] = "/data",
    logs_mount_path: Optional[str] = "/logs",
) -> None:
    """Launch a distributed training job using the Kubeflow Training Operator.

    Args:
        run_name: The name of the training run.
        namespace: The Kubernetes namespace where the job will be created.
        data_vol: The name of the Persistent Volume Claim (PVC) for the data.
        logs_vol: The name of the Persistent Volume Claim (PVC) for the logs.
        image: The Docker image to use for the training job.
        image_cmd: The command to run in the Docker image. Default is an empty list.
        image_args: The arguments to pass to the command. Default is an empty list.
        data_mount_path: The path to mount the data volume inside the container. Default is "/data".
        logs_mount_path: The path to mount the logs volume inside the container. Default is "/logs".

    Returns:
        None
    """
    from kubeflow.training import TrainingClient, constants
    from kubernetes.client import (V1ObjectMeta,
                                   V1PodTemplateSpec,
                                   V1PodSpec,
                                   V1Volume,
                                   V1PersistentVolumeClaimVolumeSource,
                                   V1EmptyDirVolumeSource,
                                   V1Container,
                                   V1VolumeMount,
                                   V1ResourceRequirements)
    from kubeflow.training.models import (KubeflowOrgV1PyTorchJob,
                                          KubeflowOrgV1PyTorchJobSpec,
                                          KubeflowOrgV1ReplicaSpec,
                                          KubeflowOrgV1RunPolicy)
    
    training_client = TrainingClient(job_kind=constants.PYTORCHJOB_KIND)
    
    def _get_metadata(name: str = None, annotations: dict = None) -> V1ObjectMeta:
        return V1ObjectMeta(name=name, annotations=annotations)
    
    def _get_volume(
        name: str, 
        persistent_volume_claim: V1PersistentVolumeClaimVolumeSource = None,
        empty_dir: V1EmptyDirVolumeSource = None
    ) -> V1Volume:
        return V1Volume(name=name, persistent_volume_claim=persistent_volume_claim, empty_dir=empty_dir)
    
    def _get_volume_mount(name: str, mount_path: str) -> V1VolumeMount:
        return V1VolumeMount(name=name, mount_path=mount_path)
    
    # define job's metadata
    pytorch_job_metadata = _get_metadata(name=run_name)
    pytorch_replica_metadata = _get_metadata(
        annotations={"sidecar.istio.io/inject": "false"})
    
    # define volumes
    data_volume = _get_volume(
        data_vol, persistent_volume_claim=V1PersistentVolumeClaimVolumeSource(claim_name=data_vol))
    logs_volume = _get_volume(
        logs_vol, persistent_volume_claim=V1PersistentVolumeClaimVolumeSource(claim_name=logs_vol))
    shm_volume = _get_volume(
        "dshm", empty_dir=V1EmptyDirVolumeSource(medium="Memory", size_limit="2Gi"))
    
    # define volume mounts
    data_volume_mount = _get_volume_mount(data_vol, data_mount_path)
    logs_volume_mount = _get_volume_mount(logs_vol, logs_mount_path)
    dshm_volume_mount = _get_volume_mount("dshm", "/dev/shm")
    
    # define job's container
    pytorch_replica_container = V1Container(
        name="pytorch",
        image=image,
        command=image_cmd,
        args=image_args,
        resources=V1ResourceRequirements(
            limits={"nvidia.com/gpu": "1"}),
        volume_mounts=[data_volume_mount, logs_volume_mount, dshm_volume_mount])
    
    # define job's replica spec
    pytorch_replica_template_spec = V1PodSpec(
        volumes=[data_volume, logs_volume, shm_volume],
        containers=[pytorch_replica_container])
    pytorch_replica_template = V1PodTemplateSpec(
        metadata=pytorch_replica_metadata,
        spec=pytorch_replica_template_spec)
    pytorch_replica_spec = KubeflowOrgV1ReplicaSpec(
        replicas=1,
        restart_policy="OnFailure",
        template=pytorch_replica_template)
    pytorch_replica_specs = {
        "Master": pytorch_replica_spec,
        "Worker": pytorch_replica_spec
    }
    
    # define PyTorchJob spec
    pytorch_job_spec = KubeflowOrgV1PyTorchJobSpec(
        pytorch_replica_specs=pytorch_replica_specs,
        run_policy=KubeflowOrgV1RunPolicy())
    
    pytorch_job = KubeflowOrgV1PyTorchJob(
        api_version="kubeflow.org/v1",
        kind="PyTorchJob",
        metadata=pytorch_job_metadata,
        spec=pytorch_job_spec)
    
    training_client.create_job(pytorch_job, namespace=namespace)

Finally, you are ready to create the Pipeline. This Pipeline includes two additional steps: one to create a PVC for the dataset download in the first step, and another to create a PVC for logging the training progress of the distributed training job. You can use the second PVC to launch a TensorBoard instance to monitor the training run.

Moreover, the Pipeline takes care of a few other details, such as passing the necessary environment variables to each Pod, defining the order in which each step should run, and determining whether to cache a step or not.

In [None]:
@dsl.pipeline
def isic_pipeline(
    namespace: str,
    competition_name: str,
    dist_run_name: str,
    data_vol: str,
    logs_vol: str,
    dist_run_image: str,
    data_path: Optional[str] = "/data",
    dist_image_cmd: Optional[List[str]] = list(),
    dist_image_args: Optional[List[str]] = list(),
    data_mount_path: Optional[str] = "/data",
    logs_mount_path: Optional[str] = "/logs",
) -> None:
    """Define a KFP Pipeline for downloading competition data and launching a distributed training job.

    Args:
        namespace: The Kubernetes namespace where the Pipeline will run.
        competition_name: The name of the Kaggle competition to download data from.
        dist_run_name: The name of the distributed training run.
        data_vol: The name of the Persistent Volume Claim (PVC) for the data.
        logs_vol: The name of the Persistent Volume Claim (PVC) for the logs.
        dist_run_image: The Docker image to use for the distributed training job.
        data_path: The path where the dataset will be downloaded and extracted. Default is "/data".
        dist_image_cmd: The command to run in the Docker image for the training job. Default is an empty list.
        dist_image_args: The arguments to pass to the command for the training job. Default is an empty list.
        data_mount_path: The path to mount the data volume inside the container. Default is "/data".
        logs_mount_path: The path to mount the logs volume inside the container. Default is "/logs".

    Returns:
        None
    """
    # create a PVC to store the dataset
    isic_data_pvc = kubernetes.CreatePVC(
        pvc_name='isic-data',
        access_modes=['ReadWriteMany'],
        size='8.0Gi',
        storage_class_name='longhorn'
    )
    
    # create a PVC to log the training progress
    isic_logs_pvc = kubernetes.CreatePVC(
        pvc_name='isic-logs',
        access_modes=['ReadWriteMany'],
        size='2.0Gi',
        storage_class_name='longhorn'
    )

    download_data_step = download_data(
        competition=competition_name,
        data_path=data_path).after(isic_data_pvc)
    download_data_step.set_caching_options(enable_caching=True)
    
    launch_training_step = launch_training(
        run_name=dist_run_name,
        namespace=namespace,
        data_vol=data_vol,
        logs_vol=logs_vol,
        image=dist_run_image,
        image_cmd=dist_image_cmd,
        image_args=dist_image_args,
        data_mount_path=data_mount_path,
        logs_mount_path=logs_mount_path).after(download_data_step)
    launch_training_step.set_caching_options(enable_caching=False)

    kubernetes.mount_pvc(
        download_data_step,
        pvc_name=isic_data_pvc.outputs['name'],
        mount_path='/data')
    kubernetes.use_secret_as_env(
        download_data_step,
        secret_name=kaggle_secret,
        secret_key_to_env={'username': 'KAGGLE_USERNAME'})
    kubernetes.use_secret_as_env(
        download_data_step,
        secret_name=kaggle_secret,
        secret_key_to_env={'key': 'KAGGLE_KEY'})

You are almost ready to launch your experiment. First, compile the Pipeline into an intermediate representation YAML file. Then, create an Experiment to group several runs. Finally, create a Run from your Pipeline.

In [None]:
compiler.Compiler().compile(isic_pipeline, package_path='pipeline.yaml')

In [None]:
experiment = client.create_experiment(
    name="isic-experiment",
    description="Skin Cancer Detection with 3D-TBP",
    namespace=ns)

In [None]:
pipeline = client.create_run_from_pipeline_package(
    pipeline_file="pipeline.yaml",
    experiment_name=experiment.display_name,
    namespace=ns,
    run_name="isic-run",
    arguments={
        "namespace": ns,
        "competition_name": "isic-2024-challenge",
        "dist_run_name": "pytorch-dist-isic-efficientnet",
        "data_vol": "isic-data",
        "logs_vol": "isic-logs",
        "dist_run_image": "dpoulopoulos/pytorch-dist-isic:61a89cd",
    },
)   