# üõ°Ô∏è Policy Engine Deep Dive

> **Master kernel policies, signals, and enforcement.**

## Learning Objectives

By the end of this notebook, you will:
1. Understand how the policy engine works
2. Create custom policies with rules and patterns
3. Configure signal handlers for violations
4. Use policy templates for common scenarios
5. Debug and audit policy decisions

---

## The Core Idea: Kernel vs. Prompt Safety

```
Prompt-Based Safety:          Kernel-Based Safety:

  "Please don't do X"           Every action is checked
         ‚Üì                             ‚Üì
  LLM decides to comply         Kernel decides to allow
         ‚Üì                             ‚Üì
  Maybe it doesn't ü§∑           No choice - blocked or allowed
```

**The kernel doesn't ask the agent to behave‚Äîit enforces behavior.**

---

## Step 1: Install Dependencies

In [None]:
!pip install agent-os --quiet

## Step 2: Policy Templates

In [None]:
from agent_os import KernelSpace
from agent_os.policies import PolicyTemplate

# List available templates
templates = PolicyTemplate.list_available()

print("üìã Available Policy Templates")
print("=" * 60)
for name, description in templates.items():
    print(f"\n  üìÅ {name}")
    print(f"     {description}")

In [None]:
# Create kernel with different templates
strict_kernel = KernelSpace(policy="strict")
permissive_kernel = KernelSpace(policy="permissive")
audit_kernel = KernelSpace(policy="audit")

print("üõ°Ô∏è Policy Comparison")
print("=" * 60)

for name, kernel in [("strict", strict_kernel), ("permissive", permissive_kernel), ("audit", audit_kernel)]:
    print(f"\n{name.upper()}:")
    print(f"  Mode: {kernel.policy.mode}")
    print(f"  Blocked actions: {len(kernel.policy.blocked_actions)}")
    print(f"  On violation: {kernel.policy.on_violation}")

## Step 3: Create Custom Policies

In [None]:
from agent_os.policies import Policy, Rule, Pattern

# Create a custom policy for a data analyst agent
analyst_policy = Policy(
    name="data-analyst",
    description="Policy for data analysis agents",
    
    # Explicitly allowed actions
    allowed_actions=[
        "read_file",
        "query_database",
        "generate_chart",
        "create_report",
    ],
    
    # Explicitly blocked actions
    blocked_actions=[
        "write_file",
        "delete_file",
        "send_email",
        "execute_shell",
        "network_request",
    ],
    
    # Pattern-based blocking (regex)
    blocked_patterns=[
        Pattern(r"\bpassword\b", "PII: password"),
        Pattern(r"\bssn\b", "PII: social security number"),
        Pattern(r"\bcredit.?card\b", "PII: credit card"),
        Pattern(r"DROP\s+TABLE", "SQL injection attempt"),
    ],
    
    # What to do on violation
    on_violation="SIGKILL",
)

print("‚úÖ Custom Policy Created")
print(f"   Name: {analyst_policy.name}")
print(f"   Allowed: {analyst_policy.allowed_actions}")
print(f"   Blocked: {analyst_policy.blocked_actions}")
print(f"   Patterns: {len(analyst_policy.blocked_patterns)}")

In [None]:
# Use the custom policy
kernel = KernelSpace(policy=analyst_policy)

@kernel.register
async def data_agent(task: str):
    return f"Analyzing: {task}"

# This should work
result = await kernel.execute(data_agent, "Show Q4 revenue trends")
print(f"‚úÖ Allowed: {result}")

## Step 4: Rules and Conditions

In [None]:
from agent_os.policies import Rule, Condition, Action

# Create rules with conditions
rules = [
    # Rule 1: Block large file reads
    Rule(
        name="large-file-limit",
        condition=Condition(
            action="read_file",
            check=lambda ctx: ctx.get("file_size", 0) > 100_000_000  # 100MB
        ),
        action=Action.BLOCK,
        message="File too large (>100MB)"
    ),
    
    # Rule 2: Rate limit API calls
    Rule(
        name="api-rate-limit",
        condition=Condition(
            action="api_call",
            check=lambda ctx: ctx.get("calls_per_minute", 0) > 60
        ),
        action=Action.BLOCK,
        message="Rate limit exceeded (60/min)"
    ),
    
    # Rule 3: Require approval for external network
    Rule(
        name="external-network-approval",
        condition=Condition(
            action="network_request",
            check=lambda ctx: not ctx.get("is_internal", False)
        ),
        action=Action.REQUIRE_APPROVAL,
        message="External network access requires approval"
    ),
]

# Add rules to policy
advanced_policy = Policy(
    name="advanced-analyst",
    rules=rules,
    on_violation="SIGSTOP"  # Pause instead of kill
)

print("üìã Advanced Policy with Rules")
print("=" * 60)
for rule in advanced_policy.rules:
    print(f"\n  üìå {rule.name}")
    print(f"     Action: {rule.action}")
    print(f"     Message: {rule.message}")

## Step 5: Signal Handlers

In [None]:
from agent_os import KernelSpace, AgentSignal

kernel = KernelSpace(policy="strict")

# Register signal handlers
@kernel.on_signal(AgentSignal.SIGKILL)
async def handle_kill(agent_id: str, context: dict):
    """Called when an agent is terminated."""
    print(f"üö® SIGKILL received for {agent_id}")
    print(f"   Reason: {context.get('reason', 'Unknown')}")
    print(f"   Action: {context.get('action', 'Unknown')}")
    # Log to audit system, send alerts, etc.

@kernel.on_signal(AgentSignal.SIGSTOP)
async def handle_stop(agent_id: str, context: dict):
    """Called when an agent is paused."""
    print(f"‚è∏Ô∏è  SIGSTOP received for {agent_id}")
    print(f"   Reason: {context.get('reason', 'Unknown')}")
    # Queue for human review

