In [None]:
%pip install transformers  sagemaker seaborn sentence-transformers nltk scikit-learn "huggingface_hub[cli]" --upgrade --quiet

# Llama3 finetuning for Bedrock
## Architecture
This diagram illustrates an end-to-end ML workflow where a SageMaker Pipeline processes, trains, and evaluates a model using HuggingFace containers, then registers it before deploying to Amazon Bedrock through a Lambda function for inference, with model artifacts stored in S3 throughout the process.

![Architecture Diagram](Llama3_finetuning_bedrock.png)

Note: The next line of code uses a API token to login in the Huggingface account to use the model weights. You need to have access to "meta-llama/Llama-3.2-3B-Instruct" to use meta llama 3.2 3B model.
- [Hugging Face Access Tokens Documentation](https://huggingface.co/docs/hub/en/security-tokens).
- [Getting access to the mode](https://huggingface.co/meta-llama/Meta-Llama-3-8B/discussions/172)
- [meta llama 3.2 3B](https://huggingface.co/meta-llama/Meta-Llama-3-8B)

In [None]:
huggingface_token='HF-token'

In [None]:
!huggingface-cli login --token {huggingface_token}

In [None]:
import sagemaker
import boto3
import os

sagemaker_session = sagemaker.Session()
# sagemaker session bucket -> used for uploading data, models and logs
# sagemaker will automatically create this bucket if it not exists
sagemaker_session_bucket=None
if sagemaker_session_bucket is None and sagemaker_session is not None:
    # set to default bucket if a bucket name is not given
    sagemaker_session_bucket = sagemaker_session.default_bucket()

try:
    role = sagemaker.get_execution_role()
except ValueError:
    iam = boto3.client('iam')
    #role = iam.get_role(RoleName='sagemaker_execution_role')['Role']['Arn']
    #use this code if you are running locally
    sagemaker_execution_role = "Update-with-your-current-AmazonSageMaker-ExecutionRole"
    role = iam.get_role(RoleName=sagemaker_execution_role)['Role']['Arn']

sess = sagemaker.Session(default_bucket=sagemaker_session_bucket)
sm_client = boto3.client('sagemaker', region_name=sess.boto_region_name)



print(f"sagemaker role arn: {role}")
print(f"sagemaker bucket: {sess.default_bucket()}")
print(f"sagemaker session region: {sess.boto_region_name}")

# Create Dataset

In [None]:
from sagemaker.s3 import S3Downloader
from sagemaker.s3 import S3Uploader
import json

In [None]:
dataset_S3Uri="s3://jumpstart-cache-prod-us-west-2/training-datasets/oasst_top/train/"

In [None]:
train_dataset_path = S3Downloader.download(s3_uri=dataset_S3Uri, local_path=f"dataset/")
print(f"Training config downloaded to:")
print(train_dataset_path)

In [None]:
from sagemaker.s3 import S3Uploader
input_path = f's3://{sess.default_bucket()}/datasets/llama3'
# upload the model yaml file to s3
train_dataset_path = "dataset/train.jsonl"
train_s3_path = S3Uploader.upload(local_path=train_dataset_path, desired_s3_uri=f"{input_path}/dataset")

print(f"Training dataset uploaded to:")
print(train_s3_path)

# Sagemaker Pipeline

In [None]:
import boto3
import sagemaker
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.steps import ProcessingStep, TrainingStep, CacheConfig
from sagemaker.workflow.step_collections import RegisterModel
from sagemaker.huggingface import HuggingFace
from sagemaker.processing import ScriptProcessor
from sagemaker.workflow.properties import PropertyFile
from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.pytorch.processing import PyTorchProcessor
from sagemaker.huggingface import HuggingFaceProcessor
from sagemaker.workflow.pipeline_context import PipelineSession



pipeline_session = PipelineSession()

#sagemaker_session = sagemaker.Session()
#role = sagemaker.get_execution_role()
# Define pipeline parameters
region=sagemaker_session.boto_region_name
model_name = "llama3-qa-model"
instance_type_preprocessing = "ml.m5.large"
instance_count = 1
# Cache configuration to improve pipeline execution time
cache_config = CacheConfig(enable_caching=True, expire_after="30d")

## Preprocessing Step

In [None]:
preprocessing_processor = SKLearnProcessor(
    framework_version="1.0-1",
    instance_type=instance_type_preprocessing,
    instance_count=instance_count,
    base_job_name="llama3-qa-preprocessing",
    role=role,
    max_runtime_in_seconds=3600,  # Set a maximum runtime of 1 hour,
    sagemaker_session=pipeline_session
)


In [None]:
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep
inputs = [
    ProcessingInput(source=train_s3_path, destination="/opt/ml/processing/input"),
]

outputs = [
    ProcessingOutput(output_name="train", source="/opt/ml/processing/output/train"),
    ProcessingOutput(output_name="test", source="/opt/ml/processing/output/test")
]



In [None]:
preprocessing_step = ProcessingStep(
    name="PreprocessQADataset",
    processor=preprocessing_processor,
    inputs=inputs,
    outputs=outputs,
    
    code="scripts/preprocessing/preprocess.py",
)

# Training step

In [None]:
%%writefile llama_3_2_3B_fsdp_lora.yaml
# script parameters
model_id: "meta-llama/Llama-3.2-3B-Instruct"# Hugging Face model id
max_seq_length:  512 #2048              # max sequence length for model and packing of the dataset
# sagemaker specific parameters
train_dataset_path: "/opt/ml/input/data/train" # path to where SageMaker saves train dataset
test_dataset_path: "/opt/ml/input/data/test"   # path to where SageMaker saves test dataset
#output_dir: "/opt/ml/model"            # path to where SageMaker will upload the model 
output_dir: "/tmp/llama3"            # path to where SageMaker will upload the model 
# training parameters
report_to: "tensorboard"               # report metrics to tensorboard
learning_rate: 0.0002                  # learning rate 2e-4
lr_scheduler_type: "constant"          # learning rate scheduler
num_train_epochs: 10                   # number of training epochs
per_device_train_batch_size: 16         # batch size per device during training
per_device_eval_batch_size: 16          # batch size for evaluation
gradient_accumulation_steps: 2         # number of steps before performing a backward/update pass
optim: adamw_torch                     # use torch adamw optimizer
logging_steps: 10                      # log every 10 steps
save_strategy: epoch                   # save checkpoint every epoch
evaluation_strategy: epoch             # evaluate every epoch
max_grad_norm: 0.3                     # max gradient norm
warmup_ratio: 0.03                     # warmup ratio
bf16: true                             # use bfloat16 precision
tf32: false                             # use tf32 precision
gradient_checkpointing: true           # use gradient checkpointing to save memory
# FSDP parameters: https://huggingface.co/docs/transformers/main/en/fsdp
fsdp: "full_shard auto_wrap offload" # remove offload if enough GPU memory
fsdp_config:
  backward_prefetch: "backward_pre"
  forward_prefetch: "false"
  use_orig_params: "false"

In [None]:
from sagemaker.s3 import S3Uploader


# upload the model yaml file to s3
model_yaml = "llama_3_2_3B_fsdp_lora.yaml"
train_config_s3_path = S3Uploader.upload(local_path=model_yaml, desired_s3_uri=f"{input_path}/config")

print(f"Training config uploaded to:")
print(train_config_s3_path)

In [None]:
from sagemaker.huggingface import HuggingFace
from huggingface_hub import HfFolder
import time

# define Training Job Name with timestamp

timestamp = time.strftime('%Y%m%d-%H%M%S')
job_name = f'llama3-8B-exp1-{timestamp}'

# create the Estimator
huggingface_estimator = HuggingFace(
    entry_point          = 'training/train_fsdp_lora.py',      # train script
    model_dir            = '/opt/ml/model',
    source_dir           = 'scripts/',  # directory which includes all the files needed for training
    instance_type        = 'ml.g5.12xlarge',  # instances type used for the training job
    #instance_type        = 'ml.g5.48xlarge',  # instances type used for the training job
    #instance_type        = 'ml.g5.16xlarge',  # instances type used for the training job
    instance_count       = 2,                 # the number of instances used for training
    max_run              = 2*24*60*60,        # maximum runtime in seconds (days * hours * minutes * seconds)
    base_job_name        = job_name,          # the name of the training job
    role                 = role,              # Iam role used in training job to access AWS ressources, e.g. S3
    volume_size          = 500,               # the size of the EBS volume in GB
    transformers_version = '4.36.0',          # the transformers version used in the training job
    pytorch_version      = '2.1.0',           # the pytorch_version version used in the training job
    py_version           = 'py310',           # the python version used in the training job
    hyperparameters      =  {
        "config": "/opt/ml/input/data/config/llama_3_2_3B_fsdp_lora.yaml" # path to TRL config which was uploaded to s3
    },
    sagemaker_session=pipeline_session,
    disable_output_compression = True,        # not compress output to save training time and cost
    distribution={"torch_distributed": {"enabled": True}},   # enables torchrun
    environment  = {
        "HUGGINGFACE_HUB_CACHE": "/tmp/.cache", # set env variable to cache models in /tmp
        "HF_TOKEN": HfFolder.get_token(),       # huggingface token to access gated models, e.g. llama 3
        "ACCELERATE_USE_FSDP": "1",             # enable FSDP
        "FSDP_CPU_RAM_EFFICIENT_LOADING": "1"   # enable CPU RAM efficient loading
    }, 
    
)

training_step = TrainingStep(
    name=job_name,
    estimator=huggingface_estimator,
    inputs={
        "train": sagemaker.inputs.TrainingInput(
            s3_data=preprocessing_step.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri,
        ),
        "config": sagemaker.inputs.TrainingInput(
            s3_data=train_config_s3_path,
        ),
        "test": sagemaker.inputs.TrainingInput(
            s3_data=preprocessing_step.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
        )
    },
)

# Model Register Step

In [None]:
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.steps import CreateModelStep
from sagemaker.model import Model
from sagemaker.huggingface.model import HuggingFaceModel
from sagemaker.inputs import CreateModelInput
from sagemaker.workflow.model_step import ModelStep
import sagemaker
from sagemaker.huggingface import HuggingFaceModel, get_huggingface_llm_image_uri
import time

In [None]:
image_uri = get_huggingface_llm_image_uri(
  backend="huggingface",
  region=region,
  version="2.0",
  
)

In [None]:
from sagemaker.huggingface import HuggingFaceModel
llm_model=HuggingFaceModel(
    transformers_version="4.37.0",
    pytorch_version="1.10.2",
    py_version="py310",
    role=role,
    image_uri=image_uri,
)

In [None]:
# Create model step
llama_model_step = CreateModelStep(
    name="CreateLlama3ModelStep",
    model=llm_model,
    inputs=training_step.properties.ModelArtifacts.S3ModelArtifacts,
    depends_on=[training_step],
)
    
# Crete a RegisterModel step, which registers the model with Sagemaker Model Registry.
model_package_group_name = "Llama3Models" 
step_register_model = RegisterModel(
    name="RegisterModel",
    model=llm_model,
    model_data=training_step.properties.ModelArtifacts.S3ModelArtifacts,
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.g5.12xlarge"],
    transform_instances=["ml.g5.12xlarge"],
    model_package_group_name=model_package_group_name,
    depends_on=[training_step],
    approval_status="Approved",
)

# Bedrock Deployment step

### Create Lambda layer

In [None]:
import os
import subprocess
import shutil
import boto3

# Create directories
os.makedirs('boto3-layer/python', exist_ok=True)

# Install boto3 into the layer directory
subprocess.check_call([
    'pip', 'install', 'boto3==1.35.16', '-t', 'boto3-layer/python',
    '--upgrade', '--no-cache-dir'
])

# Create zip file
shutil.make_archive('boto3-layer', 'zip', 'boto3-layer')

# Upload to AWS as a Lambda layer
lambda_client = boto3.client('lambda')

with open('boto3-layer.zip', 'rb') as zip_file:
    response = lambda_client.publish_layer_version(
        LayerName='boto3-latest',
        Description='Latest Boto3 layer',
        Content={
            'ZipFile': zip_file.read()
        },
        CompatibleRuntimes=['python3.10', 'python3.11']
    )

print(f"Layer ARN: {response['LayerArn']}")
print(f"Layer Version ARN: {response['LayerVersionArn']}")
lambda_layer_arn=response['LayerVersionArn']

# Clean up
shutil.rmtree('boto3-layer')
os.remove('boto3-layer.zip')

### Create Role and policies

In [None]:
def create_lambda_execution_role(role_name, training_bucket, account_id, region):
    iam = boto3.client('iam')
    
    # Define the trust relationship
    trust_relationship = {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Principal": {
                    "Service": "lambda.amazonaws.com"
                },
                "Action": "sts:AssumeRole"
            },
            {
                "Effect": "Allow",
                "Principal": {
                    "Service": "bedrock.amazonaws.com"
                },
                "Action": "sts:AssumeRole",
                "Condition": {
                    "StringEquals": {
                        "aws:SourceAccount": account_id
                    },
                    "ArnEquals": {
                        "aws:SourceArn": f"arn:aws:bedrock:{region}:{account_id}:model-import-job/*"
                    }
                }
            }
        ]
    }
    
    # Define Bedrock permissions policy
    bedrock_policy = {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Action": [
                    "bedrock:CreateModelImportJob",
                    "bedrock:GetModelImportJob",
                    "bedrock:ListModelImportJobs"
                ],
                "Resource": "*"
            },
            {
                "Effect": "Allow",
                "Action": [
                    "iam:PassRole"
                ],
                "Resource": f"arn:aws:iam::{account_id}:role/*",
                "Condition": {
                    "StringEquals": {
                        "iam:PassedToService": "bedrock.amazonaws.com"
                    }
                }
            }
        ]
    }
    
    # Define S3 permissions
    s3_policy = {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Action": [
                    "s3:GetObject",
                    "s3:ListBucket"
                ],
                "Resource": [
                    f"arn:aws:s3:::{training_bucket}",
                    f"arn:aws:s3:::{training_bucket}/*"
                ]
            }
        ]
    }

    def attach_policies(role_name):
        # Attach the Bedrock permissions policy
        iam.put_role_policy(
            RoleName=role_name,
            PolicyName='BedrockAccessPolicy',
            PolicyDocument=json.dumps(bedrock_policy)
        )
        print("Attached Bedrock permissions policy")
        
        # Attach necessary AWS managed policies for Lambda basic execution
        try:
            iam.attach_role_policy(
                RoleName=role_name,
                PolicyArn="arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole"
            )
            print("Attached Lambda basic execution policy")
        except iam.exceptions.EntityAlreadyExistsException:
            print("Lambda basic execution policy already attached")
        
        # Attach S3 policy
        iam.put_role_policy(
            RoleName=role_name,
            PolicyName='S3AccessPolicy',
            PolicyDocument=json.dumps(s3_policy)
        )
        print("Attached S3 access policy")
    
    try:
        response = iam.create_role(
            RoleName=role_name,
            AssumeRolePolicyDocument=json.dumps(trust_relationship),
            Description="Execution role for Lambda function and Bedrock model import jobs"
        )
        
        role_arn = response['Role']['Arn']
        print(f"Created IAM role: {role_arn}")
        
        # Attach all policies for new role
        attach_policies(role_name)
        
        return role_arn
    
    except iam.exceptions.EntityAlreadyExistsException:
        print(f"IAM role {role_name} already exists. Retrieving its ARN.")
        role = iam.get_role(RoleName=role_name)
        
        # Update the trust relationship
        iam.update_assume_role_policy(
            RoleName=role_name,
            PolicyDocument=json.dumps(trust_relationship)
        )
        
        # Attach or update policies for existing role
        attach_policies(role_name)
        
        return role['Role']['Arn']

