
# Work‑stream 3 — Impairment Detection Agent (Notebook Version)

This notebook mirrors the `detect.py` Lambda we drafted in the sprint planning session.
It shows, step‑by‑step, how the Strands agent ingests XML feeds, identifies impairments,
pulls scoring‑factor names from the Knowledge‑Base markdown, and emits the JSON payload
expected by Work‑stream 4.

> **Why a notebook?**  
> • Easier to demo interactively.  
> • Lets you tweak the prompt or tools and run sample XML without redeploying the Lambda.  
> • Serves as executable documentation for future hand‑offs.


In [16]:
pip install -r requirements.txt

Looking in indexes: https://pypi.org/simple, https://plugin.us-east-1.prod.workshops.aws
Note: you may need to restart the kernel to use updated packages.


In [17]:

import os, json, boto3
import numpy as np
from collections import defaultdict
from lxml import etree
from strands import Agent, tool
from strands_tools import retrieve

# ---- Set these before running locally ----
# Knowledge base configuration - uncomment the line below to use Bedrock Knowledge Base instead of local files
# kb_id = 'YSWIGPQHRJ'
kb_id = None  # Reset to None to ensure clean state

model_id = 'us.anthropic.claude-3-7-sonnet-20250219-v1:0'
embedding_model_id = 'amazon.titan-embed-text-v2:0'

# Mock data configuration
# mock_data_path = "../mock_data/hypertension"
mock_data_path = "../mock_data/diabetes_cardiovascular"

# Local knowledge base path
local_kb_path = "../underwriting_manual"


In [18]:
# Local Knowledge Base Setup
local_kb_store = None
bedrock_runtime = boto3.client('bedrock-runtime')
os.environ["KNOWLEDGE_BASE_ID"] =  "YSWIGPQHRJ"

def cosine_similarity(a, b):
    """Calculate cosine similarity between two vectors"""
    return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))

def create_embedding(text):
    """Create embedding using Amazon Titan model"""
    response = bedrock_runtime.invoke_model(
        modelId=embedding_model_id,
        body=json.dumps({"inputText": text})
    )
    embedding = json.loads(response['body'].read())['embedding']
    return np.array(embedding)

def load_local_knowledge_base():
    """Load markdown files from local underwriting manual and create embeddings"""
    global local_kb_store
    
    if 'kb_id' in globals() and kb_id is not None:
        print("Bedrock KB configured, skipping local KB loading...")
        return
    
    print(f"Loading local knowledge base from {local_kb_path}...")
    
    kb_documents = []
    
    # Find all markdown files in the underwriting manual directory
    if not os.path.exists(local_kb_path):
        print(f"Warning: Local KB path {local_kb_path} does not exist")
        return
    
    for filename in os.listdir(local_kb_path):
        if filename.lower().endswith('.md'):
            file_path = os.path.join(local_kb_path, filename)
            try:
                with open(file_path, 'r', encoding='utf-8') as f:
                    content = f.read()
                
                print(f"✓ Loading {filename} ({len(content)} chars)")
                
                # Create embedding for the document
                embedding = create_embedding(content)
                
                kb_documents.append({
                    'filename': filename,
                    'content': content,
                    'embedding': embedding
                })
                
            except Exception as e:
                print(f"✗ Error loading {filename}: {e}")
    
    local_kb_store = kb_documents
    print(f"Local knowledge base loaded with {len(kb_documents)} documents")

# Load the local knowledge base if kb_id is not defined or None
if 'kb_id' not in globals() or kb_id is None:
    load_local_knowledge_base()
else:
    print("Using Bedrock Knowledge Base")


Loading local knowledge base from ../underwriting_manual...
✓ Loading hypertension.md (7391 chars)
✓ Loading type1_diabetes.md (9446 chars)
✓ Loading type2_diabetes.md (9187 chars)
✓ Loading lab_values.md (18969 chars)
Local knowledge base loaded with 4 documents


In [19]:
# Corrected tools that follow Strands documentation patterns

@tool
def scratch_fixed(action: str, key: str, value=None, agent=None):
    """Tool for temporary storage during agent execution - uses agent.state properly"""
    # Use agent state for persistence across tool calls
    scratch_data = agent.state.get('scratch_pad') or {}
    
    if action == 'append':
        if key not in scratch_data:
            scratch_data[key] = []
        scratch_data[key].append(value)
    elif action == 'set':
        scratch_data[key] = value
    elif action == 'get':
        return scratch_data.get(key)
    
    # Save back to agent state
    agent.state.set('scratch_pad', scratch_data)
    return 'ok'


In [20]:

# kb_rt = boto3.client('bedrock-agent-runtime')

# @tool
# def kb_search(canonical_term: str):
#     """Return markdown for the top KB hit from either local or Bedrock knowledge base."""
    
