# Amazon Bedrock AgentCore Policy - Lambda Target Policy Enforcement

## Overview

This notebook demonstrates **Cedar policy enforcement** on Lambda targets using JWT claims.

### Prerequisites

Run **`01-Setup-Gateway-Lambda.ipynb`** first to create the Gateway and Cognito resources.

### Learning Objectives

- Create Policy Engine and attach to Gateway
- Add custom claims to JWT tokens using Cognito Lambda triggers
- Implement Cedar policies for attribute-based access control (ABAC)
- Test department-based and group-based access control patterns

### Architecture

```
                                ‚îå‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îê
                                ‚îÇ  Policy Engine        ‚îÇ
                                ‚îÇ  (Cedar Policies)     ‚îÇ
                                ‚îî‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚î¨‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îò
                                            ‚îÇ Connected
                                            ‚ñº
‚îå‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îê             ‚îå‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îê             ‚îå‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îê
‚îÇ   Amazon        ‚îÇ  JWT Token  ‚îÇ  AgentCore Gateway    ‚îÇ             ‚îÇ   Lambda    ‚îÇ
‚îÇ   Cognito       ‚îÇ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ>‚îÇ  + Policy Evaluation  ‚îÇ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ>‚îÇ   Target    ‚îÇ
‚îÇ   + Lambda      ‚îÇ  + Claims   ‚îÇ                       ‚îÇ  If Allowed ‚îÇ   (Tool)    ‚îÇ
‚îî‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îò             ‚îî‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îò             ‚îî‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îò
```

---

## Part 1: Environment Setup

In [None]:
import json
import sys
import time
from pathlib import Path

import boto3

# Add parent directory to path for common imports
sys.path.insert(0, str(Path.cwd().parent))

# Import utility functions from common folder
from common.auth_utils import (
    get_bearer_token,
    decode_token,
    make_gateway_request,
    analyze_response,
    display_test_result,
)
from common.gateway_utils import (
    attach_policy_engine_to_gateway,
)
from common.policy_utils import (
    get_policy_engine,
    create_cedar_policy,
    wait_for_policy_active,
    delete_policy,
    cleanup_existing_policies,
    ensure_policy_engine,
)
from common.cognito_utils import (
    create_lambda_function,
    configure_cognito_trigger,
)

print("‚úì Libraries loaded")

### Load Configuration

In [None]:
# Load configuration file
config_path = Path.cwd() / "gateway_config.json"

if not config_path.exists():
    raise FileNotFoundError(
        "gateway_config.json not found. Run 01-Setup-Gateway-Lambda.ipynb first."
    )

with open(config_path, "r") as f:
    CONFIG = json.load(f)

# Extract key values
REGION = CONFIG["region"]
GATEWAY_URL = CONFIG["gateway_url"]
GATEWAY_ID = CONFIG["gateway_id"]
GATEWAY_ARN = CONFIG["gateway_arn"]
USER_POOL_ID = CONFIG["client_info"]["user_pool_id"]
CLIENT_ID = CONFIG["client_info"]["client_id"]
CLIENT_SECRET = CONFIG["client_info"]["client_secret"]
TOKEN_ENDPOINT = CONFIG["client_info"]["token_endpoint"]
POLICY_ENGINE_ID = CONFIG.get("policy_engine_id")

print("‚úì Configuration loaded")
print(f"  GATEWAY_ID: {GATEWAY_ID}")
print(f"  POLICY_ENGINE_ID: {POLICY_ENGINE_ID or 'Not set'}")

In [None]:
# Initialize AWS clients
session = boto3.Session(region_name=REGION)

lambda_client = session.client("lambda")
cognito_client = session.client("cognito-idp")
iam_client = session.client("iam")
sts_client = session.client("sts")
policy_client = session.client("bedrock-agentcore-control", region_name=REGION)
gateway_control_client = session.client("bedrock-agentcore-control", region_name=REGION)

ACCOUNT_ID = sts_client.get_caller_identity()["Account"]

print("‚úì AWS clients initialized")
print(f"  Account ID: {ACCOUNT_ID}")

---

## Part 2: Setup Policy Engine

Create a Policy Engine and attach it to the Gateway.

In [None]:
# Ensure Policy Engine exists (create if missing)
POLICY_ENGINE_ID = ensure_policy_engine(
    policy_client=policy_client,
    policy_engine_id=POLICY_ENGINE_ID,
    create_if_missing=True
)

if not POLICY_ENGINE_ID:
    raise RuntimeError("Failed to create or find Policy Engine")

