# Audit Logging — Full Decision Trail & Queries

Every boundary decision SafeAI makes (input scan, output guard, action intercept) is recorded as an **audit event**.
Events are append-only, tamper-evident (each carries a `context_hash`), and queryable with rich filters.

This notebook walks through generating events and querying them with `ai.query_audit()`.

In [1]:
import tempfile
from pathlib import Path
from click.testing import CliRunner
from safeai.cli.init import init_command
from safeai import SafeAI

tmp = tempfile.TemporaryDirectory()
work = Path(tmp.name)
CliRunner().invoke(init_command, ["--path", str(work)])
ai = SafeAI.from_config(work / "safeai.yaml")
print(f"SafeAI initialised  |  audit file: {ai.audit.file_path}")

SafeAI initialised  |  audit file: logs/audit.log


## 1. Generate some audit events

We run four operations that cross different boundaries and trigger different policy actions:

| # | Operation | Input | Expected action |
|---|-----------|-------|-----------------|
| 1 | `scan_input` | clean text | **allow** |
| 2 | `scan_input` | contains AWS key | **block** |
| 3 | `guard_output` | contains email (PII) | **redact** |
| 4 | `guard_output` | clean text | **allow** |

In [2]:
# 1. Clean input — should be allowed
r1 = ai.scan_input("Hello, how can I help you today?")
print(f"scan_input (clean)   => {r1.decision.action}")

# 2. Input with an AWS secret key — should be blocked
r2 = ai.scan_input("My key is AKIAIOSFODNN7EXAMPLE")
print(f"scan_input (secret)  => {r2.decision.action}")

# 3. Output containing PII (email) — should be redacted
r3 = ai.guard_output("Send the report to alice@example.com please.")
print(f"guard_output (PII)   => {r3.decision.action}")

# 4. Clean output — should be allowed
r4 = ai.guard_output("The quarterly report is ready.")
print(f"guard_output (clean) => {r4.decision.action}")

print(f"\nDone — 4 operations executed, audit events written.")

scan_input (clean)   => allow
scan_input (secret)  => block
guard_output (PII)   => redact
guard_output (clean) => allow

Done — 4 operations executed, audit events written.


## 2. Query all events

`ai.query_audit()` returns a list of dicts. Each dict contains:

```
event_id, boundary, action, policy_name, reason,
data_tags, agent_id, tool_name, session_id,
timestamp, context_hash, metadata
```

In [3]:
events = ai.query_audit(limit=10)
print(f"Total events returned: {len(events)}\n")

for evt in events:
    print(
        f"  boundary={evt['boundary']:<8}  "
        f"action={evt['action']:<8}  "
        f"policy={evt['policy_name'] or '-':<32}  "
        f"ts={evt['timestamp']}"
    )

Total events returned: 8

  boundary=output    action=allow     policy=allow-output-by-default           ts=2026-02-21T11:15:14.454013+00:00
  boundary=output    action=redact    policy=redact-personal-data-in-output    ts=2026-02-21T11:15:14.453896+00:00
  boundary=input     action=block     policy=block-secrets-everywhere          ts=2026-02-21T11:15:14.453732+00:00
  boundary=input     action=allow     policy=allow-input-by-default            ts=2026-02-21T11:15:14.453353+00:00
  boundary=memory    action=block     policy=memory-handle                     ts=2026-02-21T11:15:12.603000+00:00
  boundary=memory    action=allow     policy=allow-action-by-default           ts=2026-02-21T11:15:12.599578+00:00
  boundary=input     action=block     policy=block-everything-on-input         ts=2026-02-21T11:15:11.630878+00:00
  boundary=input     action=allow     policy=allow-input-by-default            ts=2026-02-21T11:15:11.573904+00:00


## 3. Filter by action — find all blocks

Pass `action="block"` to retrieve only the events where the policy engine blocked the request.