# Usage
role_name = "LambdaBedrockExecutionRole"
training_bucket = sagemaker_session_bucket
account_id = boto3.client('sts').get_caller_identity()['Account']
region = "us-west-2"
execution_role_arn = create_lambda_execution_role(role_name, training_bucket, account_id, region)
print(f"Execution Role ARN: {execution_role_arn}")

In [None]:
import boto3
import json
from botocore.exceptions import ClientError

def handle_client_error(func, *args, **kwargs):
    try:
        return func(*args, **kwargs)
    except ClientError as e:
        if e.response['Error']['Code'] == 'NoSuchEntity':
            return None
        raise

def create_or_update_role(role_name, trust_relationship, permission_policy, iam_client=None, account_id=None):
    iam = iam_client or boto3.client('iam')
    account_id = account_id or boto3.client('sts').get_caller_identity()['Account']
    
    # Check and update/create role
    role = handle_client_error(iam.get_role, RoleName=role_name)
    if role:
        iam.update_assume_role_policy(
            RoleName=role_name,
            PolicyDocument=json.dumps(trust_relationship)
        )
        print(f"Updated existing role: {role_name}")
    else:
        iam.create_role(
            RoleName=role_name,
            AssumeRolePolicyDocument=json.dumps(trust_relationship)
        )
        print(f"Created new role: {role_name}")

    # Handle policy
    policy_name = f"{role_name}Policy"
    policy_arn = f"arn:aws:iam::{account_id}:policy/{policy_name}"
    
    # Attach or update policy
    policy = handle_client_error(iam.get_policy, PolicyArn=policy_arn)
    if policy:
        iam.create_policy_version(
            PolicyArn=policy_arn,
            PolicyDocument=json.dumps(permission_policy),
            SetAsDefault=True
        )
        # Cleanup old versions
        versions = iam.list_policy_versions(PolicyArn=policy_arn)['Versions']
        for version in versions:
            if not version['IsDefaultVersion']:
                iam.delete_policy_version(
                    PolicyArn=policy_arn,
                    VersionId=version['VersionId']
                )
        print(f"Updated existing policy: {policy_name}")
    else:
        iam.create_policy(
            PolicyName=policy_name,
            PolicyDocument=json.dumps(permission_policy)
        )
        print(f"Created new policy: {policy_name}")

    # Attach policy to role if not already attached
    attached_policies = iam.list_attached_role_policies(RoleName=role_name)['AttachedPolicies']
    iam.attach_role_policy(
        RoleName=role_name,
        PolicyArn=policy_arn
    )
    print(f"Attached policy to role: {role_name}")

    return iam.get_role(RoleName=role_name)['Role']['Arn']