# Save Policy Engine ID to config
CONFIG["policy_engine_id"] = POLICY_ENGINE_ID
with open(config_path, "w") as f:
    json.dump(CONFIG, f, indent=2)
print(f"‚úì Policy Engine ID saved to config")

In [None]:
# Attach Policy Engine to Gateway
engine = get_policy_engine(policy_client, POLICY_ENGINE_ID)
policy_engine_arn = engine.get("policyEngineArn")

attach_policy_engine_to_gateway(
    gateway_control_client=gateway_control_client,
    gateway_id=GATEWAY_ID,
    policy_engine_arn=policy_engine_arn,
    mode="ENFORCE"
)

#### Policy Engine Connected to Gateway
![policy_engine.png](img/policy_engine.png)

In [None]:
# Clean up existing policies for fresh start
cleanup_existing_policies(
    policy_client=policy_client,
    policy_engine_id=POLICY_ENGINE_ID,
    require_confirmation=False
)

# Track created policies for cleanup
CREATED_POLICIES = []

---

## Part 3: Test Scenario 1 - Department-Based Access Control

Allow only users from the **finance** department to use the refund tool.

### Cedar Policy Pattern

```cedar
permit(principal, action, resource)
when {
    principal.hasTag("department_name") &&
    principal.getTag("department_name") == "finance"
};
```

> **Note**: `principal.hasTag()` and `principal.getTag()` access JWT token claims.

### Step 3.1: Configure Lambda for Finance Department Claims

In [None]:
print("=" * 70)
print("Test Scenario 1: Department-Based Access Control")
print("=" * 70)

# Configure Lambda to add finance department claims
claims_finance = {
    "department_name": "finance",
    "employee_level": "senior",
    "cost_center": "CC-1001",
}

lambda_arn = create_lambda_function(
    lambda_client=lambda_client,
    iam_client=iam_client,
    claims=claims_finance,
    account_id=ACCOUNT_ID,
    user_pool_id=USER_POOL_ID
)

configure_cognito_trigger(
    cognito_client=cognito_client,
    lambda_client=lambda_client,
    user_pool_id=USER_POOL_ID,
    lambda_arn=lambda_arn,
    region=REGION,
    account_id=ACCOUNT_ID
)

print("\n‚úì Lambda configured with claims:")
print(json.dumps(claims_finance, indent=2))

### Step 3.2: Verify Token Contains Custom Claims

In [None]:
token = get_bearer_token(
    token_endpoint=TOKEN_ENDPOINT,
    client_id=CLIENT_ID,
    client_secret=CLIENT_SECRET
)

claims = decode_token(token)

print("üîç Token Claims:")
print(f"  department_name: {claims.get('department_name', 'N/A')}")
print(f"  employee_level: {claims.get('employee_level', 'N/A')}")
print(f"  cost_center: {claims.get('cost_center', 'N/A')}")

if claims.get("department_name") == "finance":
    print("\n‚úì Custom claims are in the token")
else:
    print("\n‚ö†Ô∏è  Custom claims not found - check Lambda trigger configuration")

### Step 3.3: Create Department Validation Policy

In [None]:
policy_name = f"dept_policy_{int(time.time())}"

cedar_statement = f'''permit(principal,
    action == AgentCore::Action::"RefundToolTarget___refund",
    resource == AgentCore::Gateway::"{GATEWAY_ARN}")
when {{
    principal.hasTag("department_name") &&
    principal.getTag("department_name") == "finance"
}};'''

policy_id = create_cedar_policy(
    policy_client=policy_client,
    policy_engine_id=POLICY_ENGINE_ID,
    policy_name=policy_name,
    cedar_statement=cedar_statement,
    description="Allow only finance department"
)

if policy_id:
    CREATED_POLICIES.append(policy_id)
    print("\n‚è≥ Waiting for policy to become ACTIVE...")
    wait_for_policy_active(policy_client, POLICY_ENGINE_ID, policy_id)

### Step 3.4: Test - Finance Department (Expected: ALLOWED)

In [None]:
print("\n" + "=" * 70)
print("Test 1.1: department_name='finance'")
print("=" * 70)

token = get_bearer_token(
    token_endpoint=TOKEN_ENDPOINT,
    client_id=CLIENT_ID,
    client_secret=CLIENT_SECRET
)

result = make_gateway_request(
    gateway_url=GATEWAY_URL,
    bearer_token=token,
    tool_name="RefundToolTarget___refund",
    arguments={"amount": 500, "orderId": "test-dept-finance"}
)

print("\nResponse:")
print(json.dumps(result, indent=2, ensure_ascii=False))