#     if ('kb_id' not in globals() or kb_id is None) and local_kb_store:
#         # Use local knowledge base
#         print(f"Searching local KB for: {canonical_term}")
        
#         # Create embedding for the search query
#         query_embedding = create_embedding(canonical_term)
        
#         # Find the most similar document
#         best_match = None
#         best_similarity = -1
        
#         for doc in local_kb_store:
#             similarity = cosine_similarity(query_embedding, doc['embedding'])
#             if similarity > best_similarity:
#                 best_similarity = similarity
#                 best_match = doc
        
#         if best_match:
#             print(f"Best match: {best_match['filename']} (similarity: {best_similarity:.3f})")
#             return best_match['content']
#         else:
#             return "No matching documents found in local knowledge base."
    
#     else:
#         # Use Bedrock Knowledge Base
#         print(f"Searching Bedrock KB for: {canonical_term}")
#         # resp = kb_rt.retrieve(
#         #     knowledgeBaseId=kb_id,
#         #     retrievalQuery={'text': canonical_term},
#         #     retrievalConfiguration={'vectorSearchConfiguration': {'numberOfResults': 1}}
#         # )
#         print(resp)
#         # According to official AWS documentation, the field is 'text', not 'text_markdown'
#         return resp['retrievalResults'][0]['content']['text']


In [None]:

PROMPT = """You are a senior life insurance underwriter. Your job is to analyze the data stream for an application and identify impairments, 
scoring factors (based on the knowledge base), and evidences for those impairments. 
1. Scan the XML feeds (application, Rx, labs, MIB) for impairment evidence and write out an initial list of impairments.
Then for each impairment in your scratch pad, do the following:
2. Call kb_search() once and treat the markdown returned as authoritative.
3. Use the ratings tables in the returned markdown to determine a list of "scoring factors" are required to completely score that impairment and write them out. 
4. Search through the XML feeds to consolidate the values for each scoring factor, and the list of evidence for that impairment. 
5. Write out the scoring factors and evidence for that impairment.

Guidelines: 
- If age of onset of impairment is needed, it can be inferred as the difference between patient's birth date and 
   the first prescription date that addresses the impairment.

Repeat this process for each impairment you find. Deduplicate any impairment that is found in multiple XML feeds into one listng. 

Once you have completed this process for all impairments, return the following JSON:
```json   
   "impairments": [
     {
       "impairment_id": "diabetes",
       "scoring_factors": {"A1C": 8.2, "Neuropathy": true},
       "evidence": ["Rx: insulin …", "Lab: A1C 8.2 %"],
       "discrepancies": ["answered no to Diabetes Questionnaire but evidence of diabetes"] # optional
     },
     {
       "impairment_id": "hypertension",
       "scoring_factors": {"Blood Pressure": 128/92, "Age": 41, "Medication": "Lisinopril 10mg", "Duration": "At least since 2022-04-18", "Compliance": "Good - regular refills", "Target Organ Damage": "None evident", "Comorbidities": "None evident", "Family History": "Father had heart attack at age 58"},
       "evidence": ["Rx: Lisinopril 10mg for hypertension, filled 2024-01-10 (90 tablets)", "Rx: Lisinopril 10mg for hypertension, filled 2023-10-12 (90 tablets)", "MIB: Code 311C 'CARDIOVASCULAR - HYPERTENSION TREATED' from 2022-04-18", "Application: Self-reported Lisinopril 10mg for blood pressure", "Application: Blood pressure reading 128/92 mmHg"],
       
     }
   "narrative": "The applicant has a history of hypertension and diabetes. The hypertension is well controlled with Lisinopril 10mg, and the diabetes is well controlled with insulin. The applicant has a family history of heart attack in the father."
   ]
```
"""


In [22]:
# Test local knowledge base functionality
def test_local_kb():
    """Test the local knowledge base search functionality"""
    if ('kb_id' not in globals() or kb_id is None) and local_kb_store:
        print(f"Local KB contains {len(local_kb_store)} documents:")
        for doc in local_kb_store:
            print(f"  - {doc['filename']}")
        
        # Test a search
        print("\nTesting search for 'diabetes':")
        result = kb_search("diabetes")
        print(f"Result length: {len(result)} characters")
        print("First 200 characters:", result[:200] + "..." if len(result) > 200 else result)
    else:
        print("Local KB not available or Bedrock KB is configured")

# Uncomment to test the local knowledge base
# test_local_kb()


In [23]:
# Load mock data from ../mock_data directory
def load_mock_data():
    import os
    
    
    mock_data = {}
    
    # Load all XML files from specified directory
    # Load all XML files from the specified directory
    xml_files = {}
    for filename in os.listdir(mock_data_path):
        if filename.lower().endswith('.xml'):
            key = filename.replace('.xml', '')
            xml_files[key] = filename
    
    for key, filename in xml_files.items():
        file_path = os.path.join(mock_data_path, filename)
        try:
            with open(file_path, 'r', encoding='utf-8') as f:
                mock_data[key] = f.read()
            print(f"✓ Loaded {filename}")
        except FileNotFoundError:
            print(f"✗ Could not find {filename}")
            mock_data[key] = ''
    
    return mock_data

