# Notebook to build workflow into a SageMaker Pipeline

In this notebook, we show how to create a SageMaker Pipeline in two ways:
1. Creating a single pipeline step with the entire RAG creation and testing workflow
2. Creating a multi-step pipline with each step of the process as a separate job.

To familiarize yourself with the RAG building process, run through the [Experimentation Notebook](./sagemaker-mlflow-experiment-agenticrag.ipynb) before running this notebook.

## Option 1: Create a single step with the entire end-to-end workflow

In [None]:
import boto3
import sagemaker
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.pytorch.processing import PyTorchProcessor
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.workflow.steps import ProcessingStep
from sagemaker.workflow.pipeline import Pipeline
import time
import os


In [None]:

# Initialize SageMaker session
session = sagemaker.Session()
region = session.boto_region_name
role = sagemaker.get_execution_role()
default_bucket = session.default_bucket()
pipeline_session = PipelineSession()

print(f"SageMaker Role ARN: {role}")
print(f"SageMaker Session Region: {region}")
print(f"SageMaker Session Default Bucket: {default_bucket}")

# Output S3 location
timestamp = time.strftime("%Y-%m-%d-%H-%M-%S", time.gmtime())
output_s3_uri = f"s3://{default_bucket}/rag-pipeline/output/{timestamp}/"

### Arguments for Processing Job (Replace with your own)

In [None]:
mlflow_tracking_uri = "" #  REPLACE WITH YOUR OWN 
embedding_endpoint_name = "rag-embeddings-endpoint"  
text_endpoint_name = "rag-generation-endpoint"  
domain_name = "" #REPLACE WITH YOUR OWN (example: "search-ragops-testing-xxxxxxx.us-west-2.es.amazonaws.com")
index_name = "ragops-exp-index-1"
model_dimensions = "384"

prefix = "rag-pipeline"
experiment_name = f"rag-pipeline-{timestamp}"
parent_run_name = "rag-experiment-test"

embedding_model_id = "huggingface-textembedding-all-MiniLM-L6-v2"
text_model_id = "deepseek-llm-r1-distill-qwen-7b"

chunking_strategy = "RecursiveChunker"
chunk_size = "500"
chunk_overlap = "200"

context_retrieval_size = "3"

In [None]:
# Path to your existing script
script_path = "single-step-pipeline.py"  
source_dir = '../sagemaker_pipeline/single_step_pipeline'

In [None]:
# Create the processor
processor = PyTorchProcessor(
    framework_version="2.5",
    role=role,
    py_version="py311",
    instance_type="ml.m5.4xlarge",  
    instance_count=1,
    base_job_name="rag-pipeline",
    sagemaker_session=pipeline_session
)

job_name = f"rag-pipeline-{timestamp}"
processing_step_args = processor.run(
    code=script_path,
    source_dir=source_dir,
    outputs=[
        ProcessingOutput(
            output_name="rag_output",
            source="/opt/ml/processing/output/",
            destination=output_s3_uri
        )
    ],
    arguments=[
        "--experiment-name", experiment_name,
        "--mlflow-tracking-uri", mlflow_tracking_uri,
        "--embedding-endpoint-name", embedding_endpoint_name,
        "--text-endpoint-name", text_endpoint_name,
        "--domain-name", domain_name,
        "--index-name", index_name,
        "--chunking-strategy", chunking_strategy,
        "--chunk-size", chunk_size,
        "--chunk-overlap", chunk_overlap,
        "--context-retrieval-size", context_retrieval_size,
        "--embedding-model-id", embedding_model_id,
        "--text-model-id", text_model_id,
        "--output-data-path", "/opt/ml/processing/output",
        "--role-arn", role
    ],
    logs=True   
)


processing_step = ProcessingStep(
    name="PreprocessingStep",
    step_args=processing_step_args
)


In [None]:
pipeline = Pipeline(
    name="rag-evalution-test",
    steps=[
        processing_step
    ]
)

In [None]:
pipeline.upsert(role) 

In [None]:
pipeline_execution = pipeline.start()
pipeline_execution.describe()

## Option 2: Create separate pipeline steps for each step

