# Lambda Error Analysis with Strands Agents

This notebook demonstrates how to deploy and use an AI-powered error analysis system for AWS Lambda functions. The system automatically analyzes Lambda failures using Strands Agents and Amazon Bedrock to provide intelligent root cause analysis and actionable recommendations.

## What This System Does

When a Lambda function fails, instead of getting generic error messages like:
```
AttributeError: 'NoneType' object has no attribute 'lower'
```

You get intelligent analysis like:
```
Root Cause: Missing null check on 'email' field before calling .lower() method.
The user_data dictionary contains email=None, causing AttributeError when attempting
string operations.

Recommendation: Add null validation before processing:
  if user_data.get('email'):
      email = user_data['email'].lower().strip()
  else:
      raise ValueError('Email is required')

Confidence: 0.92 (Very High)
```

## Prerequisites

Before starting, ensure you have:

1. **AWS Account** with appropriate permissions
2. **AWS CLI** installed and configured
3. **Node.js** 18+ and npm
4. **Python** 3.12+
5. **Docker** installed and running (for building Lambda layers)
6. **AWS CDK** installed globally: `npm install -g aws-cdk`
7. **Bedrock Model Access** - Ensure Claude Sonnet models are enabled in your AWS account

## Project Structure

```
lambda-error-analysis/
‚îú‚îÄ‚îÄ cdk/
‚îÇ   ‚îú‚îÄ‚îÄ lambda/
‚îÇ   ‚îÇ   ‚îú‚îÄ‚îÄ sample-business-function/    # Demo Lambda with @decorator
‚îÇ   ‚îÇ   ‚îÇ   ‚îú‚îÄ‚îÄ lambda_function.py       # User data processing function
‚îÇ   ‚îÇ   ‚îÇ   ‚îú‚îÄ‚îÄ decorator.py             # @error_capture decorator
‚îÇ   ‚îÇ   ‚îÇ   ‚îú‚îÄ‚îÄ test-events.json         # Test payloads
‚îÇ   ‚îÇ   ‚îÇ   ‚îî‚îÄ‚îÄ README.md                # Function documentation
‚îÇ   ‚îÇ   ‚îî‚îÄ‚îÄ error-analyzer-agent/        # AI-powered error analyzer
‚îÇ   ‚îÇ       ‚îú‚îÄ‚îÄ lambda_function.py       # Main handler
‚îÇ   ‚îÇ       ‚îî‚îÄ‚îÄ agent.py                 # Strands Agent with 3 tools
‚îÇ   ‚îú‚îÄ‚îÄ layers/
‚îÇ   ‚îÇ   ‚îî‚îÄ‚îÄ strands-layer/               # Strands SDK dependencies
‚îÇ   ‚îÇ       ‚îî‚îÄ‚îÄ build.sh                 # Layer build script
‚îÇ   ‚îú‚îÄ‚îÄ stacks/
‚îÇ   ‚îÇ   ‚îî‚îÄ‚îÄ lambda-error-analysis-stack.ts  # CDK infrastructure
‚îÇ   ‚îî‚îÄ‚îÄ constant.ts                      # Project constants
‚îú‚îÄ‚îÄ knowledge_base/                      # AI knowledge base content
‚îÇ   ‚îú‚îÄ‚îÄ lambda-error-patterns.md         # Common Lambda failures
‚îÇ   ‚îú‚îÄ‚îÄ aws-service-errors.md            # AWS error codes reference
‚îÇ   ‚îú‚îÄ‚îÄ troubleshooting-guide.md         # Debugging methodology
‚îÇ   ‚îú‚îÄ‚îÄ best-practices.md                # Lambda best practices
‚îÇ   ‚îú‚îÄ‚îÄ common-errors.md                 # Common error solutions
‚îÇ   ‚îî‚îÄ‚îÄ README.md                        # Knowledge base docs
‚îú‚îÄ‚îÄ images/                            # Architecture diagrams
‚îÇ   ‚îú‚îÄ‚îÄ generate_diagram.py              # Diagram generation script
‚îÇ   ‚îú‚îÄ‚îÄ requirements.txt                 # Diagram dependencies
‚îÇ   ‚îú‚îÄ‚îÄ lambda-error-analysis-architecture.png
‚îÇ   ‚îú‚îÄ‚îÄ lambda-error-analysis-architecture-transparent.png
‚îÇ   ‚îî‚îÄ‚îÄ README.md                        # Diagram documentation
‚îú‚îÄ‚îÄ cdk-app.ts                           # CDK app entry point
‚îú‚îÄ‚îÄ cdk.json                             # CDK configuration
‚îú‚îÄ‚îÄ package.json                         # NPM dependencies
‚îú‚îÄ‚îÄ tsconfig.json                        # TypeScript config
‚îî‚îÄ‚îÄ deploy-agent.ipynb                   # This deployment notebook
‚îî‚îÄ‚îÄ README.md                            # Comprehensive documentation
```