outcome = analyze_response(result)
display_test_result("ALLOWED", outcome, "Finance department should be allowed")

### Step 3.5: Test - Engineering Department (Expected: DENIED)

In [None]:
print("\n" + "=" * 70)
print("Test 1.2: department_name='engineering'")
print("=" * 70)

# Update Lambda to engineering department
claims_engineering = {
    "department_name": "engineering",
    "employee_level": "senior",
    "cost_center": "CC-2001",
}

lambda_arn = create_lambda_function(
    lambda_client=lambda_client,
    iam_client=iam_client,
    claims=claims_engineering,
    account_id=ACCOUNT_ID,
    user_pool_id=USER_POOL_ID
)
print("\n‚úì Lambda updated to department_name='engineering'")

print("\n‚è≥ Waiting for Lambda change propagation...")
time.sleep(5)

token = get_bearer_token(
    token_endpoint=TOKEN_ENDPOINT,
    client_id=CLIENT_ID,
    client_secret=CLIENT_SECRET
)
claims = decode_token(token)
print(f"\nToken department_name: {claims.get('department_name')}")

result = make_gateway_request(
    gateway_url=GATEWAY_URL,
    bearer_token=token,
    tool_name="RefundToolTarget___refund",
    arguments={"amount": 500, "orderId": "test-dept-engineering"}
)

print("\nResponse:")
print(json.dumps(result, indent=2, ensure_ascii=False))

outcome = analyze_response(result)
display_test_result("DENIED", outcome, "Engineering department should be denied")

---

## Part 4: Test Scenario 2 - Group-Based Access Control

Allow only users in the **admins** group to use the refund tool.

### Cedar Policy Pattern

Groups are serialized as strings in JWT, so use the `like` operator:

```cedar
permit(principal, action, resource)
when {
    principal.hasTag("groups") &&
    principal.getTag("groups") like "*admins*"
};
```

In [None]:
# Clean up previous policies
print("=" * 70)
print("Test Scenario 2: Group-Based Access Control")
print("=" * 70)

print("\nCleaning up previous policies...")
for pid in CREATED_POLICIES:
    delete_policy(policy_client, POLICY_ENGINE_ID, pid)
CREATED_POLICIES.clear()

### Step 4.1: Configure Lambda with Groups Claim

In [None]:
# Configure Lambda with admins group
claims_with_admins = {
    "groups": ["admins", "developers", "team-alpha"],
    "department_name": "finance",
    "employee_level": "senior",
}

lambda_arn = create_lambda_function(
    lambda_client=lambda_client,
    iam_client=iam_client,
    claims=claims_with_admins,
    account_id=ACCOUNT_ID,
    user_pool_id=USER_POOL_ID
)

configure_cognito_trigger(
    cognito_client=cognito_client,
    lambda_client=lambda_client,
    user_pool_id=USER_POOL_ID,
    lambda_arn=lambda_arn,
    region=REGION,
    account_id=ACCOUNT_ID
)

print("\n‚úì Lambda configured with groups:")
print(f"   groups: {claims_with_admins['groups']}")

### Step 4.2: Create Group Validation Policy

In [None]:
policy_name = f"groups_policy_{int(time.time())}"

cedar_statement = f'''permit(principal,
    action == AgentCore::Action::"RefundToolTarget___refund",
    resource == AgentCore::Gateway::"{GATEWAY_ARN}")
when {{
    principal.hasTag("groups") &&
    principal.getTag("groups") like "*admins*"
}};'''

policy_id = create_cedar_policy(
    policy_client=policy_client,
    policy_engine_id=POLICY_ENGINE_ID,
    policy_name=policy_name,
    cedar_statement=cedar_statement,
    description="Allow only admins group"
)

if policy_id:
    CREATED_POLICIES.append(policy_id)
    print("\n‚è≥ Waiting for policy to become ACTIVE...")
    wait_for_policy_active(policy_client, POLICY_ENGINE_ID, policy_id)

### Step 4.3: Test - User in Admins Group (Expected: ALLOWED)

In [None]:
print("\n" + "=" * 70)
print("Test 2.1: groups=['admins', 'developers', 'team-alpha']")
print("=" * 70)

token = get_bearer_token(
    token_endpoint=TOKEN_ENDPOINT,
    client_id=CLIENT_ID,
    client_secret=CLIENT_SECRET
)

result = make_gateway_request(
    gateway_url=GATEWAY_URL,
    bearer_token=token,
    tool_name="RefundToolTarget___refund",
    arguments={"amount": 500, "orderId": "test-groups-admins"}
)

print("\nResponse:")
print(json.dumps(result, indent=2, ensure_ascii=False))