In [None]:
import os
import boto3
import sagemaker
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.pytorch.processing import PyTorchProcessor
from sagemaker.workflow.steps import ProcessingStep
from sagemaker.workflow.pipeline import Pipeline
from datetime import datetime
import uuid

# Setup
role = sagemaker.get_execution_role()
region = boto3.Session().region_name
timestamp = datetime.now().strftime("%Y%m%d-%H%M%S")

# SageMaker session
sagemaker_session = sagemaker.Session()
pipeline_session = sagemaker.workflow.pipeline_context.PipelineSession()





In [None]:
default_bucket = sagemaker_session.default_bucket()

# Output S3 location
timestamp = time.strftime("%Y-%m-%d-%H-%M-%S", time.gmtime())
output_s3_uri = f"s3://{default_bucket}/rag-pipeline/output/{timestamp}/"


# S3 paths
base_s3_uri = f"s3://{default_bucket}/{prefix}/{timestamp}"
data_prep_output_s3_uri = f"{base_s3_uri}/data-prep"
chunking_output_s3_uri = f"{base_s3_uri}/chunking"
ingestion_output_s3_uri = f"{base_s3_uri}/ingestion"
retrieval_output_s3_uri = f"{base_s3_uri}/retrieval"
evaluation_output_s3_uri = f"{base_s3_uri}/evaluation"


### Arguments for Processing Job (Replace with your own)

In [None]:
mlflow_tracking_uri = "" #REPLACE WITH YOUR OWN  
embedding_endpoint_name = "rag-embeddings-endpoint"  
text_endpoint_name = "rag-generation-endpoint"  
domain_name = "" #REPLACE WITH YOUR OWN (example: "search-ragops-testing-xxxxxxx.us-west-2.es.amazonaws.com")
index_name = "ragops-exp-index-1"
model_dimensions = "384"

prefix = "rag-pipeline"
experiment_name = f"rag-pipeline-{timestamp}"
parent_run_name = "rag-experiment-test"

embedding_model_id = "huggingface-textembedding-all-MiniLM-L6-v2"
text_model_id = "deepseek-llm-r1-distill-qwen-7b"

chunking_strategy = "RecursiveChunker"
chunk_size = "500"
chunk_overlap = "200"

context_retrieval_size = "3"


In [None]:
# Path to the folder of scripts
script_path = "../sagemaker_pipeline/multi_step_pipeline"

In [None]:
processor = PyTorchProcessor(
    framework_version="2.5",
    role=role,
    py_version="py311",
    instance_type="ml.m5.4xlarge",
    instance_count=1,
    base_job_name="rag-pipeline",
    sagemaker_session=pipeline_session
)


# Step 1: Data Preparation
data_prep_step_args = processor.run(
    outputs=[
        ProcessingOutput(
            output_name="data_prep_output",
            source="/opt/ml/processing/output/",
            destination=data_prep_output_s3_uri
        )
    ],
    code="data_preparation.py",
    source_dir=script_path,
    arguments=[
        "--experiment-name", experiment_name,
        "--mlflow-tracking-uri", mlflow_tracking_uri,
        "--output-data-path", "/opt/ml/processing/output",
        "--parent-run-name", parent_run_name
    ]
)

data_prep_step = ProcessingStep(
    name="DataPrepStep",
    step_args=data_prep_step_args
)


# Step 2: Data Chunking
chunking_step_args = processor.run(
    inputs=[
        ProcessingInput(
            source=data_prep_step.properties.ProcessingOutputConfig.Outputs["data_prep_output"].S3Output.S3Uri,
            destination="/opt/ml/processing/input"
        )
    ],
    outputs=[
        ProcessingOutput(
            output_name="chunking_output",
            source="/opt/ml/processing/output/",
            destination=chunking_output_s3_uri
        )
    ],
    code="data_chunking.py",
    source_dir=script_path,
    arguments=[
        "--experiment-name", experiment_name,
        "--mlflow-tracking-uri", mlflow_tracking_uri,
        "--input-data-path", "/opt/ml/processing/input",
        "--output-data-path", "/opt/ml/processing/output",
        "--chunking-strategy", chunking_strategy,
        "--chunk-size", chunk_size,
        "--chunk-overlap", chunk_overlap
    ]
)

data_chunking_step = ProcessingStep(
    name="DataChunkingStep",
    step_args=chunking_step_args
)


