In [1]:
# Clean up mem0
!rm -rf /home/sagemaker-user/.mem0/
!rm -rf /tmp/mem0_*_faiss/

!pip install --upgrade strands-agents strands-agents-tools mem0ai opensearch-py ddgs faiss-cpu
!pip install --upgrade boto3
!playwright install
!playwright install-deps

from IPython import get_ipython
#get_ipython().kernel.do_shutdown(True)

Collecting strands-agents
  Downloading strands_agents-1.0.1-py3-none-any.whl.metadata (12 kB)
Collecting opensearch-py
  Using cached opensearch_py-3.0.0-py3-none-any.whl.metadata (7.2 kB)
Collecting ddgs
  Downloading ddgs-9.4.1-py3-none-any.whl.metadata (17 kB)
Collecting faiss-cpu
  Using cached faiss_cpu-1.11.0.post1-cp312-cp312-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl.metadata (5.0 kB)
Collecting Events (from opensearch-py)
  Using cached Events-0.5-py3-none-any.whl.metadata (3.9 kB)
Downloading strands_agents-1.0.1-py3-none-any.whl (162 kB)
Using cached opensearch_py-3.0.0-py3-none-any.whl (371 kB)
Downloading ddgs-9.4.1-py3-none-any.whl (33 kB)
Using cached faiss_cpu-1.11.0.post1-cp312-cp312-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl (31.3 MB)
Using cached Events-0.5-py3-none-any.whl (6.8 kB)
Installing collected packages: Events, faiss-cpu, ddgs, opensearch-py, strands-agents
[2K  Attempting uninstall: ddgs0m[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1/5[0m [f

In [2]:
import os
import logging

os.environ["BYPASS_TOOL_CONSENT"] = "true"

CLAIM_ID = "dev-use-case-01"

CONFIG = {
  "inputBucket": "insuranceclaim-input-670934798598-us-west-2",
  "outputBucket": "insuranceclaim-output-670934798598-us-west-2",
  "bdaProjectArn": "arn:aws:bedrock:us-west-2:670934798598:data-automation-project/37a1c412b824",
}

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Agents

### 1. Document handler

In [3]:
SAMPLE_SFN_PAYLOAD = {
    "inputBucket": "insuranceclaim-input-670934798598-us-west-2",
    "outputBucket": "insuranceclaim-output-670934798598-us-west-2",
    "key": "dev-use-case-01/Description of Damage.pdf",
    "bdaProjectArn": "arn:aws:bedrock:us-west-2:670934798598:data-automation-project/37a1c412b824",
    "claimId": "dev-use-case-01"
}

In [15]:
# Define a tool to invoke the ML model
from strands import Agent, tool
from strands_tools import use_aws, mem0_memory
from tampered_image_detection import check_image
from IPython.display import display, Markdown
import json
import time
import boto3
import os
from strands.models.bedrock import BedrockModel

os.environ["BYPASS_TOOL_CONSENT"] = "true"

@tool
def invoke_bda_sfn(document_s3_key: str) -> str:
    """
    Execute BDA to extract or infer information from given files.
    Args:
    document_s3_key(str): A key path to the file, excluding s3 bucket name.
    """
    sfn = boto3.client('stepfunctions')
    response = sfn.start_execution(
        stateMachineArn=f"arn:aws:states:us-west-2:670934798598:stateMachine:insuranceclaim-Ingestion",
        input=json.dumps({**SAMPLE_SFN_PAYLOAD, "key": document_s3_key})
    )

    # Get the execution ARN from the response
    execution_arn = response['executionArn']

    while True:
        # Get execution status
        execution_response = sfn.describe_execution(
            executionArn=execution_arn
        )

        status = execution_response['status']

        if status in ['SUCCEEDED', 'FAILED', 'TIMED_OUT', 'ABORTED']:
            # Return the output from the state machine
            return str(execution_response["output"])
        # Wait for 5 seconds before checking again
        time.sleep(3)
    return "FAILED"

# Testing
#invoke_bda_sfn("dev-use-case-01/Description of Damage.pdf")

model = BedrockModel(model_id="us.amazon.nova-pro-v1:0")

document_processor_agent = Agent(
    name="document_processor",
#    model = model,
    system_prompt=f"You are a document processor handling multi-modal documents including text, PDFs, images, audio and video files."
    f"You will be given with a CLAIM_ID."
    f"Firstly, scan a prefix in a S3 bucket {CONFIG['inputBucket']} and find keys for each document."
    f"Then run `invoke_bda_sfn` tool parallely to process the documents."
    f"Create a the list of documents processed along with their input and output file locations in JSON format."
    f"Return the JSON part only.",
    #f"Store the JSON produced into a DynamoDB table `insuranceclaims-documents` in us-west-2, using claim_id as a partition key.",
    #f"Use memory to store outputs from the previous step.",
    tools=[use_aws, invoke_bda_sfn]
    )

@tool
def document_processor(claim_id:str):
    """ A document processor extracting data from multi-modal documents including text, PDFs, images, audio and video files.
        Args:
            claim_id(str)
    """
    response = document_processor_agent(claim_id)
    return response.message["content"][0]["text"]


# Testing
#document_processor(CLAIM_ID)

### 2. Assessor (TBD)

### 3. Investigator

In [24]:
# Define a tool to invoke the ML model
from strands import Agent, tool
from tampered_image_detection import check_image
from IPython.display import display, Markdown


@tool
def detect_tampered_image(s3_uri: str) -> str:
    """
    Detect if the input image is a forged one or not.
    Args:
    s3_uri(str): A S3 URI to the image file.

    Return (str): "tampered" if the image is a forged one, "original" if not.
    """
    #logger.info(s3_uri)
    response = check_image(s3_uri, "document-tampering-detection-v-DEMO")
    
    return "tampered" if response else "original"
    
#detect_tampered_image( "s3://insuranceclaim-input-670934798598-us-west-2/dev-use-case-01/Proof-of-damage.png")
#tampered_image_detector_agent = Agent(system_prompt="You are a fraud detector for insurance claims document processing."
#              "Detect if given image files were forged one or not. Be concise about your answer, mentioning only if the image seems tampered one or not.",
#              tools = [detect_tampered_image])

#Testing
response = detect_tampered_image(
   "s3://insuranceclaim-input-670934798598-us-west-2/test/Noisy Image (Random)_screenshot_23.07.2025.png"
)
response




'tampered'

In [6]:
# Web search tool
from strands import tool, Agent
from ddgs import DDGS

@tool
def web_search(
    keywords: str,
    region: str = "us-en",
    max_results: int = 7,
) -> str:
    """Search the web to get updated information.
    Args:
        keywords (str): The search query keywords.
        region (str): The search region: wt-wt, us-en, uk-en, ru-ru, etc..
        max_results (int | None): The maximum number of results to return.
    Returns:
        List of dictionaries with search results.
    """
    try:
        results = DDGS().text(keywords, region=region, max_results=max_results)
        return results if results else "No results found."
    except Exception as e:
        return f"Exception: {e}"

# Test
#web_search("How much does it cost to fix a car")

#### Investigator

In [7]:
from strands_tools import think
from strands_tools import mem0_memory, use_aws, current_time, use_aws, python_repl


#model = BedrockModel(model_id="us.amazon.nova-pro-v1:0")

investigator_agent = Agent(
    name="investigator",
    #model=model,
    system_prompt="You are an assistant for the inspector."
          "Find the extracted data from the claim, then check if the documents are truthful using tools provided."
          "You can find list of the documents in a DynamoDB table `insuranceclaims-documents` in us-west-2, using claim_id as a key"
          "For example, use detect_tampered_image tool to find if .png files are forged ones."
          "For stated facts, try your best to validate them by getting helps from the provided tool"
          "Use your memory to find the location of the extracted document and remember the findings into memory, using the claim_id as agent_id",
    tools = [use_aws, detect_tampered_image, web_search, mem0_memory, current_time], )

#investigator_agent(CLAIM_ID)

#### Adjustor

In [8]:

from strands_tools import mem0_memory, use_aws, current_time, use_aws, python_repl, think


#model = BedrockModel(model_id="us.amazon.nova-pro-v1:0")

adjustor_agent = Agent(
    name="adjustor",
    #model=model,
    system_prompt="You are an claim adjustor. Find polici document for the insurance case, and find out the fair and accurate payout for the claim, and suggest adjusted payout."
    #"You can find list of the documents in a DynamoDB table `insuranceclaims-documents` in us-west-2, using claim_id as a key. Find out the policy document from there."
    "Use memory to check what investigator has found regarding the claim. Use claim_id as agent_id."
    "Generate your suggestion for adjusting the claim case",
    tools = [use_aws, mem0_memory], )


#Testing
#adjustor(adjustor_agent)

### Reviewer

In [9]:

from strands_tools import mem0_memory, use_aws, current_time, use_aws, python_repl, think


reviewer_agent = Agent(
    name="reviwer",
    system_prompt="You are a reviwer of insurnace claim cases. "
    #"You can find list of the documents in a DynamoDB table `insuranceclaims-documents` in us-west-2, using claim_id as a key."
    "Use memory to check what investigator has found regarding the claim. Use claim_id as agent_id."
    "Review the documents and find out if anything is missing. If so, send it back.",
    tools = [use_aws, mem0_memory], )

#Testing
#reviewer_agent(CLAIM_ID)

### Report Generator

In [10]:
from strands_tools import think
from strands_tools import mem0_memory, use_aws, current_time, use_aws


model = BedrockModel(model_id="us.amazon.nova-pro-v1:0")

report_agent = Agent(
    model=model,
    name="report",
    system_prompt="You are a insurnace claim agent. You evaluate collected information on a claim case and create a report."
    #"Use all the information collected. Use your memory using the given claim_id as an agent_id."
    #"You can find the location of JSON files from document processing as well."
    "Generate the final report in Markdown format",
    tools = [use_aws, mem0_memory])

#Testing
#report_agent(CLAIM_ID)

### Claim Adjustor (TBD)

### Report Generator

# Put Everything Together

In [11]:
from strands import Agent
from strands.multiagent import GraphBuilder, Swarm

builder = GraphBuilder()

builder.add_node(document_processor_agent, "document_processor")

assessor_swarm = Swarm(
    [investigator_agent, adjustor_agent, reviewer_agent],
    max_handoffs=4,
    max_iterations=2,
    execution_timeout=300.0,
    node_timeout=60
)
builder.add_node(assessor_swarm, "accessor")
builder.add_node(report_agent, "report")

builder.add_edge("document_processor", "accessor")
builder.add_edge("accessor", "report")

# Set entry points (optional - will be auto-detected if not specified)
builder.set_entry_point("document_processor")

# Build the graph
graph = builder.build()

In [12]:
response = graph(CLAIM_ID)

INFO:strands.telemetry.metrics:Creating Strands MetricsClient


I'll help you process documents for claim ID "dev-use-case-01". Let me start by scanning the S3 bucket to find all documents under this prefix.
Tool #1: use_aws


Now I'll process all the documents in parallel using the BDA step function. I found 7 documents (excluding the folder entry):
Tool #2: invoke_bda_sfn

Tool #3: invoke_bda_sfn

Tool #4: invoke_bda_sfn

Tool #5: invoke_bda_sfn

Tool #6: invoke_bda_sfn

Tool #7: invoke_bda_sfn

Tool #8: invoke_bda_sfn
```json
{
  "claim_id": "dev-use-case-01",
  "processed_documents": [
    {
      "document_name": "Customer-call.m4a",
      "document_type": "audio",
      "input_location": "s3://insuranceclaim-input-670934798598-us-west-2/dev-use-case-01/Customer-call.m4a",
      "output_location": "s3://insuranceclaim-output-670934798598-us-west-2/dev-use-case-01/Customer-call.m4a/005c6046-c4a1-4f81-9ee3-1020995af442/0/standard_output/0/result.json",
      "status": "success"
    },
    {
      "document_name": "Description of Damage.pdf",
      "document_type": "pdf",
      "input_location": "s3://insuranceclaim-input-670934798598-us-west-2/dev-use-case-01/Description of Damage.pdf",
      "output_loca

  response_received_timestamp = datetime.datetime.utcnow()
  local_timestamp = datetime.datetime.utcnow()
ERROR:strands.multiagent.swarm:node=<investigator>, timeout=<60>s | node execution timed out after timeout
Traceback (most recent call last):
  File "/opt/conda/lib/python3.12/asyncio/tasks.py", line 520, in wait_for
    return await fut
           ^^^^^^^^^
  File "/opt/conda/lib/python3.12/site-packages/strands/multiagent/swarm.py", line 590, in _execute_node
    result = await node.executor.invoke_async(node_input)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/lib/python3.12/site-packages/strands/agent/agent.py", line 398, in invoke_async
    async for event in events:
  File "/opt/conda/lib/python3.12/site-packages/strands/agent/agent.py", line 508, in stream_async
    async for event in events:
  File "/opt/conda/lib/python3.12/site-packages/strands/agent/agent.py", line 544, in _run_loop
    async for event in events:
  File "/opt/conda/lib/pyth

<thinking> Based on the inputs from the previous nodes, I need to generate a final report in Markdown format. However, the specific details of the claim case are not provided in the input. To proceed, I need to gather the necessary information about the claim case. I will use the `mem0_

INFO:faiss.loader:Loading faiss with AVX512 support.
INFO:faiss.loader:Successfully loaded faiss with AVX512 support.
INFO:faiss:Failed to load GPU Faiss: name 'GpuIndexIVFFlat' is not defined. Will not load constructor refs for GPU indexes. This is only an error if you're trying to use GPU Faiss.
INFO:mem0.vector_stores.faiss:Inserted 1 vectors into collection mem0migrations


memory` tool to retrieve relevant memories related to the claim case. </thinking>

Tool #1: mem0_memory


  datetime_now = datetime.datetime.utcnow()


<thinking> The `mem0_memory` tool requires either a `user_id` or `agent_id` to perform the retrieval. Since I don't have this information, I need to ask the user to provide either the `user_id` or `agent_id` to proceed with retrieving the claim case details. </thinking> 

Please provide either the `user_id` or `agent_id` associated with the claim case so I can retrieve the necessary details.

  response_received_timestamp = datetime.datetime.utcnow()
  local_timestamp = datetime.datetime.utcnow()


In [13]:
from IPython.display import display, Markdown
response.results["report"].result.message["content"][0]["text"]
display(Markdown(response.results["report"].result.message["content"][0]["text"]))

<thinking> The `mem0_memory` tool requires either a `user_id` or `agent_id` to perform the retrieval. Since I don't have this information, I need to ask the user to provide either the `user_id` or `agent_id` to proceed with retrieving the claim case details. </thinking> 

Please provide either the `user_id` or `agent_id` associated with the claim case so I can retrieve the necessary details.