# SageMaker Pipeline for End-to-End MLOps

This notebook creates a SageMaker Pipeline that automates the entire ML workflow:
1. Training with QWEN3-0.6B LoRA
2. Model evaluation with Processing Job
3. Conditional model registration based on performance
4. Automatic deployment to endpoint

## 1. Setup and Import Libraries

In [None]:
import boto3
import sagemaker
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.workflow.steps import (
    TrainingStep,
    ProcessingStep,
    CreateModelStep,
    CacheConfig
)
from sagemaker.workflow.step_collections import RegisterModel
from sagemaker.workflow.conditions import ConditionLessThanOrEqualTo, ConditionGreaterThanOrEqualTo
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.properties import PropertyFile
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterFloat,
    ParameterString
)
from sagemaker.workflow.functions import Join, JsonGet
from sagemaker.pytorch import PyTorch, PyTorchProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.inputs import TrainingInput
from sagemaker.model import Model
from sagemaker.model_metrics import ModelMetrics, MetricsSource
from datetime import datetime
import json

## 2. Configure SageMaker Session and Parameters

In [None]:
# SageMaker session
sagemaker_session = sagemaker.Session()
role = "arn:aws:iam::637423390840:role/WSParticipantRole" # need to change your role
region = sagemaker_session.boto_region_name

# S3 configuration
bucket = sagemaker_session.default_bucket()
prefix = "qwen3-0-6-lora-pipeline"

# Pipeline name
pipeline_name = "qwen3-lora-mlops-pipeline"

# Model package group name
model_package_group_name = "qwen3-0-6b-lora-models"

print(f"Using bucket: {bucket}")
print(f"Using prefix: {prefix}")
print(f"Pipeline name: {pipeline_name}")

## 3. Define Pipeline Parameters

In [None]:
# Pipeline parameters - these can be overridden at runtime
training_instance_type = ParameterString(
    name="TrainingInstanceType",
    default_value="ml.g5.2xlarge"  
)

processing_instance_type = ParameterString(
    name="ProcessingInstanceType",
    default_value="ml.m5.xlarge"
)

num_train_epochs = ParameterInteger(
    name="NumTrainEpochs",
    default_value=3
)

learning_rate = ParameterFloat(
    name="LearningRate",
    default_value=2e-4
)

# Updated thresholds for CPU-based evaluation metrics
eval_loss_threshold = ParameterFloat(
    name="EvalLossThreshold", 
    default_value=0.5  # For derived loss from text similarity
)

bleu_score_threshold = ParameterFloat(
    name="BleuScoreThreshold",
    default_value=0.25  # For derived BLEU score from text similarity
)

# Add text similarity threshold (primary metric for CPU evaluation)
text_similarity_threshold = ParameterFloat(
    name="TextSimilarityThreshold",
    default_value=0.5  # Minimum text similarity score
)

model_approval_status = ParameterString(
    name="ModelApprovalStatus",
    default_value="PendingManualApproval"  # or "Approved" for auto-approval
)

print("Pipeline parameters defined")

## 4. Upload Data to S3

In [None]:
# Upload training and test data to S3
train_s3_uri = sagemaker_session.upload_data(
    path='samples/train.jsonl',
    bucket=bucket,
    key_prefix=f'{prefix}/data/train'
)

test_s3_uri = sagemaker_session.upload_data(
    path='samples/test.jsonl',
    bucket=bucket,
    key_prefix=f'{prefix}/data/test'
)

print(f"Training data: {train_s3_uri}")
print(f"Test data: {test_s3_uri}")

## 5. Step 1: Training Step

In [None]:
# Configure training estimator
train_hyperparameters = {
    # Model
    "model_name_or_path": "Qwen/Qwen3-0.6B",
    
    # Training
    "output_dir": "/opt/ml/model",
    "num_train_epochs": num_train_epochs,
    "per_device_train_batch_size": 1,
    "per_device_eval_batch_size": 1,
    "gradient_accumulation_steps": 64,
    "learning_rate": learning_rate,
    "weight_decay": 0.01,
    "warmup_ratio": 0.03,
    "lr_scheduler_type": "cosine",
    "logging_steps": 1,
    "save_steps": 50,
    "save_strategy": "steps",
    "save_total_limit": 3,
    "do_eval": True,
    "eval_strategy": "steps",
    "eval_steps": 50,
    "metric_for_best_model": "eval_loss",
    "greater_is_better": False,
    "load_best_model_at_end": False,
    "report_to": "none",
    "bf16": True,
    "gradient_checkpointing": True,
    
    # LoRA
    "lora_r": 4, 
    "lora_alpha": 32,
    "lora_dropout": 0.1,
    "lora_target_modules": "q_proj,k_proj,v_proj,o_proj,gate_proj,up_proj,down_proj",
    
    # Dataset
    "train_file": "/opt/ml/input/data/train/train.jsonl",
    "validation_split_percentage": 20,
    "block_size": 256,
}

