# Kubeflow and Prompt Tunning

This notebook demonstrates how Kubeflow could be levaraged for prompt tuning a foundational LLM and serving it. 

#### What's used?

1. [bigscience/bloomz-560m](https://huggingface.co/bigscience/bloomz-560m#model-summary) - a foundational model prompt tunned in this notebook
2. [PEFT](https://huggingface.co/docs/peft/index) - Parameter-Efficient Fine-Tuning (PEFT)- a 🤗 HuggingFace liberary used for tine tuning/prompt tuning opensource LLM 
3. [RAFT](https://huggingface.co/datasets/ought/raft) dataset - Real-world Annotated Few-shot Tasks (RAFT) dataset - an aggregation of English-language datasets found in the real world. We are using `twitter compains` subset specified in classifing sentiment of tweets (`compain` or `not complain`)  

#### The Goal ? 
<!-- Apply prompt tuning to train a bloomz-560m model on the twitter_complaints subset of the RAFT dataset. -->
Train and update a smaller set of prompt parameters to improve the performance of a frozen-weights pretrained model in a specific downstream task instead of fully finetuning a separate model.
In our specific case:

1. Improve `bigscience/bloomz-560m`'s ability to determine tweets' sentiment by prompt tuning it. 
2. Automate the prompt tuning and serving processes via Kubeflow Pipelines.

#### Helpfull links
- [vml-2023](https://github.com/difince/vml-2023) - a github reporitory which contains the code of this notebook and setup instructions. 
- Other:
    - [PEFT-prompt-tuning](https://huggingface.co/docs/peft/task_guides/clm-prompt-tuning)
<!-- 
This notebook create Kubeflow pipeline that has the following steps: 
- Reads user huggingface token from the already kubernetes created secret
- Downloads an open source large language model (LLM) 
- Trains the prompt tuning configuration against the Hugging Face open source model (open source twitter_complaints prompt dataset).
- Publishes a trained configuration to Hugging Face.
- Runs inferencing on the Large Language Model with the new prompt tuning configuration
- Tests the model -->



### Run through the Notebook

After installing kfp, you must restart the kernel for the new version to take affect. After the restart, run through all cells.

In [1]:
!pip install kfp==2.3.0
# !pip install kfp 


Collecting kfp==2.3.0
  Using cached kfp-2.3.0-py3-none-any.whl
Collecting click<9,>=8.0.0 (from kfp==2.3.0)
  Obtaining dependency information for click<9,>=8.0.0 from https://files.pythonhosted.org/packages/00/2e/d53fa4befbf2cfa713304affc7ca780ce4fc1fd8710527771b58311a3229/click-8.1.7-py3-none-any.whl.metadata
  Using cached click-8.1.7-py3-none-any.whl.metadata (3.0 kB)
Collecting google-cloud-storage<3,>=2.2.1 (from kfp==2.3.0)
  Obtaining dependency information for google-cloud-storage<3,>=2.2.1 from https://files.pythonhosted.org/packages/04/72/71b1b531cefa1daff8f6a2a70b4d4fa18dd4da851b5486d53578811b0838/google_cloud_storage-2.13.0-py2.py3-none-any.whl.metadata
  Using cached google_cloud_storage-2.13.0-py2.py3-none-any.whl.metadata (6.1 kB)
Collecting kfp-pipeline-spec==0.2.2 (from kfp==2.3.0)
  Using cached kfp_pipeline_spec-0.2.2-py3-none-any.whl (20 kB)
Collecting kfp-server-api<2.1.0,>=2.0.0 (from kfp==2.3.0)
  Using cached kfp_server_api-2.0.3-py3-none-any.whl
Collecting go

In [2]:
import kfp
import kfp.dsl as dsl
from kfp.dsl import component
from kfp.compiler import Compiler
import kfp.components as comp

print(kfp.__version__)

2.3.0


### Kubeflow Pipeline
A Kubeflow pipeline (KFP) is used for building and deploying portable and scalable machine learning (ML) workflows, based on `Docker containers`. A pipeline is composed of a set of input parameters and a list of the steps in this workflow. Each step in a pipeline is an instance of a component.

With KFP you can 
- author components and pipelines using the `KFP Python SDK`, 
- compile pipelines to an intermediate representation YAML, 
- submit the pipeline to KFP-conformant backend for execution.

The `dsl.component` and `dsl.pipeline` decorators turn your type-annotated Python functions into components and pipelines, respectively. 

Each Pipeline component is executed in a separate container. For each we need to define the `packages_to_install` and the `base_image`
<!-- The PromptTuningConfig contains information about the task type, the text to initialize the prompt embedding, the number of virtual tokens, and the tokenizer to use -->

### Read huggingface-secret 

A kubernetes secret that contains User Huggingface Write token needs to be created prior runing this pipeline. The secret can be created with the following command: 

`kubectl create secret generic huggingface-secret --from-literal='token=<HuggingFace_WRITE_Token>' -n kubeflow`

get_hf_token Pipeline component reads this secret, exctracts the HuggingFace token and pass it as an parameter to the following Pipeline steps

In [3]:
@component(
    packages_to_install=["kubernetes"],
    base_image='python:3.10'
)
def get_hf_token() -> str:
    from kubernetes import client, config

    config.load_incluster_config()
    core_api = client.CoreV1Api()
    secret = core_api.read_namespaced_secret(name="huggingface-secret", namespace="kubeflow-user-example-com")
    return secret.data["token"]

### Prompt Tuning 
"prompt_tuning_bloom" pipeline step defines the model and tokenizer, the dataset and the dataset columns to train on, some training hyperparameters, and the PromptTuningConfig. The PromptTuningConfig contains information about the task type, the text to initialize the prompt embedding, the number of virtual tokens, and the tokenizer to use.


In [4]:
@component(
    packages_to_install=["peft", "transformers", "datasets", "torch", "datasets", "tqdm"],
    base_image='python:3.10'
)
def prompt_tuning_bloom(huggingface_name: str, peft_model_publish_id: str, model_name_or_path: str, num_epochs: int, hf_token: str):
    from transformers import AutoModelForCausalLM, AutoTokenizer, default_data_collator, get_linear_schedule_with_warmup
    from peft import get_peft_config, get_peft_model, PromptTuningInit, PromptTuningConfig, TaskType, PeftType
    import torch
    from datasets import load_dataset
    import os
    from torch.utils.data import DataLoader
    from tqdm import tqdm
    import base64

    peft_config = PromptTuningConfig(
        task_type=TaskType.CAUSAL_LM,
        prompt_tuning_init=PromptTuningInit.TEXT,
        num_virtual_tokens=8,
        prompt_tuning_init_text="Classify if the tweet is a complaint or not:",
        tokenizer_name_or_path=model_name_or_path, # bigscience/bloomz-560m
    )

    dataset_name = "twitter_complaints"
    text_column = "Tweet text"
    label_column = "text_label"
    max_length = 64
    lr = 3e-2
    batch_size = 8

    # Load the twitter_complaints subset of the RAFT dataset. This subset contains tweets that are labeled either complaint or no complaint
    dataset = load_dataset("ought/raft", dataset_name)
    dataset["train"][0]

    # Make the Label column more readable -> replace the Label value with the corresponding label text and store them in a `text_label` column. The `map` function makes it possible to apply this change over the entire dataset in one step.
    classes = [k.replace("_", " ") for k in dataset["train"].features["Label"].names]
    dataset = dataset.map(
        lambda x: {"text_label": [classes[label] for label in x["Label"]]},
        batched=True,
        num_proc=1,
    )

    # Preprocess dataset - by setting tokenizer.
    # Configure the appropriate padding token to use for padding sequences, and determine the maximum length of the tokenized labels
    tokenizer = AutoTokenizer.from_pretrained(model_name_or_path)
    if tokenizer.pad_token_id is None:
        tokenizer.pad_token_id = tokenizer.eos_token_id
        
    # Create a preprocess_function to:
    # Tokenize the input text and labels.
    # For each example in a batch, pad the labels with the tokenizers pad_token_id.
    # Concatenate the input text and labels into the model_inputs.
    # Create a separate attention mask for labels and model_inputs.
    # Loop through each example in the batch again to pad the input ids, labels, and attention mask to the max_length and convert them to PyTorch tensors.
    def preprocess_function(examples):
        batch_size = len(examples[text_column])
        inputs = [f"{text_column} : {x} Label : " for x in examples[text_column]]
        targets = [str(x) for x in examples[label_column]]
        model_inputs = tokenizer(inputs)
        labels = tokenizer(targets)
        for i in range(batch_size):
            sample_input_ids = model_inputs["input_ids"][i]
            label_input_ids = labels["input_ids"][i] + [tokenizer.pad_token_id]
            model_inputs["input_ids"][i] = sample_input_ids + label_input_ids
            labels["input_ids"][i] = [-100] * len(sample_input_ids) + label_input_ids
            model_inputs["attention_mask"][i] = [1] * len(model_inputs["input_ids"][i])
        for i in range(batch_size):
            sample_input_ids = model_inputs["input_ids"][i]
            label_input_ids = labels["input_ids"][i]
            model_inputs["input_ids"][i] = [tokenizer.pad_token_id] * (
                max_length - len(sample_input_ids)
            ) + sample_input_ids
            model_inputs["attention_mask"][i] = [0] * (max_length - len(sample_input_ids)) + model_inputs[
                "attention_mask"
            ][i]
            labels["input_ids"][i] = [-100] * (max_length - len(sample_input_ids)) + label_input_ids
            model_inputs["input_ids"][i] = torch.tensor(model_inputs["input_ids"][i][:max_length])
            model_inputs["attention_mask"][i] = torch.tensor(model_inputs["attention_mask"][i][:max_length])
            labels["input_ids"][i] = torch.tensor(labels["input_ids"][i][:max_length])
        model_inputs["labels"] = labels["input_ids"]
        return model_inputs
    
    # Use the map function to apply the preprocess_function to the entire dataset. You can remove the unprocessed columns since the model won’t need them:
    processed_datasets = dataset.map(
        preprocess_function,
        batched=True,
        num_proc=1,
        remove_columns=dataset["train"].column_names,
        load_from_cache_file=False,
        desc="Running tokenizer on dataset",
    )

    # Create a DataLoader from the train and eval datasets. Set pin_memory=True to speed up the data transfer to the GPU during training if the samples in your dataset are on a CPU.
    train_dataset = processed_datasets["train"]
    eval_dataset = processed_datasets["test"]

    train_dataloader = DataLoader(
        train_dataset, shuffle=True, collate_fn=default_data_collator, batch_size=batch_size, pin_memory=False
    )
    eval_dataloader = DataLoader(eval_dataset, collate_fn=default_data_collator, batch_size=batch_size, pin_memory=False)
    
    # Initialize a base model from AutoModelForCausalLM, and pass it and peft_config to the get_peft_model() function to create a PeftModel. 
    model = AutoModelForCausalLM.from_pretrained(model_name_or_path) #  bigscience/bloomz-560m
    model = get_peft_model(model, peft_config)
    
    # Print the new PeftModel’s trainable parameters to see how much more efficient it is than training the full parameters of the original model!
    print(model.print_trainable_parameters())
    
    # Setup an optimizer and learning rate scheduler:
    optimizer = torch.optim.AdamW(model.parameters(), lr=lr)
    lr_scheduler = get_linear_schedule_with_warmup(
        optimizer=optimizer,
        num_warmup_steps=0,
        num_training_steps=(len(train_dataloader) * num_epochs),
    )

    # Define training loop
    for epoch in range(num_epochs):
        model.train()
        total_loss = 0
        for step, batch in enumerate(tqdm(train_dataloader)):
            batch = {k: v for k, v in batch.items()}
            outputs = model(**batch)
            loss = outputs.loss
            total_loss += loss.detach().float()
            loss.backward()
            optimizer.step()
            lr_scheduler.step()
            optimizer.zero_grad()

        model.eval()
        eval_loss = 0
        eval_preds = []
        for step, batch in enumerate(tqdm(eval_dataloader)):
            batch = {k: v for k, v in batch.items()}
            with torch.no_grad():
                outputs = model(**batch)
            loss = outputs.loss
            eval_loss += loss.detach().float()
            eval_preds.extend(
                tokenizer.batch_decode(torch.argmax(outputs.logits, -1).detach().cpu().numpy(), skip_special_tokens=True)
            )

        eval_epoch_loss = eval_loss / len(eval_dataloader)
        eval_ppl = torch.exp(eval_epoch_loss)
        train_epoch_loss = total_loss / len(train_dataloader)
        train_ppl = torch.exp(train_epoch_loss)
        print("epoch=%s: train_ppl=%s train_epoch_loss=%s eval_ppl=%s eval_epoch_loss=%s" % (epoch, train_ppl, train_epoch_loss, eval_ppl, eval_epoch_loss))

    # Store and share your model 
    from huggingface_hub import login
    login(token=base64.b64decode(hf_token).decode())

    peft_model_id = "{}/{}".format(huggingface_name, peft_model_publish_id)
    model.save_pretrained("output_dir")
    
    # Use the push_to_hub function to upload your model to a model repository on the Hub:
    model.push_to_hub(peft_model_id, use_auth_token=True)

### Deploy Serving Runtime

A KServe's ServingRuntimeand ClusterServingRuntime api-resources define the templates for Pods that can serve one or more particular model formats. Each ServingRuntime defines key information such as the container image of the runtime and a list of the model formats that the runtime supports. Other configuration settings for the runtime can be conveyed through environment variables in the container specification.

Several out-of-the-box ClusterServingRuntimes are provided with KServe so that users can quickly deploy common model formats without having to define the runtimes themselves.
```bash 
    kubectl get ClusterServingRuntimes
```
But in our example we extend the KServe installation by adding Custom Serving Runtime - see "custom_runtime_manifest". We use pre-created image for serving "quay.io/aipipeline/peft-model-server:latest", but if you would like to create it by yourself take a look at [peft_model_server.py](https://github.com/kubeflow/kfp-tekton/blob/master/samples/peft-modelmesh-pipeline/peft_model_server.py) and the [Dockerfile](https://github.com/kubeflow/kfp-tekton/blob/master/samples/peft-modelmesh-pipeline/Dockerfile) 
Once this step is executed you could validate the CSR is creted by: 
```bash 
kubectl get  servingruntime -n modelmesh-serving
```


In [5]:
@component(
    packages_to_install=["kubernetes"],
    base_image='python:3.10'
)
def deploy_modelmesh_custom_runtime(huggingface_name: str, peft_model_publish_id: str, model_name_or_path: str, server_name: str, namespace: str, image: str):
    import kubernetes.config as k8s_config
    import kubernetes.client as k8s_client
    from kubernetes.client.exceptions import ApiException

    def create_custom_object(group, version, namespace, plural, manifest):
        cfg = k8s_client.Configuration()
        cfg.verify_ssl=False
        cfg.host = "https://kubernetes.default.svc"
        cfg.api_key_prefix['authorization'] = 'Bearer'
        cfg.ssl_ca_cert = '/var/run/secrets/kubernetes.io/serviceaccount/ca.crt'
        with open("/var/run/secrets/kubernetes.io/serviceaccount/token") as f:
            lines = f.readlines()
            for l in lines:
                cfg.api_key['authorization'] = "{}".format(l)
                break
        with k8s_client.ApiClient(cfg) as api_client:
            capi = k8s_client.CustomObjectsApi(api_client)
            try:
                res = capi.create_namespaced_custom_object(group=group,
                                                           version=version, namespace=namespace,
                                                           plural=plural, body=manifest)
            except ApiException as e:
                # object already exists
                if e.status != 409:
                    raise
    custom_runtime_manifest = {
        "apiVersion": "serving.kserve.io/v1alpha1",
        "kind": "ServingRuntime",
        "metadata": {
            "name": "{}-server".format(server_name), #vml-demo-server
            "namespace": namespace
        },
        "spec": {
            "supportedModelFormats": [
            {
                "name": "peft-model",
                "version": "1",
                "autoSelect": True
            }
            ],
            "multiModel": True,
            "grpcDataEndpoint": "port:8001",
            "grpcEndpoint": "port:8085",
            "containers": [
            {
                "name": "mlserver",
                "image": image, # quay.io/aipipeline/peft-model-server:latest
                "env": [
                {
                    "name": "MLSERVER_MODELS_DIR",
                    "value": "/models/_mlserver_models/"
                },
                {
                    "name": "MLSERVER_GRPC_PORT",
                    "value": "8001"
                },
                {
                    "name": "MLSERVER_HTTP_PORT",
                    "value": "8002"
                },
                {
                    "name": "MLSERVER_LOAD_MODELS_AT_STARTUP",
                    "value": "true"
                },
                {
                    "name": "MLSERVER_MODEL_NAME",
                    "value": "peft-model"
                },
                {
                    "name": "MLSERVER_HOST",
                    "value": "127.0.0.1"
                },
                {
                    "name": "MLSERVER_GRPC_MAX_MESSAGE_LENGTH",
                    "value": "-1"
                },
                {
                    "name": "PRETRAINED_MODEL_PATH",
                    "value": model_name_or_path
                },
                {
                    "name": "PEFT_MODEL_ID",
                    "value": "{}/{}".format(huggingface_name, peft_model_publish_id),
                }
                ],
                "resources": {
                "requests": {
                    "cpu": "500m",
                    "memory": "4Gi"
                },
                "limits": {
                    "cpu": "5",
                    "memory": "5Gi"
                }
                }
            }
            ],
            "builtInAdapter": {
            "serverType": "mlserver",
            "runtimeManagementPort": 8001,
            "memBufferBytes": 134217728,
            "modelLoadingTimeoutMillis": 90000
            }
        }
    }
    create_custom_object(group="serving.kserve.io", version="v1alpha1",
                         namespace=namespace, plural="servingruntimes",
                         manifest=custom_runtime_manifest)

### Deploy the Model

Create KServe InferenceService 


In [6]:
@component(
    base_image='python:3.10'
)
def inference_svc(model_name: str, namespace: str) -> str :

    inference_service = '''
apiVersion: serving.kserve.io/v1beta1
kind: InferenceService
metadata:
  name: {}
  namespace: {}
  annotations:
    serving.kserve.io/deploymentMode: ModelMesh
spec:
  predictor:
    model:
      modelFormat:
        name: peft-model
      runtime: {}-server
      storage:
        key: localMinIO
        path: sklearn/mnist-svm.joblib
'''.format(model_name, namespace, model_name)

    return inference_service

### Test the deployed model 

The final pipeline step "test_model_mesh" executes a REST reqests against the inference service and prints out the result.

If you would like to futher test the model you could execute gRPC or REST requests by ourself, but first need to port-forward service/modelmesh-serving.

```bash
kubectl port-forward --address 0.0.0.0 service/modelmesh-serving 8008 -n modelmesh-serving

curl -X POST -k http://localhost:8008/v2/models/vml-demo/infer -d '{"inputs": [{ "name": "content", "shape": [1], "datatype": "BYTES", "data": ["@nationalgridus I have no water and the bill is current and paid. Can you do something about this?"]}]}' | jq -r '.outputs[0].data[0]' | base64 --decode
```

In [7]:
@component(
    packages_to_install=["transformers", "peft", "torch", "requests"],
    base_image='python:3.10'
)
def test_modelmesh_model(service: str,  namespace: str, model_name: str, input_tweet: str):
    import requests
    import base64
    import json

    url = "http://%s.%s:8008/v2/models/%s/infer" % (service, namespace, model_name)
    input_json = {
        "inputs": [
            {
            "name": "content",
            "shape": [1],
            "datatype": "BYTES",
            "data": [input_tweet]
            }
        ]
    }

    x = requests.post(url, json = input_json)

    print(x.text)
    respond_dict = json.loads(x.text)
    inference_result = respond_dict["outputs"][0]["data"][0]
    base64_bytes = inference_result.encode("ascii")
  
    string_bytes = base64.b64decode(base64_bytes)
    inference_result = string_bytes.decode("ascii")
    print("inference_result: %s " % inference_result)

### Define your pipeline function 
#### Pipeline Input Parameters

- `peft_model_server_image` - we are using pre-build custom runtime image to serve HuggingFace LLM with the prompt tuning configuration. If you would like to build your own KServe ModelMesh custom runtime image, you could do it using this [Dockerfile](https://github.com/kubeflow/kfp-tekton/blob/master/samples/peft-modelmesh-pipeline/Dockerfile)
- `modelmesh_namespace` - namespace the modelmesh is deployed
- `modelmesh_servicename` - 
- `pipeline_out_file`- name of the yaml output file created as a result of compiling the pipeline
- `kserv_component` - URL link to the kserve component

In [8]:
peft_model_server_image="quay.io/aipipeline/peft-model-server:latest"
modelmesh_namespace="modelmesh-serving"
modelmesh_servicename="modelmesh-serving"
pipeline_out_file="llm-prompt_tuning_pipeline.yaml"
kserv_component="https://raw.githubusercontent.com/kubeflow/pipelines/release-2.0.1/components/kserve/component.yaml"

In [9]:
# Define your pipeline function
@dsl.pipeline(
    name="Serving LLM with Prompt tuning",
    description="A Pipeline for Serving Prompt Tuning LLMs on Modelmesh"
)
def prompt_tuning_pipeline(
    huggingface_name: str = "difince",
    peft_model_publish_id: str = "bloomz-560m_PROMPT_TUNING_CAUSAL_LM",
    model_name_or_path: str = "bigscience/bloomz-560m",
    model_name: str = "vml-demo",
    input_tweet: str = "@nationalgridus I have no water and the bill is current and paid. Can you do something about this?",
    test_served_llm_model: str ="true",
    num_epochs: int = 50
):
    hf_token_task = get_hf_token()
    prompt_tuning_llm = prompt_tuning_bloom( huggingface_name=huggingface_name, 
                                             peft_model_publish_id=peft_model_publish_id, 
                                             model_name_or_path=model_name_or_path,
                                             num_epochs=num_epochs,
                                             hf_token=hf_token_task.output)
    deploy_modelmesh_custom_runtime_task = deploy_modelmesh_custom_runtime(huggingface_name=huggingface_name,
                                                                           peft_model_publish_id=peft_model_publish_id, 
                                                                           model_name_or_path=model_name_or_path,
                                                                           server_name=model_name, namespace=modelmesh_namespace,
                                                                           image=peft_model_server_image)
    deploy_modelmesh_custom_runtime_task.after(prompt_tuning_llm)

    inference_svc_task = inference_svc(model_name=model_name, namespace=modelmesh_namespace)
    inference_svc_task.after(deploy_modelmesh_custom_runtime_task)
    inference_svc_task.set_caching_options(False)
    
    kserve_launcher_op = comp.load_component_from_url(kserv_component)
    serve_llm_with_peft_task = kserve_launcher_op(action="apply", inferenceservice_yaml=inference_svc_task.output)
    serve_llm_with_peft_task.after(inference_svc_task)
    serve_llm_with_peft_task.set_caching_options(False)

    with dsl.If(test_served_llm_model == 'true'):
        test_modelmesh_model_task = test_modelmesh_model(service=modelmesh_servicename, namespace=modelmesh_namespace, 
                                                         model_name=model_name, input_tweet=input_tweet).after(serve_llm_with_peft_task)
        test_modelmesh_model_task.set_caching_options(False)

### Pipeline Compilation & Run 
To submit our pipeline for execution, here we have two options:
1. To compile it to intermediate representation (IR) YAML with the KFP SDK compiler and later upload it and run it - `Compile.compile` outputs a file with name `"llm-prompt_tuning_pipeline.yaml"` in the current directory.
2. To directly call `kfp.Client` to create a run from pipeline function.

In [10]:
Compiler().compile(
    pipeline_func=prompt_tuning_pipeline,
    package_path=pipeline_out_file
)

kfp_client=kfp.Client()

run = kfp_client.create_run_from_pipeline_func(
    prompt_tuning_pipeline,
    arguments={}
)

run_id = run.run_id
print("Run ID: ", run_id)



Run ID:  495dd212-105a-425a-bc8b-051841f810ea
