# Supervised Fine-Tuning (SFT) of Amazon Nova using Amazon SageMaker Training Job

In this notebook, we fine-tune LLM on Amazon SageMaker AI, using Python scripts and SageMaker ModelTrainer for executing a training job.

In [None]:
! pip install -r ./requirements.txt --upgrade
! pip install seaborn

***

## Prerequisites

If you are going to use Sagemaker in a local environment. You need access to an IAM Role with the required permissions for Sagemaker. You can find [here](https://docs.aws.amazon.com/sagemaker/latest/dg/sagemaker-roles.html) more about it.

In [None]:
import sagemaker
import boto3

sess = sagemaker.Session()
sagemaker_session_bucket = None

if sagemaker_session_bucket is None and sess is not None:
    # set to default bucket if a bucket name is not given
    sagemaker_session_bucket = sess.default_bucket()

try:
    role = sagemaker.get_execution_role()
except ValueError:
    iam = boto3.client("iam")
    role = iam.get_role(RoleName="sagemaker_execution_role")["Role"]["Arn"]

sess = sagemaker.Session(default_bucket=sagemaker_session_bucket)
bucket_name = sess.default_bucket()
default_prefix = sess.default_bucket_prefix

print(f"sagemaker role arn: {role}")
print(f"sagemaker bucket: {sess.default_bucket()}")
print(f"sagemaker session region: {sess.boto_region_name}")

***

## Prepare the dataset

In this example, we are going to load [glaiveai/glaive-function-calling-v2](https://huggingface.co/datasets/glaiveai/glaive-function-calling-v2) dataset, an open-source dataset and model suite focused on enabling and improving function calling capabilities for large language models (LLMs)

In [None]:
from datasets import load_dataset

dataset = load_dataset("glaiveai/glaive-function-calling-v2", split="train[:10000]")

dataset

In [None]:
from utils.preprocessing import glaive_to_standard_format

processed_dataset = glaive_to_standard_format(dataset)

In [None]:
import pandas as pd

df = pd.DataFrame(processed_dataset)

df.head()

In [None]:
from sklearn.model_selection import train_test_split

train, test = train_test_split(df, test_size=0.1, random_state=42)
train, val = train_test_split(train, test_size=0.01, random_state=42)

print("Number of train elements: ", len(train))
print("Number of test elements: ", len(test))
print("Number of val elements: ", len(val))

Let's format the dataset by using the prompt style for Amazon Nova:

```
{
    "system": [{"text": Content of the System prompt}],
    "messages": [
        {
            "role": "user",
            "content": ["text": Content of the user prompt]
        },
        {
            "role": "assistant",
            "content": ["text": Content of the answer]
        },
        ...
    ]
}
```

The notebook defines utility functions to clean the dataset content by removing prefixes and handling special cases:

```python
def clean_prefix(content):
    # Removes prefixes like "USER:", "ASSISTANT:", etc.
    ...

def clean_message_list(message_list):
    # Cleans message lists from None values and converts to proper format
    ...

def clean_numbered_conversation(message_list):
    # Cleans message lists from None values and converts to proper format
    ...
```

In [None]:
import json
import re


def clean_prefix(content):
    """Remove prefixes from content, according to Nova data_validator"""
    prefixes = [
        "SYSTEM:",
        "System:",
        "USER:",
        "User:",
        "ASSISTANT:",
        "Assistant:",
        "Bot:",
        "BOT:",
    ]

    # Handle array case (list of content items)
    if hasattr(content, "__iter__") and not isinstance(content, str):
        for i, item in enumerate(content):
            if isinstance(item, dict) and "text" in item:
                text = item["text"]
                if isinstance(text, str):
                    # Clean line by line for multi-line text
                    lines = text.split("\n")
                    cleaned_lines = []
                    for line in lines:
                        cleaned_line = line.strip()
                        for prefix in prefixes:
                            if cleaned_line.startswith(prefix):
                                cleaned_line = cleaned_line[len(prefix) :].strip()
                                break
                        cleaned_lines.append(cleaned_line)
                    item["text"] = "\n".join(cleaned_lines)
        return content

    # Handle string case
    if isinstance(content, str):
        lines = content.split("\n")
        cleaned_lines = []
        for line in lines:
            cleaned_line = line.strip()
            for prefix in prefixes:
                if cleaned_line.startswith(prefix):
                    cleaned_line = cleaned_line[len(prefix) :].strip()
                    break
            cleaned_lines.append(cleaned_line)
        return "\n".join(cleaned_lines)

    return content


def clean_message_list(message_list):
    """Clean message list from None values and convert to list of dicts if needed."""
    if isinstance(message_list, str):
        message_list = json.loads(message_list)

    tmp_cleaned = []
    for msg in message_list:
        new_msg = {}
        for key, value in msg.items():
            if key in ["content"]:
                if value is None or str(value).lower() == "None":
                    continue
            new_msg[key] = value
        tmp_cleaned.append(new_msg)

    cleaned = []
    for item in tmp_cleaned:
        content = item["content"]
        for content_item in content:
            if isinstance(content_item, dict) and "text" in content_item:
                text = clean_numbered_conversation(content_item["text"])
                content_item["text"] = clean_prefix(text)
        cleaned.append({"role": item["role"], "content": content})

    return cleaned


# Additional function to specifically handle the numbered conversation format
def clean_numbered_conversation(text):
    """Clean numbered conversation format like '1. User: ...'"""
    if not isinstance(text, str):
        return text

    # Pattern to match numbered items with User: or Assistant: prefixes
    pattern = r"(\d+\.\s*)(User:|Assistant:)\s*"

    # Replace the pattern, keeping the number but removing the role prefix
    cleaned_text = re.sub(pattern, r"\1", text)

    return cleaned_text

These functions transform the dataset into the format required by Nova models, handling tool calls and formatting:

```python

def transform_tool_format(tool):
    # Transforms tool format to Nova's expected format
    ...

def prepare_dataset(sample):
    # Prepares dataset in the required format for Nova models
    ...

def prepare_dataset_test(sample):
    # Formats validation dataset for evaluation
    ...
```


In [None]:
import json


def transform_tool_format(tool):
    """Transform tool from old format to Nova format."""
    if "function" not in tool:
        return tool

    function = tool["function"]
    return {
        "toolSpec": {
            "name": function["name"],
            "description": function["description"],
            "inputSchema": {"json": function["parameters"]},
        }
    }


def prepare_dataset(sample):
    """Prepare dataset in the required format for Nova models"""
    messages = {"system": [], "messages": []}

    # Process tools upfront if they exist
    tools = json.loads(sample["tools"]) if sample.get("tools") else []
    transformed_tools = [transform_tool_format(tool) for tool in tools]

    formatted_text = (
        ""  # Initialize outside the loop to avoid undefined variable issues
    )

    for message in sample["messages"]:
        role = message["role"]

        if role == "system" and tools:
            # Build system message with tools
            system_text = (
                f"{message['content']}\n"
                "You may call one or more functions to assist with the user query.\n\n"
                "You are provided with function signatures within <tools></tools> XML tags:\n"
                "<tools>\n"
                f"{json.dumps({'tools': transformed_tools})}\n"
                "</tools>\n\n"
                "For each function call, return a json object with function name and parameters:\n"
                '{"name": function name, "parameters": dictionary of argument name and its value}'
            )
            messages["system"] = [{"text": system_text.lower()}]

        elif role == "user":
            messages["messages"].append(
                {"role": "user", "content": [{"text": message["content"].lower()}]}
            )

        elif role == "tool":
            formatted_text += message["content"]
            messages["messages"].append(
                {"role": "user", "content": [{"text": formatted_text.lower()}]}
            )

        elif role == "assistant":
            if message.get("tool_calls"):
                # Process tool calls
                tool_calls_text = []
                for tool_call in message["tool_calls"]:
                    function_data = tool_call["function"]
                    arguments = (
                        json.loads(function_data["arguments"])
                        if isinstance(function_data["arguments"], str)
                        else function_data["arguments"]
                    )
                    tool_call_json = {
                        "name": function_data["name"],
                        "parameters": arguments,
                    }
                    tool_calls_text.append(json.dumps(tool_call_json))

                messages["messages"].append(
                    {
                        "role": "assistant",
                        "content": [{"text": "".join(tool_calls_text).lower()}],
                    }
                )
            else:
                messages["messages"].append(
                    {
                        "role": "assistant",
                        "content": [{"text": message["content"].lower()}],
                    }
                )

    # Remove the last message if it's not from assistant
    if messages["messages"] and messages["messages"][-1]["role"] != "assistant":
        messages["messages"].pop()

    return messages

In [None]:
def prepare_dataset_validation(sample):
    """Parse sample and format it for validation dataset."""
    # Process tools
    tools = json.loads(sample["tools"]) if sample.get("tools") else []
    transformed_tools = [transform_tool_format(tool) for tool in tools]

    # Initialize result
    result = []
    conversation_history = []

    # Extract system message
    system_content = ""
    for message in sample["messages"]:
        if message["role"] == "system":
            system_content = message["content"]
            if tools:
                system_content += (
                    "\nYou may call one or more functions to assist with the user query.\n\n"
                    "You are provided with function signatures within <tools></tools> XML tags:\n"
                    "<tools>\n"
                    f"{json.dumps({'tools': transformed_tools})}\n"
                    "</tools>\n\n"
                    "For each function call, return a json object with function name and parameters:\n"
                    '{"name": function name, "parameters": dictionary of argument name and its value}'
                )
            break

    # Process conversation turns
    for i, message in enumerate(sample["messages"]):
        if message["role"] == "system":
            continue

        # Add message to conversation history
        if message["role"] == "user":
            conversation_history.append(f"##User: {message['content']}")
        elif message["role"] == "assistant":
            if message.get("tool_calls"):
                # Format tool calls
                target_parts = []
                for tool_call in message["tool_calls"]:
                    function_data = tool_call["function"]
                    arguments = (
                        json.loads(function_data["arguments"])
                        if isinstance(function_data["arguments"], str)
                        else function_data["arguments"]
                    )
                    target_parts.append(
                        json.dumps(
                            {"name": function_data["name"], "parameters": arguments}
                        )
                    )
                target = "".join(target_parts)

                conversation_history.append(f"##Assistant: {target}")
            else:
                conversation_history.append(f"##Assistant: {message['content']}")
        elif message["role"] == "tool":
            conversation_history.append(f"## Function: {message['content']}")

        # Create input-target pair when we have an assistant message
        if message["role"] == "assistant":
            # Input is system message + all previous conversation
            input_text = "\n".join(conversation_history[:-1])

            # Target is the assistant's response
            if message.get("tool_calls"):
                # Format tool calls
                target_parts = []
                for tool_call in message["tool_calls"]:
                    function_data = tool_call["function"]
                    arguments = (
                        json.loads(function_data["arguments"])
                        if isinstance(function_data["arguments"], str)
                        else function_data["arguments"]
                    )
                    target_parts.append(
                        json.dumps(
                            {"name": function_data["name"], "parameters": arguments}
                        )
                    )
                target = "".join(target_parts)
            else:
                target = message["content"]

            result.append(
                {
                    "system": system_content.lower(),
                    "query": input_text.lower(),
                    "response": target.lower(),
                }
            )

    return {"messages": result}

In [None]:
from datasets import Dataset, DatasetDict
from random import randint

train_dataset = Dataset.from_pandas(train)
test_dataset = Dataset.from_pandas(test)
val_dataset = Dataset.from_pandas(val)

dataset = DatasetDict(
    {"train": train_dataset, "test": test_dataset, "val": val_dataset}
)

train_dataset = dataset["train"].map(
    prepare_dataset, remove_columns=train_dataset.features
)

train_dataset = train_dataset.to_pandas()

train_dataset["messages"] = train_dataset["messages"].apply(clean_message_list)

print(train_dataset.iloc[randint(0, len(train_dataset))].to_json())

test_dataset = dataset["test"].map(
    prepare_dataset, remove_columns=test_dataset.features
)

test_dataset = test_dataset.to_pandas()

test_dataset["messages"] = test_dataset["messages"].apply(clean_message_list)

val_dataset = dataset["val"].map(
    prepare_dataset_validation, remove_columns=val_dataset.features
)

The validation dataset will be formatted with the structure below:

```
{
    "system": "Optional - String containing the system prompt, that sets the behavior, role, or personality of the model",
    "query": "String containing the input prompt",
    "response": "String containing the expected model output"
}
```

In [None]:
from datasets import Dataset

# Flatten the dataset
all_examples = []
for examples_list in val_dataset:
    # The first column contains the list of examples
    column_name = val_dataset.column_names[0]
    examples = examples_list[column_name]
    all_examples.extend(examples)

# Create a new dataset with the desired structure
val_dataset = Dataset.from_dict(
    {
        "system": [example["system"] for example in all_examples],
        "query": [example["query"] for example in all_examples],
        "response": [example["response"] for example in all_examples],
    }
)

print(val_dataset[randint(0, len(val_dataset))])

### Upload to Amazon S3

In [None]:
import boto3
import shutil

In [None]:
s3_client = boto3.client('s3')

if default_prefix:
    input_path = f"{default_prefix}/datasets/nova-sft"
else:
    input_path = f"datasets/nova-sft"

train_dataset_s3_path = f"s3://{bucket_name}/{input_path}/train/dataset.jsonl"
test_dataset_s3_path = f"s3://{bucket_name}/{input_path}/test/dataset.jsonl"
val_dataset_s3_path = f"s3://{bucket_name}/{input_path}/val/gen_qa.jsonl"

In [None]:
import os

# Save datasets to s3
os.makedirs("./data/train", exist_ok=True)
os.makedirs("./data/test", exist_ok=True)

train_dataset.to_json("./data/train/dataset.jsonl", orient="records", lines=True)
test_dataset.to_json("./data/test/dataset.jsonl", orient="records", lines=True)
val_dataset.to_json("./data/val/gen_qa.jsonl")

s3_client.upload_file(
    "./data/train/dataset.jsonl", bucket_name, f"{input_path}/train/dataset.jsonl"
)

s3_client.upload_file(
    "./data/test/dataset.jsonl", bucket_name, f"{input_path}/test/dataset.jsonl"
)

s3_client.upload_file(
    "./data/val/gen_qa.jsonl", bucket_name, f"{input_path}/val/gen_qa.jsonl"
)

shutil.rmtree("./data")

print(f"Training data uploaded to:")
print(train_dataset_s3_path)
print(test_dataset_s3_path)
print(val_dataset_s3_path)

***

## Model fine-tuning

We now define the PyTorch estimator to run the a Supervised fine-tuning (SFT) workload on the formatted tool-calling dataset for Amazon Nova models

In [None]:
instance_type = "ml.p5.48xlarge"
instance_count = 4

instance_type

Let's define the container to execute the SFT workload for Amazon Nova models

In [None]:
image_uri = f"708977205387.dkr.ecr.{sess.boto_region_name}.amazonaws.com/nova-fine-tune-repo:SM-TJ-SFT-latest"

image_uri

In [None]:
model_id = "nova-lite/prod"
recipe = "fine-tuning/nova/nova_lite_p5_gpu_sft"

In [None]:
from sagemaker.pytorch import PyTorch

# define Training Job Name
job_name = f"train-{model_id.split('/')[0].replace('.', '-')}-sft"

# define OutputDataConfig path
if default_prefix:
    output_path = f"s3://{bucket_name}/{default_prefix}/{job_name}"
else:
    output_path = f"s3://{bucket_name}/{job_name}"

recipe_overrides = {
    "run": {
        "replicas": instance_count,
    },
}

estimator = PyTorch(
    output_path=output_path,
    base_job_name=job_name,
    role=role,
    instance_count=instance_count,
    instance_type=instance_type,
    training_recipe=recipe,
    recipe_overrides=recipe_overrides,
    max_run=432000,
    sagemaker_session=sess,
    image_uri=image_uri,
    disable_profiler=True,
    debugger_hook_config=False,
)

In [None]:
from sagemaker.inputs import TrainingInput

train_input = TrainingInput(
    s3_data=train_dataset_s3_path,
    distribution="FullyReplicated",
    s3_data_type="Converse",
)

test_input = TrainingInput(
    s3_data=test_dataset_s3_path,
    distribution="FullyReplicated",
    s3_data_type="Converse",
)

In [None]:
# starting the train job with our uploaded datasets as input
estimator.fit(inputs={"train": train_input, "validation": test_input}, wait=False)

#### Downloading and Extracting the Artifacts

After the training job is in status "Complete", we can access the output information

In [None]:
model_s3_uri = estimator.model_data
print(model_s3_uri)

output_s3_uri = "/".join(model_s3_uri.split("/")[:-1]) + "/output.tar.gz"

In [None]:
!mkdir -p ./tmp/train_output/

In [None]:
!aws s3 cp $output_s3_uri ./tmp/train_output/output.tar.gz

In [None]:
!tar -xvzf ./tmp/train_output/output.tar.gz -C ./tmp/train_output/

In [None]:
import json

escrow_model_uri = json.load(open('./tmp/train_output/manifest.json'))['checkpoint_s3_bucket']

In [None]:
escrow_model_uri

***

## Model evaluation

Create minimal recipe for `gen_qa` evaluation. With `gen_qa` evaluation, we bring our own dataset for evaluation, and measure the following metrics:

* rouge1
* rouge2
* rougeL
* exact_match
* quasi_exact_match
* f1_score
* f1_score_quasi
* bleu

Your fine-tuned model checkpoints are accessible through the `manifest.json` in the output.tar.gz

In [None]:
recipe_content = f"""
run:
  name: nova-lite-gen_qa-eval-job
  model_type: amazon.nova-lite-v1:0:300k
  model_name_or_path: {escrow_model_uri}
  data_s3_path: {val_dataset_s3_path} # Required, input data s3 location

evaluation:
  task: gen_qa
  strategy: gen_qa
  metric: all

inference:
  max_new_tokens: 4096
  top_p: 0.9
  temperature: 0.1
"""

with open("eval-recipe.yaml", "w") as f:
  f.write(recipe_content)

Let's define our PyTorch estimator

In [None]:
instance_type = "ml.g5.12xlarge" # Override the instance type if you want to get a different container version
instance_count = 1

instance_type

Let's define the container to execute the Evaluation workload for Amazon Nova models

In [None]:
image_uri = f"708977205387.dkr.ecr.{sess.boto_region_name}.amazonaws.com/nova-evaluation-repo:SM-TJ-Eval-latest"

image_uri

In [None]:
model_id = "nova-lite/prod"
recipe = "./eval-recipe.yaml"

In [None]:
from sagemaker.pytorch import PyTorch

# define Training Job Name
job_name = f"train-{model_id.split('/')[0].replace('.', '-')}-sft-eval"

# define OutputDataConfig path
if default_prefix:
    output_path = f"s3://{bucket_name}/{default_prefix}/{job_name}"
else:
    output_path = f"s3://{bucket_name}/{job_name}"

recipe_overrides = {
    "run": {
        "replicas": instance_count,
    },
}

estimator = PyTorch(
    output_path=output_path,
    base_job_name=job_name,
    role=role,
    instance_count=instance_count,
    instance_type=instance_type,
    training_recipe=recipe,
    recipe_overrides=recipe_overrides,
    max_run=432000,
    sagemaker_session=sess,
    image_uri=image_uri,
    disable_profiler=True,
    debugger_hook_config=False,
)

In [None]:
from sagemaker.inputs import TrainingInput

eval_input = TrainingInput(
    s3_data=val_dataset_s3_path,
    distribution="FullyReplicated",
    s3_data_type="S3Prefix",
)

In [None]:
# starting the train job with our uploaded datasets as input
estimator.fit(inputs={"train": eval_input}, wait=False)

#### Downloading and Extracting the Artifacts

After the training job is in status "Complete", we can access the output information

In [None]:
model_s3_uri = estimator.model_data
print(model_s3_uri)

output_s3_uri = "/".join(model_s3_uri.split("/")[:-1]) + "/output.tar.gz"

In [None]:
!mkdir -p ./tmp/eval_output/

In [None]:
!aws s3 cp $output_s3_uri ./tmp/eval_output/output.tar.gz

In [None]:
!tar -xvzf ./tmp/eval_output/output.tar.gz -C ./tmp/eval_output/

In [None]:
results_path = "./tmp/eval_output/nova-lite-gen_qa-eval-job/eval_results"

### Visualize results

After the job is complete, we can visualize our results by using the following utility function

In [None]:
import matplotlib.pyplot as plt
import numpy as np
import os


def plot_metrics(results):
    # Extract metrics and their standard errors
    metrics = {}
    for key, value in results.items():
        if not key.endswith("_stderr"):
            metrics[key] = {"value": value, "stderr": results.get(f"{key}_stderr", 0)}

    # Sort metrics by value for better visualization
    sorted_metrics = dict(
        sorted(metrics.items(), key=lambda x: x[1]["value"], reverse=True)
    )

    # Prepare data for plotting
    labels = list(sorted_metrics.keys())
    values = [sorted_metrics[label]["value"] for label in labels]
    errors = [sorted_metrics[label]["stderr"] for label in labels]

    # Normalize BLEU score to be on the same scale as other metrics (0-1)
    bleu_index = labels.index("bleu") if "bleu" in labels else -1
    if bleu_index >= 0:
        values[bleu_index] /= 100
        errors[bleu_index] /= 100

    # Create figure
    fig, ax = plt.subplots(figsize=(12, 8))

    # Create bar chart
    x = np.arange(len(labels))
    bars = ax.bar(
        x,
        values,
        yerr=errors,
        align="center",
        alpha=0.7,
        capsize=5,
        color="skyblue",
        ecolor="black",
    )

    # Add labels and title
    ax.set_ylabel("Score")
    ax.set_title("Evaluation Metrics")
    ax.set_xticks(x)
    ax.set_xticklabels(labels, rotation=45, ha="right")
    ax.set_ylim(0, 1.0)

    # Add value labels on top of bars
    for i, bar in enumerate(bars):
        height = bar.get_height()
        # Convert BLEU back to its original scale for display
        display_value = values[i] * 100 if labels[i] == "bleu" else values[i]
        ax.text(
            bar.get_x() + bar.get_width() / 2.0,
            height + 0.01,
            f"{display_value:.2f}",
            ha="center",
            va="bottom",
        )

    # Add a note about BLEU
    if bleu_index >= 0:
        ax.text(
            0.5,
            -0.15,
            "Note: BLEU score shown as percentage (original: {:.2f})".format(
                values[bleu_index] * 100
            ),
            transform=ax.transAxes,
            ha="center",
            fontsize=9,
        )

    plt.tight_layout()
    return fig

In [None]:
import glob
import os

def find_json_files(path):
    return glob.glob(os.path.join(path, "*.json"))

In [None]:
evaluation_results_path = find_json_files(results_path)[0]

In [None]:
import json

with open(evaluation_results_path, "r") as f:
    data = json.load(f)

fig = plot_metrics(data["results"]["all"])

output_file = os.path.join("./", 'evaluation_metrics.png')
fig.savefig(output_file, bbox_inches='tight')

***

## Model deployment and inference

After training and evaluating our model, we want to make it available for inference. Amazon Bedrock provides a serverless endpoint for model deployment, allowing us to serve the model without managing infrastructure.

The Bedrock Custom Model feature of Amazon Bedrock lets us import our fine-tuned model and access it through the same API as other foundation models. This provides:

In [None]:
import boto3

# Initialize the Bedrock client
bedrock = boto3.client("bedrock", region_name=sess.boto_region_name)

model_path = "<ESCROW_S3_PATH_MODEL_CHECKPOINTS>"

# Define name for imported model
imported_model_name = "nova-lite-sagemaker-sft"

### Creating the Bedrock Custom Model

In [None]:
request_params = {
    "modelName": imported_model_name,
    "modelSourceConfig": {"s3DataSource": {"s3Uri": model_path}},
    "roleArn": role,
    "clientRequestToken": "NovaRecipeSageMaker",
}

# Create the model import job
response = bedrock.create_custom_model(**request_params)

model_arn = response["modelArn"]

# Output the model ARN
print(f"Model import job created with ARN: {model_arn}")

### Monitoring the Model status

After initiating the model import, we need to monitor its progress. The status goes through several states:

* CREATING: Model is being imported
* ACTIVE: Import successful
* FAILED: Import encountered errors

This cell polls the Bedrock API every 60 seconds to check the status of the model import, continuing until it reaches a terminal state (ACTIVE or FAILED). Once the import completes successfully, we'll have the model ARN which we can use for inference.

In [None]:
from IPython.display import clear_output
import time

while True:
    response = bedrock.list_custom_models(sortBy='CreationTime',sortOrder='Descending')
    model_summaries = response["modelSummaries"]
    status = ""
    for model in model_summaries:
        if model["modelName"] == imported_model_name:
            status = model["modelStatus"].upper()
            model_arn = model["modelArn"]
            print(f'{model["modelStatus"].upper()} {model["modelArn"]} ...')
            if status in ["ACTIVE", "FAILED"]:
                break
    if status in ["ACTIVE", "FAILED"]:
        break
    clear_output(wait=True)
    time.sleep(10)
    
model_arn

##### ⚠️ After the model is ACTIVE, deploy a custom model for on-demand inference!

Please refer to the official [AWS Documentation](https://docs.aws.amazon.com/nova/latest/userguide/deploy-custom-model.html)

In [None]:
request_params = {
    "clientRequestToken": "NovaRecipeSageMakerODI",
    "modelDeploymentName": f"{imported_model_name}-odi",
    "modelArn": model_arn,
}

response = bedrock.create_custom_model_deployment(**request_params)

response

In [None]:
from IPython.display import clear_output
import time

while True:
    response = bedrock.list_custom_model_deployments(
        sortBy="CreationTime", sortOrder="Descending"
    )
    model_summaries = response["modelDeploymentSummaries"]
    status = ""
    for model in model_summaries:
        if model["customModelDeploymentName"] == f"{imported_model_name}-odi":
            status = model["status"].upper()
            custom_model_arn = model["customModelDeploymentArn"]
            print(f'{model["status"].upper()} {model["customModelDeploymentArn"]} ...')
            if status in ["CREATING"]:
                break
    if status in ["ACTIVE", "FAILED"]:
        break
    clear_output(wait=True)
    time.sleep(10)

custom_model_arn

### Testing the Deployed Model

Now that our model is deployed to Amazon Bedrock, we can invoke it for inference. We'll set up the necessary clients and functions to interact with our model through the Bedrock Runtime API.

Inference Setup Components:
* Bedrock Runtime Client: AWS SDK client for making inference calls
* Helper Function: To handle retry logic and properly format requests
The generate function we're defining:

Applies the proper chat template to user messages
* Handles retry logic for robustness
* Sets appropriate generation parameters like temperature and top-p

This setup allows us to easily test how well our training worked by sending queries to the model and evaluating its responses.

In [None]:
import boto3
from botocore.config import Config


# Initialize Bedrock Runtime client
session = boto3.Session()
client = session.client(
    service_name="bedrock-runtime",
    region_name=sess.boto_region_name,
    config=Config(
        connect_timeout=300,  # 5 minutes
        read_timeout=300,  # 5 minutes
        retries={"max_attempts": 3},
    ),
)

In [None]:
import time

def generate(
    model_id,
    messages,
    system_prompt=None,
    tools=None,
    temperature=0.3,
    max_tokens=4096,
    top_p=0.9,
    max_retries=10,
):
    """
    Generate response using the model with proper tokenization and retry mechanism

    Parameters:
        model_id (str): ID of the model to use
        messages (list): List of message dictionaries with 'role' and 'content'
        system_prompt (str, optional): System prompt to guide the model
        tools (dict, optional): Tool configuration for the model
        temperature (float): Controls randomness in generation (0.0-1.0)
        max_tokens (int): Maximum number of tokens to generate
        top_p (float): Nucleus sampling parameter (0.0-1.0)
        max_retries (int): Maximum number of retry attempts

    Returns:
        dict: Model response containing generated text and metadata
    """
    # Prepare base parameters for the API call
    kwargs = {
        "inferenceConfig": {
            "temperature": temperature,
            "maxTokens": max_tokens,
            "topP": top_p,
        },
    }

    # Add optional parameters if provided
    if tools:
        kwargs["toolConfig"] = tools
    if system_prompt:
        kwargs["system"] = [{"text": system_prompt}]

    # Retry logic
    for attempt in range(max_retries):
        try:
            return client.converse(modelId=model_id, messages=messages, **kwargs)
        except Exception as e:
            print(f"Attempt {attempt + 1} failed: {str(e)}")
            if attempt < max_retries - 1:
                time.sleep(30)
            else:
                print("Max retries reached. Unable to get response.")
                return None

Use the custom model deployment ARN created with the Bedrock Custom Model

In [None]:
import json

model_arn = (
    custom_model_arn
    if custom_model_arn is not None
    else "<CUSTOM_MODEL_DEPLOYMENT_ARN>"
)

system_prompt = f"""
You are a helpful AI assistant that can answer questions and provide information.
You can use tools to help you with your tasks.

You have access to the following tools:

<tools>
{{tools}}
</tools>
For each function call, return a json object with function name and parameters:

{{{{\"name\": \"function name\", \"parameters\": \"dictionary of argument name and its value\"}}}}
"""

tools = [
    {
        "toolSpec": {
            "name": "calculate_bmi",
            "description": "Calculate BMI given weight in kg and height in meters",
            "inputSchema": {
                "json": {
                    "type": "object",
                    "properties": {
                        "weight_kg": {
                            "type": "number",
                            "description": "Property weight_kg",
                        },
                        "height_m": {
                            "type": "number",
                            "description": "Property height_m",
                        },
                    },
                    "required": ["weight_kg", "height_m"],
                },
            },
        }
    },
    {
        "toolSpec": {
            "name": "fetch_weather",
            "description": 'Fetch weather information\n\nArgs:\nquery: The weather query (e.g., "weather in New York")\nnum_results: Number of results to return (default: 1)\n\nReturns:\nJSON string containing weather information',
            "inputSchema": {
                "json": {
                    "type": "object",
                    "properties": {
                        "type": "object",
                        "properties": {
                            "query": {
                                "type": "string",
                                "description": "Property query",
                            },
                            "num_results": {
                                "type": "integer",
                                "description": "Property num_results",
                            },
                        },
                        "required": ["query"],
                    },
                },
            },
        }
    },
]

system_prompt = system_prompt.format(tools=json.dumps({"tools": tools}))

messages = [
    {"role": "user", "content": [{"text": "What is the weather in Rome, Italy?"}]},
]

response = generate(
    model_id=model_arn,
    system_prompt=system_prompt,
    messages=messages,
    temperature=0.1,
    top_p=0.9,
)

response["output"]

***

## LLM as a judge

We are now going to evaluate the model with the LLM as a Judge evaluation task. First, let's define the Bedrock client

In [None]:
import boto3
from botocore.config import Config

# Initialize Bedrock Runtime client
session = boto3.Session()
client = session.client(
    service_name="bedrock-runtime",
    region_name=sess.boto_region_name,
    config=Config(
        connect_timeout=300,  # 5 minutes
        read_timeout=300,  # 5 minutes
        retries={"max_attempts": 3},
    ),
)

To invoke the fine-tuned model for inference, we'll set up the necessary clients and functions to interact with our model through the Bedrock Runtime API.

Inference Setup Components:
* Bedrock Runtime Client: AWS SDK client for making inference calls
* Helper Function: To handle retry logic and properly format requests
The generate function we're defining:

Applies the proper chat template to user messages
* Handles retry logic for robustness
* Sets appropriate generation parameters like temperature and top-p

This setup allows us to easily test how well our training worked by sending queries to the model and evaluating its responses.

In [None]:
import time

def generate(
    model_id,
    messages,
    system_prompt=None,
    tools=None,
    temperature=0.3,
    max_tokens=4096,
    top_p=0.9,
    max_retries=10,
):
    """
    Generate response using the model with proper tokenization and retry mechanism

    Parameters:
        model_id (str): ID of the model to use
        messages (list): List of message dictionaries with 'role' and 'content'
        system_prompt (str, optional): System prompt to guide the model
        tools (dict, optional): Tool configuration for the model
        temperature (float): Controls randomness in generation (0.0-1.0)
        max_tokens (int): Maximum number of tokens to generate
        top_p (float): Nucleus sampling parameter (0.0-1.0)
        max_retries (int): Maximum number of retry attempts

    Returns:
        dict: Model response containing generated text and metadata
    """
    # Prepare base parameters for the API call
    kwargs = {
        "inferenceConfig": {
            "temperature": temperature,
            "maxTokens": max_tokens,
            "topP": top_p,
        },
    }

    # Add optional parameters if provided
    if tools:
        kwargs["toolConfig"] = tools
    if system_prompt:
        kwargs["system"] = [{"text": system_prompt}]

    # Retry logic
    for attempt in range(max_retries):
        try:
            return client.converse(modelId=model_id, messages=messages, **kwargs)
        except Exception as e:
            print(f"Attempt {attempt + 1} failed: {str(e)}")
            if attempt < max_retries - 1:
                time.sleep(30)
            else:
                print("Max retries reached. Unable to get response.")
                return None

#### Generate model inference on the created validation dataset

In the following cell, we are going to invoke the model to create the dataset for LLM as a judge.

The required dataset structure for LLM as a judge task is:

```
{
    "prompt": "String containing the input prompt and instructions.",
    "response_A": "String containing the ground truth output",
    "response_B": "String containing the customized model output."
}
```

In [None]:
model_ids = [
    (
        custom_model_arn
        if custom_model_arn is not None
        else "<CUSTOM_MODEL_DEPLOYMENT_ARN>"
    ),
    "us.amazon.nova-micro-v1:0",
]

# Change model ID to run LLM as a judge evaluation on the fine-tuned model or base model
model_id = model_ids[0]

llm_val_dataset = []

index = 1
for el in val_dataset:
    print("Processing row ", index)
    messages = [
        {"role": "user", "content": [{"text": el["query"]}]},
    ]

    response = generate(
        model_id=model_id,
        system_prompt=el["system"],
        messages=messages,
        temperature=0.1,
        top_p=0.9,
    )

    question = (
        el["system"] + "\n\n" + el["query"]
        if el["system"] != ""
        else el["query"]
    )

    llm_val_dataset.append(
        [
            question,
            el["response"],
            response["output"]["message"]["content"][0]["text"],
        ]
    )

    index += 1

llm_judge_df = pd.DataFrame(
    llm_val_dataset, columns=["prompt", "response_A", "response_B"]
)

In [None]:
llm_judge_df.head()

### Upload to Amazon S3

In [None]:
import boto3
import shutil

In [None]:
s3_client = boto3.client("s3")

if default_prefix:
    input_path = f"{default_prefix}/datasets/nova-dpo"
else:
    input_path = f"datasets/nova-dpo"

llm_judge_dataset_s3_path = f"s3://{bucket_name}/{input_path}/llm-judge/llm_judge.jsonl"

In [None]:
import os

# Save datasets to s3
os.makedirs("./data/llm-judge", exist_ok=True)

llm_judge_df.to_json("./data/llm-judge/llm_judge.jsonl", orient="records", lines=True)

s3_client.upload_file(
    "./data/llm-judge/llm_judge.jsonl",
    bucket_name,
    f"{input_path}/llm-judge/llm_judge.jsonl",
)

#shutil.rmtree("./data")

print(f"Training data uploaded to:")
print(llm_judge_dataset_s3_path)

***

### Run LLM as a Judge evaluation job

Create minimal recipe for `LLM-as-Judge` evaluation. With `llm_judge` evaluation, we bring our own dataset for evaluation, and measure the following metrics:

* a_scores
* b_scores
* ties
* inference_error
* score
* winrate
* lower_rate
* upper_rate

Your fine-tuned model checkpoints are accessible through the `manifest.json` in the output.tar.gz

In [None]:
recipe_content = f"""
run:
  name: nova-micro-llm-judge-eval-job
  model_type: amazon.nova-micro-v1:0:128k
  model_name_or_path: "nova-micro/prod"
  replicas: 1 # unmodifiable
  data_s3_path: {llm_judge_dataset_s3_path} # Required, input data s3 location

evaluation:
  task: llm_judge # not modifiable
  strategy: judge # not modifiable
  metric: all # not modifiable

inference:
  max_new_tokens: 4096 # modifiable
  top_p: 0.9 # modifiable
  temperature: 0.1 # modifiable
"""

with open("llm-judge-recipe.yaml", "w") as f:
  f.write(recipe_content)

Let's define our PyTorch estimator, by pointing to the created evaluation recipes

In [None]:
instance_type = "ml.g5.12xlarge" # Override the instance type if you want to get a different container version
instance_count = 1

instance_type

Let's define the container to execute the Evaluation workload for Amazon Nova models

In [None]:
image_uri = f"708977205387.dkr.ecr.{sess.boto_region_name}.amazonaws.com/nova-evaluation-repo:SM-TJ-Eval-latest"

image_uri

In [None]:
model_id = "nova-lite/prod"
recipe = "./llm-judge-recipe.yaml"

In [None]:
from sagemaker.pytorch import PyTorch

# define Training Job Name
job_name = f"train-{model_id.split('/')[0].replace('.', '-')}-dpo-peft-llm-judge"

# define OutputDataConfig path
if default_prefix:
    output_path = f"s3://{bucket_name}/{default_prefix}/{job_name}"
else:
    output_path = f"s3://{bucket_name}/{job_name}"

recipe_overrides = {
    "run": {
        "replicas": instance_count,
    },
}

estimator = PyTorch(
    output_path=output_path,
    base_job_name=job_name,
    role=role,
    instance_count=instance_count,
    instance_type=instance_type,
    training_recipe=recipe,
    recipe_overrides=recipe_overrides,
    max_run=432000,
    sagemaker_session=sess,
    image_uri=image_uri,
    disable_profiler=True,
    debugger_hook_config=False,
)

In [None]:
from sagemaker.inputs import TrainingInput

eval_input = TrainingInput(
    s3_data=llm_judge_dataset_s3_path,
    distribution="FullyReplicated",
    s3_data_type="S3Prefix",
)

eval_input

In [None]:
# starting the train job with our uploaded datasets as input
estimator.fit(inputs={"train": eval_input}, wait=False)

#### Downloading and Extracting the Artifacts

After the training job is in status "Complete", we can access the output information

In [None]:
!mkdir -p ./tmp/llm_judge_output/

In [None]:
!aws s3 cp $output_s3_uri ./tmp/llm_judge_output/output.tar.gz

In [None]:
!tar -xvzf ./tmp/llm_judge_output/output.tar.gz -C ./tmp/llm_judge_output/

In [None]:
results_path = "./tmp/llm_judge_output/nova-lite-llm-judge-eval-job/eval_results"

#### Visualize results

After the job is complete, we can visualize our results by using the following utility function

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns


def plot_llm_judge_results(results):
    """
    Plot LLM judge evaluation results with bar chart and pie chart only.

    Args:
        results (dict): Dictionary containing evaluation results

    Returns:
        matplotlib.pyplot: The pyplot object with the plots
    """
    # Set style
    plt.style.use("default")
    sns.set_palette("husl")

    # Create figure with subplots
    fig = plt.figure(figsize=(12, 5))

    # 1. Score Distribution Bar Chart
    ax1 = plt.subplot(1, 2, 1)
    scores = {
        "A Scores": results["a_scores"],
        "B Scores": results["b_scores"],
        "Ties": results["ties"],
        "Inference Errors": results["inference_error"],
    }

    bars = ax1.bar(
        scores.keys(),
        scores.values(),
        color=["#FF6B6B", "#4ECDC4", "#45B7D1", "#FFA07A"],
    )
    ax1.set_title("Score Distribution", fontsize=14, fontweight="bold")
    ax1.set_ylabel("Count")

    # Add value labels on bars
    for bar, value in zip(bars, scores.values()):
        height = bar.get_height()
        ax1.text(
            bar.get_x() + bar.get_width() / 2.0,
            height + height * 0.01,
            f"{int(value)}",
            ha="center",
            va="bottom",
            fontweight="bold",
        )

    plt.xticks(rotation=45, ha="right")

    # 2. Preference Pie Chart (excluding inference errors)
    ax2 = plt.subplot(1, 2, 2)
    total_valid = results["a_scores"] + results["b_scores"] + results["ties"]

    if total_valid > 0:
        pie_data = [results["a_scores"], results["b_scores"], results["ties"]]
        pie_labels = ["A Preferred", "B Preferred", "Ties"]
        colors = ["#FF6B6B", "#4ECDC4", "#45B7D1"]

        wedges, texts, autotexts = ax2.pie(
            pie_data, labels=pie_labels, colors=colors, autopct="%1.1f%%", startangle=90
        )

        # Make percentage text bold
        for autotext in autotexts:
            autotext.set_fontweight("bold")
            autotext.set_color("white")

    ax2.set_title(
        "Preference Distribution\n(Valid Judgments Only)",
        fontsize=14,
        fontweight="bold",
    )

    plt.tight_layout()

    return plt

In [None]:
import glob
import os

def find_json_files(path):
    return glob.glob(os.path.join(path, "*.json"))

In [None]:
evaluation_results_path = find_json_files(results_path)[0]

In [None]:
import json

with open(evaluation_results_path, "r") as f:
    data = json.load(f)

fig = plot_llm_judge_results(data["results"]["all"])

output_file = os.path.join("./", "evaluation_metrics_llm_judge.png")
fig.savefig(output_file, bbox_inches="tight")