Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ If you are an agent working in this repo: **do not improvise architecture**. Fol

### Current Focus: Phase 4A

**Phase 5.4 is complete** — PRD stress-test web UI: trigger + streaming (#561). Backend: `GET /api/v2/prd/stress-test` SSE endpoint streams `goals_extracted`, `goal_analyzed`, `complete`, and `error` events from `core/prd_stress_test.py:stress_test_prd_stream()`, resolving the LLM provider via the standard chain and applying the standard rate limit. Frontend: `useStressTestStream` hook manages the SSE connection and event accumulation; `StressTestModal` renders the streaming progress and is opened via a "Stress Test" button on the `/prd` page (enabled only when a PRD exists). Results rendering (#562) is out of scope and still pending.

**Phase 5.3 is complete** — Async notifications cover both surfaces:
- **Browser + in-app center (#559)**: `useNotifications` hook with workspace-scoped `localStorage` persistence and browser Notification dispatch (only when tab hidden + permission granted); `NotificationProvider` in root layout; `NotificationCenter` (bell icon + dropdown) mounts in sidebar footer. `BatchExecutionMonitor` dispatches `batch.completed` on terminal status transitions (distinguishing COMPLETED/FAILED/CANCELLED in both the in-app message and the success icon) and `blocker.created` on per-task BLOCKED transitions. `/execution` requests browser permission once on mount when permission is `'default'`. `/proof` dispatches `gate.run.failed` per failed gate when a proof run completes with `passed === false`. Known limitation: notifications only fire while `BatchExecutionMonitor` is mounted (cross-page background poller is out of scope; tracked for future work).
- **Outbound webhook (#560)**: Settings → Notifications tab takes a single URL + enabled toggle, persisted to `.codeframe/notifications_config.json` via `atomic_write_json`. `GET/PUT /api/v2/settings/notifications` and `POST /api/v2/settings/notifications/test` (test fires a sample payload and surfaces status code). `WebhookNotificationService.send_event` is the generic backend; dispatched fire-and-forget (5s timeout) from `core/conductor.py` on `BATCH_COMPLETED` only (not PARTIAL/FAILED/CANCELLED), `core/blockers.py:create()` after `BLOCKER_CREATED`, and `ui/routers/pr_v2.py:merge_pull_request` after successful merge. Failures are logged but never break the triggering operation.
Expand Down
62 changes: 61 additions & 1 deletion codeframe/core/prd_stress_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@
This module is headless — no FastAPI or HTTP dependencies.
"""

import asyncio
import json
import logging
import uuid
from dataclasses import dataclass
from enum import Enum
from typing import Optional
from typing import AsyncGenerator, Optional

from codeframe.adapters.llm.base import Purpose

Expand Down Expand Up @@ -407,3 +408,62 @@ def stress_test_prd(
tech_spec_markdown=tech_spec,
ambiguity_report=amb_report,
)


async def stress_test_prd_stream(
prd_content: str, provider, max_depth: int = 3
) -> AsyncGenerator[dict, None]:
"""Async streaming variant of :func:`stress_test_prd`.

Yields progress event dicts suitable for SSE delivery as each top-level
goal is decomposed, so a UI can render incremental output:

- ``{"type": "goals_extracted", "goals": [...]}``
- ``{"type": "goal_analyzed", "goal": str, "classification": str,
"ambiguities_so_far": int}`` (once per top-level goal)
- ``{"type": "complete", "ambiguity_count": int,
"tech_spec_markdown": str, "ambiguity_report": str}``
- ``{"type": "error", "message": str}`` if decomposition raises

The underlying ``provider.complete()`` calls are synchronous and blocking,
so each is offloaded via :func:`asyncio.to_thread` to keep the event loop
responsive. This function stays headless (no FastAPI/HTTP imports).
"""
try:
goals = await asyncio.to_thread(extract_goals, prd_content, provider)
yield {"type": "goals_extracted", "goals": goals}

ambiguities: list[Ambiguity] = []
tree: list[DecompositionNode] = []

for goal in goals:
node = await asyncio.to_thread(
recursive_decompose,
goal, # title
goal, # description
[], # lineage
prd_content,
0, # depth
max_depth,
ambiguities,
provider,
)
tree.append(node)
yield {
"type": "goal_analyzed",
"goal": node.title,
"classification": node.classification.value,
"ambiguities_so_far": len(ambiguities),
}

tech_spec = render_tech_spec(tree, ambiguities)
amb_report = render_ambiguity_report(ambiguities)
yield {
"type": "complete",
"ambiguity_count": len(ambiguities),
"tech_spec_markdown": tech_spec,
"ambiguity_report": amb_report,
}
except Exception as exc: # noqa: BLE001 — surface any failure to the client
logger.warning("Stress test stream failed: %s", exc, exc_info=True)
yield {"type": "error", "message": str(exc)}
121 changes: 120 additions & 1 deletion codeframe/ui/routers/prd_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,13 @@
GET /api/v2/prd/{id}/diff - Diff two versions
"""

import json
import logging
from typing import Optional
import os
from typing import AsyncGenerator, Optional

from fastapi import APIRouter, Depends, HTTPException, Query, Request
from fastapi.responses import StreamingResponse
from pydantic import BaseModel, Field

from codeframe.core.workspace import Workspace
Expand Down Expand Up @@ -186,6 +189,122 @@ async def get_latest_prd(
return _prd_to_response(record)


def _sse(event: dict) -> str:
"""Format a stress-test event dict as an SSE ``data:`` frame."""
return f"data: {json.dumps(event)}\n\n"


async def _stress_test_event_stream(
workspace: Workspace,
max_depth: int,
request: Optional[Request] = None,
) -> AsyncGenerator[str, None]:
"""Yield SSE frames for a PRD stress-test.

Recoverable problems (missing PRD, missing ``ANTHROPIC_API_KEY``) are
surfaced as in-stream ``error`` events rather than HTTP errors, so a
browser ``EventSource`` can display them via its message handler.

Stops early if the client disconnects, so an abandoned stream does not keep
issuing LLM calls — mirroring ``event_stream_generator`` in streaming_v2.
"""
from codeframe.core.prd_stress_test import stress_test_prd_stream

record = prd.get_latest(workspace)
if not record:
yield _sse({
"type": "error",
"message": "No PRD found. Add or generate a PRD first.",
})
return

# Resolve the LLM provider following the documented chain:
# env var → workspace config (.codeframe/config.yaml) → default "anthropic".
# (No CLI flag here — this is the web surface.) Mirrors runtime.py.
from codeframe.adapters.llm import get_provider
from codeframe.core.config import load_environment_config

env_cfg = load_environment_config(workspace.repo_path)
llm_cfg = env_cfg.llm if (env_cfg and env_cfg.llm) else None
provider_type = (
os.getenv("CODEFRAME_LLM_PROVIDER")
or (llm_cfg.provider if llm_cfg else None)
or "anthropic"
)

# Only the Anthropic provider needs an API key up front; local providers
# (ollama/vllm/compatible) do not.
if provider_type == "anthropic" and not os.getenv("ANTHROPIC_API_KEY"):
yield _sse({
"type": "error",
"message": "ANTHROPIC_API_KEY environment variable required.",
})
return

provider_kwargs: dict = {}
model_override = os.getenv("CODEFRAME_LLM_MODEL") or (
llm_cfg.model if llm_cfg else None
)
base_url_override = (llm_cfg.base_url if llm_cfg else None) or os.getenv(
"OPENAI_BASE_URL"
)
if model_override:
provider_kwargs["model"] = model_override
if base_url_override:
provider_kwargs["base_url"] = base_url_override

try:
provider = get_provider(provider_type, **provider_kwargs)
except ValueError as exc:
yield _sse({"type": "error", "message": str(exc)})
return

async for event in stress_test_prd_stream(
record.content, provider, max_depth=max_depth,
):
# If the browser has gone away, stop iterating the core generator so its
# next (blocking, billable) LLM call is never made.
if request is not None and await request.is_disconnected():
logger.info("Client disconnected from stress-test stream; aborting")
break
yield _sse(event)


@router.get("/stress-test")
@rate_limit_standard()
async def stress_test_prd_stream_endpoint(
request: Request,
max_depth: int = Query(3, ge=1, le=10, description="Maximum recursion depth"),
workspace: Workspace = Depends(get_v2_workspace),
) -> StreamingResponse:
Comment thread
coderabbitai[bot] marked this conversation as resolved.
"""Stream a PRD stress-test (recursive decomposition) via SSE.

Runs the headless ``stress_test_prd_stream`` core generator over the
latest PRD and emits its progress events as Server-Sent Events. This is
the web equivalent of ``cf prd stress-test``.

Declared as GET (not POST) so it is reachable from a browser
``EventSource``, matching ``GET /api/v2/tasks/{task_id}/stream``. No custom
auth headers are required (cookie-based auth via ``withCredentials``).

Event payloads (JSON in the SSE ``data:`` field, ``type`` field):
- ``goals_extracted``: high-level goals parsed from the PRD
- ``goal_analyzed``: one per top-level goal (classification + running
ambiguity count)
- ``complete``: ambiguity count + rendered tech spec / ambiguity report
- ``error``: no PRD, missing API key, or decomposition failure
"""
return StreamingResponse(
_stress_test_event_stream(workspace, max_depth, request),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
},
)


@router.get("/{prd_id}", response_model=PrdResponse)
@rate_limit_standard()
async def get_prd(
Expand Down
3 changes: 2 additions & 1 deletion docs/PHASE_2_CLI_API_MAPPING.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,9 @@ Both end up with PRD records managed by `core.prd`.
| `cf prd export` | `core.prd` | `export_to_file()` | (CLI-only) | - | N/A |
| `cf prd versions` | `core.prd` | `get_versions()` | `/api/v2/prd/{id}/versions` | GET | ✅ Present |
| `cf prd diff` | `core.prd` | `diff_versions()` | `/api/v2/prd/{id}/diff` | GET | ✅ Present |
| `cf prd stress-test` | `core.prd_stress_test` | `stress_test_prd_stream()` | `/api/v2/prd/stress-test` | GET (SSE) | ✅ Present |

**Note:** Both Discovery workflow and PRD CRUD are now complete ✅.
**Note:** Both Discovery workflow and PRD CRUD are now complete ✅. The stress-test SSE endpoint (#561) is present; web UI results rendering (#562) is pending.

### Task Commands

Expand Down
7 changes: 3 additions & 4 deletions docs/PRODUCT_ROADMAP.md
Original file line number Diff line number Diff line change
Expand Up @@ -147,11 +147,10 @@ Without a settings page, a new user who cannot find the env vars cannot use the

### 4. PRD Stress-Test Web UI

**Current state**: The CLI has `cf prd stress-test` for recursive decomposition — it takes the PRD and surfaces ambiguities the agent cannot resolve without human input. This is described in the vision as a core part of the THINK phase. The web UI has no equivalent; users who work exclusively in the browser never see this step.
**Current state**: Phase 5.4 trigger + streaming shipped (#561). The `/prd` page now has a "Stress Test" button (enabled only when a PRD exists) that opens `StressTestModal`. The modal connects via `useStressTestStream` to `GET /api/v2/prd/stress-test` (SSE), which streams `goals_extracted`, `goal_analyzed`, `complete`, and `error` events from `core/prd_stress_test.py`. Results rendering — displaying the decomposition tree, surfacing ambiguities as answerable questions, feeding answers back to refine the PRD — is tracked in #562 and is not yet built.

**What to build**:
**What remains (#562)**:

- A **[Stress Test]** button on the PRD page that triggers the stress-test process
- A **results view** showing the decomposition tree with ambiguities surfaced as questions, styled similarly to the existing Discovery transcript
- Each ambiguity has an inline answer field — the user's answers are fed back to refine the PRD
- On completion: the refined PRD is saved and the user can proceed to task generation
Expand Down Expand Up @@ -204,7 +203,7 @@ These are items that were considered and excluded because they do not serve the
| 5.1 | Settings page (skeleton + agent config + PROOF9/workspace tabs) | ✅ Complete | #554–556 |
| 5.2 | Cost analytics | ✅ Complete | #557–558 |
| 5.3 | Async notifications | ✅ Complete (browser + in-app center #559, webhook #560) | #559–560 |
| 5.4 | PRD stress-test web UI | ❌ Not started | #561–562 |
| 5.4 | PRD stress-test web UI | ✅ Complete (trigger + streaming #561; results rendering #562 pending) | #561–562 |
| 5.5 | GitHub Issues import | ❌ Not started | #563–565 |

**Current focus**: Phase 4A — PR status tracking + PROOF9 merge gate.
Expand Down
78 changes: 78 additions & 0 deletions tests/core/test_prd_stress_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,84 @@ def test_max_depth_respected(self, sample_prd, mock_provider):
assert child.children == [] # No grandchildren at depth 1


# --- Streaming Generator Tests ---


class TestStressTestPrdStream:
async def test_emits_event_sequence(self, sample_prd, mock_provider):
from codeframe.core.prd_stress_test import stress_test_prd_stream

events = [
ev async for ev in stress_test_prd_stream(
sample_prd, mock_provider, max_depth=3,
)
]

types = [e["type"] for e in events]
# First event announces extracted goals, last announces completion.
assert types[0] == "goals_extracted"
assert types[-1] == "complete"
# One goal_analyzed per top-level goal (3 in the sample PRD).
assert types.count("goal_analyzed") == 3

async def test_goals_extracted_payload(self, sample_prd, mock_provider):
from codeframe.core.prd_stress_test import stress_test_prd_stream

events = [
ev async for ev in stress_test_prd_stream(sample_prd, mock_provider)
]
goals_event = events[0]
assert goals_event["goals"] == [
"User Authentication",
"Invoice Management",
"PDF Export",
]

async def test_goal_analyzed_carries_classification_and_running_count(
self, sample_prd, mock_provider
):
from codeframe.core.prd_stress_test import stress_test_prd_stream

events = [
ev async for ev in stress_test_prd_stream(sample_prd, mock_provider)
]
analyzed = [e for e in events if e["type"] == "goal_analyzed"]

auth = next(e for e in analyzed if e["goal"] == "User Authentication")
assert auth["classification"] == "ambiguous"
assert auth["ambiguities_so_far"] == 1

invoice = next(e for e in analyzed if e["goal"] == "Invoice Management")
assert invoice["classification"] == "composite"

pdf = next(e for e in analyzed if e["goal"] == "PDF Export")
assert pdf["classification"] == "atomic"

async def test_complete_payload(self, sample_prd, mock_provider):
from codeframe.core.prd_stress_test import stress_test_prd_stream

events = [
ev async for ev in stress_test_prd_stream(sample_prd, mock_provider)
]
complete = events[-1]
assert complete["type"] == "complete"
assert complete["ambiguity_count"] == 1
assert "# Technical Specification" in complete["tech_spec_markdown"]
assert "AUTH SCOPE" in complete["ambiguity_report"]

async def test_provider_failure_yields_error_event(self, sample_prd):
from codeframe.core.prd_stress_test import stress_test_prd_stream

failing = MagicMock()
failing.complete.side_effect = RuntimeError("LLM unavailable")

events = [
ev async for ev in stress_test_prd_stream(sample_prd, failing)
]
assert events[-1]["type"] == "error"
assert "LLM unavailable" in events[-1]["message"]


# --- CLI Tests ---


Expand Down
Loading
Loading