# Step 4: Assessment (Granular)

This notebook demonstrates the **granular assessment** approach for evaluating extraction confidence using AWS Bedrock.

**Key Features:**
- Multiple focused inferences instead of single large inference
- Prompt caching for cost optimization
- Parallel processing for reduced latency
- Better handling of complex documents with many attributes

**Inputs:**
- Document object with extraction results from Step 3
- Granular assessment configuration
- Document classes with confidence thresholds

**Outputs:**
- Document with enhanced assessment results
- Detailed confidence scores and reasoning for each attribute
- Performance metrics showing granular processing benefits

## 1. Load Previous Step Data

In [None]:
import os
import json
import time
import logging
import boto3
from pathlib import Path

# Import IDP libraries
from idp_common.models import Document, Status
from idp_common import assessment

# Configure logging
logging.basicConfig(level=logging.WARNING)
logging.getLogger('idp_common.assessment.granular_service').setLevel(logging.INFO)
logging.getLogger('idp_common.bedrock.client').setLevel(logging.INFO)

print("Libraries imported successfully")
print("Granular assessment logging enabled")

In [None]:
# Load document from previous step
extraction_data_dir = Path(".data/step3_extraction")

# Load document object from JSON
document_path = extraction_data_dir / "document.json"
with open(document_path, 'r') as f:
    document = Document.from_json(f.read())

# Load configuration directly from config files
import yaml
config_dir = Path("config")
CONFIG = {}

# Load each configuration file
config_files = [
    "assessment_granular.yaml",  # Use granular config
    "classes.yaml"
]

for config_file in config_files:
    config_path = config_dir / config_file
    if config_path.exists():
        with open(config_path, 'r') as f:
            file_config = yaml.safe_load(f)
            CONFIG.update(file_config)
        print(f"Loaded {config_file}")
    else:
        print(f"Warning: {config_file} not found")

# Load environment info
env_path = extraction_data_dir / "environment.json"
with open(env_path, 'r') as f:
    env_info = json.load(f)

# Set environment variables
os.environ['AWS_REGION'] = env_info['region']
os.environ['METRIC_NAMESPACE'] = 'IDP-Modular-Pipeline'

print(f"Loaded document: {document.id}")
print(f"Document status: {document.status.value}")
print(f"Number of sections: {len(document.sections) if document.sections else 0}")
print(f"Loaded configuration sections: {list(CONFIG.keys())}")

## 2. Configure Granular Assessment Service

In [None]:
# Extract assessment configuration
assessment_config = CONFIG.get('assessment', {})
granular_config = assessment_config.get('granular', {})

print("=== Assessment Configuration ===")
print(f"Model: {assessment_config.get('model')}")
print(f"Temperature: {assessment_config.get('temperature')}")
print(f"Max Tokens: {assessment_config.get('max_tokens')}")
print(f"Default Confidence Threshold: {assessment_config.get('default_confidence_threshold')}")

print("\n=== Granular Configuration ===")
print(f"Enabled: {granular_config.get('enabled', False)}")
print(f"Max Workers: {granular_config.get('max_workers', 4)}")
print(f"Simple Batch Size: {granular_config.get('simple_batch_size', 3)}")
print(f"List Batch Size: {granular_config.get('list_batch_size', 1)}")
print(f"Enable Caching: {granular_config.get('enable_caching', True)}")
print(f"Enable Parallel: {granular_config.get('enable_parallel', True)}")

print("\n" + "*"*50)
print(f"System Prompt:\n{assessment_config.get('system_prompt')}")
print("*"*50)
print(f"Task Prompt (first 500 chars):\n{assessment_config.get('task_prompt', '')[:500]}...")
print("*"*50)

In [None]:
# Display document classes with confidence thresholds
classes = CONFIG.get('classes', [])
print(f"\nDocument Classes with Confidence Thresholds:")
for cls in classes:
    print(f"\n{cls['name']}:")
    for attr in cls.get('attributes', [])[:5]:  # Show first 5 attributes
        threshold = attr.get('confidence_threshold', 'default')
        attr_type = attr.get('attributeType', 'simple')
        print(f"  - {attr['name']} ({attr_type}): threshold = {threshold}")
        
        # Show nested attributes for groups and lists
        if attr_type == 'group':
            for group_attr in attr.get('groupAttributes', [])[:3]:
                group_threshold = group_attr.get('confidence_threshold', 'default')
                print(f"    • {group_attr['name']}: {group_threshold}")
        elif attr_type == 'list':
            list_template = attr.get('listItemTemplate', {})
            for item_attr in list_template.get('itemAttributes', [])[:3]:
                item_threshold = item_attr.get('confidence_threshold', 'default')
                print(f"    • {item_attr['name']}: {item_threshold}")
                
    if len(cls.get('attributes', [])) > 5:
        print(f"  ... and {len(cls.get('attributes', [])) - 5} more")