# Load the mock data
mock_data = load_mock_data()
print(f"\nLoaded {len([v for v in mock_data.values() if v])} XML files successfully")


✓ Loaded mock_lab_results.xml
✓ Loaded mock_intelliscript_rx.xml
✓ Loaded mock_mib_response.xml
✓ Loaded mock_application.xml

Loaded 4 XML files successfully


In [24]:
# Updated detector with corrected tools and message handling
detector = Agent(
    system_prompt=PROMPT,
    tools=[retrieve],
    model=model_id,
)


In [25]:

def parse_xml(blob: str):
    return etree.fromstring(blob.encode('utf-8'))

def xml_to_text(elem):
    return ' '.join(elem.itertext())


In [26]:

    

def run_detection(application_xml='', rx_xml='', lab_xml='', mib_xml='', use_mock_data=True):
    """Utility to run the agent in‑notebook"""
    
    # Use mock data by default if no specific XML is provided
    if use_mock_data and not any([application_xml, rx_xml, lab_xml, mib_xml]):
        feeds = mock_data.copy()
        print("Using mock data from ../mock_data directory")
    else:
        feeds = {
            'application_xml': application_xml,
            'rx_xml': rx_xml,
            'lab_xml': lab_xml,
            'mib_xml': mib_xml,
        }
    
    xml_sections = []
    for k, xml in feeds.items():
        if not xml:
            continue
        try:
            # Parse XML and convert to text, limit to 4000 chars
            xml_text = xml_to_text(parse_xml(xml))[:4000]
            xml_sections.append(f"{k}:\n{xml_text}")
            print(f"✓ Processed {k} ({len(xml_text)} chars)")
        except Exception as e:
            print(f"✗ Error processing {k}: {e}")
            continue
    
    if not xml_sections:
        print("No valid XML data to process")
        return []
    
    # Create a simple string message with all XML data
    message = "Here are the XML feeds to analyze for impairments:\n\n" + "\n\n---\n\n".join(xml_sections)
    
    print(f"\nSending {len(xml_sections)} XML feeds to the detection agent...")
    
    # Call the agent with a simple string message (correct way according to Strands docs)
    res = detector(message)
    print("Agent response:")
    print(res)
    import re

    # Extract JSON from between ```json ... ``` tags if present
    res_str = res.__str__()
    json_match = re.search(r"```json\s*(.*?)\s*```", res_str, re.DOTALL)
    if json_match:
        res_str = json_match.group(1)
    return json.loads(res_str)

# Now you can run detection with mock data easily:
# sample_output = run_detection()  # Uses mock data automatically
# print(json.dumps(sample_output, indent=2))


In [27]:
# Run the impairment detection using mock data
print("=== Running Impairment Detection with Mock Data ===")
print(f"Knowledge Base Mode: {'Local' if 'kb_id' not in globals() or kb_id is None else 'Bedrock'}")
print()

try:
    # This will automatically use the mock data from ../mock_data
    results = run_detection()
    
    print("\n=== Detection Results ===")
    print(json.dumps(results, indent=2))
    
except Exception as e:
    print(f"\n\nError running detection: {e}")
    print("\nMake sure:")
    print("\n1. Your AWS credentials are configured")
    if 'kb_id' not in globals() or kb_id is None:
        print("\n2. The local underwriting manual files are accessible")
        print("\n3. You have access to the Bedrock embedding model")
    else:
        print("\n2. The KB_ID is set correctly")
        print("\n3. You have access to the specified Bedrock model and knowledge base")



=== Running Impairment Detection with Mock Data ===
Knowledge Base Mode: Local

Using mock data from ../mock_data directory
✓ Processed mock_lab_results (1127 chars)
✓ Processed mock_intelliscript_rx (1544 chars)
✓ Processed mock_mib_response (1002 chars)
✓ Processed mock_application (1982 chars)

Sending 4 XML feeds to the detection agent...
I'll analyze these XML feeds for impairments, identify scoring factors, and provide evidence for each impairment. Let me begin by scanning the feeds for potential health issues.

Based on an initial review, I can see evidence of several impairments:
1. Diabetes/Type 2 Diabetes
2. Hypertension 
3. Hyperlipidemia

Let me analyze each impairment systematically.

## Impairment 1: Diabetes

Let me search the knowledge base for diabetes rating information:
Tool #1: retrieve
I apologize for the error. Let me try a more general search:
Tool #2: retrieve
I apologize, but it seems we're having issues accessing the knowledge base. Let me work with the standa