# Fine-tuning Multi-Modal Language Models with Amazon SageMaker AI

This notebook demonstrates how to fine-tune large language models using Amazon SageMaker AI 
and the Modelscope Swift framework. The process includes:

1. Setting up model and training configurations
2. Configuring SageMaker resources
3. Fine-tuning the model
4. Evaluating the fine-tuning training process
5. Downloading and analyzing the fine-tuned model

## Key Components

- Model Configuration: Select and configure the model to be fine-tuned
- SageMaker Setup: Configure AWS resources and training environment
- Training Process: Fine-tune the model using the SWIFT framework
- Evaluation: Analyze training metrics and model performance
- Model Export: Save and prepare the model for deployment

## Requirements

- AWS SageMaker access with appropriate permissions
- Training data in the correct format
- Sufficient GPU resources for training

The training will happen inside a container image. We decided to use a SageMaker Distribution image because it already contains many useful dependencies, for example a pytorch installation. 


You can learn more about SageMaker Distribution images in the [SageMaker Distribution documentation](https://docs.aws.amazon.com/sagemaker/latest/dg/notebooks-available-images.html#notebooks-available-images-arn).

Here are a few example distributions from the link above:
* us-east-1: 885854791233.dkr.ecr.us-east-1.amazonaws.com/sagemaker-distribution-prod:2.1.0-gpu
* us-west-2: 542918446943.dkr.ecr.us-west-2.amazonaws.com/sagemaker-distribution-prod:2.1.0-gpu

Later for the training you will get to know the @remote decorator. The @remote decorator requires your local python version to be the same as the one in the training image. Let's try to find a suitable SageMaker Distribution image for your Python version.

In [None]:
import sys
from utils.training_image import (
    get_sagemaker_distribution, 
    SageMakerDistribution, 
    get_python_version, 
    get_aws_account_id_for_region,
    is_docker_installed,
    is_docker_compose_installed,
    check_and_enable_docker_access_sagemaker_studio
)

In [None]:
py_version = sys.version_info
print(f"Your Python version: {str(get_python_version(*py_version))}")

In [None]:
sm_distro_version = get_sagemaker_distribution(py_version)
print(f"Using SageMaker distribution v{sm_distro_version.image_version} as training image.")

### Install Dependencies

In [None]:
sagemaker_sdk_version = sm_distro_version.sagemaker_python_sdk # local SageMaker version must be same as in training job with remote decorator

In [None]:
%pip install -U --quiet requests beautifulsoup4 dataclasses
%pip install --quiet sagemaker=={sagemaker_sdk_version} 

In [None]:
%load_ext autoreload
%autoreload 2

In [None]:
import os
import json
import subprocess
from pathlib import Path
from typing import Optional, Dict, Any, Union, List
import time

In [None]:
import sagemaker
from sagemaker.remote_function import remote, CheckpointLocation

In [None]:
from utils.config import ModelConfig
from utils.finetuning import (
    check_checkpoints_directory,
    find_latest_version_dir,
    find_latest_checkpoint,
    find_latest_checkpoint_path,
    get_latest_sagemaker_training_job,
    get_s3_suffix,
    find_best_model_checkpoint
)
from utils.model_manager import list_available_models

### Model Configuration

In [None]:
# Using Qwen2.5-VL-7B for its strong performance on vision-language tasks
# Can be a vision model that MS Swift supports: 
# https://github.com/modelscope/ms-swift/blob/main/docs/source_en/Instruction/Supported-models-and-datasets.md
model_config = ModelConfig(
    model_type="qwen2_5_vl",
    model_id="Qwen/Qwen2.5-VL-3B-Instruct"
    # Other models are
    # model_type = "deepseek_janus_pro",
    # model_id = "deepseek-ai/Janus-Pro-7B"
    
    # model_type = "qwen2_vl",
    # model_id = "Qwen/Qwen2-VL-2B-Instruct"
    
    # model_type = "qwen2_5_vl",
    # model_id = "Qwen/Qwen2.5-VL-7B-Instruct"
    
    # model_type = "llama3_2_vision",
    # model_id = "meta-llama/Llama-3.2-11B-Vision-Instruct"
)

print("✅ Configured model id.")

### SageMaker Setup

In [None]:
# Initialize session and configure AWS resources for training
try:
    role = sagemaker.get_execution_role()
    session = sagemaker.Session()
    region = session.boto_region_name
    
    # Configure S3 paths for data and artifacts
    # CHANGE if your dataset is in a different S3 bucket
    default_bucket_name = session.default_bucket()
    dataset_s3_prefix = "fatura2-train-data"
    s3_root_uri = f"s3://{default_bucket_name}"
    dataset_s3_uri = f"{s3_root_uri}/{dataset_s3_prefix}"
    
    
except Exception as e:
    raise Exception(f"Error setting up SageMaker session: {str(e)}")
print("✅ Initialized SageMaker session...")
print(f"💾 Using dataset: {dataset_s3_uri}")

### Training Configuration

Next you will configure the training job.
First up is which container image to use during training and which dependencies to install into he container image. 

In [None]:
sagemaker_distr_account_id = get_aws_account_id_for_region(region)
if not sagemaker_distr_account_id:
    raise ValueError(
        f"Please make sure to manually set the `sagemaker_distr_account_id` account id for your specific AWS region ({region}) from the AWS documentation: https://docs.aws.amazon.com/sagemaker/latest/dg/notebooks-available-images.html#notebooks-available-images-arn"
    )

In [None]:
# lets define the sagemaker distribution to use
sagemaker_dist_uri = f"{sagemaker_distr_account_id}.dkr.ecr.{region}.amazonaws.com/sagemaker-distribution-prod:{sm_distro_version.image_version}-gpu"

In [None]:
dependencies = f"""git+https://github.com/huggingface/accelerate.git@v1.7.0
ms-swift@git+https://github.com/modelscope/ms-swift.git@v3.5.3
git+https://github.com/huggingface/transformers@v4.52.4
av
qwen_vl_utils==0.0.11
decord
optimum
huggingface_hub[hf_transfer]
tensorboardX
tensorboard
sagemaker=={sagemaker_sdk_version}
"""

In [None]:
%store dependencies >requirements.txt

The instance type required will depend on which model you are using, the hyperparameters, and the dataset size. If you encounter an out of memory error then you should use a larger instance type or change the training configuration. Here are the instance types we have been using:
| Instance Type  | Model | Optimized | Note |
|----------------|-------|-----------|------|
| ml.g6.8xlarge | Qwen/Qwen2.5-VL-3B-Instruct | Optimized | This is with 300 training samples. If you have a larger dataset you might need a bigger instance, for example ml.g6e.8xlarge or ml.g6.12xlarge |
| ml.g6e.48xlarge | meta-llama/Llama-3.2-11B-Vision-Instruct | Not optimized | You can probably use a smaller instance type |

In [None]:
use_local_mode = True # Set to true to run on local instance
instance_type = "local_gpu" if use_local_mode else "ml.g6.8xlarge" # "ml.g6.12xlarge" 

<div style="border: 2px solid #006CE0; 
    padding: 10px; 
    border-radius: 5px; 
    max-width: 100%;
    background: #f0fbff;">
    <b>Note:</b> If you run into out of memory errors during training use a larger instance type. For example if you are training a larger model or with more data. If you are using local mode then you will need a GPU on your local machine, for example running inside a SageMaker Studio JuypterLab on a ml.g6.8xlarge instance.
</div>

We can use Spot Instances for training. A Spot Instance depends on availability and might get interrupted if others need the capacity. The advantage of Spot Instances is up to 90% price reduction compared to the on demand price. SageMaker takes care of restarting the training job once capacity is back available. Training with checkpointing goes well together with Spot Instances.

In [None]:
use_spot = True

In [None]:
# Setup training job parameters and checkpoint management
training_job_name_prefix = model_config.training_job_prefix(dataset_s3_prefix)
print(f"Training job name prefix: {training_job_name_prefix}")

In [None]:
checkpoint_s3_uri = os.path.join(s3_root_uri, training_job_name_prefix, "checkpoints")
checkpoint_loc = CheckpointLocation(s3_uri=checkpoint_s3_uri)
print(f"Checkpoint S3 location: {checkpoint_loc._s3_uri}")
latest_checkpoint = find_latest_checkpoint_path(checkpoint_loc._s3_uri)
print(f"Training will use checkpoint: {latest_checkpoint}")

<div style="border: 2px solid #006CE0; 
    padding: 10px; 
    border-radius: 5px; 
    max-width: 100%;
    background: #f0fbff;">
    <b>Note:</b> Please check the checkpoint configuration above. Checkpointing is useful if you want to continue training from a previous training runs checkpoint or if you are using Spot Instances to recover from interruptions. If you do not want to use checkpointing then you should set `checkpoint_loc` to None or delete the content at the S3 location with the command in the cell below.
</div>

In [None]:
checkpoint_loc = None

In [None]:
#!aws s3 rm --recursive --quiet {checkpoint_loc._s3_uri}

By default, the [Amazon SageMaker Python SDK reads configuration](https://sagemaker.readthedocs.io/en/stable/overview.html#configuring-and-using-defaults-with-the-sagemaker-python-sdk) values from an admin defined or user specific configuration file. This configuration allows all kind of customizations do be made. Setting the `SAGEMAKER_USER_CONFIG_OVERRIDE` environment variable below overwrites these defaults. The main settings you will configure below are

* The container image URI that should run the remote function code.
* Python dependencies to install for the remote training.
* Which files from the local working directory not to upload to the remote code.

In [None]:
# Override user config to ensure consistent environment setup
os.environ["SAGEMAKER_USER_CONFIG_OVERRIDE"] = os.getcwd()

In [None]:
# defines the environment variables for the training
env_variables ={
    "SIZE_FACTOR": json.dumps(8), # can be increase but requires more GPU memory
    "MAX_PIXELS": json.dumps(602112), # can be increase but requires more GPU memory
    "CUDA_VISIBLE_DEVICES": "0,1,2,3", # depends on the instance type, ml.g6.8xlarge has 4 GPUs
    "NPROC_PER_NODE": "4", # depends on the instance type, ml.g6.8xlarge has 4 GPUs
    "USE_HF_TRANSFER": json.dumps(1),
    # "HF_TOKEN": "xxxxxxxxxxx",
}

In [None]:
config_yaml = f"""
SchemaVersion: '1.0'
SageMaker:
  PythonSDK:
    Modules:
      RemoteFunction:
        # role arn is not required if in SageMaker Notebook instance or SageMaker Studio
        # Uncomment the following line and replace with the right execution role if in a local IDE
        # RoleArn: <replace the role arn here>
        S3RootUri: {s3_root_uri}
        ImageUri: {sagemaker_dist_uri}        
        InstanceType: {instance_type} # default instance type to use
        Dependencies: ./requirements.txt
        IncludeLocalWorkDir: true
        PreExecutionCommands:
        - "pip install packaging"
        - "sudo mkdir -p /opt/ml/cache"
        - "sudo chmod -R 777 /opt/ml/cache"
        - "sudo mkdir -p /opt/ml/checkpoints"
        - "sudo chmod -R 777 /opt/ml/checkpoints"
        - "sudo mkdir -p /opt/ml/model"
        - "sudo chmod -R 777 /opt/ml/model"
        - "sudo chown sagemaker-user:sagemaker-user /opt/ml/model"
        - "echo 'Granted checkpoints directory permissions'"
        CustomFileFilter:
          IgnoreNamePatterns:
          - "*.ipynb"
          - "__pycache__"
          - "data"
          - "images"
          - "bin"
          - "models"
          - "results"
          - ".git"
        EnvironmentVariables: {json.dumps(env_variables)}
        Tags:
          - Key: 'purpose'
            Value: 'fine-tuning'
          - Key: 'model_id'
            Value: {model_config.model_id}
          - Key: 'dataset'
            Value: {dataset_s3_uri}
"""

print(config_yaml, file=open("config.yaml", "w"))
print(config_yaml)

For the training you will need to set hyperparameters. We have already set sensible defaults for the parameters. You can overwrite any of them:

In [None]:
fine_tuning_kwargs = {
    "training_data_s3":dataset_s3_uri,
    "checkpoint_loc":checkpoint_loc,
    "model_type":model_config.model_type,
    "model_id":model_config.model_id,
    "train_data_path":"conversations_train_swift_format.json", 
    "validation_data_path":"conversations_dev_swift_format.json"
}

### Defining fine-tuning function

In [None]:
# @remote(instance_type="ml.g6e.48xlarge", volume_size=200, use_spot_instances=True,job_name_prefix=training_job_name_prefix, max_wait_time_in_seconds=172800,max_runtime_in_seconds=172800)
def fine_tune_documents(
    model_type: str, model_id: str, checkpoint_loc: Optional[CheckpointLocation], 
    training_data_s3: str, train_data_path="train.jsonl", validation_data_path="validation.jsonl"
) -> str:
    """Fine-tune model with checkpoint recovery support for cost-efficient spot training.
    
    The fine-tuning is a 3 step process:
    1. Download the training data.
    2. Configure the fine-tuning
    3. Run the fine-tuning
            
    """
    import os
    from swift.llm import sft_main, TrainArguments
    import shutil
    from utils.finetuning import find_latest_checkpoint_path, setup_directories


    output_dir = os.environ.get("SM_MODEL_DIR", "/opt/ml/model")
    checkpoint_dir = checkpoint_loc._local_path if checkpoint_loc else output_dir # directory for checkpoint artifacts (for spot training or to continue previous training)
    dataset_dir = "."
    

   
    setup_directories(output_dir, checkpoint_dir, dataset_dir)

    
    # 1. Copy training data into the training container
    subprocess.run(
        ["aws", "s3", "cp", training_data_s3, dataset_dir, "--recursive", "--quiet"],
        check=True,
        shell=False
    )
    
    train_data_local_path = os.path.join(dataset_dir, train_data_path)
    validation_data_local_path = os.path.join(dataset_dir, validation_data_path)


    from transformers import modeling_utils
    if not hasattr(modeling_utils, "ALL_PARALLEL_STYLES") or modeling_utils.ALL_PARALLEL_STYLES is None:
        modeling_utils.ALL_PARALLEL_STYLES = ["tp", "none", "colwise", "rowwise"]
    
    # 2. Define training parameters
    # swift sft ...
    argv = [
        "--model_type", model_type,
        "--model", model_id,
        "--model_revision", "main", # We recommend that you pin to a specific commit from HuggingFace Hub
        "--train_type", "lora",
        "--use_dora", "true",
        "--output_dir", checkpoint_dir,
        "--max_length", "4096",
        "--dataset", train_data_local_path,
        "--val_dataset", validation_data_local_path,
        "--save_steps", "50",
        "--logging_steps","5",
        "--num_train_epochs", "4",
        "--lora_dtype", "bfloat16",
        "--per_device_train_batch_size", "4",
        "--per_device_eval_batch_size", "1",
        "--learning_rate", "1e-4", # "4.0e-5", #  "2e-4"
        "--target_modules", "all-linear",
        "--use_hf", "true",
        "--warmup_ratio","0.05",
        "--save_total_limit","3",
        "--gradient_accumulation_steps","1",
        "--freeze_vit", "true", # default: true
        "--freeze_llm", "false", # default: false
        "--freeze_aligner", "true" # default: true
    ]

    # Find latest checkpoint for training recovery
    full_checkpoint_path = find_latest_checkpoint_path(checkpoint_dir)
    if full_checkpoint_path:
        argv.append("--resume_from_checkpoint")
        argv.append(full_checkpoint_path)

    # 3. Execute training
    result = sft_main(argv)
    best_checkpoint = result["best_model_checkpoint"]

    if checkpoint_loc:
        # Copy training artifacts to SageMaker output directory
        shutil.copytree(checkpoint_dir, output_dir, dirs_exist_ok=True)
        
    return best_checkpoint


## Define SageMaker Pipeline for Local Mode training

This approach allows us to execute the Sagemaker training job in LocalMode, without having to wait for any remote instances or resources.
Make sure you have a Jupyterlab space with GPU.

In [None]:
import urllib
import boto3
from sagemaker.session import Session
from sagemaker import get_execution_role
from sagemaker.workflow.function_step import step
from sagemaker.workflow.pipeline import Pipeline

# import mlflow
from sagemaker.workflow.execution_variables import ExecutionVariables
from sagemaker.workflow.pipeline_definition_config import PipelineDefinitionConfig
from sagemaker.workflow.pipeline_context import LocalPipelineSession

def run_pipeline(local_mode=True):
    train_result = step(fine_tune_documents, name="finetune")(**fine_tuning_kwargs)
    
    steps = [train_result]
    
    role = get_execution_role()
    local_pipeline_session = LocalPipelineSession()
    more_params = {}
    if local_mode:
        more_params["sagemaker_session"] = local_pipeline_session 
    
    pipeline = Pipeline(
        name=training_job_name_prefix,
        parameters=[],
        steps=steps,
        pipeline_definition_config=PipelineDefinitionConfig(use_custom_job_prefix=True),
        **more_params
    )

    pipeline.upsert(role_arn=role)
    pipeline.start()

## Run Fine-Tuning Remote or Local Mode

Now we can run the fine-tuning with the `RemoteExecutor` as a SageMaker training job or we can run the fine-tuning locally with the pipeline in local mode. 

Note: We could also run the Pipeline on Amazon SageMaker. We decided not to because pipelines usually have multiple steps but our pipeline only has one training step. 

In [None]:
if use_local_mode and (not is_docker_installed() or not is_docker_compose_installed()): 
    # we need docker and docker-compose for LocalMode execution
    !bash docker-artifacts/01_docker_install.sh

In [None]:
# if in SageMaker Studio check if docker access is enabled if it is not enable it
check_and_enable_docker_access_sagemaker_studio(use_local_mode, session)

In [None]:
if use_local_mode:
    print("\nStarting fine-tuning locally...")
    run_pipeline(local_mode=True)
else:
    # run remotely
    from sagemaker.remote_function import RemoteExecutor
    
    with RemoteExecutor(instance_type=instance_type, volume_size=200, use_spot_instances=use_spot,job_name_prefix=training_job_name_prefix, max_wait_time_in_seconds=172800,max_runtime_in_seconds=172800) as job:
        print("\nStarting fine-tuning process remotely...")
        print(
            f"View your job here: https://{region}.console.aws.amazon.com/sagemaker/home?region={region}#/jobs/"
        )
        future = job.submit(fine_tune_documents, **fine_tuning_kwargs)
        result = future.result()
        print(f"Fine-tuning remote completed: {result}")

## Download Model

In [None]:
df = list_available_models(default_bucket_name, training_job_name_prefix)
df 

In [None]:
which_model_to_pick = 0 # use first model from list by default

In [None]:
# Set up the S3 URI from which we will download the model
model_key=df['Key'].iloc[which_model_to_pick]
model_output_url = f"s3://{default_bucket_name}/{model_key}"
print(f"Selected model for download: {model_key}")
print(f"S3 Model URI: {model_output_url}")

We copy the model from S3 to our local directory

In [None]:
from utils.helpers import get_s3_suffix

model_suffix_s3 = get_s3_suffix(model_output_url)
model_weights_dir = "./models"
model_destination = f"{model_weights_dir}/{model_suffix_s3}"
model_dest_dir = str(Path(model_destination).parent)

In [None]:
!aws s3 cp {model_output_url} {model_destination}
!tar --warning=no-unknown-keyword  -xzvf {model_destination} --directory {model_dest_dir} > /dev/null

Lets have a look what is inside of model.tar.gz:

* The checkpoint directory contains the actual adapter
* adapter_model.safetensors - contains the actual weights of the adapter
  
For inference you could either use the adapter together with the original model, or we merge the adapter with the original model.

In [None]:
! cd {model_dest_dir} && du -ah --max-depth=5

In [None]:
model_dir = model_dest_dir

In [None]:
from utils.helpers import find_latest_version_directory, find_best_model_checkpoint

latest_version = find_latest_version_directory(model_dir)
latest_model_dir = os.path.join(model_dir, latest_version)
logging_file = os.path.join(os.getcwd(), model_dir, latest_version, "logging.jsonl")
best_model_checkpoint = find_best_model_checkpoint(logging_file)
if best_model_checkpoint:
    best_model_checkpoint = best_model_checkpoint.replace("/opt/ml/model/", "")
    print(f"best model checkpoint: {best_model_checkpoint}")
else:
    print(
        "Best model checkpoint not found. Please search the logs manually to find the path that stores the best model checkpoint."
    )

## View Evaluation Metrics from fine-tuning run

Next you can look at the train & evaluation accuracy and loss.

In [None]:
images_dir = os.path.join(latest_model_dir, "images")

In [None]:
from IPython.display import Image
from IPython.display import display


def display_image(images_dir, image):
    image = Image(os.path.join(images_dir, image))
    display(image)

In [None]:
display_image(images_dir, "train_token_acc.png")
display_image(images_dir, "train_loss.png")

In [None]:
display_image(images_dir, "eval_token_acc.png")
display_image(images_dir, "eval_loss.png")

## Next Steps

1. Run inference on unseen data to evaluate the models real-world performance: [04_run_batch_inference.ipynb](04_run_batch_inference.ipynb) and then [05_evaluate_model.ipynb](05_evaluate_model.ipynb).
2. Deploy the model: [06_deploy_model_endpoint.ipynb](06_deploy_model_endpoint.ipynb)