In [None]:
# Create assessment service - will automatically use granular if enabled
assessment_service = assessment.AssessmentService(config=CONFIG)

print(f"Assessment service initialized: {type(assessment_service._service).__name__}")
print(f"Service type: {'Granular' if 'Granular' in type(assessment_service._service).__name__ else 'Original'}")

## 3. Assess Extraction Results with Granular Approach

In [None]:
# Helper function to parse S3 URIs and load JSON
def parse_s3_uri(uri):
    parts = uri.replace("s3://", "").split("/")
    bucket = parts[0]
    key = "/".join(parts[1:])
    return bucket, key

def load_json_from_s3(uri):
    s3_client = boto3.client('s3')
    bucket, key = parse_s3_uri(uri)
    response = s3_client.get_object(Bucket=bucket, Key=key)
    content = response['Body'].read().decode('utf-8')
    return json.loads(content)

print("Helper functions defined")

In [None]:
print("Assessing extraction confidence using granular approach...")

if not document.sections:
    print("No sections found in document. Cannot proceed with assessment.")
else:
    assessment_results = []
    
    # Process each section that has extraction results (limit to first 2 to save time)
    sections_with_extractions = [s for s in document.sections if hasattr(s, 'extraction_result_uri') and s.extraction_result_uri]
    n = min(2, len(sections_with_extractions))
    
    print(f"Found {len(sections_with_extractions)} sections with extraction results")
    print(f"Processing first {n} sections for granular assessment...")
    
    for i, section in enumerate(sections_with_extractions[:n]):
        print(f"\n--- Granular Assessment: Section {i+1}/{n} ---")
        print(f"Section ID: {section.section_id}")
        print(f"Classification: {section.classification}")
        print(f"Extraction Result URI: {section.extraction_result_uri}")
        
        # Load extraction results to show what will be assessed
        try:
            extraction_data = load_json_from_s3(section.extraction_result_uri)
            extraction_results = extraction_data.get('inference_result', {})
            print(f"Attributes to assess: {list(extraction_results.keys())}")
            
            # Show list attribute sizes
            for attr_name, attr_value in extraction_results.items():
                if isinstance(attr_value, list):
                    print(f"  - {attr_name}: {len(attr_value)} items")
                elif isinstance(attr_value, dict):
                    print(f"  - {attr_name}: {len(attr_value)} sub-attributes")
                else:
                    print(f"  - {attr_name}: simple value")
        except Exception as e:
            print(f"Could not preview extraction results: {e}")
        
        # Process section assessment
        start_time = time.time()
        document = assessment_service.process_document_section(
            document=document,
            section_id=section.section_id
        )
        assessment_time = time.time() - start_time
        
        print(f"Granular assessment completed in {assessment_time:.2f} seconds")
        
        # Load updated results to show granular metadata
        try:
            updated_extraction_data = load_json_from_s3(section.extraction_result_uri)
            metadata = updated_extraction_data.get('metadata', {})
            
            if metadata.get('granular_assessment_used'):
                print(f"✅ Granular assessment confirmed")
                print(f"📊 Tasks created: {metadata.get('assessment_tasks_total', 'N/A')}")
                print(f"✅ Tasks successful: {metadata.get('assessment_tasks_successful', 'N/A')}")
                print(f"❌ Tasks failed: {metadata.get('assessment_tasks_failed', 'N/A')}")
                print(f"⏱️  Assessment time: {metadata.get('assessment_time_seconds', 'N/A'):.2f}s")
            else:
                print("⚠️  Original assessment used (granular not enabled)")
        except Exception as e:
            print(f"Could not load metadata: {e}")
        
        # Record results
        assessment_results.append({
            'section_id': section.section_id,
            'classification': section.classification,
            'processing_time': assessment_time,
            'extraction_result_uri': section.extraction_result_uri
        })
    
    print(f"\nGranular assessment complete for {n} sections.")

## 4. Display Granular Assessment Results