In [4]:
blocked = ai.query_audit(action="block")
print(f"Blocked events: {len(blocked)}\n")

for evt in blocked:
    print(
        f"  [{evt['event_id']}]  "
        f"boundary={evt['boundary']}  "
        f"policy={evt['policy_name']}  "
        f"reason={evt['reason']}"
    )

Blocked events: 3

  [evt_6979571f28e2]  boundary=input  policy=block-secrets-everywhere  reason=Secrets must never cross any boundary.
  [evt_87b9c9ab8f0a]  boundary=memory  policy=memory-handle  reason=memory handle agent binding mismatch
  [evt_cc734d4b5fcf]  boundary=input  policy=block-everything-on-input  reason=Emergency lockdown — all input blocked.


## 4. Filter by boundary

Retrieve only output-guard events by passing `boundary="output"`.

In [5]:
output_events = ai.query_audit(boundary="output")
print(f"Output-boundary events: {len(output_events)}\n")

for evt in output_events:
    print(
        f"  action={evt['action']:<8}  "
        f"policy={evt['policy_name']}  "
        f"tags={evt['data_tags']}"
    )

Output-boundary events: 2

  action=allow     policy=allow-output-by-default  tags=[]
  action=redact    policy=redact-personal-data-in-output  tags=['personal.pii']


## 5. Filter by agent_id

In multi-agent systems each agent has its own identity. We can generate events from
different agents and then filter the audit trail per agent.

In [6]:
# Generate events from two different agents
ai.scan_input("Agent A says hello.", agent_id="agent-a")
ai.scan_input("Agent B has key AKIAIOSFODNN7EXAMPLE", agent_id="agent-b")
ai.guard_output("Agent A output with alice@example.com", agent_id="agent-a")

# Query events for agent-a only
agent_a_events = ai.query_audit(agent_id="agent-a")
print(f"Events for agent-a: {len(agent_a_events)}")
for evt in agent_a_events:
    print(f"  boundary={evt['boundary']:<8}  action={evt['action']}")

print()

# Query events for agent-b only
agent_b_events = ai.query_audit(agent_id="agent-b")
print(f"Events for agent-b: {len(agent_b_events)}")
for evt in agent_b_events:
    print(f"  boundary={evt['boundary']:<8}  action={evt['action']}")

Events for agent-a: 3
  boundary=output    action=redact
  boundary=input     action=allow
  boundary=memory    action=allow

Events for agent-b: 2
  boundary=input     action=block
  boundary=memory    action=block


## 6. Filter by data_tag

The `data_tag` filter supports hierarchical matching. Querying for `"secret.credential"`
returns events where the detected tags include `secret.credential` or any child tag.

In [7]:
secret_events = ai.query_audit(data_tag="secret.credential")
print(f"Events matching data_tag='secret.credential': {len(secret_events)}\n")

for evt in secret_events:
    print(
        f"  [{evt['event_id']}]  "
        f"boundary={evt['boundary']}  "
        f"action={evt['action']}  "
        f"tags={evt['data_tags']}"
    )

Events matching data_tag='secret.credential': 2

  [evt_4f7f910b963b]  boundary=input  action=block  tags=['secret.credential']
  [evt_6979571f28e2]  boundary=input  action=block  tags=['secret.credential']


## 7. Time-based queries

Three time-range options are available:

| Parameter | Format | Example |
|-----------|--------|---------|
| `last` | compact duration | `"5m"`, `"2h"`, `"7d"` |
| `since` | ISO-8601 or datetime | `"2025-01-01T00:00:00Z"` |
| `until` | ISO-8601 or datetime | `"2025-12-31T23:59:59Z"` |

The `last` shorthand is the most convenient for interactive use.

In [8]:
# Events from the last 5 minutes
recent = ai.query_audit(last="5m")
print(f"Events in the last 5 minutes: {len(recent)}")

# Demonstrate since/until with ISO-8601 strings
from datetime import datetime, timezone, timedelta

