## Coordinating FMOps Steps into a Fine-Tuning and Model Evaluation Pipeline

In this notebook, we stitch together the components of FMOps into a full FMOps pipeline on SageMaker AI. This capability creates a Directed-Acyclic Graph of steps, orchestrated by SageMaker AI and Managed MLFlow 3.0 on Amazon SageMaker.

Running hundreds of experiments, comparing the results, and keeping a track of the ML lifecycle can become very complex. This is where MLflow can help streamline the ML lifecycle, from data preparation to model deployment. By integrating MLflow into your LLM workflow, you can efficiently manage experiment tracking, model versioning, and deployment, providing reproducibility of steps. With MLflow, you can track and compare the performance of multiple LLM experiments, identify the best-performing models, and deploy them to production environments with confidence. 

You can create workflows with SageMaker Pipelines that enable you to prepare data, fine-tune models, and evaluate model performance with simple Python code for each step. 

Now you can use SageMaker managed MLflow to run LLM fine-tuning and evaluation experiments at scale. Specifically:

- MLflow can manage tracking of fine-tuning experiments, comparing evaluation results of different runs, model versioning, deployment, and configuration (such as data and hyperparameters)
- SageMaker Pipelines can orchestrate multiple experiments based on the experiment configuration 
  

The following figure shows the overview of the solution.

![](./Task5-Pipeline.png)

## Prerequisites 
Before you begin, make sure you have the following prerequisites in place:

- MLflow tracking server: If you're running this lab in a workshop environment, a MLflow tracking server has already been created for you. If you need to create a MLflow tracking server, follow the [documentation](https://docs.aws.amazon.com/sagemaker/latest/dg/mlflow-create-tracking-server.html)

### 1. Setup and Dependencies
Restart the kernel after executing below cells

In [None]:
%pip install -r ./scripts/requirements.txt --upgrade --quiet

In [None]:
from IPython import get_ipython
get_ipython().kernel.do_shutdown(True)

**Importing Libraries and Setting Up Environment**

This part imports all necessary Python modules. It includes SageMaker-specific imports for pipeline creation and execution, which will be used to define the pipeline steps.

In [None]:
import os
import boto3
import sagemaker
from sagemaker.workflow.execution_variables import ExecutionVariables
from sagemaker.workflow.function_step import step
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo
from sagemaker.workflow.fail_step import FailStep
from botocore.exceptions import ClientError

### 2. SageMaker Session and IAM Role

`get_execution_role()`: Retrieves the IAM role that SageMaker will use to access AWS resources. This role needs appropriate permissions for tasks like accessing S3 buckets and creating SageMaker resources.

In [None]:
sagemaker_session = sagemaker.session.Session()
role = sagemaker.get_execution_role()
instance_type = "ml.m5.xlarge"
pipeline_name = "qwen3-finetune-pipeline"
bucket_name = sagemaker_session.default_bucket()
default_prefix = sagemaker_session.default_bucket_prefix
if default_prefix:
    input_path = f'{default_prefix}/datasets/llm-fine-tuning-modeltrainer-sft'
else:
    input_path = f'datasets/llm-fine-tuning-modeltrainer-sft'

model_id = "Qwen/Qwen3-4B-Instruct-2507"
model_id_filesafe = model_id.replace("/","_").replace(".", "_")

### 3. Configuration

MLflow integration is crucial for experiment tracking and management. **Update the ARN for the MLflow tracking server.**

mlflow_arn: The ARN for the MLflow tracking server. You can get this ARN from SageMaker Studio UI. This allows the pipeline to log metrics, parameters, and artifacts to a central location.