# Create PyTorch estimator with fixed versions for compatibility
pytorch_estimator = PyTorch(
    entry_point="train.py",
    source_dir="src",
    role=role,
    instance_type=training_instance_type,
    instance_count=1,
    framework_version="2.6.0", # At this time, 2.7.1 for TrainingStep is not supported
    py_version="py312", 
    hyperparameters=train_hyperparameters,
    output_path=f"s3://{bucket}/{prefix}/output",
    checkpoint_s3_uri=f"s3://{bucket}/{prefix}/checkpoints",
    use_spot_instances=False,
    max_run=24*60*60,
    keep_alive_period_in_seconds=1800,
    volume_size=450,
    environment={
        "PYTORCH_CUDA_ALLOC_CONF": "expandable_segments:True",
    },
    sagemaker_session=sagemaker_session,
)


# Create training step
step_train = TrainingStep(
    name="TrainQwen3LoRA",
    estimator=pytorch_estimator,
    inputs={
        "train": TrainingInput(
            s3_data=train_s3_uri,
            content_type="application/jsonl"
        )
    },
    cache_config=CacheConfig(enable_caching=True, expire_after="30d")
)

print("Training step created")

## 6. Step 2: Evaluation Step

In [None]:
pipeline_session = PipelineSession()
    
eval_processor = PyTorchProcessor(
    framework_version="2.6.0",
    py_version="py312",  
    role=role,
    instance_type=processing_instance_type,
    instance_count=1,
    base_job_name="qwen3-eval",
    sagemaker_session=pipeline_session,
)

# Define property file for evaluation metrics
evaluation_report = PropertyFile(
    name="EvaluationReport",
    output_name="evaluation",
    path="evaluation_metrics.json"
)

# Generate step_args using processor.run() to handle source_dir
step_args = eval_processor.run(
    code="evaluation/evaluate.py",
    source_dir="src",
    inputs=[
        ProcessingInput(
            source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/model"
        ),
        ProcessingInput(
            source=test_s3_uri,
            destination="/opt/ml/processing/input/test"
        )
    ],
    outputs=[
        ProcessingOutput(
            output_name="evaluation",
            source="/opt/ml/processing/output",
            destination=f"s3://{bucket}/{prefix}/evaluation"
        )
    ],
)

# Create evaluation step using step_args
step_eval = ProcessingStep(
    name="EvaluateModel",
    step_args=step_args,
    property_files=[evaluation_report],
    cache_config=CacheConfig(enable_caching=True, expire_after="30d")
)

print("Evaluation step created")

## 7. Step 3: Model Registration Step (Conditional)

In [None]:
# Create model metrics
model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri=Join(
            on="/",
            values=[
                step_eval.properties.ProcessingOutputConfig.Outputs["evaluation"].S3Output.S3Uri,
                "evaluation_metrics.json"
            ]
        ),
        content_type="application/json"
    )
)

# Get inference image
from sagemaker import image_uris
inference_image = image_uris.retrieve(
    framework="pytorch",
    region=region,
    version="2.6.0",
    py_version="py312",
    instance_type="ml.g5.2xlarge",
    image_scope="inference"
)

# For pipeline, we'll use RegisterModel without custom inference script
# The model will use the default PyTorch serving handler
step_register = RegisterModel(
    name="RegisterQwen3Model",
    estimator=pytorch_estimator,  # Use the training estimator
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    content_types=["application/json"],
    response_types=["application/json"],
    inference_instances=["ml.g5.xlarge", "ml.g5.2xlarge"],
    transform_instances=["ml.g5.xlarge", "ml.g5.2xlarge"],
    model_package_group_name=model_package_group_name,
    approval_status=model_approval_status,
    model_metrics=model_metrics,
    description="QWEN3-0.6B LoRA fine-tuned model from pipeline"
)

print("Model registration step created")

## 8. Step 4: Conditional Registration Based on Metrics

In [None]:
# Create conditions for model registration based on CPU evaluation metrics
# CPU evaluation produces: text_similarity, derived eval_loss, derived bleu_score

# Condition 1: Check text similarity (primary metric)
cond_similarity = ConditionGreaterThanOrEqualTo(
    left=JsonGet(
        step_name=step_eval.name,
        property_file=evaluation_report,
        json_path="text_similarity"
    ),
    right=text_similarity_threshold
)

# Create condition step - using text_similarity as primary condition
step_cond = ConditionStep(
    name="CheckModelPerformance",
    conditions=[cond_similarity],  # Only condition: text similarity
    if_steps=[step_register],  # Register model
    else_steps=[]  # Do nothing if conditions not met
)

## 9. Create Pipeline