## Setup

### Step 1: Install Dependencies

In [None]:
# Install CDK dependencies
!npm install

In [None]:
# Install Python dependencies
!pip install pandas boto3 -q

### Step 2: Verify Docker is Running

Docker is required for:
- Building the Strands Lambda layer
- Knowledge Base custom resource Lambda

Let's check if Docker is running before proceeding:

In [None]:
import subprocess

def check_docker():
    """Check if Docker daemon is running"""
    try:
        result = subprocess.run(
            ['docker', 'info'],
            capture_output=True,
            text=True,
            timeout=5
        )
        if result.returncode == 0:
            print("‚úÖ Docker is running")
            # Get Docker version
            version_result = subprocess.run(
                ['docker', '--version'],
                capture_output=True,
                text=True
            )
            print(f"   {version_result.stdout.strip()}")
            return True
        else:
            print("‚ùå Docker is not running")
            print("\n‚ö†Ô∏è  Please start Docker Desktop and wait for it to fully start.")
            print("   Then run this cell again.")
            return False
    except FileNotFoundError:
        print("‚ùå Docker is not installed")
        print("\n‚ö†Ô∏è  Please install Docker Desktop from: https://www.docker.com/products/docker-desktop")
        return False
    except subprocess.TimeoutExpired:
        print("‚ùå Docker command timed out")
        print("\n‚ö†Ô∏è  Docker might be starting. Please wait and try again.")
        return False
    except Exception as e:
        print(f"‚ùå Error checking Docker: {e}")
        return False

docker_running = check_docker()

if not docker_running:
    print("\n" + "="*60)
    print("‚õî STOP: Docker must be running before continuing")
    print("="*60)

### Step 3: Verify AWS Credentials

Check if AWS credentials are configured and accessible:

In [None]:
import os
import boto3

# Check if AWS credentials are available
try:
    sts = boto3.client('sts')
    identity = sts.get_caller_identity()
    session = boto3.Session()
    
    # Export environment variables for CDK
    os.environ['CDK_DEFAULT_ACCOUNT'] = identity['Account']
    os.environ['CDK_DEFAULT_REGION'] = session.region_name
    
    print(f"‚úÖ AWS credentials detected!")
    print(f"   Account: {identity['Account']}")
    print(f"   Region: {session.region_name}")
    print(f"\n‚úÖ Environment variables set for CDK:")
    print(f"   CDK_DEFAULT_ACCOUNT={identity['Account']}")
    print(f"   CDK_DEFAULT_REGION={session.region_name}")
    
except Exception as e:
    print(f"‚ùå AWS credentials not found: {e}")
    print(f"\n‚ö†Ô∏è  Please configure AWS credentials before continuing.")
    print(f"   See: https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-files.html")


### Step 4: Bootstrap CDK

In [None]:
# Bootstrap CDK (if not done already)
!npx cdk bootstrap

## Architecture

![Lambda Error Analysis Architecture](images/lambda-error-analysis-architecture.png)

### Components:

1. **Sample Business Function** - Lambda with @error_capture decorator that captures failures
2. **EventBridge** - Routes failure events to the analyzer
3. **Error Analyzer Agent** - AI-powered analysis using Strands SDK
4. **Bedrock Knowledge Base** - Searchable documentation and error patterns
5. **DynamoDB** - Stores analysis results with confidence scoring
6. **S3** - Source code storage for analysis