This example requires a SageMaker with MLflow tracking server to track experiments and manage model artifacts. To create your own tracking server please refer to the [SageMaker documentation](https://docs.aws.amazon.com/sagemaker/latest/dg/mlflow-create-tracking-server.html). Once you have created your tracking server, please copy the tracking server ARN to the `mlflow_tracking server_arn` variable in the cell below.

In [None]:
mlflow_tracking_server_arn = "<REPLACE WITH YOUR ARN>"

try:
    response = boto3.client('sagemaker').describe_mlflow_tracking_server(
        TrackingServerName='genai-mlflow-tracker'
    )
    mlflow_tracking_server_arn = response['TrackingServerArn']
    print(f"MLflow Tracking Server ARN: {mlflow_tracking_server_arn}")
except ClientError:
    print("No MLflow Tracking Server Found, please input a value for mlflow_tracking_server_arn")

os.environ["mlflow_tracking_server_arn"] = mlflow_tracking_server_arn
os.environ["pipeline_name"] = pipeline_name

This section provides blanket configuration for how remote functions should be executed in a SageMaker environment. This configuration helps to streamline remote function execution which is particularly useful for optimizing the execution of pipelines.

In [None]:
%%writefile config.yaml
SchemaVersion: '1.0'
SageMaker:
  PythonSDK:
    Modules:
      RemoteFunction:
        # role arn is not required if in SageMaker Notebook instance or SageMaker Studio
        # Uncomment the following line and replace with the right execution role if in a local IDE
        # RoleArn: <replace the role arn here>
        InstanceType: ml.m5.xlarge
        Dependencies: ./scripts/requirements.txt
        IncludeLocalWorkDir: true
        CustomFileFilter:
          IgnoreNamePatterns: # files or directories to ignore
          - "*.ipynb" # all notebook files

In [None]:
# Set path to config file
os.environ["SAGEMAKER_USER_CONFIG_OVERRIDE"] = os.getcwd()

### 4. Download Model Data from Huggingface

In [None]:
from huggingface_hub import snapshot_download


# Simple function to check if file exists in S3
def s3_file_exists(s3_client, bucket, key):
    try:
        s3_client.head_object(Bucket=bucket, Key=key)
        return True
    except ClientError:
        return False

# Simple S3 upload function that checks if files exist before uploading
def simple_s3_upload(local_dir, s3_bucket, s3_prefix, skip_existing=True):
    """
    Upload files to S3, skipping files that already exist.
    
    Args:
        local_dir (str): Local directory containing files to upload
        s3_bucket (str): S3 bucket name
        s3_prefix (str): S3 prefix (folder path)
        skip_existing (bool): Whether to skip files that already exist in S3
        
    Returns:
        tuple: (uploaded_files, skipped_files, failed_files)
    """
    s3_client = boto3.client('s3')
    uploaded_files = []
    skipped_files = []
    failed_files = []
    
    # Get all local files
    local_files = []
    for root, _, files in os.walk(local_dir):
        for filename in files:
            local_path = os.path.join(root, filename)
            rel_path = os.path.relpath(local_path, local_dir)
            s3_key = os.path.join(s3_prefix, rel_path).replace('\\', '/')
            local_files.append((local_path, s3_key))
    
    print(f"Found {len(local_files)} files in {local_dir}")
    
    # Process each file sequentially
    for local_path, s3_key in local_files:
        try:
            # Check if file exists in S3
            if skip_existing and s3_file_exists(s3_client, s3_bucket, s3_key):
                print(f"Skipping {s3_key} (file exists in S3)")
                skipped_files.append(s3_key)
                continue
            
            # Upload the file
            print(f"Uploading {local_path} to s3://{s3_bucket}/{s3_key}")
            s3_client.upload_file(
                local_path, 
                s3_bucket, 
                s3_key,
                ExtraArgs={'ACL': 'bucket-owner-full-control'}
            )
            uploaded_files.append(s3_key)
            
        except Exception as e:
            print(f"Failed to upload {local_path}: {str(e)}")
            failed_files.append((s3_key, str(e)))
    
    print(f"\nUpload Summary:")
    print(f"  - Uploaded: {len(uploaded_files)} files")
    print(f"  - Skipped: {len(skipped_files)} files")
    print(f"  - Failed: {len(failed_files)} files")
    
    return uploaded_files, skipped_files, failed_files

# Set local and S3 model paths
model_local_location = f"../models/{model_id_filesafe}"
if default_prefix:
    model_s3_destination = f"s3://{bucket_name}/{default_prefix}/models/{model_id_filesafe}"
    prefix = f"{default_prefix}/models/{model_id_filesafe}"
else:
    model_s3_destination = f"s3://{bucket_name}/models/{model_id_filesafe}"
    prefix = f"models/{model_id_filesafe}"

print("Downloading model ", model_id)
os.makedirs(model_local_location, exist_ok=True)

try:
    snapshot_download(repo_id=model_id, local_dir=model_local_location)
    print(f"Model {model_id} downloaded under {model_local_location}")
    
    print(f"Beginning Model Upload to {model_s3_destination}...")
    
    # Use the simple upload function without threads or batch processing
    uploaded, skipped, failed = simple_s3_upload(
        local_dir=model_local_location,
        s3_bucket=bucket_name,
        s3_prefix=prefix,
        skip_existing=True
    )
 
    print(f"Model successfully uploaded to: \n {model_s3_destination}")
except Exception as e:
    print(f"Error during model download or upload: {e}")
    raise

os.environ["model_location"] = model_s3_destination

### 5. Configure Fine-Tuning Job

This section defines the core components of the SageMaker pipeline.

**Training Configuration**

The train_config dictionary is comprehensive, including:

Experiment naming for tracking purposes
Model specifications (ID, version, name)
Infrastructure details (instance types and counts for fine-tuning and deployment)
Training hyperparameters (epochs, batch size)

This configuration allows for easy adjustment of the training process without changing the core pipeline code.

**LoRA Parameters**

Low-Rank Adaptation (LoRA) is an efficient fine-tuning technique that reduces the number of trainable parameters by adding low-rank decomposition matrices to existing weights rather than updating all model weights. This significantly reduces memory requirements and training time while maintaining performance comparable to full fine-tuning.

In [None]:
%%bash

cat > ./args.yaml <<EOF

# MLflow Config
mlflow_uri: "${mlflow_tracking_server_arn}"                # The URI for the MLflow tracking server 
mlflow_experiment_name: "${pipeline_name}"  # Name of the MLflow experiment for organizing runs


model_id: "${model_location}"              # Hugging Face model id, or S3 location of base model

# SageMaker specific parameters 
output_dir: "/opt/ml/model"                # Path where SageMaker will upload the model 
train_dataset_path: "/opt/ml/input/data/train/"   # Path where FSx saves train dataset
test_dataset_path: "/opt/ml/input/data/test/"     # Path where FSx saves test dataset

# Training parameters
max_seq_length: 1500                       # Maximum sequence length for inputs (affects memory usage)
                                           # Higher values allow for longer context but require more memory
                                           # Range: 512-4096 depending on model architecture and hardware

# LoRA parameters (Low-Rank Adaptation)
lora_r: 8                                  # Rank of the LoRA update matrices
                                           # Lower values (4-16) are more efficient, higher values (32-64) can improve quality
                                           # Recommended range: 8-64 depending on task complexity
lora_alpha: 16                             # Scaling factor for the LoRA update
                                           # Generally set to 2x lora_r for good performance
lora_dropout: 0.1                          # Dropout probability for LoRA layers
                                           # Range: 0.0-0.5, helps prevent overfitting

# Optimizer parameters
learning_rate: 2e-4                        # Learning rate for parameter updates
                                           # Range: 1e-5 to 5e-4 for LoRA fine-tuning
                                           # Too high: training instability, too low: slow convergence

# Training loop parameters
num_train_epochs: 1                        # Number of complete passes through the training dataset
                                           # More epochs can improve performance but risk overfitting
                                           # Range: 1-5 for LoRA fine-tuning
per_device_train_batch_size: 2             # Number of samples per GPU during training
                                           # Larger values improve training speed but require more memory
                                           # Range: 1-8 for large models on common GPUs
per_device_eval_batch_size: 1              # Number of samples per GPU during evaluation
                                           # Can typically be larger than training batch size
gradient_accumulation_steps: 2             # Accumulate gradients over multiple steps
                                           # Effectively increases batch size by this factor
                                           # Useful when limited by GPU memory

# Memory optimization techniques
gradient_checkpointing: true               # Reduces memory usage by recomputing activations during backward pass
                                           # Trades computation for memory, ~20% slower but enables larger models/sequences
fp16: true                                 # Use half-precision floating point (speeds up training, reduces memory)
bf16: false                                # Use bfloat16 precision (better numerical stability than fp16)
                                           # Also enables FlashAttention2 (requires Ampere/Hopper GPU+ eg:A10, A100, H100)
tf32: false                                # Use TensorFloat-32 precision (NVIDIA Ampere+ GPUs only)

#uncomment here for fsdp - start
# fsdp: "full_shard auto_wrap offload"     # Fully Sharded Data Parallel training
                                           # Splits model states across multiple GPUs
# fsdp_config:                             # Configuration for FSDP
#     backward_prefetch: "backward_pre"    # Prefetches parameters before backward pass
#     cpu_ram_efficient_loading: true      # More memory-efficient parameter loading
#     offload_params: true                 # Offloads parameters to CPU when not in use
#     forward_prefetch: false              # Don't prefetch parameters for forward pass
#     use_orig_params: true                # Use original parameter ordering
#uncomment here for fsdp - end

merge_weights: true                        # Merge adapter weights into the base model
                                           # true: produces standalone model, false: keeps adapter separate
EOF

In [None]:
from sagemaker.s3 import S3Uploader

if default_prefix:
    input_path = f"s3://{bucket_name}/{default_prefix}/training_config/{model_id_filesafe}"
else:
    input_path = f"s3://{bucket_name}/training_config/{model_id_filesafe}"

# upload the model yaml file to s3
model_yaml = "args.yaml"
train_config_s3_path = S3Uploader.upload(local_path=model_yaml, desired_s3_uri=f"{input_path}/config")

print(f"Training config uploaded to:")
print(train_config_s3_path)

### 6. Pipeline Creation and Execution

This final section brings all the components together into an executable pipeline.

**Creating the Pipeline**

The pipeline object is created with all defined steps.

1. Preprocessing Step -- Reformat all of the fine-tuning data to the prompt format required for the fine-tuning job.
2. Training Step -- Execute the model fine-tuning job using the preprocessed data.
3. Deploy Step -- Deploy the model to a SageMaker AI Managed Endpoint for testing fine-tuning performance.
4. Quantitative Evaluation Step -- Evaluate the model's performance using ROUGE scores.
5. Qualitative Evaluation Step -- Evaluate the model's performance using LLM-as-a-Judge.
6. Conditionally Register Model -- Register the model if the quantitative and qualitative evaluations meet criteria.

In [None]:
from steps import pipeline_utils
guardrail_id, guardrail_version = pipeline_utils.get_or_create_guardrail()

In [None]:
from steps import (
    preprocess_step,
    finetune_step,
    deploy_step,
    quantitative_eval_step,
    qualitative_eval_step,
    model_registration_step
)
from sagemaker.workflow.step_collections import StepCollection

preprocessing_step = preprocess_step.preprocess(
    tracking_server_arn=mlflow_tracking_server_arn,
    experiment_name=pipeline_name,
    run_name=ExecutionVariables.PIPELINE_EXECUTION_ID,
    input_path=input_path
)

training_step = finetune_step.train(
    tracking_server_arn=mlflow_tracking_server_arn,
    experiment_name=pipeline_name,
    run_id=preprocessing_step[0],
    train_dataset_s3_path=preprocessing_step[1],
    test_dataset_s3_path=preprocessing_step[2],
    train_config_s3_path=train_config_s3_path,
    role=role,
    model_id=model_s3_destination
)
run_id=training_step[0]
model_artifacts_s3_path=training_step[2]

deploy_step = deploy_step.deploy(
    tracking_server_arn=mlflow_tracking_server_arn,
    model_artifacts_s3_path=model_artifacts_s3_path,
    model_id=model_s3_destination,
    experiment_name=pipeline_name,
    run_id=run_id,
)
endpoint_name=deploy_step

mlflow_trace_attributes = {
    "model_id": model_id,
    "guardrail_id": guardrail_id,
    "guardrail_version": guardrail_version
}
quantitative_eval_step = quantitative_eval_step.quantitative_evaluate(
    tracking_server_arn=mlflow_tracking_server_arn,
    experiment_name=pipeline_name,
    run_id=run_id,
    endpoint_name=endpoint_name,
    mlflow_trace_attributes=mlflow_trace_attributes
)

qualitative_eval_step = qualitative_eval_step.qualitative_evaluate(
    tracking_server_arn=mlflow_tracking_server_arn,
    experiment_name=pipeline_name,
    run_id=run_id,
    endpoint_name=endpoint_name,
    mlflow_trace_attributes=mlflow_trace_attributes
)

evaluation_gate = ConditionStep(
    name="EvaluationGate",
    depends_on=[qualitative_eval_step],
    conditions=[
        ConditionGreaterThanOrEqualTo(
            left=quantitative_eval_step["rougeL_f"],
            right=0.2
        ),
        ConditionGreaterThanOrEqualTo(
            left=qualitative_eval_step["avg_medical_accuracy"],
            right=3.0
        )
    ],
    if_steps=[
        model_registration_step.register_model(
            tracking_server_arn=mlflow_tracking_server_arn,
            experiment_name=pipeline_name,
            run_id=run_id,  # Assuming training_step returns run_id as first output
            model_artifacts_s3_path=model_artifacts_s3_path,  # Assuming training_step returns artifacts path as second output
            model_id=model_id,
            model_name=f"Fine-Tuned-Medical-Qwen3-4B-Instruct-2507",
            endpoint_name=endpoint_name,
            evaluation_score=quantitative_eval_step["rougeL_f"],  # Get the evaluation score
            pipeline_name=pipeline_name,
            model_description="Fine-tuned medical LLM for clinical reasoning and diagnostics"
        )
    ],
    else_steps=[
        FailStep(
            name="EvaluationFailed",
            error_message="Model evaluation failed to meet quality thresholds."
        )
    ]
)

# Combining the steps into the pipeline definition
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        instance_type,
    ],
    steps=[
        preprocessing_step,
        training_step,
        deploy_step,
        quantitative_eval_step,
        evaluation_gate
    ],
)