# Set up variables
account_id = boto3.client('sts').get_caller_identity()['Account']
region = "us-west-2"
training_bucket = sagemaker_session_bucket
role_name = "Sagemaker_Bedrock_import_role"

# Define policies
trust_relationship = {
    "Version": "2012-10-17",
    "Statement": [{
        "Effect": "Allow",
        "Principal": {"Service": "bedrock.amazonaws.com"},
        "Action": "sts:AssumeRole",
        "Condition": {
            "StringEquals": {"aws:SourceAccount": account_id},
            "ArnEquals": {"aws:SourceArn": f"arn:aws:bedrock:{region}:{account_id}:model-import-job/*"}
        }
    }]
}

permission_policy = {
    "Version": "2012-10-17",
    "Statement": [{
        "Effect": "Allow",
        "Action": ["s3:GetObject", "s3:ListBucket"],
        "Resource": [f"arn:aws:s3:::{training_bucket}", f"arn:aws:s3:::{training_bucket}/*"],
        "Condition": {"StringEquals": {"aws:ResourceAccount": account_id}}
    }]
}

# Create or update the role
role_arn = create_or_update_role(role_name, trust_relationship, permission_policy)
print(f"Role ARN: {role_arn}")

### Create Lambda Step

In [None]:
from sagemaker.lambda_helper import Lambda
# Create Lambda function instance
lambda_func = Lambda(
    function_name="bedrock-model-import",
    execution_role_arn=execution_role_arn,
    script="scripts/lambda/bedrock_model_import.py",
    handler='bedrock_model_import.lambda_handler',
    timeout=900,  # 15 minutes, adjust as needed
    memory_size=128,
    runtime='python3.12',
    layers=[lambda_layer_arn],  # Your boto3 layer ARN
)