### Data Flow:

1. Lambda function fails ‚Üí @decorator captures exception
2. EventBridge routes failure event to Error Analyzer
3. Agent uses 3 tools to gather context:
   - `fetch_source_code()` - Gets Lambda source from S3
   - `fetch_cloudwatch_logs()` - Retrieves execution logs
   - `search_knowledge_base()` - Queries documentation
4. Claude Sonnet analyzes with confidence scoring
5. Enhanced analysis stored in DynamoDB

## Code Walkthrough

### 1. Sample Business Function

Simulates a digital banking user registration service that validates customer data, enriches profiles, and assigns membership tiers. Contains 8 intentional bugs (missing fields, null handling, type conversions, edge cases) that represent common validation scenarios developers face during development.

In [None]:
# View the sample business function
with open('cdk/lambda/sample-business-function/lambda_function.py', 'r') as f:
    print(f.read())

**Key Points:**
- Processes user registration data
- Validates, enriches, and calculates user tier
- Contains realistic bugs: missing null checks, type errors, division by zero
- Uses `@error_capture` decorator to capture failures

### 2. The @error_capture Decorator

This decorator automatically captures exceptions and publishes events to EventBridge:

In [None]:
# View the decorator (key sections)
print("""
The @error_capture decorator:

1. Wraps your Lambda handler
2. Captures exceptions with full stack traces
3. Publishes events to EventBridge:
   - TaskSucceeded (statusCode 200-299)
   - TaskFailed (exceptions or statusCode >= 400)
   - TaskUpdate (statusCode 102)

4. Includes Lambda execution context:
   - Request ID
   - Function name
   - CloudWatch log group/stream

Usage:
  @error_capture(logger, eventbridge, event_bus_name, True, True, True)
  def lambda_handler(event, context):
      # Your business logic
      return {"statusCode": 200}
""")

### 3. Error Analyzer Agent

The heart of the system - an AI agent with 3 specialized tools:

In [None]:
# View the agent code (key sections)
with open('cdk/lambda/error-analyzer-agent/agent.py', 'r') as f:
    content = f.read()
    # Show the tool definitions
    print("=" * 80)
    print("TOOL 1: fetch_source_code()")
    print("=" * 80)
    start = content.find('@tool\ndef fetch_source_code')
    end = content.find('\n\n@tool', start + 1)
    print(content[start:end])
    print("\n" + "=" * 80)
    print("TOOL 2: fetch_cloudwatch_logs()")
    print("=" * 80)
    start = content.find('@tool\ndef fetch_cloudwatch_logs')
    end = content.find('\n\n@tool', start + 1)
    print(content[start:end])
    print("\n" + "=" * 80)
    print("TOOL 3: search_knowledge_base()")
    print("=" * 80)
    start = content.find('@tool\ndef search_knowledge_base')
    end = content.find('\n\ndef analyze_error', start)
    print(content[start:end])

**Agent Capabilities:**

1. **fetch_source_code()** - Retrieves Lambda source code from S3 for analysis
2. **fetch_cloudwatch_logs()** - Gets execution logs filtered by request ID
3. **search_knowledge_base()** - Queries Bedrock Knowledge Base for error patterns

The agent uses Claude Sonnet with extended thinking to:
- Analyze error context from all three sources
- Identify root cause with evidence
- Provide specific, actionable recommendations
- Calculate confidence score based on available evidence

## Deploy the Stack

In [None]:
# Build TypeScript and Lambda layer
!npm run deploy

This will:
1. Build TypeScript CDK code
2. Build Strands Lambda layer using Docker
3. Deploy all AWS resources:
   - 2 Lambda functions
   - EventBridge rule
   - S3 bucket with source code and knowledge base
   - DynamoDB table for analysis results
   - Bedrock Knowledge Base
   - IAM roles and permissions

## Sync Knowledge Base with Latest Documentation

In [None]:
import boto3
import time

