## LLMOps Pipeline with SageMaker Pipelines, JumpStart, and FMEval

In this example we will take a look at building an LLMOps Pipeline utilizing SageMaker Pipelines, JumpStart, and FMEval. We will use JumpStart to fine-tune a Llama7B model, the FMEval package for evaluating the fine-tuned model, and Pipelines for the MLOps portion of the example.

### Credits/Reference
- <b>[SageMaker JumpStart Llama 2 Fine-Tuning Guide](https://github.com/aws/amazon-sagemaker-examples/blob/main/introduction_to_amazon_algorithms/jumpstart-foundation-models/llama-2-finetuning.ipynb)</b>: We'll use this example as a base for the first step of fine-tuning our LLM.

## Setup

We'll download the public [dolly dataset](https://huggingface.co/datasets/databricks/databricks-dolly-15k) and utilize it for a summarization use-case. We filter the dataset for the summarization samples and push the data to S3 for both training and inference/evaluation.

In [None]:
#!pip install -r requirements.txt

In [None]:
import datasets
import sagemaker

In [None]:
import datasets
import sagemaker
from datasets import load_dataset

model_id, model_version = "meta-textgeneration-llama-2-7b", "2.*"

# dolly dataset
dolly_dataset = load_dataset("databricks/databricks-dolly-15k", split="train")

# To train for question answering/information extraction, you can replace the assertion in next line to example["category"] == "closed_qa"/"information_extraction".
summarization_dataset = dolly_dataset.filter(lambda example: example["category"] == "summarization")
summarization_dataset = summarization_dataset.remove_columns("category")

# We split the dataset into two where test data is used to evaluate at the end.
train_and_test_dataset = summarization_dataset.train_test_split(test_size=0.1)

# Dumping the training data to a local file to be used for training.
train_and_test_dataset["train"].to_json("train.jsonl")

# test dataset
train_and_test_dataset["test"].to_json("test.jsonl")

In [None]:
import json

template = {
    "prompt": "Below is an instruction that describes a task, paired with an input that provides further context. "
    "Write a response that appropriately completes the request.\n\n"
    "### Instruction:\n{instruction}\n\n### Input:\n{context}\n\n",
    "completion": " {response}",
}
with open("template.json", "w") as f:
    json.dump(template, f)

In [None]:
from sagemaker.s3 import S3Uploader
import sagemaker
import random

output_bucket = sagemaker.Session().default_bucket()
local_data_file = "train.jsonl"
test_data_file = "test.jsonl"
train_data_location = f"s3://{output_bucket}/dolly_dataset"
test_data_location = f"s3://{output_bucket}/test_dataset"
S3Uploader.upload(local_data_file, train_data_location)
S3Uploader.upload("template.json", train_data_location)
S3Uploader.upload(test_data_file, test_data_location)
print(f"Training data: {train_data_location}")
print(f"Test data: {test_data_location}")
print(f"Output bucket: {output_bucket}")

In [None]:
test_data_path = test_data_location + "/"
print(test_data_path)

In [None]:
!aws s3 ls {test_data_path}

## Pipelines Setup

For this example we have two main steps: training and model/inference evaluation.

1. <b> Training </b>: We pull the S3 dataset and fine-tune utilizing SageMaker JumpStart with Llama2.
2. <b> Inference/Evaluation </b>. We use SageMaker Clarify/FMEval library to perform evaluation on the summarization use-case. Before we can run evaluation we perform inference on the test dataset to run FMEval.

In [None]:
import sagemaker
from sagemaker.workflow.function_step import step
from sagemaker.workflow.parameters import ParameterString

sagemaker_session = sagemaker.session.Session()
role = sagemaker.get_execution_role()
bucket = sagemaker_session.default_bucket()
region = sagemaker_session.boto_region_name

instance_type = ParameterString(name="TrainInstanceType", default_value="ml.c5.18xlarge")

In [None]:
import os

# Set path to config file
os.environ["SAGEMAKER_USER_CONFIG_OVERRIDE"] = os.getcwd()

### Training Step

In [None]:
# step one
@step(
    name = "train-deploy",
    instance_type = instance_type,
    keep_alive_period_in_seconds=300
)
def train_deploy(train_data_path: str, model_id: str = "meta-textgeneration-llama-2-7b", model_version: str = "2.*") -> str:
    import sagemaker
    from sagemaker.jumpstart.estimator import JumpStartEstimator

    estimator = JumpStartEstimator(
        model_id=model_id,
        model_version=model_version,
        environment={"accept_eula": "true"},
        disable_output_compression=True, 
    )

    # reducing epoch count to 1 for example sake
    print("--------------")
    print("Starting training")
    print("--------------")
    estimator.set_hyperparameters(instruction_tuned="True", epoch="1", max_input_length="1024")
    estimator.fit({"training": train_data_path})

    # deploy fine-tuned model
    print("--------------")
    print("Starting deployment")
    print("--------------")
    finetuned_predictor = estimator.deploy()
    endpoint_name = finetuned_predictor.endpoint_name
    return endpoint_name

### Inference & Evaluation Step

In [None]:
# util function to prepare datapoint for inference
def prepare_payload(datapoint: dict) -> dict:
    template = {
        "prompt": "Below is an instruction that describes a task, paired with an input that provides further context. "
        "Write a response that appropriately completes the request.\n\n"
        "### Instruction:\n{instruction}\n\n### Input:\n{context}\n\n",
        "completion": " {response}",
    }
    input_output_demarkation_key = "\n\n### Response:\n"
    payload = {
        "inputs": template["prompt"].format(
            instruction=datapoint["instruction"], context=datapoint["context"]
        )
        + input_output_demarkation_key,
        "parameters": {"max_new_tokens": 100},
    }
    return payload

In [None]:
# step two
@step(
    name = "evaluate-infer",
    instance_type = instance_type,
    keep_alive_period_in_seconds=300
)
def evaluate(endpoint_name: str, output_bucket: str = output_bucket, test_data_file: str = "test.jsonl",
            key_path: str = "test_dataset/test.jsonl") -> str:
    import os
    import boto3
    import jsonlines
    import json
    import fmeval
    from fmeval.data_loaders.data_config import DataConfig
    from fmeval.constants import MIME_TYPE_JSONLINES
    from fmeval.eval_algorithms.summarization_accuracy import SummarizationAccuracy
    os.environ["PARALLELIZATION_FACTOR"] = "1"
    s3 = boto3.client("s3")
    runtime = boto3.client("sagemaker-runtime")

    # download test dataset for inference
    s3.download_file(output_bucket, key_path, test_data_file)
    print("--------------")
    print("Downloaded test dataset file")
    print("--------------")
    input_file = "test.jsonl"
    output_file = "results.jsonl"
    content_type = "application/json"
    
    print("--------------")
    print("Starting Inference")
    print("--------------")
    with jsonlines.open(input_file) as input_fh, jsonlines.open(output_file, "w") as output_fh:
        for i, datapoint in enumerate(input_fh, start=1):
            instruction = datapoint["instruction"]
            context = datapoint["context"]
            summary = datapoint["response"]
            payload = prepare_payload(datapoint)
            response = runtime.invoke_endpoint(EndpointName=endpoint_name, Body=json.dumps(payload), 
                                   ContentType=content_type, CustomAttributes='accept_eula=true')
            result = json.loads(response['Body'].read().decode())[0]['generation']
            line = {"instruction": instruction, "context": context, "summary": summary, "model_output": result}
            output_fh.write(line)

            # evaluate just 20 datapoints for example
            if i == 20:
                break

    print("--------------")
    print("Starting Evaluation")
    print("--------------")
    config = DataConfig(
        dataset_name="dolly_summary_model_outputs",
        dataset_uri="results.jsonl",
        dataset_mime_type=MIME_TYPE_JSONLINES,
        model_input_location="instruction",
        target_output_location="summary",
        model_output_location="model_output"
    )
    eval_algo = SummarizationAccuracy()
    eval_output = eval_algo.evaluate(dataset_config=config, save=True)
    res = json.dumps(eval_output, default=vars, indent=4)
    serialized_data = json.loads(res)
    # print metrics to CW logs, realistically push to somewhere to visualize
    for item in serialized_data:
        for key, value in item.items():
            print(f"Key: {key}, Value: {value}")
    return res

## Pipeline Execution

In [None]:
# stitch together pipeline
from sagemaker.workflow.pipeline import Pipeline

endpoint_name = train_deploy(train_data_location)
eval_metrics = evaluate(endpoint_name)

pipeline = Pipeline(
    name="llm-train-eval-pipeline",
    parameters=[
        instance_type
    ],
    steps=[
        eval_metrics,
    ],
)

In [None]:
pipeline.upsert(role_arn=role)
execution = pipeline.start()
execution.describe()
execution.wait()

In [None]:
execution.list_steps()