# 04 - Amazon SageMaker jobs and pipeline for documents processing

In [None]:
import sys
import boto3
import sagemaker
from sagemaker.workflow.pipeline_context import PipelineSession

sagemaker_session = sagemaker.session.Session()
region = sagemaker_session.boto_region_name
role = sagemaker.get_execution_role()
pipeline_session = PipelineSession()

In [None]:
import json

ssm = boto3.client("ssm")
secretsmanager = boto3.client("secretsmanager")
region = boto3.session.Session().region_name

In [None]:
security_group_parameter = "/AgenticLLMAssistant/SMProcessingJobSecurityGroupId"
dbsecret_arn_parameter = "/AgenticLLMAssistant/DBSecretARN"
subnet_ids_parameter = "/AgenticLLMAssistant/SubnetIds"
s3_bucket_name_parameter = "/AgenticLLMAssistant/AgentDataBucketParameter"

security_group = ssm.get_parameter(Name=security_group_parameter)
security_group = security_group["Parameter"]["Value"]

db_secret_arn = ssm.get_parameter(Name=dbsecret_arn_parameter)
db_secret_arn = db_secret_arn["Parameter"]["Value"]

subnet_ids = ssm.get_parameter(Name=subnet_ids_parameter)
public_subnets_ids = json.loads(subnet_ids["Parameter"]["Value"])

s3_bucket_name = ssm.get_parameter(Name=s3_bucket_name_parameter)
s3_bucket_name = s3_bucket_name["Parameter"]["Value"]

In [None]:
processed_documents_s3_key = "documents_processed.json"
sql_tables_s3_key = "structured_metadata"

In [None]:
import boto3
import botocore

s3 = boto3.client('s3')

def upload_file_to_s3_if_doesnt_exist(file_path, bucket_name, s3_file_key):
    try:
        s3.head_object(Bucket=bucket_name, Key=s3_file_key)
        print(f"File '{s3_file_key}' already exists in bucket '{bucket_name}'")
    except botocore.exceptions.ClientError as e:
        if e.response['Error']['Code'] == '404':
            try:
                s3.upload_file(file_path, bucket_name, s3_file_key)
                print(f"File '{s3_file_key}' uploaded to bucket '{bucket_name}'")
            except botocore.exceptions.ClientError as e:
                print(f"Error uploading file: {e}")
        else:
            print(f"Error occurred: {e}")

In [None]:
file_path = "data/documents_processed.json"
s3_file_key = processed_documents_s3_key
upload_file_to_s3_if_doesnt_exist(file_path, s3_bucket_name, s3_file_key)

In [None]:
file_path = "data/extracted_entities.csv"
s3_file_key = f"{sql_tables_s3_key}/extracted_entities.csv"
upload_file_to_s3_if_doesnt_exist(file_path, s3_bucket_name, s3_file_key)

In [None]:
from sagemaker.network import NetworkConfig

current_network_config = NetworkConfig(
    subnets=public_subnets_ids, security_group_ids=[security_group]
)

In [None]:
from sagemaker.xgboost import XGBoostProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker import get_execution_role
from sagemaker.workflow.steps import ProcessingStep

embeddings_and_index_job = XGBoostProcessor(
    framework_version="1.7-1",
    role=get_execution_role(),
    instance_type="ml.t3.medium",
    instance_count=1,
    base_job_name="PrepareAndLoadEmbeddingsJob",
    env={
        "SQL_DB_SECRET_ID": db_secret_arn,
        "AWS_DEFAULT_REGION": region,
    },
    network_config=current_network_config,
    sagemaker_session=pipeline_session
)

embeddings_creation_and_indexing_args = embeddings_and_index_job.run(
    code="prepare_and_load_embeddings.py",
    source_dir="scripts",
    inputs=[
        ProcessingInput(
            input_name="processed_documents",
            source=f"s3://{s3_bucket_name}/{processed_documents_s3_key}",
            destination="/opt/ml/processing/input/processed_documents",
        )
    ],
)

embeddings_creation_and_indexing_step = ProcessingStep(
    name="EmbeddingCreationAndIndexing",
    step_args=embeddings_creation_and_indexing_args
)

In [None]:
entities_to_sql = XGBoostProcessor(
    framework_version="1.7-1",
    role=get_execution_role(),
    instance_type="ml.t3.medium",
    instance_count=1,
    base_job_name="LoadEntitiesToSQLTableJob",
    env={"SQL_DB_SECRET_ID": db_secret_arn, "AWS_DEFAULT_REGION": region},
    network_config=current_network_config,
    sagemaker_session=pipeline_session,
)

load_entities_to_sql_table_args = entities_to_sql.run(
    code="load_sql_tables.py",
    source_dir="scripts",
    inputs=[
        ProcessingInput(
            input_name="sqltables",
            source=f"s3://{s3_bucket_name}/{sql_tables_s3_key}",
            destination="/opt/ml/processing/input/sqltables",
        )
    ]
)

load_entities_to_sql_table_step = ProcessingStep(
    name="LoadEntitiesToSQLTable",
    step_args=load_entities_to_sql_table_args
)

In [None]:
from sagemaker.workflow.pipeline import Pipeline

pipeline_name = f"agentic-assistant-data-processing"
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[],
    steps=[
        embeddings_creation_and_indexing_step,
        load_entities_to_sql_table_step
    ]
)

In [None]:
pipeline.upsert(role_arn=role)

In [None]:
execution = pipeline.start()

In [None]:
execution.describe()

In [None]:
execution.list_steps()