# Initialize clients
bedrock_agent = boto3.client('bedrock-agent')
cfn_client = boto3.client('cloudformation')

# Get stack outputs
stack_name = 'LambdaErrorAnalysisStack'
try:
    response = cfn_client.describe_stacks(StackName=stack_name)
    outputs = response['Stacks'][0]['Outputs']
    
    # Extract Knowledge Base ID and Data Source ID from outputs
    kb_id = next(o['OutputValue'] for o in outputs if o['OutputKey'] == 'KnowledgeBaseId')
    ds_id = next(o['OutputValue'] for o in outputs if o['OutputKey'] == 'DataSourceId')
    
    print(f"üìö Knowledge Base ID: {kb_id}")
    print(f"üìÅ Data Source ID: {ds_id}")
    
except Exception as e:
    print(f"‚ùå Error getting stack outputs: {e}")
    raise

# Start ingestion job
print(f"\nüöÄ Starting ingestion job...")
try:
    response = bedrock_agent.start_ingestion_job(
        knowledgeBaseId=kb_id,
        dataSourceId=ds_id,
        description="Sync Lambda source code after deployment"
    )
    
    ingestion_job_id = response['ingestionJob']['ingestionJobId']
    print(f"‚úÖ Job ID: {ingestion_job_id}")
    print(f"   Status: {response['ingestionJob']['status']}")
    
except Exception as e:
    print(f"‚ùå Error starting ingestion: {e}")
    raise

# Monitor progress
print("\n‚è≥ Monitoring ingestion progress...")
dots = ""
while True:
    status_response = bedrock_agent.get_ingestion_job(
        knowledgeBaseId=kb_id,
        dataSourceId=ds_id,
        ingestionJobId=ingestion_job_id
    )
    
    status = status_response['ingestionJob']['status']
    
    # Progress indicator
    dots += "."
    if len(dots) > 50:
        dots = "."
    print(f"\rStatus: {status} {dots}", end="", flush=True)
    
    if status in ['COMPLETE', 'FAILED', 'STOPPED']:
        print()  # New line
        break
    
    time.sleep(10)

# Print results
print("\n" + "="*70)
if status == 'COMPLETE':
    stats = status_response['ingestionJob'].get('statistics', {})
    print("‚úÖ INGESTION COMPLETED SUCCESSFULLY!")
    print("="*70)
    print(f"üìÑ Documents Scanned:    {stats.get('numberOfDocumentsScanned', 0)}")
    print(f"‚ú® Documents Indexed:    {stats.get('numberOfNewDocumentsIndexed', 0)}")
    print(f"üîÑ Documents Modified:   {stats.get('numberOfModifiedDocumentsIndexed', 0)}")
    print(f"üóëÔ∏è  Documents Deleted:    {stats.get('numberOfDocumentsDeleted', 0)}")
    print(f"‚ùå Documents Failed:     {stats.get('numberOfDocumentsFailed', 0)}")
    print("="*70)
    print("\nüéâ Knowledge Base is ready for error analysis!")
else:
    print(f"‚ùå INGESTION FAILED: {status}")
    print("="*70)
    if 'failureReasons' in status_response['ingestionJob']:
        print("Failure Reasons:")
        for reason in status_response['ingestionJob']['failureReasons']:
            print(f"  - {reason}")
    print("="*70)


## Discover Deployed Functions

In [None]:
# Get deployed function names
import boto3
import json

lambda_client = boto3.client('lambda')
region = boto3.session.Session().region_name

# List our functions (filter out CDK helpers)
functions = lambda_client.list_functions()
helpers = ['LogRetention', 'OpenSearchIndexCRProvider', 'CustomS3AutoDeleteObjects', 'CustomCDKBucketDeployment']
our_functions = [f for f in functions['Functions'] 
                 if 'LambdaErrorAnalysis' in f['FunctionName'] 
                 and not any(h in f['FunctionName'] for h in helpers)]

print("‚úÖ Deployed Lambda Functions:")
for func in our_functions:
    print(f"   ‚Ä¢ {func['FunctionName']}")
    if 'sample-business' in func['FunctionName']:
        business_function_name = func['FunctionName']
    elif 'error-analyzer' in func['FunctionName']:
        analyzer_function_name = func['FunctionName']