In [None]:
from sagemaker.workflow.lambda_step import LambdaStep, LambdaOutput, LambdaOutputTypeEnum
# Define the outputs
lambda_outputs = [
    LambdaOutput(output_name="model_arn", output_type=LambdaOutputTypeEnum.String)
]

In [None]:
step_register_model.properties.ModelPackageArn

In [None]:

# Create the Lambda step
lambda_step = LambdaStep(
    name="BedrockModelImport",
    lambda_func=lambda_func,
    inputs={
        "model_uri": training_step.properties.ModelArtifacts.S3ModelArtifacts,  # Use the output from the training step
        "role_arn": role,
        "model_name": model_name
    },
    outputs=lambda_outputs,
    cache_config=CacheConfig(enable_caching=True, expire_after="1d"),
    depends_on=[step_register_model]
)

"""
"model_name": "llama3_model",
"model_uri":training_step.properties.ModelArtifacts.S3ModelArtifacts,
"role_arn": role_arn,
"""

# Pipeline creation

In [None]:
import logging

logging.basicConfig(level=logging.INFO)

try:
    pipeline = Pipeline(
        name="Llama3-QAPipeline",
        steps=[preprocessing_step, training_step,step_register_model,lambda_step ],
        parameters=[role, model_name],
        sagemaker_session=pipeline_session,
    )
    logging.info("Pipeline created successfully")

    pipeline.upsert(role_arn=role)
    logging.info("Pipeline upserted successfully")

    execution = pipeline.start()
    logging.info("Pipeline started successfully")

