# `.log` — Universal Drill-Down

One accessor at every level. The chain is always `.log.steps[i].log.steps[j]...`

| Object | `.log` returns | Type |
|--------|---------------|------|
| `RunResult` | trace of this run | `RunLog` |
| `MapResult` | batch overview | `MapLog` |
| `NodeRecord` (leaf) | — | `None` |
| `NodeRecord` (1 inner) | inner trace | `RunLog` |
| `NodeRecord` (N inner) | batch of inner traces | `MapLog` |

In [None]:
from hypergraph import SyncRunner, Graph, node, ifelse, END


@node(output_name="doubled")
def double(x: int) -> int:
    return x * 2


@node(output_name="tripled")
def triple(doubled: int) -> int:
    return doubled * 3


runner = SyncRunner()
graph = Graph([double, triple], name="pipeline")

## 1. Single run → `RunLog`

`result.log` — just type it, the table shows automatically.

In [None]:
result = runner.run(graph, {"x": 5})
result.log

In [None]:
result.log.summary()

In [None]:
result.log.timing

In [None]:
result.log.node_stats

Leaf nodes have `.log = None` — nothing nested:

In [None]:
result.log.steps[0].log is None

## 2. Mapped run → `MapLog`

`results.log` gives you a `MapLog` — batch overview with per-item drill-down.

In [None]:
results = runner.map(graph, {"x": [1, 2, 3, 4, 5]}, map_over="x")
results.log

In [None]:
results.log.summary()

Drill into one item — **same `RunLog` as Step 1**:

In [None]:
results.log[0]

In [None]:
# Aggregate stats across ALL items (cross-item bottleneck analysis)
results.log.node_stats

## 3. Nested graph → `.log` on steps

When a graph runs as a node inside another graph, `step.log` reveals the inner trace.

In [None]:
inner = Graph([double, triple], name="pipeline")
outer = Graph([inner.as_node()])
result = runner.run(outer, {"x": 5})
result.log

The footer tells you where to drill. Single nested → `step.log` is a `RunLog`:

In [None]:
result.log.steps[0].log

### map_over → `step.log` is a `MapLog`

When the inner graph runs N times, `step.log` returns a `MapLog` instead:

In [None]:
outer_mapped = Graph([inner.as_node().map_over("x")])
result = runner.run(outer_mapped, {"x": [1, 2, 3, 4, 5]})
result.log

In [None]:
step = result.log.steps[0]
step.log  # MapLog — same as results.log in Step 2

In [None]:
step.log[0]  # drill into first item → RunLog

## 4. Deep nesting — the chain keeps going

Three levels deep: outer → middle → innermost. The `.log` chain works at every level.

In [None]:
@node(output_name="incremented")
def increment(doubled: int) -> int:
    return doubled + 1


innermost = Graph([double], name="innermost")
middle = Graph([innermost.as_node(), increment], name="middle")
outer = Graph([middle.as_node()])

result = runner.run(outer, {"x": 5})
result.log

In [None]:
# outer → middle (RunLog)
middle_log = result.log.steps[0].log
middle_log

In [None]:
# middle → innermost (RunLog)
innermost_step = next(s for s in middle_log.steps if s.node_name == "innermost")
innermost_step.log

## 5. Errors — find failures across items

`MapLog.errors` aggregates all failed `NodeRecord`s across all items.

In [None]:
@node(output_name="result")
def maybe_fail(x: int) -> int:
    if x % 2 == 0:
        raise ValueError(f"even: {x}")
    return x * 10


fail_graph = Graph([maybe_fail], name="checker")
results = runner.map(fail_graph, {"x": [1, 2, 3, 4, 5]}, map_over="x", error_handling="continue")
results.log

In [None]:
results.log.errors

In [None]:
# Which items failed?
[(i, log.errors[0].error) for i, log in enumerate(results.log) if log.errors]

## 6. Serialization

Both `RunLog` and `MapLog` serialize to JSON via `.to_dict()`.

In [None]:
import json

result = runner.run(graph, {"x": 5})
print(json.dumps(result.log.to_dict(), indent=2))

In [None]:
results = runner.map(graph, {"x": [1, 2]}, map_over="x")
print(json.dumps(results.log.to_dict(), indent=2))

## 7. CLI — Inspect Runs from the Terminal

Runs persisted with a checkpointer can be inspected post-hoc via `hypergraph runs`.

The flow: **run with a checkpointer → inspect with CLI**.