## Helper Functions for Testing

In [None]:
import time
from datetime import datetime, timedelta

def extract_execution_logs(all_events: list, request_id: str) -> list:
    """Extract all logs between START and REPORT for a specific Lambda execution"""
    execution_events = []
    start_found = False
    
    for event in all_events:
        message = event['message']
        
        # Look for START marker
        if f"START RequestId: {request_id}" in message:
            start_found = True
            execution_events.append(event)
            continue
        
        # If we found START, collect all logs until REPORT
        if start_found:
            execution_events.append(event)
            
            # Stop at REPORT marker (end of execution)
            if f"REPORT RequestId: {request_id}" in message:
                break
    
    return execution_events

def wait_for_analyzer_logs(analyzer_function_name, request_id, timeout=60, check_interval=5):
    """Wait for analyzer agent to process the error using request ID"""
    logs_client = boto3.client('logs')
    log_group = f"/aws/lambda/{analyzer_function_name}"
    
    print(f"‚è≥ Agent analyzing error (max {timeout}s)...", end='', flush=True)
    
    start_time = time.time()
    end_time = datetime.now()
    
    while time.time() - start_time < timeout:
        try:
            # Get recent log streams
            streams_response = logs_client.describe_log_streams(
                logGroupName=log_group,
                orderBy='LastEventTime',
                descending=True,
                limit=5
            )
            
            # Search in recent streams for our request ID
            for stream in streams_response.get('logStreams', []):
                stream_name = stream['logStreamName']
                
                events_response = logs_client.get_log_events(
                    logGroupName=log_group,
                    logStreamName=stream_name,
                    limit=100,
                    startFromHead=False
                )
                
                # Check if this stream has our request ID with analysis complete
                for event in events_response.get('events', []):
                    if request_id in event['message'] and 'Agent Analysis Result' in event['message']:
                        print(" ‚úÖ")
                        return True
        except:
            pass
        
        print(".", end='', flush=True)
        time.sleep(check_interval)
    
    print(" ‚ö†Ô∏è Timeout")
    return False

def get_analyzer_logs(analyzer_function_name, business_request_id, minutes=2):
    """Fetch and display analyzer logs for specific business function execution"""
    logs_client = boto3.client('logs')
    log_group = f"/aws/lambda/{analyzer_function_name}"
    
    end_time = datetime.now()
    start_time = end_time - timedelta(minutes=minutes)
    
    print(f"\nüìã Analyzer Logs (last {minutes} min):")
    print("=" * 100)
    
    try:
        # Get recent log streams
        streams_response = logs_client.describe_log_streams(
            logGroupName=log_group,
            orderBy='LastEventTime',
            descending=True,
            limit=10
        )
        
        found_analysis = False
        
        # Search through recent streams
        for stream in streams_response.get('logStreams', []):
            stream_name = stream['logStreamName']
            
            events_response = logs_client.get_log_events(
                logGroupName=log_group,
                logStreamName=stream_name,
                startTime=int(start_time.timestamp() * 1000),
                endTime=int(end_time.timestamp() * 1000),
                limit=300  # Increased limit
            )
            
            all_events = events_response.get('events', [])
            
            # Check if this stream contains our business request ID
            has_our_request = any(business_request_id in e['message'] for e in all_events)
            
            if has_our_request:
                found_analysis = True
                
                # Find the analyzer's request ID (START marker)
                analyzer_request_id = None
                for event in all_events:
                    if 'START RequestId:' in event['message']:
                        analyzer_request_id = event['message'].split('START RequestId: ')[1].split()[0]
                        break
                
                print(f"\nüîç Found analysis for business request {business_request_id}")
                print(f"   Analyzer execution: {analyzer_request_id}")
                print(f"   Log stream: {stream_name}\n")
                
                # Extract full execution logs using the analyzer's request ID
                if analyzer_request_id:
                    execution_logs = extract_execution_logs(all_events, analyzer_request_id)
                else:
                    # Fallback: use all events if we can't find request ID
                    execution_logs = all_events
                
                # Display ALL logs from this execution
                for event in execution_logs:
                    ts = datetime.fromtimestamp(event['timestamp'] / 1000)
                    msg = event['message'].strip()
                    
                    # Highlight important messages
                    if any(m in msg for m in ['ü§ñ', 'üìä', 'Root Cause', 'Confidence', 'Agent Analysis Result']):
                        print(f"\n{'='*100}")
                        print(f"[{ts.strftime('%H:%M:%S')}] {msg}")
                        print(f"{'='*100}\n")
                    elif 'Tool #' in msg or 'fetch_' in msg or 'search_' in msg:
                        print(f"  ‚Üí [{ts.strftime('%H:%M:%S')}] {msg}")
                    elif 'START RequestId' in msg or 'END RequestId' in msg or 'REPORT RequestId' in msg:
                        print(f"[{ts.strftime('%H:%M:%S')}] {msg}")
                    else:
                        # Show all other logs too
                        print(f"[{ts.strftime('%H:%M:%S')}] {msg}")
                
                break  # Found our analysis, stop searching
        
        if not found_analysis:
            print(f"‚ö†Ô∏è No analysis found for request ID {business_request_id}")
            print(f"   The analyzer may still be processing or the request ID may be incorrect.")
    
    except Exception as e:
        print(f"‚ùå Error: {e}")