In [None]:
def display_assessment_data(data, attr_name="", indent="  "):
    """
    Recursively display assessment data supporting simple, group, and list attributes.
    Enhanced to show confidence thresholds from granular assessment.
    
    Args:
        data: Assessment data (can be dict with confidence, dict with nested attrs, or list)
        attr_name: Name of the current attribute for display
        indent: Current indentation level
    """
    if isinstance(data, dict):
        # Check if this is a confidence assessment (has 'confidence' key)
        if 'confidence' in data:
            confidence = data.get('confidence', 'N/A')
            threshold = data.get('confidence_threshold', 'N/A')
            reason = data.get('confidence_reason')
            
            # Color coding based on confidence vs threshold
            status = "✅" if isinstance(confidence, (int, float)) and isinstance(threshold, (int, float)) and confidence >= threshold else "⚠️"
            
            print(f"{indent}{status} {attr_name}: {confidence:.3f} (threshold: {threshold})")
            if reason:
                print(f"{indent}   Reason: {reason}")
        else:
            # This is a group attribute - iterate through sub-attributes
            print(f"{indent}{attr_name} (Group):")
            for sub_attr_name, sub_data in data.items():
                display_assessment_data(sub_data, sub_attr_name, indent + "  ")
                
    elif isinstance(data, list):
        # This is a list attribute - display each item
        print(f"{indent}{attr_name} (List - {len(data)} items):")
        for i, item_data in enumerate(data[:3]):  # Show first 3 items
            print(f"{indent}  📄 Item {i+1}:")
            if isinstance(item_data, dict):
                for item_attr_name, item_assessment in item_data.items():
                    display_assessment_data(item_assessment, item_attr_name, indent + "    ")
            else:
                print(f"{indent}    Unexpected item format: {type(item_data)}")
        
        if len(data) > 3:
            print(f"{indent}  ... and {len(data) - 3} more items")
    else:
        print(f"{indent}{attr_name}: Unexpected data type {type(data)}")

print("Enhanced assessment display helper function defined")

In [None]:
print("\n=== Granular Assessment Results ===")

if document.sections:
    sections_with_extractions = [s for s in document.sections if hasattr(s, 'extraction_result_uri') and s.extraction_result_uri]
    n = min(2, len(sections_with_extractions))
    
    for i, section in enumerate(sections_with_extractions[:n]):
        print(f"\n--- Section {section.section_id} ({section.classification}) ---")
        
        try:
            # Load the updated extraction results with assessment
            extraction_data = load_json_from_s3(section.extraction_result_uri)
            
            print(f"Extraction Result URI: {section.extraction_result_uri}")
            
            # Show granular assessment metadata
            metadata = extraction_data.get('metadata', {})
            if metadata.get('granular_assessment_used'):
                print(f"\n📊 Granular Assessment Metrics:")
                print(f"  • Total tasks: {metadata.get('assessment_tasks_total', 'N/A')}")
                print(f"  • Successful: {metadata.get('assessment_tasks_successful', 'N/A')}")
                print(f"  • Failed: {metadata.get('assessment_tasks_failed', 'N/A')}")
                print(f"  • Processing time: {metadata.get('assessment_time_seconds', 'N/A'):.2f}s")
                
            # Display the assessment results with support for nested structures
            explainability_info = extraction_data.get('explainability_info', [])
            if explainability_info:
                print("\n🎯 Assessment Results:")
                # The explainability_info is a list, get the first item which contains the assessments
                assessments = explainability_info[0] if isinstance(explainability_info, list) else explainability_info
                
                for attr_name, assessment_data in assessments.items():
                    display_assessment_data(assessment_data, attr_name)
            else:
                print("\nNo assessment results found")
                
        except Exception as e:
            print(f"Error loading assessment results: {e}")
            import traceback
            traceback.print_exc()
else:
    print("No sections to display")

## 5. Compare Performance Metrics

In [None]:
print("\n=== Performance Analysis ===")

