# 04 - Amazon SageMaker pipeline for documents processing

Steps of the pipeline: 

1. Download documents
2. Parse out the text
3. Create embedding and semantic search index
4. Extract entities
5. Load embeddings into PostgreSQL
6. Load entities into SQL table.


Run this notebook after running notebooks 01 to 05 under `notebooks`. This will execute a SageMaker pipeline that prepares the embeddings and SQL tables and loads them into the PostgreSQL database.

In [None]:
import sys

import boto3
import sagemaker
from sagemaker.workflow.pipeline_context import PipelineSession

sagemaker_session = sagemaker.session.Session()
region = sagemaker_session.boto_region_name
role = sagemaker.get_execution_role()
pipeline_session = PipelineSession()

In [None]:
import json

ssm = boto3.client("ssm")
secretsmanager = boto3.client("secretsmanager")
region = boto3.session.Session().region_name

In [None]:
security_group_parameter = "/AgenticLLMAssistant/SMProcessingJobSecurityGroupId"
dbsecret_arn_parameter = "/AgenticLLMAssistant/DBSecretARN"
subnet_ids_parameter = "/AgenticLLMAssistant/SubnetIds"
s3_bucket_name_parameter = "/AgenticLLMAssistant/AgentDataBucketParameter"

security_group = ssm.get_parameter(Name=security_group_parameter)
security_group = security_group["Parameter"]["Value"]

db_secret_arn = ssm.get_parameter(Name=dbsecret_arn_parameter)
db_secret_arn = db_secret_arn["Parameter"]["Value"]

subnet_ids = ssm.get_parameter(Name=subnet_ids_parameter)
private_subnets_with_egress_ids = json.loads(subnet_ids["Parameter"]["Value"])

s3_bucket_name = ssm.get_parameter(Name=s3_bucket_name_parameter)
s3_bucket_name = s3_bucket_name["Parameter"]["Value"]


In [None]:
processed_documents_s3_key = "documents_processed.json"
sql_tables_s3_key = "structured_metadata"

In [None]:
!aws s3 ls {s3_bucket_name}/{processed_documents_s3_key}

In [None]:
!aws s3 ls {s3_bucket_name}/

## Add pipeline step to prepare and load embeddings

In [None]:
from sagemaker.network import NetworkConfig

# Note if you enable network isolation, with enable_network_isolation=True
# the pip installation of the dependencies
# under scripts/requirements.txt won't work.
current_network_config = NetworkConfig(
    subnets=private_subnets_with_egress_ids, security_group_ids=[security_group]
)

In [None]:
%%time
from sagemaker.xgboost import XGBoostProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker import get_execution_role
from sagemaker.workflow.steps import ProcessingStep

# Initialize the XGBoostProcessor
embeddings_and_index_job = XGBoostProcessor(
    framework_version="1.7-1",
    role=get_execution_role(),
    instance_type="ml.m5.large",
    instance_count=1,
    base_job_name="PrepareAndLoadEmbeddingsJob",
    env={
        "SQL_DB_SECRET_ID": db_secret_arn,
        # region used by botocore.
        "AWS_DEFAULT_REGION": region,
    },
    network_config=current_network_config,
    sagemaker_session=pipeline_session
)

# Run the processing job
embeddings_creation_and_indexing_args = embeddings_and_index_job.run(
    code="prepare_and_load_embeddings.py",
    source_dir="scripts",
    inputs=[
        ProcessingInput(
            input_name="processed_documents",
            source=f"s3://{s3_bucket_name}/{processed_documents_s3_key}",
            destination="/opt/ml/processing/input/processed_documents",
        )
    ],
)

embeddings_creation_and_indexing_step = ProcessingStep(
    name="EmbeddingCreationAndIndexing",
    step_args=embeddings_creation_and_indexing_args
)

## Add pipeline step to load extract entities into a SQL table

In [None]:
%%time
from sagemaker.xgboost import XGBoostProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker import get_execution_role

# Initialize the XGBoostProcessor
entities_to_sql = XGBoostProcessor(
    framework_version="1.7-1",
    role=get_execution_role(),
    instance_type="ml.m5.large",
    instance_count=1,
    base_job_name="LoadEntitiesToSQLTableJob",
    env={"SQL_DB_SECRET_ID": db_secret_arn, "AWS_DEFAULT_REGION": region},
    network_config=current_network_config,
    sagemaker_session=pipeline_session,
)

# Run the processing job
load_entities_to_sql_table_args = entities_to_sql.run(
    code="load_sql_tables.py",
    source_dir="scripts",
    inputs=[
        ProcessingInput(
            input_name="sqltables",
            source=f"s3://{s3_bucket_name}/{sql_tables_s3_key}",
            destination="/opt/ml/processing/input/sqltables",
        )
    ],
)


load_entities_to_sql_table_step = ProcessingStep(
    name="LoadEntitiesToSQLTable",
    step_args=load_entities_to_sql_table_args
)

## Define the SageMaker Pipeline

In [None]:
from sagemaker.workflow.pipeline import Pipeline


pipeline_name = f"agentic-assistant-data-processing"
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[],
    steps=[
        embeddings_creation_and_indexing_step,
        load_entities_to_sql_table_step
    ]
)

In [None]:
import json


definition = json.loads(pipeline.definition())
definition

Update or insert the pipeline

In [None]:
pipeline.upsert(role_arn=role)

Start the pipeline execution

In [None]:
execution = pipeline.start()

Monitor the pipeline execution.

In [None]:
execution.describe()

In [None]:
execution.list_steps()