## Test with Different Scenarios

**üí° Tip:** If log output appears truncated:
- Click on the output area
- Right-click and select "Enable Scrolling for Outputs"
- Or double-click the output to expand it fully

### Test 1: Missing Email Field (KeyError)

In [None]:
# Test 1: Missing Email Field
test_payload_1 = {
    "user_data": {
        "profile": {"name": "John Doe"},
        "age": 30
    }
}

print("üß™ Test 1: Missing Email (KeyError)")
print("=" * 80)

response = lambda_client.invoke(
    FunctionName=business_function_name,
    InvocationType='RequestResponse',
    Payload=json.dumps(test_payload_1)
)

request_id = response['ResponseMetadata']['RequestId']
print(f"‚úÖ Invoked - Request ID: {request_id}")

# Print Lambda response
response_payload = json.loads(response['Payload'].read())
print(f"\nüì§ Lambda Response:")
print(json.dumps(response_payload, indent=2))

wait_for_analyzer_logs(analyzer_function_name, request_id)
get_analyzer_logs(analyzer_function_name, request_id)

### Test 2: Null Email (AttributeError)

In [None]:
# Test 2: Null Email Value
test_payload_2 = {
    "user_data": {
        "email": None,
        "profile": {"name": "Jane Smith"},
        "age": 25
    }
}

print("\n\nüß™ Test 2: Null Email (AttributeError)")
print("=" * 80)

response = lambda_client.invoke(
    FunctionName=business_function_name,
    InvocationType='RequestResponse',
    Payload=json.dumps(test_payload_2)
)

request_id = response['ResponseMetadata']['RequestId']
print(f"‚úÖ Invoked - Request ID: {request_id}")

# Print Lambda response
response_payload = json.loads(response['Payload'].read())
print(f"\nüì§ Lambda Response:")
print(json.dumps(response_payload, indent=2))

wait_for_analyzer_logs(analyzer_function_name, request_id)
get_analyzer_logs(analyzer_function_name, request_id)

### Enable Claude Sonnet 4 for Enhanced Analysis

In [None]:
# Enable Claude Sonnet 4 with Interleaved Thinking
print("\n\nüîß Enabling Claude Sonnet 4")
print("=" * 80)

try:
    config = lambda_client.get_function_configuration(FunctionName=analyzer_function_name)
    env_vars = config.get('Environment', {}).get('Variables', {})
    env_vars['USE_SONNET_4'] = 'true'
    
    lambda_client.update_function_configuration(
        FunctionName=analyzer_function_name,
        Environment={'Variables': env_vars}
    )
    
    print("‚úÖ Updated USE_SONNET_4=true")
    print("‚è≥ Waiting for update...", end='', flush=True)
    
    waiter = lambda_client.get_waiter('function_updated')
    waiter.wait(FunctionName=analyzer_function_name)
    
    print(" Done!")
