# 06 - Amazon SageMaker jobs and pipeline for documents processing

Run this notebook to orchestrate the creation the jobs of creating and indexing embeddings for semantic search and loading a set of structured entities as a SQL table into the PostgreSQL database. The orchestration will be done using a SageMaker pipeline.

In [None]:
import sys

import boto3
import sagemaker
from sagemaker.workflow.pipeline_context import PipelineSession

sagemaker_session = sagemaker.session.Session()
region = sagemaker_session.boto_region_name
role = sagemaker.get_execution_role()
pipeline_session = PipelineSession()

In [3]:
import json

ssm = boto3.client("ssm")
secretsmanager = boto3.client("secretsmanager")
region = boto3.session.Session().region_name

In [4]:
security_group_parameter = "/AgenticLLMAssistantWorkshop/SMProcessingJobSecurityGroupId"
dbsecret_arn_parameter = "/AgenticLLMAssistantWorkshop/DBSecretARN"
subnet_ids_parameter = "/AgenticLLMAssistantWorkshop/SubnetIds"
s3_bucket_name_parameter = "/AgenticLLMAssistantWorkshop/AgentDataBucketParameter"

security_group = ssm.get_parameter(Name=security_group_parameter)
security_group = security_group["Parameter"]["Value"]

db_secret_arn = ssm.get_parameter(Name=dbsecret_arn_parameter)
db_secret_arn = db_secret_arn["Parameter"]["Value"]

subnet_ids = ssm.get_parameter(Name=subnet_ids_parameter)
private_subnets_with_egress_ids = json.loads(subnet_ids["Parameter"]["Value"])

s3_bucket_name = ssm.get_parameter(Name=s3_bucket_name_parameter)
s3_bucket_name = s3_bucket_name["Parameter"]["Value"]


In [5]:
processed_documents_s3_key = "documents_processed.json"
sql_tables_s3_key = "structured_metadata"

In [None]:
!aws s3 ls {s3_bucket_name}/{processed_documents_s3_key}

In [None]:
!aws s3 ls {s3_bucket_name}/{sql_tables_s3_key}/

## Upload pre-created data if no data exists

In [18]:
import boto3
import botocore

# Create an S3 client
s3 = boto3.client('s3')


def upload_file_to_s3_if_doesnt_exist(file_path, bucket_name, s3_file_key):
    try:
        # Try to retrieve the file metadata
        s3.head_object(Bucket=bucket_name, Key=s3_file_key)
        print(f"File '{s3_file_key}' already exists in bucket '{bucket_name}'")
    except botocore.exceptions.ClientError as e:
        # If the error code is 404 (Not Found), the file doesn't exist
        if e.response['Error']['Code'] == '404':
            try:
                # Upload the file to S3
                s3.upload_file(file_path, bucket_name, s3_file_key)
                print(f"File '{s3_file_key}' uploaded to bucket '{bucket_name}'")
            except botocore.exceptions.ClientError as e:
                print(f"Error uploading file: {e}")
        else:
            # Handle other errors, e.g., access denied, bucket doesn't exist, etc.
            print(f"Error occurred: {e}")

Upload Amazon Financial reports pre-extracted with Amazon Textract using the notebooks 1 to 5 referred at the beginning of this notebook.

In [None]:
file_path = "data/documents_processed.json"
s3_file_key = processed_documents_s3_key
upload_file_to_s3_if_doesnt_exist(file_path, s3_bucket_name, s3_file_key)

Upload entities of interest extracted from Amazon Financial reports using notebooks 1 to 5 referred at the beginning of this notebook.

In [None]:
file_path = "data/extracted_entities.csv"
s3_file_key = f"{sql_tables_s3_key}/extracted_entities.csv"
upload_file_to_s3_if_doesnt_exist(file_path, s3_bucket_name, s3_file_key)

## Add pipeline step to prepare and load embeddings

In [27]:
from sagemaker.network import NetworkConfig

# Note if you enable network isolation, with enable_network_isolation=True
# the pip installation of the dependencies
# under scripts/requirements.txt won't work.
current_network_config = NetworkConfig(
    subnets=private_subnets_with_egress_ids, security_group_ids=[security_group]
)

In [None]:
%%time
from sagemaker.processing import ScriptProcessor, ProcessingInput, ProcessingOutput
from sagemaker import get_execution_role

# Initialize the ScriptProcessor
embeddings_and_index_job = ScriptProcessor(
    image_uri=script_processor_container_uri,
    role=get_execution_role(),
    instance_type="ml.t3.large",
    instance_count=1,
    base_job_name="EmbeddingCreationAndIndexing",
    env={"SQL_DB_SECRET_ID": db_secret_arn, "AWS_DEFAULT_REGION": region},
    network_config=current_network_config,
    command=["python3"]
    sagemaker_session=sagemaker_session
)


# Run the processing job
embeddings_creation_and_indexing_args = embeddings_and_index_job.run(
    code="prepare_and_load_embeddings.py",
    source_dir="scripts",
    inputs=[
        ProcessingInput(
            input_name="processed_documents",
            source=f"s3://{s3_bucket_name}/{processed_documents_s3_key}",
            destination="/opt/ml/processing/input/processed_documents",
        )
    ],
)

embeddings_creation_and_indexing_step = ProcessingStep(
    name="EmbeddingCreationAndIndexing",
    step_args=embeddings_creation_and_indexing_args
)

##Â Add pipeline step to load extract entities into a SQL table

In [None]:
%%time
from sagemaker.processing import ScriptProcessor, ProcessingInput, ProcessingOutput
from sagemaker import get_execution_role

# Initialize the ScriptProcessor
entities_to_sql = ScriptProcessor(
    image_uri=script_processor_container_uri,
    role=get_execution_role(),
    instance_type="ml.t3.large",
    instance_count=1,
    base_job_name="LoadEntitiesToSQLTable",
    env={"SQL_DB_SECRET_ID": db_secret_arn, "AWS_DEFAULT_REGION": region},
    network_config=current_network_config,
    command=["python3"]
    sagemaker_session=sagemaker_session
)

# Run the processing job
load_entities_to_sql_table_args = entities_to_sql.run(
    code="load_sql_tables.py",
    source_dir="scripts",
    inputs=[
        ProcessingInput(
            input_name="sqltables",
            source=f"s3://{s3_bucket_name}/{sql_tables_s3_key}",
            destination="/opt/ml/processing/input/sqltables",
        )
    ]
)

load_entities_to_sql_table_step = ProcessingStep(
    name="LoadEntitiesToSQLTable",
    step_args=load_entities_to_sql_table_args
)

## Define the SageMaker Pipeline

In [None]:
from sagemaker.workflow.pipeline import Pipeline


pipeline_name = f"agentic-assistant-data-processing"
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[],
    steps=[
        embeddings_creation_and_indexing_step,
        load_entities_to_sql_table_step
    ]
)

print the pipeline definition in json

In [None]:
import json


definition = json.loads(pipeline.definition())
definition

Update or insert the pipeline

In [None]:
pipeline.upsert(role_arn=role)

Start the pipeline execution

In [33]:
execution = pipeline.start()

Monitor the pipeline execution.

In [None]:
execution.describe()

In [None]:
execution.list_steps()