except ValueError as ve:
    logging.error(f"ValueError occurred: {str(ve)}")
    logging.error(f"Error occurred in pipeline definition: {pipeline.definition()}")
except Exception as e:
    logging.error(f"An error occurred: {str(e)}")
    logging.error(f"Error type: {type(e).__name__}")

In [None]:
import time
from botocore.exceptions import ClientError

def get_pipeline_status(execution):
    try:
        return execution.describe()['PipelineExecutionStatus']
    except ClientError as e:
        print(f"Error getting pipeline status: {e}")
        return None

def get_step_statuses(execution):
    try:
        steps = execution.list_steps()
        return {step['StepName']: step['StepStatus'] for step in steps}
    except ClientError as e:
        print(f"Error getting step statuses: {e}")
        return {}

def is_pipeline_finished(status):
    return status in ['Succeeded', 'Completed', 'Failed', 'Stopped']

def print_progress(status, step_statuses):
    print(f"\nPipeline status: {status}")
    print("Step statuses:")
    for step, status in step_statuses.items():
        print(f"  {step}: {status}")

def monitor_pipeline_execution(execution, check_interval=60):
    print("Pipeline execution started.")
    print("Status updates (checking every minute):")

    previous_step_statuses = {}
    while True:
        status = get_pipeline_status(execution)
        if status is None:
            print("Failed to get pipeline status. Retrying...")
            time.sleep(check_interval)
            continue

        step_statuses = get_step_statuses(execution)
        
        if step_statuses != previous_step_statuses:
            print_progress(status, step_statuses)
            previous_step_statuses = step_statuses
        else:
            print(".", end='', flush=True)
        
        if is_pipeline_finished(status):
            break

        time.sleep(check_interval)

    print("\nPipeline execution finished.")
    print_progress(status, step_statuses)

# Usage example:
monitor_pipeline_execution(execution)