except Exception as e:
    print(f"‚ùå Error: {e}")

### Test 3: Division by Zero

In [None]:
# Test 3: Division by Zero (with Sonnet 4)
test_payload_3 = {
    "user_data": {
        "email": "asif.mithawala@example.com",
        "profile": {"name": "Asif Mithawala"},
        "age": 0,
        "initial_deposit": 1000,
        "registration_date": "2024-01-15",
        "settings": {"preferences": {"notifications": ["email"]}}
    }
}

print("\n\nüß™ Test 3: Zero Age / Division by Zero (Sonnet 4)")
print("=" * 80)

response = lambda_client.invoke(
    FunctionName=business_function_name,
    InvocationType='RequestResponse',
    Payload=json.dumps(test_payload_3)
)

request_id = response['ResponseMetadata']['RequestId']
print(f"‚úÖ Invoked - Request ID: {request_id}")

# Print Lambda response
response_payload = json.loads(response['Payload'].read())
print(f"\nüì§ Lambda Response:")
print(json.dumps(response_payload, indent=2))

wait_for_analyzer_logs(analyzer_function_name, request_id)
get_analyzer_logs(analyzer_function_name, request_id)

## View DynamoDB Analysis Results

Let's query the DynamoDB table to see all the stored analysis results:

In [None]:
import pandas as pd
from decimal import Decimal
from IPython.display import display

dynamodb = boto3.resource('dynamodb')

# Find our analysis table
tables = list(dynamodb.tables.all())
analysis_table = None

for table in tables:
    if 'LambdaErrorAnalysis-error-analysis' in table.name:
        analysis_table = table
        break

if analysis_table:
    # Scan table with pagination
    response = analysis_table.scan()
    items = response['Items']
    
    while 'LastEvaluatedKey' in response:
        response = analysis_table.scan(ExclusiveStartKey=response['LastEvaluatedKey'])
        items.extend(response['Items'])
    
    # Convert Decimal to float
    def decimal_to_float(obj):
        if isinstance(obj, list):
            return [decimal_to_float(i) for i in obj]
        elif isinstance(obj, dict):
            return {k: decimal_to_float(v) for k, v in obj.items()}
        elif isinstance(obj, Decimal):
            return float(obj)
        return obj
    
    items = [decimal_to_float(item) for item in items]
    
    if items:
        df = pd.DataFrame(items)
        
        # Sort by timestamp
        if 'timestamp' in df.columns:
            df = df.sort_values('timestamp', ascending=False)
        
        # Select columns
        columns = ['error_id', 'timestamp', 'function_name', 'agent_analysis', 'analysis_duration_mm_ss']
        df = df[[col for col in columns if col in df.columns]]
        
        # Truncate error_id only
        if 'error_id' in df.columns:
            df['error_id'] = df['error_id'].str[:35]
        
        # Set pandas display options to show full text
        pd.set_option('display.max_colwidth', None)
        
        display(df)


## Summary

You've successfully:

1. ‚úÖ Deployed the Lambda Error Analysis system
2. ‚úÖ Tested with 3 different failure scenarios
3. ‚úÖ Viewed AI-powered error analysis in CloudWatch logs
4. ‚úÖ Queried analysis results from DynamoDB

### Key Takeaways:

- The `@error_capture` decorator automatically captures failures
- EventBridge routes errors to the AI analyzer
- The Strands Agent uses 3 tools to gather comprehensive context
- Claude Sonnet provides intelligent analysis with confidence scoring
- All analysis is stored in DynamoDB for historical tracking

### Next Steps:

1. **Add to your Lambda functions** - Copy the decorator to your own functions
2. **Customize the agent** - Modify system prompts for your use case
3. **Expand knowledge base** - Add your organization's error patterns
4. **Monitor trends** - Use DynamoDB data to identify recurring issues

## Cleanup

When you're done testing, clean up the resources:

In [None]:
# Destroy the CDK stack
!npx cdk destroy --force