outcome = analyze_response(result)
display_test_result("ALLOWED", outcome, "User in 'admins' group should be allowed")

### Step 4.4: Test - User Without Admins Group (Expected: DENIED)

In [None]:
print("\n" + "=" * 70)
print("Test 2.2: groups=['developers', 'team-alpha'] (no admins)")
print("=" * 70)

# Update Lambda without admins group
claims_no_admins = {
    "groups": ["developers", "team-alpha"],
    "department_name": "finance",
    "employee_level": "senior",
}

lambda_arn = create_lambda_function(
    lambda_client=lambda_client,
    iam_client=iam_client,
    claims=claims_no_admins,
    account_id=ACCOUNT_ID,
    user_pool_id=USER_POOL_ID
)

print("\n‚è≥ Waiting for Lambda change propagation...")
time.sleep(5)

token = get_bearer_token(
    token_endpoint=TOKEN_ENDPOINT,
    client_id=CLIENT_ID,
    client_secret=CLIENT_SECRET
)
claims = decode_token(token)
print(f"\nToken groups: {claims.get('groups')}")

result = make_gateway_request(
    gateway_url=GATEWAY_URL,
    bearer_token=token,
    tool_name="RefundToolTarget___refund",
    arguments={"amount": 500, "orderId": "test-groups-no-admins"}
)

print("\nResponse:")
print(json.dumps(result, indent=2, ensure_ascii=False))

outcome = analyze_response(result)
display_test_result("DENIED", outcome, "User without 'admins' group should be denied")

---

## Part 5: Advanced Patterns

### Combined Conditions (Department AND Amount Limit)

In [None]:
print("\nAdvanced Pattern: Combined Conditions")
print("=" * 70)

combined_cedar = f'''permit(principal,
    action == AgentCore::Action::"RefundToolTarget___refund",
    resource == AgentCore::Gateway::"{GATEWAY_ARN}")
when {{
    principal.hasTag("department_name") &&
    principal.getTag("department_name") == "finance" &&
    context.input.amount <= 1000
}};'''

print("Cedar Policy:")
print("-" * 60)
print(combined_cedar)
print("-" * 60)
print("\nThis policy allows when:")
print("  ‚úì User is in finance department")
print("  ‚úì AND refund amount <= $1000")

### Pattern Matching (`like` operator)

| Pattern | Matches |
|---------|--------|
| `"*admin*"` | Contains "admin" |
| `"admin*"` | Starts with "admin" |
| `"*admin"` | Ends with "admin" |
| `"team-*"` | Starts with "team-" |

In [None]:
print("\nAdvanced Pattern: Team-Based Access with Wildcard")
print("=" * 70)

team_cedar = f'''permit(principal,
    action == AgentCore::Action::"RefundToolTarget___refund",
    resource == AgentCore::Gateway::"{GATEWAY_ARN}")
when {{
    principal.hasTag("groups") &&
    principal.getTag("groups") like "*team-finance*"
}};'''

print("Cedar Policy:")
print("-" * 60)
print(team_cedar)
print("-" * 60)
print("\nMatches groups containing 'team-finance':")
print("  ‚úì ['team-finance', 'developers']")
print("  ‚úì ['admins', 'team-finance-leads']")

---

## Part 6: Cleanup

Delete policies created during testing.

In [None]:
print("=" * 70)
print("Cleanup")
print("=" * 70)

print(f"\nDeleting {len(CREATED_POLICIES)} policies...")
for pid in CREATED_POLICIES:
    delete_policy(policy_client, POLICY_ENGINE_ID, pid)

CREATED_POLICIES.clear()
print("\n‚úì Cleanup complete")

---

## Conclusion

This tutorial demonstrated:

‚úÖ Create Policy Engine and attach to Gateway  
‚úÖ Add custom claims to JWT using Cognito Lambda triggers  
‚úÖ Implement Cedar policies for attribute-based access control  
‚úÖ Department-based access control (`principal.getTag("department_name")`)  
‚úÖ Group-based access control with pattern matching (`like "*admins*"`)  
‚úÖ Combined conditions for complex access control scenarios  

### Key Cedar Syntax Patterns

| Claim Type | Cedar Syntax | Example |
|------------|--------------|--------|
| String (exact match) | `principal.getTag("claim") == "value"` | `department_name == "finance"` |
| String/Array (contains) | `principal.getTag("claim") like "*value*"` | `groups like "*admins*"` |
| Input validation | `context.input.field <= value` | `context.input.amount <= 1000` |

> **Note**: JWT array claims are serialized as strings, so use `like` operator for membership checks.