now = datetime.now(timezone.utc)
one_hour_ago = (now - timedelta(hours=1)).isoformat()
now_iso = now.isoformat()

ranged = ai.query_audit(since=one_hour_ago, until=now_iso)
print(f"Events between since={one_hour_ago[:19]}Z and until={now_iso[:19]}Z: {len(ranged)}")

# Events from last 1 week (should include everything)
weekly = ai.query_audit(last="1w")
print(f"Events in the last 1 week: {len(weekly)}")

Events in the last 5 minutes: 11
Events between since=2026-02-21T10:15:14Z and until=2026-02-21T11:15:14Z: 11
Events in the last 1 week: 11


## 8. Event details — context hash & metadata

Each audit event includes:

- **`context_hash`** — a SHA-256 hash over the event's core fields, providing tamper-evidence.
- **`metadata`** — a free-form dict with extra context (e.g. `phase`, `detection_count`, etc.).

In [9]:
import json

# Pick the first event to inspect in full
all_events = ai.query_audit(limit=1, newest_first=False)
sample = all_events[0]

print("Full audit event structure:\n")
print(json.dumps(sample, indent=2, default=str))

print(f"\n--- Key fields ---")
print(f"event_id      : {sample['event_id']}")
print(f"context_hash  : {sample['context_hash']}")
print(f"boundary      : {sample['boundary']}")
print(f"action        : {sample['action']}")
print(f"policy_name   : {sample['policy_name']}")
print(f"data_tags     : {sample['data_tags']}")
print(f"metadata      : {sample['metadata']}")
print(f"timestamp     : {sample['timestamp']}")

Full audit event structure:

{
  "event_id": "evt_036eee61cc72",
  "boundary": "input",
  "action": "allow",
  "policy_name": "allow-input-by-default",
  "reason": "Allow when no restrictive policy matched.",
  "data_tags": [],
  "agent_id": "unknown",
  "tool_name": null,
  "session_id": null,
  "source_agent_id": null,
  "destination_agent_id": null,
  "context_hash": "sha256:94c1ead34313078f1f2d80ba06fc850ccde986e8390f36b9708ed334606141f9",
  "metadata": {},
  "timestamp": "2026-02-21T11:15:11.573904+00:00"
}

--- Key fields ---
event_id      : evt_036eee61cc72
context_hash  : sha256:94c1ead34313078f1f2d80ba06fc850ccde986e8390f36b9708ed334606141f9
boundary      : input
action        : allow
policy_name   : allow-input-by-default
data_tags     : []
metadata      : {}
timestamp     : 2026-02-21T11:15:11.573904+00:00


In [10]:
# Cleanup
tmp.cleanup()
print("Temporary directory cleaned up.")

Temporary directory cleaned up.


---

## Summary

Every boundary decision SafeAI makes is persisted as a structured, hash-verified audit event.
The `query_audit()` method supports rich filtering:

| Filter | Purpose |
|--------|---------|
| `boundary` | `"input"`, `"output"`, `"action"`, `"memory"` |
| `action` | `"allow"`, `"block"`, `"redact"`, `"require_approval"` |
| `policy_name` | Match a specific policy rule |
| `agent_id` | Per-agent audit trail |
| `tool_name` | Filter by tool |
| `data_tag` | Hierarchical tag matching |
| `session_id` | Scope to a session |
| `event_id` | Look up a single event |
| `source_agent_id` / `destination_agent_id` | Agent-to-agent flow tracking |
| `phase` | Metadata phase filter |
| `metadata_key` / `metadata_value` | Arbitrary metadata search |
| `since` / `until` | ISO-8601 time range |
| `last` | Compact duration (`"15m"`, `"2h"`, `"7d"`) |
| `limit` | Max results (default 100) |
| `newest_first` | Sort order (default `True`) |

Use these filters to build compliance dashboards, investigate incidents, or verify that
your boundary policies are working as intended.