@kernel.on_signal(AgentSignal.SIGCONT)
async def handle_continue(agent_id: str, context: dict):
    """Called when an agent is resumed."""
    print(f"‚ñ∂Ô∏è  SIGCONT received for {agent_id}")
    print(f"   Approved by: {context.get('approved_by', 'System')}")

print("‚úÖ Signal handlers registered")
print(f"   SIGKILL: handle_kill")
print(f"   SIGSTOP: handle_stop")
print(f"   SIGCONT: handle_continue")

In [None]:
# Trigger a violation to see the handler in action
@kernel.register
async def bad_agent(task: str):
    import os
    os.system("rm -rf /")  # This will be blocked!
    return "Done"

try:
    await kernel.execute(bad_agent, "Clean up files")
except Exception as e:
    print(f"\n‚ùå Agent terminated: {e}")

## Step 6: Policy Files (YAML)

In [None]:
# Create a policy file
policy_yaml = """
kernel:
  version: "1.0"
  mode: strict

policies:
  - name: read_only
    description: "Prevents all write operations"
    blocked_actions:
      - file_write
      - file_delete
      - database_write
      - database_delete
  
  - name: no_pii
    description: "Blocks PII patterns"
    blocked_patterns:
      - pattern: "\\bssn\\b"
        reason: "Social Security Number detected"
      - pattern: "\\b\\d{16}\\b"
        reason: "Possible credit card number"
      - pattern: "password\\s*[:=]"
        reason: "Password in plaintext"

  - name: rate_limits
    description: "Enforces rate limits"
    rules:
      - action: api_call
        max_per_minute: 100
      - action: database_query
        max_per_minute: 1000

signals:
  on_violation: SIGKILL
  on_warning: SIGSTOP
  on_rate_limit: SIGSTOP

audit:
  enabled: true
  log_level: INFO
  destination: ./audit.log
"""

# Save to file
with open("security.yaml", "w") as f:
    f.write(policy_yaml)

print("üìù Policy file created: security.yaml")

In [None]:
# Load policy from file
kernel = KernelSpace(policy_file="security.yaml")

print("‚úÖ Policy loaded from security.yaml")
print(f"   Mode: {kernel.policy.mode}")
print(f"   Policies: {len(kernel.policy.policies)}")
print(f"   Audit: {kernel.policy.audit_enabled}")

## Step 7: Policy Debugging

In [None]:
from agent_os.policies import PolicyDebugger

debugger = PolicyDebugger(kernel.policy)

# Test what would happen with a specific action
test_cases = [
    {"action": "read_file", "path": "/data/sales.csv"},
    {"action": "write_file", "path": "/data/output.csv"},
    {"action": "query_database", "query": "SELECT * FROM users"},
    {"action": "query_database", "query": "SELECT ssn FROM users"},
]

print("üîç Policy Debugging")
print("=" * 70)

for test in test_cases:
    result = debugger.evaluate(test)
    status = "‚úÖ ALLOW" if result.allowed else "‚ùå BLOCK"
    print(f"\n{status}: {test['action']}")
    if not result.allowed:
        print(f"   Rule: {result.triggered_rule}")
        print(f"   Reason: {result.reason}")

## Step 8: Audit Trail

In [None]:
from agent_os import KernelSpace
from agent_os.audit import AuditLog

# Create kernel with auditing
kernel = KernelSpace(policy="strict", audit=True)

@kernel.register
async def audited_agent(task: str):
    return f"Completed: {task}"

# Execute some tasks
await kernel.execute(audited_agent, "Task 1")
await kernel.execute(audited_agent, "Task 2")
await kernel.execute(audited_agent, "Task 3")

# View audit log
audit = kernel.get_audit_log()

print("üìú Audit Trail")
print("=" * 70)
for entry in audit.entries:
    print(f"\n  {entry.timestamp}")
    print(f"  Agent: {entry.agent_id}")
    print(f"  Action: {entry.action}")
    print(f"  Decision: {entry.decision}")
    print(f"  Policies checked: {entry.policies_checked}")

## Cleanup

In [None]:
import os

for f in ["security.yaml", "audit.log"]:
    if os.path.exists(f):
        os.remove(f)
        print(f"üóëÔ∏è  Removed {f}")

---

## Summary

| Feature | What It Does |
|---------|-------------|
| `Policy` | Defines allowed/blocked actions |
| `Rule` | Conditional logic for enforcement |
| `Pattern` | Regex-based content blocking |
| `@kernel.on_signal()` | Custom handlers for violations |
| `PolicyDebugger` | Test policies without running agents |
| `AuditLog` | Track all policy decisions |

### Quick Reference

```python
from agent_os import KernelSpace, AgentSignal
from agent_os.policies import Policy, Rule, Pattern

# Template policies
kernel = KernelSpace(policy="strict")  # or "permissive", "audit"

# Custom policy
policy = Policy(
    name="custom",
    allowed_actions=["read_file"],
    blocked_actions=["write_file"],
    blocked_patterns=[Pattern(r"\bpassword\b", "PII")],
    on_violation="SIGKILL"
)
kernel = KernelSpace(policy=policy)

# Signal handlers
@kernel.on_signal(AgentSignal.SIGKILL)
async def handle_kill(agent_id, ctx): ...

# From file
kernel = KernelSpace(policy_file="security.yaml")
```

---

## Next Steps

- [Production Examples](../examples/) - See policies in real-world scenarios
- [Security Specification](../docs/security-spec.md) - Full security model
- [RFC-004: Agent Primitives](../docs/rfcs/RFC-004-Agent-Primitives.md) - Core primitives