In [None]:
# Create pipeline with updated parameters
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        training_instance_type,
        processing_instance_type,
        num_train_epochs,
        learning_rate,
        eval_loss_threshold,
        bleu_score_threshold,
        text_similarity_threshold,  # Add new parameter
        model_approval_status
    ],
    steps=[step_train, step_eval, step_cond],
    sagemaker_session=sagemaker_session
)

## 10. Submit Pipeline Definition

In [None]:
# Create or update pipeline
pipeline_response = pipeline.upsert(role_arn=role)

print(f"Pipeline ARN: {pipeline_response['PipelineArn']}")
print(f"\\n✓ Pipeline '{pipeline_name}' created/updated successfully!")

## 11. Start Pipeline Execution

In [None]:
# Start pipeline execution
timestamp = datetime.now().strftime("%Y-%m-%d-%H-%M-%S")
execution_name = f"{pipeline_name}-{timestamp}"

# You can override parameters here if needed
execution = pipeline.start(
    execution_display_name=execution_name,
    parameters={
        # Override parameters if needed
        # "NumTrainEpochs": 5,
        # "LearningRate": 1e-4,
    }
)

print(f"Pipeline execution started: {execution_name}")
print(f"Execution ARN: {execution.arn}")
print("\\nYou can monitor the pipeline execution in the SageMaker Studio or console.")

## 12. Monitor Pipeline Execution (Also you can see the progress in the SageMaker Studio)

In [None]:
# Wait for execution to complete (optional)
import time

print("Monitoring pipeline execution...")
print("This will take approximately 30-45 minutes...\\n")

while True:
    status = execution.describe()['PipelineExecutionStatus']
    print(f"Status: {status}")
    
    if status in ['Succeeded', 'Failed', 'Stopped']:
        break
    
    time.sleep(60)  # Check every minute

if status == 'Succeeded':
    print("\\n✓ Pipeline execution completed successfully!")
else:
    print(f"\\n✗ Pipeline execution ended with status: {status}")

## 13. List Pipeline Executions

In [None]:
# List recent pipeline executions
sm_client = boto3.client('sagemaker', region_name=region)

response = sm_client.list_pipeline_executions(
    PipelineName=pipeline_name,
    SortOrder='Descending',
    MaxResults=5
)

print(f"=== Recent Executions for {pipeline_name} ===")
for idx, exec_summary in enumerate(response['PipelineExecutionSummaries'], 1):
    print(f"\\n{idx}. Execution:")
    print(f"   ARN: {exec_summary['PipelineExecutionArn']}")
    print(f"   Status: {exec_summary['PipelineExecutionStatus']}")
    print(f"   Start Time: {exec_summary['StartTime']}")

## 14. Configure Pipeline Schedule (Optional)

In [None]:
# Create EventBridge rule to trigger pipeline on schedule
# This example triggers the pipeline daily at 2 AM UTC

import boto3
events_client = boto3.client('events', region_name=region)

# Create schedule rule
rule_name = f"{pipeline_name}-daily-schedule"

try:
    response = events_client.put_rule(
        Name=rule_name,
        ScheduleExpression='cron(0 2 * * ? *)',  # Daily at 2 AM UTC
        State='DISABLED',  # Start with disabled, enable when ready
        Description=f'Daily trigger for {pipeline_name}'
    )
    
    # Add target (SageMaker Pipeline)
    response = events_client.put_targets(
        Rule=rule_name,
        Targets=[
            {
                'Id': '1',
                'Arn': pipeline_response['PipelineArn'],
                'RoleArn': role,
                'SageMakerPipelineParameters': {
                    'PipelineParameterList': [
                        # Add any parameter overrides here
                    ]
                }
            }
        ]
    )
    
    print(f"✓ Schedule rule '{rule_name}' created (currently DISABLED)")
    print("To enable the schedule, uncomment and run the next cell")
    
except Exception as e:
    print(f"Schedule creation failed: {e}")

In [None]:
# Enable the schedule (uncomment to activate)
# response = events_client.enable_rule(Name=rule_name)
# print(f"✓ Schedule '{rule_name}' enabled")

## 15. Pipeline Cleanup (Optional)

In [None]:
# Uncomment to delete the pipeline when no longer needed
# response = sm_client.delete_pipeline(
#     PipelineName=pipeline_name
# )
# print(f"Pipeline '{pipeline_name}' deleted")

## Summary

This notebook created an end-to-end MLOps pipeline with integration:

1. **Training Step**: Fine-tunes QWEN3-0.6B with LoRA on training data (GPU: ml.g5.2xlarge)
2. **Evaluation Step**: Evaluates model performance on test data using CPU instances (ml.m5.xlarge)
3. **Conditional Registration**: Registers model based on text similarity scores from CPU evaluation
4. **Automated Workflow**: All steps are orchestrated automatically with cost-effective evaluation