# Step 3: Data Ingestion
ingestion_step_args = processor.run(
    inputs=[
        ProcessingInput(
            source=data_chunking_step.properties.ProcessingOutputConfig.Outputs["chunking_output"].S3Output.S3Uri,
            destination="/opt/ml/processing/input"
        )
    ],
    outputs=[
        ProcessingOutput(
            output_name="ingestion_output",
            source="/opt/ml/processing/output/",
            destination=ingestion_output_s3_uri
        )
    ],
    code="data_ingestion.py",
    source_dir=script_path,
    arguments=[
        "--experiment-name", experiment_name,
        "--mlflow-tracking-uri", mlflow_tracking_uri,
        "--input-data-path", "/opt/ml/processing/input",
        "--output-data-path", "/opt/ml/processing/output",
        "--embedding-endpoint-name", embedding_endpoint_name,
        "--domain-name", domain_name,
        "--index-name", index_name,
        "--model-dimensions", model_dimensions,
        "--embedding-model-id", embedding_model_id
    ]
)

data_ingestion_step = ProcessingStep(
    name="DataIngestionStep",
    step_args=ingestion_step_args
)



# Step 4: RAG Retrieval
retrieval_step_args = processor.run(
    inputs=[
        ProcessingInput(
            source=data_ingestion_step.properties.ProcessingOutputConfig.Outputs["ingestion_output"].S3Output.S3Uri,
            destination="/opt/ml/processing/input"
        )
    ],
    outputs=[
        ProcessingOutput(
            output_name="retrieval_output",
            source="/opt/ml/processing/output/",
            destination=retrieval_output_s3_uri
        )
    ],
    code="rag_retrieval.py",
    source_dir=script_path,
    arguments=[
        "--experiment-name", experiment_name,
        "--mlflow-tracking-uri", mlflow_tracking_uri,
        "--input-data-path", "/opt/ml/processing/input",
        "--output-data-path", "/opt/ml/processing/output",
        "--embedding-endpoint-name", embedding_endpoint_name,
        "--text-endpoint-name", text_endpoint_name,
        "--domain-name", domain_name,
        "--index-name", index_name,
        "--context-retrieval-size", context_retrieval_size,
        "--embedding-model-id", embedding_model_id,
        "--text-model-id", text_model_id
    ]
)

rag_retrieval_step = ProcessingStep(
    name="RAGRetrievalStep",
    step_args=retrieval_step_args
)


# Step 5: RAG Evaluation
evaluation_step_args = processor.run(
    inputs=[
        ProcessingInput(
            source=rag_retrieval_step.properties.ProcessingOutputConfig.Outputs["retrieval_output"].S3Output.S3Uri,
            destination="/opt/ml/processing/input"
        )
    ],
    outputs=[
        ProcessingOutput(
            output_name="evaluation_output",
            source="/opt/ml/processing/output/",
            destination=evaluation_output_s3_uri
        )
    ],
    code="rag_evaluation.py",
    source_dir=script_path,
    arguments=[
        "--experiment-name", experiment_name,
        "--mlflow-tracking-uri", mlflow_tracking_uri,
        "--input-data-path", "/opt/ml/processing/input",
        "--output-data-path", "/opt/ml/processing/output",
        "--embedding-endpoint-name", embedding_endpoint_name,
        "--text-endpoint-name", text_endpoint_name,
        "--domain-name", domain_name,
        "--embedding-model-id", embedding_model_id,
        "--text-model-id", text_model_id,
        "--chunking-strategy", chunking_strategy,
        "--chunk-size", chunk_size,
        "--chunk-overlap", chunk_overlap,
        "--role-arn", role
    ]
)


rag_evaluation_step = ProcessingStep(
    name="RAGEvaluationStep",
    step_args=evaluation_step_args
)


In [None]:
pipeline = Pipeline(
    name=f"RAGPipeline-{timestamp}",
    steps=[data_prep_step, data_chunking_step, data_ingestion_step, rag_retrieval_step, rag_evaluation_step],
    sagemaker_session=pipeline_session
)

In [None]:
pipeline.upsert(role_arn=role)

In [None]:
execution = pipeline.start()
print(f"Pipeline execution started with ARN: {execution.arn}")