**Upserting the Pipeline**

This step either creates a new pipeline in SageMaker or updates an existing one with the same name. It's a key part of the MLOps process, allowing for iterative refinement of the pipeline.

In [None]:
pipeline.upsert(role)

**Starting the Pipeline Execution**

This command kicks off the actual execution of the pipeline in SageMaker. From this point, SageMaker will orchestrate the execution of each step, managing resources and data flow between steps.

In [None]:
execution = pipeline.start()

# Clean up

In [None]:
# Delete the endpoint to avoid incurring charges
import boto3
import time
import botocore

def delete_endpoint_with_retry(endpoint_name, max_retries=3, wait_seconds=10):
    """
    Delete a SageMaker endpoint with retry logic
    
    Args:
        endpoint_name (str): Name of the SageMaker endpoint to delete
        max_retries (int): Maximum number of retry attempts
        wait_seconds (int): Time to wait between retries in seconds
    
    Returns:
        bool: True if deletion was successful, False otherwise
    """
    sm_client = boto3.client('sagemaker')
    
    # First check if the endpoint exists
    try:
        sm_client.describe_endpoint(EndpointName=endpoint_name)
        endpoint_exists = True
    except sm_client.exceptions.ClientError as e:
        if "Could not find endpoint" in str(e):
            print(f"Endpoint {endpoint_name} does not exist, no cleanup needed.")
            return True
        else:
            print(f"Error checking endpoint existence: {e}")
            return False
    
    # If we get here, the endpoint exists and we should delete it
    for attempt in range(max_retries):
        try:
            print(f"Attempting to delete endpoint {endpoint_name} (attempt {attempt + 1}/{max_retries})")
            sm_client.delete_endpoint(EndpointName=endpoint_name)
            sm_client.delete_endpoint_config(EndpointConfigName=endpoint_name)
            print(f"Endpoint {endpoint_name} deletion initiated successfully")
            
            # Wait for endpoint to be fully deleted
            print("Waiting for endpoint to be fully deleted...")
            
            # Poll until endpoint is deleted or max wait time is reached
            total_wait_time = 0
            max_wait_time = 300  # 5 minutes maximum wait
            while total_wait_time < max_wait_time:
                try:
                    sm_client.describe_endpoint(EndpointName=endpoint_name)
                    print(f"Endpoint still exists, waiting {wait_seconds} seconds...")
                    time.sleep(wait_seconds)
                    total_wait_time += wait_seconds
                except sm_client.exceptions.ClientError:
                    print(f"Endpoint {endpoint_name} successfully deleted")
                    return True
            
            # If we get here, the endpoint still exists after max_wait_time
            print(f"Warning: Endpoint deletion initiated but still exists after {max_wait_time} seconds")
            return False
            
        except botocore.exceptions.ClientError as e:
            if "ResourceInUse" in str(e) or "ResourceNotFound" in str(e):
                print(f"Error deleting endpoint: {e}")
                print(f"Retrying in {wait_seconds} seconds...")
                time.sleep(wait_seconds)
            else:
                print(f"Unexpected error deleting endpoint: {e}")
                return False
    
    print(f"Failed to delete endpoint {endpoint_name} after {max_retries} attempts")
    return False

# Clean up endpoint
try:
    endpoint_name = f"{model_id.replace('/', '-').replace('_', '-')}-sft-djl"
    
    print(f"Cleaning up endpoint: {endpoint_name}")
    if delete_endpoint_with_retry(endpoint_name):
        print("Cleanup completed successfully")
    else:
        print("Warning: Endpoint cleanup may have failed, please check the SageMaker console")
        
except Exception as e:
    print(f"Error during endpoint cleanup: {str(e)}")
    print("You may need to manually delete the endpoint from the SageMaker console")