First, create some runs to inspect:

In [None]:
import asyncio, tempfile, os
from hypergraph import AsyncRunner
from hypergraph.checkpointers import SqliteCheckpointer

# Temp DB for this demo (cleaned up when notebook restarts)
_tmp = tempfile.mkdtemp()
DB = os.path.join(_tmp, "playground.db")

cp = SqliteCheckpointer(DB, durability="sync")
async_runner = AsyncRunner(checkpointer=cp)

# A successful run
await async_runner.run(graph, {"x": 5}, workflow_id="demo-pipeline")

# A cyclic workflow
@ifelse(when_true=END, when_false="count_up")
def stop(count: int) -> bool:
    return count >= 3

@node(output_name="count")
def count_up(count: int) -> int:
    return count + 1

cycle_graph = Graph([count_up, stop])
await async_runner.run(cycle_graph, {"count": 0}, workflow_id="demo-cycle")

# A partially failed run
@node(output_name="ok")
def succeed(x: int) -> int:
    return x + 1

@node(output_name="boom")
def fail_node(x: int) -> int:
    raise RuntimeError("intentional failure")

fail_graph = Graph([succeed, fail_node])
await async_runner.run(fail_graph, {"x": 1}, workflow_id="demo-failure", error_handling="continue")

print(f"DB: {DB}")
print(f"Runs created: {len(cp.runs())}")

### `runs` — Quick dashboard

Shows active and recent runs at a glance.

In [None]:
!hypergraph runs --db {DB}

### `runs ls` — List and filter

Filter by status, graph name, or time window.

In [None]:
!hypergraph runs ls --db {DB}

In [None]:
!hypergraph runs ls --status failed --db {DB}

### `runs show` — Inspect a single run

Step-by-step execution trace. Use `--errors` to focus on failures, `--step N` to zoom in.

In [None]:
!hypergraph runs show demo-pipeline --db {DB}

In [None]:
# Cyclic workflow — shows re-executions across supersteps
!hypergraph runs show demo-cycle --db {DB}

In [None]:
# Focus on errors only
!hypergraph runs show demo-failure --errors --db {DB}

### `runs values` — Output values

What did the run produce? Use `--key` to extract a single value.

In [None]:
!hypergraph runs values demo-pipeline --db {DB}

In [None]:
!hypergraph runs values demo-pipeline --key tripled --db {DB}

### `runs steps` — Step-level detail

Full step records with timing and values. Use `--values` to see outputs, `--node` to filter.

In [None]:
!hypergraph runs steps demo-cycle --values --db {DB}

### `runs search` — Full-text search

Search across node names and error messages using FTS5.

In [None]:
!hypergraph runs search "failure" --db {DB}

### `runs stats` — Performance profiling

Per-node execution count, duration, and error rates.

In [None]:
# Stats shine for cyclic workflows — shows how many times each node ran
!hypergraph runs stats demo-cycle --db {DB}

### JSON output — Machine-readable

Every command supports `--json` for agent/script consumption.

In [None]:
!hypergraph runs show demo-pipeline --json --db {DB}

## 8. Python API — Same queries, no CLI

Every CLI command has a sync Python equivalent on `SqliteCheckpointer`. No `await` needed.

| CLI | Python |
|-----|--------|
| `runs ls` | `cp.runs()` |
| `runs ls --status failed` | `cp.runs(status=WorkflowStatus.FAILED)` |
| `runs show <id>` | `cp.run(id)` + `cp.steps(id)` |
| `runs values <id>` | `cp.values(id)` |
| `runs values <id> --key x` | `cp.values(id, key="x")` |
| `runs steps <id>` | `cp.steps(id)` |
| `runs search "query"` | `cp.search("query")` |
| `runs stats <id>` | `cp.stats(id)` |

In [None]:
# List all runs
cp.runs()

In [None]:
# Get a single run's metadata
cp.run("demo-pipeline")

In [None]:
# Output values — what did the run produce?
cp.values("demo-pipeline")

In [None]:
# Extract a single key
cp.values("demo-pipeline", key="tripled")

In [None]:
# Step records — full execution trace
cp.steps("demo-cycle")

In [None]:
# Full-text search across all runs
cp.search("failure")

In [None]:
# Per-node performance stats
cp.stats("demo-cycle")

In [None]:
# Checkpoint — snapshot for forking/retrying
cp.checkpoint("demo-pipeline")