if document.sections:
    sections_with_extractions = [s for s in document.sections if hasattr(s, 'extraction_result_uri') and s.extraction_result_uri]
    
    total_tasks = 0
    total_time = 0
    total_attributes = 0
    total_list_items = 0
    
    for section in sections_with_extractions[:2]:
        try:
            extraction_data = load_json_from_s3(section.extraction_result_uri)
            metadata = extraction_data.get('metadata', {})
            
            if metadata.get('granular_assessment_used'):
                tasks = metadata.get('assessment_tasks_total', 0)
                time_taken = metadata.get('assessment_time_seconds', 0)
                
                total_tasks += tasks
                total_time += time_taken
                
                # Count attributes and list items
                extraction_results = extraction_data.get('inference_result', {})
                for attr_name, attr_value in extraction_results.items():
                    total_attributes += 1
                    if isinstance(attr_value, list):
                        total_list_items += len(attr_value)
                
                print(f"\nSection {section.section_id}:")
                print(f"  • Assessment tasks: {tasks}")
                print(f"  • Processing time: {time_taken:.2f}s")
                print(f"  • Avg time per task: {time_taken/tasks:.3f}s" if tasks > 0 else "  • No tasks")
        except Exception as e:
            print(f"Error analyzing section {section.section_id}: {e}")
    
    if total_tasks > 0:
        print(f"\n📈 Overall Performance:")
        print(f"  • Total assessment tasks: {total_tasks}")
        print(f"  • Total processing time: {total_time:.2f}s")
        print(f"  • Average time per task: {total_time/total_tasks:.3f}s")
        print(f"  • Total attributes assessed: {total_attributes}")
        print(f"  • Total list items assessed: {total_list_items}")
        
        print(f"\n💡 Granular Benefits:")
        print(f"  • Focused assessments: Each task handles 1-3 attributes")
        print(f"  • Parallel processing: Multiple tasks run concurrently")
        print(f"  • Prompt caching: Reduces token costs by 80-90%")
        print(f"  • Better accuracy: Smaller prompts = better LLM performance")
else:
    print("No sections available for performance analysis")

## 6. Save Results for Next Step

In [None]:
# Create data directory for this step
data_dir = Path(".data/step4_assessment_granular")
data_dir.mkdir(parents=True, exist_ok=True)

# Save updated document object as JSON
document_path = data_dir / "document.json"
with open(document_path, 'w') as f:
    f.write(document.to_json())

# Save configuration (pass through)
config_path = data_dir / "config.json"
with open(config_path, 'w') as f:
    json.dump(CONFIG, f, indent=2)

# Save environment info (pass through)
env_path = data_dir / "environment.json"
with open(env_path, 'w') as f:
    json.dump(env_info, f, indent=2)

# Save granular assessment-specific results summary
assessment_summary = {
    'approach': 'granular',
    'model_used': assessment_config.get('model'),
    'default_confidence_threshold': assessment_config.get('default_confidence_threshold'),
    'granular_config': granular_config,
    'sections_assessed': len(assessment_results) if 'assessment_results' in locals() else 0,
    'total_sections_with_extractions': len([s for s in (document.sections or []) if hasattr(s, 'extraction_result_uri') and s.extraction_result_uri]),
    'assessment_results': assessment_results if 'assessment_results' in locals() else [],
    'sections_status': [
        {
            'section_id': section.section_id,
            'classification': section.classification,
            'has_extraction': hasattr(section, 'extraction_result_uri') and section.extraction_result_uri is not None,
            'extraction_result_uri': getattr(section, 'extraction_result_uri', None)
        } for section in (document.sections or [])
    ]
}

assessment_summary_path = data_dir / "assessment_summary.json"
with open(assessment_summary_path, 'w') as f:
    json.dump(assessment_summary, f, indent=2)

print(f"Saved document to: {document_path}")
print(f"Saved configuration to: {config_path}")
print(f"Saved environment info to: {env_path}")
print(f"Saved granular assessment summary to: {assessment_summary_path}")

## 7. Summary

In [None]:
sections_assessed = len(assessment_results) if 'assessment_results' in locals() else 0
sections_with_extractions = len([s for s in (document.sections or []) if hasattr(s, 'extraction_result_uri') and s.extraction_result_uri])

print("=== Step 4: Granular Assessment Complete ===")
print(f"✅ Document processed: {document.id}")
print(f"✅ Sections assessed: {sections_assessed} of {sections_with_extractions} with extractions")
print(f"✅ Total sections: {len(document.sections) if document.sections else 0}")
print(f"✅ Model used: {assessment_config.get('model')}")
print(f"✅ Default threshold: {assessment_config.get('default_confidence_threshold')}")
print(f"✅ Granular enabled: {granular_config.get('enabled', False)}")
print(f"✅ Data saved to: .data/step4_assessment_granular/")
meteringkey = f"GranularAssessment/bedrock/{assessment_config.get('model')}"
print(f"✅ Token usage: {document.metering[meteringkey]}")
print("\n📌 Next step: Run step5_summarization.ipynb")
print("\n📋 Granular Assessment Features Demonstrated:")
print("  • Multiple focused inferences instead of single large inference")
print("  • Prompt caching for cost optimization (80-90% savings)")
print("  • Parallel processing for reduced latency")
print("  • Enhanced confidence threshold tracking")
print("  • Detailed performance metrics and task breakdown")
print("  • Better handling of complex documents with many attributes")