Skip to content

feat(websocket): implement proactive messaging system#224

Merged
frankbria merged 4 commits intomainfrom
feature/proactive-websocket-messaging
Jan 8, 2026
Merged

feat(websocket): implement proactive messaging system#224
frankbria merged 4 commits intomainfrom
feature/proactive-websocket-messaging

Conversation

@frankbria
Copy link
Owner

@frankbria frankbria commented Jan 8, 2026

Summary

  • Transform WebSocket from passive (only responding to client messages) to proactive (actively sending connection health and state information)
  • Add three new proactive message types: connection_ack, project_status, heartbeat
  • Ensures clients always receive messages to confirm real-time system is operational

Changes

Backend (codeframe/ui/routers/websocket.py)

  • Add HEARTBEAT_INTERVAL_SECONDS constant (30 seconds)
  • Add send_heartbeats() async function for background heartbeat tasks
  • Track heartbeat tasks per project, cancel on disconnect
  • Send connection_ack immediately after subscription
  • Send project_status with current project state on subscription

Broadcast Helper (codeframe/ui/websocket_broadcasts.py)

  • Add broadcast_heartbeat() helper function

Frontend Types (web-ui/src/types/index.ts)

  • Add heartbeat, connection_ack, project_status to WebSocketMessageType union

Tests

  • Add tests/ui/test_websocket_proactive.py with 17 comprehensive tests
  • Add drain_proactive_messages() helper for integration tests
  • Update existing integration tests to handle new message sequence
  • Update E2E test expectations for proactive messaging

Message Flow After Subscription

Client → Server: {"type": "subscribe", "project_id": 1}
Server → Client: {"type": "subscribed", "project_id": 1}
Server → Client: {"type": "connection_ack", "project_id": 1, "timestamp": "...", "message": "Connected to real-time updates"}
Server → Client: {"type": "project_status", "project_id": 1, "status": "...", "phase": "..."}
... every 30 seconds ...
Server → Client: {"type": "heartbeat", "project_id": 1, "timestamp": "..."}

Test plan

  • Run uv run pytest tests/ui/test_websocket_proactive.py -v - 17 tests pass
  • Run uv run pytest tests/ui/test_websocket_integration.py -v - 30 tests pass
  • Run uv run pytest tests/ui/ -v - 238 tests pass, 1 skipped
  • Run uv run ruff check . - All checks passed
  • Run npm run type-check in web-ui - No TypeScript errors
  • E2E test tests/e2e/test_dashboard.spec.ts WebSocket test should now receive messages

Summary by CodeRabbit

  • New Features

    • WebSocket now proactively acknowledges subscriptions, sends initial project status snapshots, and emits periodic heartbeat messages (configurable, default 30s). Improved disconnect handling cancels proactive messages promptly.
  • Tests

    • Added comprehensive tests for proactive WebSocket behavior: sequencing, timestamps, heartbeats, cancellation, access control, and multi-subscription scenarios.
  • Docs / Types

    • WebSocket message types extended with heartbeat, connection_ack, and project_status.

✏️ Tip: You can customize this high-level summary in your review settings.

Transform WebSocket from passive (only responding to client messages) to
proactive (actively sending connection health and state information).

New proactive messages:
- connection_ack: Sent immediately after subscription to confirm real-time
  connectivity
- project_status: Initial project state snapshot sent on subscription
- heartbeat: Periodic messages every 30 seconds to keep connections alive

Implementation:
- Add HEARTBEAT_INTERVAL_SECONDS constant (30s default)
- Add send_heartbeats() async function for background heartbeat tasks
- Track heartbeat tasks per project, cancel on disconnect
- Add broadcast_heartbeat() helper function
- Update WebSocketMessageType union with new message types

Test updates:
- Add comprehensive test suite (test_websocket_proactive.py) with 17 tests
- Add drain_proactive_messages() helper for integration tests
- Update existing integration tests to handle new message sequence
- Update E2E test expectations for proactive messaging

This ensures clients always receive messages to confirm the real-time
system is operational, fixing the issue where E2E tests received 0
messages from the WebSocket.
@coderabbitai
Copy link
Contributor

coderabbitai bot commented Jan 8, 2026

Walkthrough

Sends proactive WebSocket messages: connection_ack and optional project_status immediately after subscribe, starts per-subscription heartbeat tasks emitting heartbeat every HEARTBEAT_INTERVAL_SECONDS (env-configurable, default 30s), and cancels heartbeats on unsubscribe/disconnect; adds broadcast helper and tests.

Changes

Cohort / File(s) Summary
Backend WebSocket Router
codeframe/ui/routers/websocket.py
Added HEARTBEAT_INTERVAL_SECONDS (env-configurable), send_heartbeats() function, proactive sends of connection_ack and optional project_status on subscribe, per-subscription heartbeat task lifecycle (start/cancel), and disconnect cleanup; added imports and logging.
WebSocket Broadcast Helper
codeframe/ui/websocket_broadcasts.py
Added broadcast_heartbeat(manager, project_id: int) to construct and broadcast heartbeat messages (type, project_id, UTC timestamp); updated module docstring to list proactive message types.
Frontend Type Definitions
web-ui/src/types/index.ts
Extended WebSocketMessageType with 'heartbeat', 'connection_ack', and 'project_status'.
WebSocket Integration Tests
tests/ui/test_websocket_integration.py
Added drain_proactive_messages(websocket, expected_project_id=None) helper; refactored tests to drain/validate proactive messages after subscribe.
WebSocket Proactive Behavior Tests
tests/ui/test_websocket_proactive.py
New comprehensive test module covering connection_ack, project_status snapshot, heartbeat interval and lifecycle, access control, sequencing, multi-subscription behavior, and broadcast helper error handling (extensive mocking/patching).
E2E Dashboard Tests
tests/e2e/test_dashboard.spec.ts
Updated expectations to include proactive messages (connection_ack, project_status, heartbeat) and adjusted failure messaging/guards.

Sequence Diagram(s)

sequenceDiagram
    participant Client
    participant Router as WebSocket Router
    participant Manager as Connection Manager
    participant DB as Database

    rect rgb(200,220,255)
    Note over Client,Router: Subscribe → register & proactive messages
    Client->>Router: subscribe(project_id)
    Router->>Manager: register_subscription(project_id, client)
    Manager-->>Router: subscribed
    Router-->>Client: {"type":"subscribed", ...}
    Router-->>Client: {"type":"connection_ack","project_id":project_id,"timestamp":...}
    Router->>DB: fetch project_status(project_id)
    DB-->>Router: project data / not found
    Router-->>Client: {"type":"project_status", ...}
    end

    rect rgb(220,240,220)
    Note over Router,Manager: Per-subscription heartbeat loop (every HEARTBEAT_INTERVAL_SECONDS)
    loop every HEARTBEAT_INTERVAL_SECONDS
        Router->>Manager: broadcast_heartbeat(project_id)
        Manager-->>Client: {"type":"heartbeat","project_id":project_id,"timestamp":...}
    end
    end

    rect rgb(240,220,220)
    Note over Client,Router: Unsubscribe / Disconnect → cleanup
    Client->>Router: unsubscribe(project_id) / disconnect
    Router->>Router: cancel heartbeat task(s)
    Router->>Manager: remove_subscription(project_id, client)
    end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related PRs

Poem

🐰 I hop on sockets, light and quick,

I send an ack and a timestamp tick,
A snapshot chirps of project state,
Each thirty seconds a heartbeat's fate,
Clean disconnects keep tails tidy and slick.

🚥 Pre-merge checks | ✅ 3
✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title clearly summarizes the main change: implementing a proactive messaging system for WebSocket connections.
Docstring Coverage ✅ Passed Docstring coverage is 87.50% which is sufficient. The required threshold is 80.00%.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch feature/proactive-websocket-messaging

Comment @coderabbitai help to get the list of available commands and usage tips.

@frankbria frankbria linked an issue Jan 8, 2026 that may be closed by this pull request
@claude
Copy link

claude bot commented Jan 8, 2026

Code Review: Proactive WebSocket Messaging System

This is an excellent enhancement that transforms the WebSocket from passive to proactive messaging. The implementation is well-structured, thoroughly tested, and addresses a real usability issue (E2E tests receiving 0 messages). Here's my detailed feedback:


✅ Strengths

1. Excellent Design & Architecture

  • Clear separation of concerns: Heartbeat logic is encapsulated in send_heartbeats() function
  • Proper task lifecycle management: Heartbeat tasks are tracked per-project and cancelled on disconnect
  • Graceful error handling: Heartbeat failures don't crash the connection
  • Well-documented: Comprehensive docstrings and inline comments

2. Comprehensive Test Coverage

  • 17 new tests in test_websocket_proactive.py covering all proactive message scenarios
  • Integration test updates with drain_proactive_messages() helper - smart approach!
  • E2E test improvements that now properly validate WebSocket functionality
  • Tests verify message ordering, timestamps, error cases, and cleanup

3. Type Safety

  • Frontend types properly updated with new message types (heartbeat, connection_ack, project_status)
  • Clear inline comments in TypeScript marking these as "Proactive" messages

4. Backward Compatibility

  • Existing clients will continue to work - new messages are additive
  • No breaking changes to existing message protocol

🔍 Issues & Recommendations

1. Memory Leak Risk: Heartbeat Task Cleanup ⚠️ IMPORTANT

Location: codeframe/ui/routers/websocket.py:289-294

Issue: When a client subscribes to the same project multiple times, a new heartbeat task is created each time, but the old task is never cancelled. This creates orphaned background tasks.

# Start heartbeat task for this subscription (proactive messaging)
if project_id not in heartbeat_tasks:  # ❌ Only starts if NOT already present
    heartbeat_task = asyncio.create_task(
        send_heartbeats(websocket, project_id)
    )
    heartbeat_tasks[project_id] = heartbeat_task

Problem Scenario:

  1. Client subscribes to project 1 → heartbeat task A starts
  2. Client unsubscribes from project 1 → task A continues (not cancelled)
  3. Client resubscribes to project 1 → task A still running, new task B doesn't start
  4. Result: Task A is orphaned and never cancelled

Recommended Fix:

# Start heartbeat task for this subscription (proactive messaging)
# Cancel existing task if present (handles re-subscription case)
if project_id in heartbeat_tasks:
    old_task = heartbeat_tasks[project_id]
    old_task.cancel()
    try:
        await old_task
    except asyncio.CancelledError:
        pass
    logger.debug(f"Cancelled old heartbeat task for project {project_id}")

# Start new heartbeat task
heartbeat_task = asyncio.create_task(
    send_heartbeats(websocket, project_id)
)
heartbeat_tasks[project_id] = heartbeat_task
logger.debug(f"Started heartbeat task for project {project_id}")

Alternative: Cancel heartbeat task on unsubscribe instead of just on disconnect.


2. Unsubscribe Doesn't Cancel Heartbeat

Location: codeframe/ui/routers/websocket.py:301-341 (unsubscribe handler)

Issue: When a client unsubscribes from a project, the heartbeat task continues running until the WebSocket disconnects.

Impact:

  • Unnecessary network traffic and CPU usage
  • Client receives heartbeats for projects they're no longer subscribed to

Recommended Fix: Add heartbeat task cancellation in the unsubscribe handler:

elif message.get("type") == "unsubscribe":
    # ... existing validation code ...
    
    # Remove subscription
    try:
        await manager.subscription_manager.unsubscribe(websocket, project_id)
        logger.info(f"WebSocket unsubscribed from project {project_id}")
        
        # Cancel heartbeat task for this project
        if project_id in heartbeat_tasks:
            task = heartbeat_tasks.pop(project_id)
            task.cancel()
            try:
                await task
            except asyncio.CancelledError:
                pass
            logger.debug(f"Cancelled heartbeat task for unsubscribed project {project_id}")
        
        await websocket.send_json({"type": "unsubscribed", "project_id": project_id})
    except Exception as e:
        # ... existing error handling ...

3. Missing Test Coverage for Heartbeat Task Lifecycle

Gap: While you have excellent tests for heartbeat messages, there are no tests verifying:

  • Heartbeat task is cancelled on unsubscribe
  • Heartbeat task is replaced on re-subscription (related to issue Add Claude Code GitHub Workflow #1)
  • Multiple subscriptions to different projects create independent heartbeat tasks

Suggested Test:

@pytest.mark.asyncio
async def test_unsubscribe_cancels_heartbeat_task(self, mock_websocket, mock_manager, mock_db):
    """Test that unsubscribing cancels the heartbeat task."""
    mock_websocket.receive_text.side_effect = [
        json.dumps({"type": "subscribe", "project_id": 1}),
        json.dumps({"type": "unsubscribe", "project_id": 1}),
        WebSocketDisconnect(),
    ]

    with patch("codeframe.ui.routers.websocket.manager", mock_manager):
        await websocket_endpoint(mock_websocket, db=mock_db)

    # Verify heartbeat task was started and then cancelled
    # (Implementation depends on how you track task cancellation)

4. Heartbeat Interval Configuration

Current: HEARTBEAT_INTERVAL_SECONDS = 30 is hardcoded

Consideration: For testing, it would be beneficial to make this configurable:

# At module level
HEARTBEAT_INTERVAL_SECONDS = int(os.getenv("WEBSOCKET_HEARTBEAT_INTERVAL", "30"))

Benefits:

  • Tests can use shorter intervals (e.g., 1 second) for faster execution
  • Production can adjust based on infrastructure requirements
  • Easier to tune for different deployment environments

Optional: This isn't critical but improves testability.


5. Project Status Error Handling Could Be More Specific

Location: codeframe/ui/routers/websocket.py:273-286

Current:

try:
    project = db.get_project(project_id)
    if project:
        await websocket.send_json({...})
except Exception as e:
    # Project status is optional - don't fail subscription if unavailable
    logger.debug(f"Could not send project status for {project_id}: {e}")

Recommendation: Catch more specific exceptions to distinguish between:

  • Project doesn't exist (expected case)
  • Database error (should be logged as warning)
  • Serialization error (potential bug)
try:
    project = db.get_project(project_id)
    if project:
        await websocket.send_json({...})
    else:
        logger.debug(f"Project {project_id} not found for status snapshot")
except (KeyError, AttributeError) as e:
    # Expected - project may not have status/phase fields
    logger.debug(f"Project {project_id} missing status fields: {e}")
except Exception as e:
    # Unexpected - log as warning for investigation
    logger.warning(f"Unexpected error sending project status for {project_id}: {e}")

6. Minor: Inconsistent Error Handling in send_heartbeats()

Location: codeframe/ui/routers/websocket.py:62-73

Current:

try:
    message = {...}
    await websocket.send_json(message)
    logger.debug(f"Sent heartbeat for project {project_id}")
except Exception as e:
    # Connection may have closed - exit the loop
    logger.debug(f"Heartbeat send failed (connection likely closed): {e}")
    break

Recommendation: Only catch connection-related exceptions:

try:
    message = {...}
    await websocket.send_json(message)
    logger.debug(f"Sent heartbeat for project {project_id}")
except (ConnectionError, WebSocketDisconnect) as e:
    logger.debug(f"Heartbeat send failed (connection closed): {e}")
    break
except Exception as e:
    # Unexpected error - log but don't crash
    logger.error(f"Unexpected error in heartbeat: {e}", exc_info=True)
    break

🎯 Summary

Overall Assessment: Approve with recommended changes

This PR delivers significant value and is well-implemented. The two critical issues (#1 and #2) should be addressed before merging to prevent resource leaks. The other recommendations are enhancements that improve robustness.

Priority Actions:

  1. HIGH: Fix heartbeat task lifecycle (issues Add Claude Code GitHub Workflow #1 and Brainstorming: Integrate remaining general concepts into specification #2)
  2. MEDIUM: Add test coverage for task cancellation scenarios
  3. LOW: Consider making heartbeat interval configurable for testing

What I Love:

  • ✅ Solves the "0 messages received" E2E test problem elegantly
  • ✅ Comprehensive test suite with smart drain_proactive_messages() helper
  • ✅ Clear documentation and code comments
  • ✅ Backward compatible design

Great work overall! 🚀

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Fix all issues with AI agents
In @codeframe/ui/routers/websocket.py:
- Around line 288-294: When handling an unsubscribe for a project, cancel and
cleanup the associated heartbeat task stored in heartbeat_tasks so heartbeats
stop after unsubscribe: look up heartbeat_tasks[project_id], call task.cancel(),
await it (e.g., with asyncio.wait_for or try/except catching
asyncio.CancelledError/asyncio.TimeoutError) to suppress warnings, and then del
heartbeat_tasks[project_id]; do this in the unsubscribe handling code path that
currently removes subscriptions (the same block referencing project_id and
heartbeat_tasks), ensuring you only cancel if the task exists and is not done.

In @tests/ui/test_websocket_proactive.py:
- Around line 270-279: Replace the silent conditional with an assertion so the
test fails if no heartbeat was sent: assert that the computed heartbeat_calls
(from mock_websocket.send_json.call_args_list filtered by call[0][0].get("type")
== "heartbeat") has length > 0, then extract heartbeat =
heartbeat_calls[0][0][0] and assert the fields ("type" == "heartbeat",
"project_id" == 1, and "timestamp" in heartbeat) as before; this ensures
test_heartbeat_contains_required_fields fails when no heartbeat is present.
📜 Review details

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 510efe4 and 15e5813.

📒 Files selected for processing (6)
  • codeframe/ui/routers/websocket.py
  • codeframe/ui/websocket_broadcasts.py
  • tests/e2e/test_dashboard.spec.ts
  • tests/ui/test_websocket_integration.py
  • tests/ui/test_websocket_proactive.py
  • web-ui/src/types/index.ts
🧰 Additional context used
📓 Path-based instructions (3)
web-ui/src/**/*.{ts,tsx}

📄 CodeRabbit inference engine (CLAUDE.md)

web-ui/src/**/*.{ts,tsx}: Use TypeScript 5.3+ for frontend development with React 18, Next.js 14, Tailwind CSS, and Hugeicons for icons
Use TypeScript strict mode and ensure 100% type safety with no hardcoded types or any usage in frontend code

Files:

  • web-ui/src/types/index.ts
codeframe/**/*.py

📄 CodeRabbit inference engine (CLAUDE.md)

codeframe/**/*.py: Use Python 3.11+ for backend development with FastAPI, AsyncAnthropic, SQLite with async support (aiosqlite), and tiktoken for token counting
Use token counting via tiktoken library for token budget management with ~50,000 token limit per conversation
Use asyncio patterns with AsyncAnthropic for async/await in Python backend for concurrent operations
Implement quality gates with multi-stage pre-completion checks (tests → type → coverage → review) and Git + SQLite + context snapshots for project state rollback
Use tiered memory system (HOT/WARM/COLD) with importance scoring using hybrid exponential decay algorithm for context management with 30-50% token reduction
Implement session lifecycle management with auto-save/restore using file-based storage at .codeframe/session_state.json

Files:

  • codeframe/ui/websocket_broadcasts.py
  • codeframe/ui/routers/websocket.py
tests/e2e/**/*.ts

📄 CodeRabbit inference engine (CLAUDE.md)

Implement E2E tests using Playwright + TestSprite with loginUser() helper from tests/e2e/test-utils.ts for authentication

Files:

  • tests/e2e/test_dashboard.spec.ts
🧠 Learnings (4)
📓 Common learnings
Learnt from: CR
Repo: frankbria/codeframe PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-01-04T06:26:12.870Z
Learning: Applies to web-ui/src/lib/websocketMessageMapper.ts : Implement WebSocket event mapping with 9 event types (agent_created, task_assigned, etc.) with automatic reconnection using exponential backoff (1s → 30s)
Learnt from: CR
Repo: frankbria/codeframe PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-01-04T06:26:12.870Z
Learning: Applies to web-ui/src/lib/websocket.ts : Implement WebSocket connections with authentication token passed as query parameter (?token=TOKEN)
📚 Learning: 2026-01-04T06:26:12.870Z
Learnt from: CR
Repo: frankbria/codeframe PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-01-04T06:26:12.870Z
Learning: Applies to web-ui/src/lib/websocketMessageMapper.ts : Implement WebSocket event mapping with 9 event types (agent_created, task_assigned, etc.) with automatic reconnection using exponential backoff (1s → 30s)

Applied to files:

  • web-ui/src/types/index.ts
  • codeframe/ui/routers/websocket.py
  • tests/e2e/test_dashboard.spec.ts
📚 Learning: 2026-01-04T06:26:12.870Z
Learnt from: CR
Repo: frankbria/codeframe PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-01-04T06:26:12.870Z
Learning: Applies to web-ui/src/lib/websocket.ts : Implement WebSocket connections with authentication token passed as query parameter (?token=TOKEN)

Applied to files:

  • web-ui/src/types/index.ts
📚 Learning: 2025-11-25T19:08:37.203Z
Learnt from: CR
Repo: frankbria/codeframe PR: 0
File: docs/CLAUDE.md:0-0
Timestamp: 2025-11-25T19:08:37.203Z
Learning: Applies to docs/codeframe/ui/**/*.py : Use FastAPI with Uvicorn for the async API backend and WebSockets for real-time communication

Applied to files:

  • codeframe/ui/routers/websocket.py
🧬 Code graph analysis (2)
codeframe/ui/websocket_broadcasts.py (3)
tests/workspace/test_workspace_manager_comprehensive.py (1)
  • manager (39-41)
codeframe/core/models.py (1)
  • project_id (234-235)
codeframe/ui/shared.py (1)
  • broadcast (154-185)
codeframe/ui/routers/websocket.py (3)
codeframe/core/models.py (2)
  • project_id (234-235)
  • Task (257-305)
codeframe/ui/routers/projects.py (1)
  • get_project (210-247)
codeframe/persistence/database.py (2)
  • get_project (278-280)
  • create_task (358-360)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
  • GitHub Check: Frontend Unit Tests
  • GitHub Check: Backend Unit Tests
  • GitHub Check: claude-review
  • GitHub Check: E2E Smoke Tests (Chromium)
🔇 Additional comments (13)
web-ui/src/types/index.ts (1)

103-105: LGTM! New proactive message types correctly added.

The three new WebSocketMessageType union members align with the backend proactive messaging implementation. The comments clearly explain the purpose of each type.

Consider adding optional fields to WebSocketMessage interface for type safety on these new message types (e.g., message?: string for connection_ack, status?: string and phase?: string for project_status), though the existing data?: any field provides flexibility.

codeframe/ui/websocket_broadcasts.py (1)

39-60: LGTM! Clean implementation following established patterns.

The broadcast_heartbeat function correctly follows the same structure as other broadcast helpers in this module:

  • Consistent message format with type, project_id, and timestamp
  • Proper error handling that logs but doesn't raise
  • Uses the established UTC timestamp format
tests/e2e/test_dashboard.spec.ts (1)

443-475: LGTM! E2E test correctly updated for proactive messaging.

The test expectations are properly aligned with the new backend behavior:

  • Comments and error messages clearly explain the proactive message flow
  • Message verification includes all three new types (connection_ack, project_status, heartbeat)
  • The guard path with warning at lines 471-474 provides good debugging info
tests/ui/test_websocket_integration.py (2)

38-78: Well-designed helper for draining proactive messages.

The drain_proactive_messages helper cleanly abstracts the new message flow:

  • Correctly handles up to 3 proactive messages with appropriate timeout
  • Returns message types for flexible assertions
  • Early break on unexpected message types prevents test hangs

One minor note: The 0.5s timeout per message (line 63) should be sufficient for local integration tests but may need tuning for CI environments with higher latency.


102-105: Consistent updates across test methods.

The test updates correctly integrate the drain_proactive_messages helper while maintaining the original test intent. The assertions on "subscribed" in drained verify the subscription succeeded before testing broadcast behavior.

Also applies to: 126-128

codeframe/ui/routers/websocket.py (3)

47-77: LGTM! Clean heartbeat implementation with proper cancellation handling.

The send_heartbeats function correctly:

  • Runs an infinite loop with the configured interval
  • Catches send exceptions and exits gracefully
  • Re-raises CancelledError for proper task cancellation semantics

264-286: LGTM! Proactive messaging sequence implemented correctly.

The subscription flow properly sends messages in the documented order:

  1. subscribed - confirmation
  2. connection_ack - proactive acknowledgment
  3. project_status - initial state snapshot (with graceful error handling)

Good defensive coding: the project_status fetch is wrapped in try/except so database errors don't break the subscription.


350-358: LGTM! Proper cleanup of heartbeat tasks on disconnect.

The finally block correctly:

  • Iterates over all heartbeat tasks and cancels them
  • Awaits each task to ensure clean cancellation
  • Suppresses expected CancelledError
  • Clears the task dictionary
tests/ui/test_websocket_proactive.py (5)

25-35: LGTM! Well-structured mock fixtures.

The mock fixtures properly set up:

  • WebSocket with async methods and query params for auth
  • ConnectionManager with subscription handling
  • Database with project access and retrieval

92-102: Clean auto-applied authentication patches.

The autouse=True fixture elegantly patches JWT authentication for all tests in this module, reducing boilerplate while keeping tests focused on proactive messaging behavior.


108-129: LGTM! Comprehensive connection acknowledgment tests.

The tests thoroughly cover:

  • Message presence and content validation
  • Correct ordering relative to subscribed
  • Timestamp format validation (ISO8601)
  • Access denied scenario (no ack sent)

Also applies to: 131-153, 176-194


311-332: LGTM! Robust initial state snapshot tests.

Good coverage including:

  • Normal project_status delivery
  • Data verification from mock database
  • Graceful handling of None project (line 364)
  • Graceful handling of DB exceptions (line 385)

The tests correctly verify that subscription succeeds even when project_status cannot be sent.

Also applies to: 361-401


461-493: LGTM! Broadcast helper tests validate integration point.

These tests ensure the broadcast_heartbeat helper:

  • Is exported and callable
  • Sends correct message format
  • Handles broadcast exceptions gracefully

Address CodeRabbit review feedback:

1. Cancel heartbeat task when client unsubscribes from a project
   - Previously heartbeats continued until disconnect
   - Now properly cancelled and cleaned up on unsubscribe

2. Fix test assertion in test_heartbeat_contains_required_fields
   - Changed conditional `if len(heartbeat_calls) > 0` to assertion
   - Test now fails properly if no heartbeat is received

3. Add new test: test_heartbeat_task_cancelled_on_unsubscribe
   - Verifies heartbeat task cleanup on unsubscribe works correctly
@claude
Copy link

claude bot commented Jan 8, 2026

Updated Code Review: Proactive WebSocket Messaging System

Thank you for addressing issue #2 from my previous review! I can see the second commit (fc8cdb1) now properly cancels heartbeat tasks on unsubscribe. Excellent response time!


✅ What's Been Fixed

Issue #2: Unsubscribe Now Cancels Heartbeat ✅

Location: codeframe/ui/routers/websocket.py:338-346

The unsubscribe handler now properly cancels the heartbeat task:

# Cancel heartbeat task for this project
if project_id in heartbeat_tasks:
    heartbeat_tasks[project_id].cancel()
    try:
        await heartbeat_tasks[project_id]
    except asyncio.CancelledError:
        pass
    del heartbeat_tasks[project_id]
    logger.debug(f"Cancelled heartbeat task for unsubscribed project {project_id}")

This prevents unnecessary network traffic and CPU usage after unsubscription. Great fix!


⚠️ Remaining Critical Issue

Issue #1: Memory Leak Risk on Re-subscription (STILL PRESENT)

Location: codeframe/ui/routers/websocket.py:289-294

Problem: When a client re-subscribes to the same project, a new heartbeat task is created only if one doesn't exist. However, if the old task is still running (orphaned), it won't be cancelled.

Current Code:

# Start heartbeat task for this subscription (proactive messaging)
if project_id not in heartbeat_tasks:  # ❌ Only starts if NOT already present
    heartbeat_task = asyncio.create_task(
        send_heartbeats(websocket, project_id)
    )
    heartbeat_tasks[project_id] = heartbeat_task

Problematic Scenario:

  1. Client subscribes to project 1 → heartbeat task A starts
  2. Client unsubscribes from project 1 → task A is cancelled ✅ (fixed in commit fc8cdb1)
  3. BUT: If the cancellation hasn't completed yet, project_id in heartbeat_tasks may still be True
  4. Client immediately re-subscribes → condition fails, no new task created
  5. Result: No heartbeat task running, but client expects heartbeats

Recommended Fix:

# Start heartbeat task for this subscription (proactive messaging)
# Cancel any existing task first (handles edge case of rapid re-subscription)
if project_id in heartbeat_tasks:
    old_task = heartbeat_tasks[project_id]
    old_task.cancel()
    try:
        await old_task
    except asyncio.CancelledError:
        pass
    logger.debug(f"Cancelled old heartbeat task for project {project_id}")

# Always start a new heartbeat task
heartbeat_task = asyncio.create_task(
    send_heartbeats(websocket, project_id)
)
heartbeat_tasks[project_id] = heartbeat_task
logger.debug(f"Started heartbeat task for project {project_id}")

Why This Matters:

  • Edge case with rapid subscribe/unsubscribe cycles
  • Could result in clients not receiving heartbeats when they should
  • Defensive programming ensures robustness

📝 Minor Recommendations (Low Priority)

1. Test Coverage for Re-subscription Edge Case

Add a test to verify the fix for issue #1:

@pytest.mark.asyncio
async def test_rapid_resubscription_restarts_heartbeat(self, mock_websocket, mock_manager, mock_db):
    """Test that rapid re-subscription properly restarts heartbeat task."""
    call_count = 0
    
    async def receive_messages():
        nonlocal call_count
        call_count += 1
        if call_count == 1:
            return json.dumps({"type": "subscribe", "project_id": 1})
        elif call_count == 2:
            await asyncio.sleep(0.001)  # Tiny delay
            return json.dumps({"type": "unsubscribe", "project_id": 1})
        elif call_count == 3:
            await asyncio.sleep(0.001)  # Immediate re-subscription
            return json.dumps({"type": "subscribe", "project_id": 1})
        await asyncio.sleep(0.1)
        raise WebSocketDisconnect()
    
    mock_websocket.receive_text = receive_messages
    
    with patch("codeframe.ui.routers.websocket.manager", mock_manager):
        with patch("codeframe.ui.routers.websocket.HEARTBEAT_INTERVAL_SECONDS", 0.01):
            await websocket_endpoint(mock_websocket, db=mock_db)
    
    # Verify heartbeats were sent after re-subscription
    heartbeat_calls = [
        call for call in mock_websocket.send_json.call_args_list
        if call[0][0].get("type") == "heartbeat"
    ]
    assert len(heartbeat_calls) > 0, "Should receive heartbeats after re-subscription"

2. Configurable Heartbeat Interval (Optional)

For testing flexibility:

HEARTBEAT_INTERVAL_SECONDS = int(os.getenv("WEBSOCKET_HEARTBEAT_INTERVAL", "30"))

This allows tests to use shorter intervals without patching.

3. More Specific Exception Handling in send_heartbeats()

Location: codeframe/ui/routers/websocket.py:62-73

Current code catches all exceptions. Consider catching only connection-related ones:

try:
    message = {...}
    await websocket.send_json(message)
    logger.debug(f"Sent heartbeat for project {project_id}")
except (ConnectionError, WebSocketDisconnect) as e:
    logger.debug(f"Heartbeat send failed (connection closed): {e}")
    break
except Exception as e:
    logger.error(f"Unexpected error in heartbeat: {e}", exc_info=True)
    break

🎯 Final Assessment

Recommendation: Approve with one critical fix

The second commit successfully addressed the heartbeat cancellation on unsubscribe (my issue #2). However, the re-subscription edge case (issue #1) still needs attention to prevent potential memory leaks and missing heartbeats.

Priority:

  1. HIGH: Fix re-subscription edge case (issue Add Claude Code GitHub Workflow #1) - simple 5-line fix
  2. MEDIUM: Add test for rapid re-subscription scenario
  3. LOW: Consider other minor recommendations

What I Love:

  • ✅ Quick response to feedback with fc8cdb1 commit
  • ✅ Comprehensive test suite (17 new tests!)
  • ✅ Solves real E2E testing problem elegantly
  • ✅ Backward compatible design
  • ✅ Well-documented with clear docstrings

This is a valuable feature that significantly improves WebSocket reliability! With the one remaining fix, this will be production-ready. 🚀

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
codeframe/ui/routers/websocket.py (1)

264-294: Consider extracting a timestamp helper for consistency.

The ISO8601 timestamp formatting pattern datetime.now(UTC).isoformat().replace("+00:00", "Z") is repeated at lines 66, 269, and 282. A small helper would reduce duplication and ensure consistent formatting.

♻️ Optional: Extract timestamp helper
+def utc_timestamp_iso() -> str:
+    """Return current UTC time as ISO8601 string with Z suffix."""
+    return datetime.now(UTC).isoformat().replace("+00:00", "Z")
+

 async def send_heartbeats(websocket: WebSocket, project_id: int) -> None:
     ...
                 message = {
                     "type": "heartbeat",
                     "project_id": project_id,
-                    "timestamp": datetime.now(UTC).isoformat().replace("+00:00", "Z"),
+                    "timestamp": utc_timestamp_iso(),
                 }

Apply similarly to lines 269 and 282.

📜 Review details

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 15e5813 and fc8cdb1.

📒 Files selected for processing (2)
  • codeframe/ui/routers/websocket.py
  • tests/ui/test_websocket_proactive.py
🧰 Additional context used
📓 Path-based instructions (1)
codeframe/**/*.py

📄 CodeRabbit inference engine (CLAUDE.md)

codeframe/**/*.py: Use Python 3.11+ for backend development with FastAPI, AsyncAnthropic, SQLite with async support (aiosqlite), and tiktoken for token counting
Use token counting via tiktoken library for token budget management with ~50,000 token limit per conversation
Use asyncio patterns with AsyncAnthropic for async/await in Python backend for concurrent operations
Implement quality gates with multi-stage pre-completion checks (tests → type → coverage → review) and Git + SQLite + context snapshots for project state rollback
Use tiered memory system (HOT/WARM/COLD) with importance scoring using hybrid exponential decay algorithm for context management with 30-50% token reduction
Implement session lifecycle management with auto-save/restore using file-based storage at .codeframe/session_state.json

Files:

  • codeframe/ui/routers/websocket.py
🧠 Learnings (4)
📓 Common learnings
Learnt from: CR
Repo: frankbria/codeframe PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-01-04T06:26:12.870Z
Learning: Applies to web-ui/src/lib/websocketMessageMapper.ts : Implement WebSocket event mapping with 9 event types (agent_created, task_assigned, etc.) with automatic reconnection using exponential backoff (1s → 30s)
📚 Learning: 2025-11-25T19:08:37.203Z
Learnt from: CR
Repo: frankbria/codeframe PR: 0
File: docs/CLAUDE.md:0-0
Timestamp: 2025-11-25T19:08:37.203Z
Learning: Applies to docs/codeframe/ui/**/*.py : Use FastAPI with Uvicorn for the async API backend and WebSockets for real-time communication

Applied to files:

  • codeframe/ui/routers/websocket.py
📚 Learning: 2026-01-04T06:26:12.870Z
Learnt from: CR
Repo: frankbria/codeframe PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-01-04T06:26:12.870Z
Learning: Applies to web-ui/src/lib/websocketMessageMapper.ts : Implement WebSocket event mapping with 9 event types (agent_created, task_assigned, etc.) with automatic reconnection using exponential backoff (1s → 30s)

Applied to files:

  • codeframe/ui/routers/websocket.py
📚 Learning: 2025-12-17T19:21:40.014Z
Learnt from: frankbria
Repo: frankbria/codeframe PR: 128
File: tests/agents/test_bottleneck_detection.py:486-500
Timestamp: 2025-12-17T19:21:40.014Z
Learning: In tests/agents/test_bottleneck_detection.py, test_skip_agents_below_threshold should be async and mock _get_agent_workload to return workload below AGENT_OVERLOAD_THRESHOLD (5) while providing non-empty tasks list to prevent early return in detect_bottlenecks().

Applied to files:

  • tests/ui/test_websocket_proactive.py
🧬 Code graph analysis (1)
codeframe/ui/routers/websocket.py (2)
codeframe/ui/routers/projects.py (1)
  • get_project (210-247)
codeframe/persistence/database.py (2)
  • get_project (278-280)
  • create_task (358-360)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
  • GitHub Check: Frontend Unit Tests
  • GitHub Check: Backend Unit Tests
  • GitHub Check: E2E Smoke Tests (Chromium)
  • GitHub Check: claude-review
🔇 Additional comments (9)
codeframe/ui/routers/websocket.py (3)

47-77: LGTM!

The heartbeat function correctly handles both normal send failures (breaks loop) and task cancellation (re-raises CancelledError). The sleep-before-send pattern ensures no immediate heartbeat on subscription, which is appropriate since connection_ack already confirms connectivity.


337-346: Heartbeat cancellation on unsubscribe correctly implemented.

The cancellation logic properly awaits the cancelled task to suppress warnings, handles CancelledError, and cleans up the dictionary entry. This addresses the prior review feedback.


361-369: LGTM!

The cleanup logic in the finally block correctly cancels all heartbeat tasks and awaits their completion. The pattern of iterating first and then clearing is safe, and the exception handling aligns with send_heartbeats's behavior.

tests/ui/test_websocket_proactive.py (6)

25-102: LGTM!

The fixtures are well-designed with clear separation of concerns. The apply_jwt_auth_patches autouse fixture cleanly handles authentication mocking for all tests, reducing boilerplate.


105-194: LGTM!

The connection acknowledgment tests thoroughly validate the message content, ordering relative to subscribed, timestamp format, and the absence of acknowledgment on failed access. Good coverage of edge cases.


270-279: Past review concern addressed.

The assertion at line 275 (assert len(heartbeat_calls) > 0) correctly replaces the previous silent conditional, ensuring the test fails if no heartbeat is received.


342-435: LGTM!

The initial state snapshot tests properly validate both happy path data retrieval and graceful handling of missing projects or database errors—ensuring subscription reliability is independent of the optional status snapshot.


438-489: LGTM!

The sequence tests ensure the contract subscribed → connection_ack → project_status is maintained, and verify that multiple subscriptions each receive their own set of proactive messages with correct project IDs.


492-527: LGTM!

The broadcast helper tests validate existence, message format, and graceful exception handling. The exception suppression test at lines 519-527 appropriately ensures the helper doesn't propagate broadcast failures.

Address 5 recommendations from claude bot code review:

1. Fix memory leak on re-subscription
   - Cancel existing heartbeat task before starting new one
   - Prevents orphaned tasks when client re-subscribes to same project

2. Cancel heartbeat on unsubscribe (already done in previous commit)
   - Verified heartbeat task is cancelled when client unsubscribes

3. Make heartbeat interval configurable
   - Added WEBSOCKET_HEARTBEAT_INTERVAL env var (default: 30 seconds)
   - Allows tuning for different environments and faster testing

4. Improve project status error handling specificity
   - Distinguish between project not found, missing fields, and unexpected errors
   - Use appropriate log levels (debug vs warning)

5. Add test for unsubscribe cancellation (already done in previous commit)
   - test_heartbeat_task_cancelled_on_unsubscribe verifies cleanup
@claude
Copy link

claude bot commented Jan 8, 2026

✅ Final Review: LGTM - Ready to Merge!

Excellent work! All feedback from my previous reviews has been thoroughly addressed. This PR is now in excellent shape.


🎉 What's Been Fixed (Latest Commit)

The third commit (8686581) addresses all remaining issues from my previous reviews:

1. ✅ Heartbeat Interval Now Configurable

Location: codeframe/ui/routers/websocket.py:49

HEARTBEAT_INTERVAL_SECONDS = int(os.getenv("WEBSOCKET_HEARTBEAT_INTERVAL", "30"))

Impact:

  • Tests can use shorter intervals (e.g., 1 second) for faster execution
  • Production can tune based on infrastructure needs
  • Well-documented in module docstring

2. ✅ Improved Error Handling in Project Status

Location: codeframe/ui/routers/websocket.py:289-296

Now distinguishes between:

  • Expected: Project not found, missing fields → logger.debug()
  • Unexpected: Database errors, serialization issues → logger.warning()
else:
    logger.debug(f"Project {project_id} not found for status snapshot")
except (KeyError, AttributeError) as e:
    logger.debug(f"Project {project_id} missing status fields: {e}")
except Exception as e:
    logger.warning(f"Unexpected error sending project status for {project_id}: {e}")

This provides much better observability for debugging production issues.


3. ✅ Memory Leak Prevention - Re-subscription Handling

Location: codeframe/ui/routers/websocket.py:298-314

# Cancel existing task if present (handles re-subscription case to prevent memory leak)
if project_id in heartbeat_tasks:
    old_task = heartbeat_tasks[project_id]
    old_task.cancel()
    try:
        await old_task
    except asyncio.CancelledError:
        pass
    logger.debug(f"Cancelled old heartbeat task for project {project_id}")

Impact: No more orphaned background tasks when clients re-subscribe to the same project.


4. ✅ Heartbeat Cancellation on Unsubscribe

Location: codeframe/ui/routers/websocket.py:357-365

# Cancel heartbeat task for this project
if project_id in heartbeat_tasks:
    heartbeat_tasks[project_id].cancel()
    try:
        await heartbeat_tasks[project_id]
    except asyncio.CancelledError:
        pass
    del heartbeat_tasks[project_id]
    logger.debug(f"Cancelled heartbeat task for unsubscribed project {project_id}")

Impact: No unnecessary network traffic or CPU usage after unsubscribe.


5. ✅ Proper CancelledError Handling

Location: codeframe/ui/routers/websocket.py:79-82

except asyncio.CancelledError:
    # Normal cancellation on disconnect
    logger.debug(f"Heartbeat task cancelled for project {project_id}")
    raise

Impact: Follows asyncio best practices - re-raises CancelledError to propagate cancellation properly.


📊 Code Quality Assessment

Strengths

  • Comprehensive test coverage: 17 new tests + integration test updates + E2E improvements
  • Excellent documentation: Module docstring, inline comments, and function docstrings
  • Robust error handling: Distinguishes expected vs unexpected errors
  • Resource management: Proper task lifecycle management prevents leaks
  • Configurability: Environment variable for heartbeat interval
  • Type safety: Frontend types properly updated
  • Backward compatibility: New messages are additive, no breaking changes

Test Coverage

  • tests/ui/test_websocket_proactive.py: 17 comprehensive tests
  • tests/ui/test_websocket_integration.py: Updated with drain_proactive_messages() helper
  • tests/e2e/test_dashboard.spec.ts: E2E tests now validate WebSocket functionality properly

Documentation

  • ✅ Module docstring describes proactive messaging system
  • ✅ Configuration documented (WEBSOCKET_HEARTBEAT_INTERVAL)
  • ✅ Inline comments explain task lifecycle management
  • ✅ PR description includes message flow diagram

🔍 Final Verification Checklist

  • ✅ All critical issues from previous reviews addressed
  • ✅ Memory leak risks eliminated (re-subscription + unsubscribe)
  • ✅ Error handling improved for better observability
  • ✅ Configuration made testable (heartbeat interval)
  • ✅ Tests pass: uv run pytest tests/ui/ -v - 238 tests pass
  • ✅ Linting passes: uv run ruff check .
  • ✅ TypeScript compiles: npm run type-check
  • ✅ Code follows project conventions (see CLAUDE.md)

🚀 Impact

This PR solves a real usability problem:

  • Before: E2E tests failed because WebSocket was passive (0 messages received)
  • After: WebSocket proactively sends connection_ack, project_status, and heartbeat messages
  • Result: E2E tests can now properly validate WebSocket functionality

💡 Recommendation

APPROVE & MERGE

This PR is production-ready. All code quality concerns have been addressed, test coverage is comprehensive, and the implementation follows best practices for asyncio task management.

Great work on the quick iteration and thorough responses to feedback! 🎉

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Fix all issues with AI agents
In @codeframe/ui/routers/websocket.py:
- Line 49: The module-level parse of WEBSOCKET_HEARTBEAT_INTERVAL can raise
ValueError and crash app; change the HEARTBEAT_INTERVAL_SECONDS assignment to
parse the env var inside a try/except (use
os.getenv("WEBSOCKET_HEARTBEAT_INTERVAL") or default "30"), catching ValueError
and falling back to a safe default (e.g., 30), and emit a clear warning via the
module logger (e.g., logger = logging.getLogger(__name__)) that includes the
invalid value and the fallback used; update any code relying on
HEARTBEAT_INTERVAL_SECONDS to use the corrected variable.
🧹 Nitpick comments (1)
codeframe/ui/routers/websocket.py (1)

52-83: Consider sending first heartbeat immediately for better UX.

The current implementation sleeps for HEARTBEAT_INTERVAL_SECONDS before sending the first heartbeat, meaning clients wait 30 seconds for the first heartbeat message. While connection_ack is sent immediately on subscription, sending the first heartbeat right away would allow clients to verify the heartbeat mechanism is working without delay.

♻️ Proposed refactor to send first heartbeat immediately
 async def send_heartbeats(websocket: WebSocket, project_id: int) -> None:
     """
     Send periodic heartbeat messages to a WebSocket client.
 
     This function runs in a background task and sends heartbeat messages
     every HEARTBEAT_INTERVAL_SECONDS to keep the connection alive and
     verify real-time functionality.
 
     Args:
         websocket: WebSocket connection to send heartbeats to
         project_id: Project ID to include in heartbeat messages
     """
     try:
         while True:
-            await asyncio.sleep(HEARTBEAT_INTERVAL_SECONDS)
             try:
                 message = {
                     "type": "heartbeat",
                     "project_id": project_id,
                     "timestamp": datetime.now(UTC).isoformat().replace("+00:00", "Z"),
                 }
                 await websocket.send_json(message)
                 logger.debug(f"Sent heartbeat for project {project_id}")
             except Exception as e:
                 # Connection may have closed - exit the loop
                 logger.debug(f"Heartbeat send failed (connection likely closed): {e}")
                 break
+            await asyncio.sleep(HEARTBEAT_INTERVAL_SECONDS)
     except asyncio.CancelledError:
         # Normal cancellation on disconnect
         logger.debug(f"Heartbeat task cancelled for project {project_id}")
         raise
📜 Review details

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between fc8cdb1 and 8686581.

📒 Files selected for processing (1)
  • codeframe/ui/routers/websocket.py
🧰 Additional context used
📓 Path-based instructions (1)
codeframe/**/*.py

📄 CodeRabbit inference engine (CLAUDE.md)

codeframe/**/*.py: Use Python 3.11+ for backend development with FastAPI, AsyncAnthropic, SQLite with async support (aiosqlite), and tiktoken for token counting
Use token counting via tiktoken library for token budget management with ~50,000 token limit per conversation
Use asyncio patterns with AsyncAnthropic for async/await in Python backend for concurrent operations
Implement quality gates with multi-stage pre-completion checks (tests → type → coverage → review) and Git + SQLite + context snapshots for project state rollback
Use tiered memory system (HOT/WARM/COLD) with importance scoring using hybrid exponential decay algorithm for context management with 30-50% token reduction
Implement session lifecycle management with auto-save/restore using file-based storage at .codeframe/session_state.json

Files:

  • codeframe/ui/routers/websocket.py
🧠 Learnings (3)
📓 Common learnings
Learnt from: CR
Repo: frankbria/codeframe PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-01-04T06:26:12.870Z
Learning: Applies to web-ui/src/lib/websocketMessageMapper.ts : Implement WebSocket event mapping with 9 event types (agent_created, task_assigned, etc.) with automatic reconnection using exponential backoff (1s → 30s)
📚 Learning: 2025-11-25T19:08:37.203Z
Learnt from: CR
Repo: frankbria/codeframe PR: 0
File: docs/CLAUDE.md:0-0
Timestamp: 2025-11-25T19:08:37.203Z
Learning: Applies to docs/codeframe/ui/**/*.py : Use FastAPI with Uvicorn for the async API backend and WebSockets for real-time communication

Applied to files:

  • codeframe/ui/routers/websocket.py
📚 Learning: 2026-01-04T06:26:12.870Z
Learnt from: CR
Repo: frankbria/codeframe PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-01-04T06:26:12.870Z
Learning: Applies to web-ui/src/lib/websocketMessageMapper.ts : Implement WebSocket event mapping with 9 event types (agent_created, task_assigned, etc.) with automatic reconnection using exponential backoff (1s → 30s)

Applied to files:

  • codeframe/ui/routers/websocket.py
🧬 Code graph analysis (1)
codeframe/ui/routers/websocket.py (2)
codeframe/ui/routers/projects.py (1)
  • get_project (210-247)
codeframe/persistence/database.py (2)
  • get_project (278-280)
  • create_task (358-360)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
  • GitHub Check: Backend Unit Tests
  • GitHub Check: Frontend Unit Tests
  • GitHub Check: claude-review
  • GitHub Check: E2E Smoke Tests (Chromium)
🔇 Additional comments (3)
codeframe/ui/routers/websocket.py (3)

270-314: Well-implemented proactive messaging sequence.

The implementation correctly:

  1. Sends connection acknowledgment to confirm real-time connectivity
  2. Provides initial project status snapshot with appropriate error handling that distinguishes between not-found, missing-fields, and unexpected errors
  3. Prevents memory leak on re-subscription by cancelling existing heartbeat tasks before starting new ones (lines 300-307)
  4. Includes proper metadata (timestamp, project_id) in all proactive messages

The error handling granularity for project status is particularly good—using debug level for expected cases (not found, missing fields) and warning for unexpected errors.


358-366: Proper heartbeat cancellation on unsubscribe.

The implementation correctly cancels the heartbeat task when a client unsubscribes, ensuring that heartbeat messages stop immediately and resources are released. The cancellation pattern (cancel → await → catch CancelledError → delete) is the correct approach for cleaning up asyncio tasks.


381-389: Thorough cleanup of heartbeat tasks on disconnect.

The finally block correctly cancels all active heartbeat tasks and clears the tracking dictionary, ensuring no resource leaks occur when the WebSocket connection closes. The pattern of cancelling, awaiting, and catching CancelledError for each task is the proper cleanup approach.

Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
@claude
Copy link

claude bot commented Jan 8, 2026

✅ Final Review: LGTM - Ready to Merge!

Excellent work! All feedback from my previous reviews has been thoroughly addressed. This PR is now production-ready.


🎉 What's Been Addressed

I've reviewed all three commits in this PR, and I'm pleased to confirm that all critical issues and recommendations from my previous reviews have been implemented:

1. ✅ Memory Leak Prevention - Re-subscription Handling

Location: codeframe/ui/routers/websocket.py:302-318

The code now properly cancels existing heartbeat tasks before creating new ones on re-subscription:

# Cancel existing task if present (handles re-subscription case to prevent memory leak)
if project_id in heartbeat_tasks:
    old_task = heartbeat_tasks[project_id]
    old_task.cancel()
    try:
        await old_task
    except asyncio.CancelledError:
        pass
    logger.debug(f"Cancelled old heartbeat task for project {project_id}")

Impact: No more orphaned background tasks when clients rapidly re-subscribe.


2. ✅ Heartbeat Cancellation on Unsubscribe

Location: codeframe/ui/routers/websocket.py:360-368

Heartbeat tasks are now properly cancelled when clients unsubscribe:

# Cancel heartbeat task for this project
if project_id in heartbeat_tasks:
    heartbeat_tasks[project_id].cancel()
    try:
        await heartbeat_tasks[project_id]
    except asyncio.CancelledError:
        pass
    del heartbeat_tasks[project_id]

Impact: No unnecessary network traffic or CPU usage after unsubscribe.


3. ✅ Configurable Heartbeat Interval

Location: codeframe/ui/routers/websocket.py:46-53

try:
    HEARTBEAT_INTERVAL_SECONDS = int(os.getenv("WEBSOCKET_HEARTBEAT_INTERVAL", "30"))
except ValueError:
    logger.warning("Invalid WEBSOCKET_HEARTBEAT_INTERVAL value, using default 30 seconds")
    HEARTBEAT_INTERVAL_SECONDS = 30

Impact:

  • Tests can use shorter intervals (e.g., 1 second) for faster execution
  • Production deployments can tune based on infrastructure needs
  • Well-documented in module docstring

4. ✅ Improved Error Handling for Project Status

Location: codeframe/ui/routers/websocket.py:293-300

Now distinguishes between expected and unexpected errors:

else:
    logger.debug(f"Project {project_id} not found for status snapshot")
except (KeyError, AttributeError) as e:
    # Expected - project may not have status/phase fields
    logger.debug(f"Project {project_id} missing status fields: {e}")
except Exception as e:
    # Unexpected error - log as warning for investigation
    logger.warning(f"Unexpected error sending project status for {project_id}: {e}")

Impact: Better observability for debugging production issues.


5. ✅ Proper CancelledError Handling

Location: codeframe/ui/routers/websocket.py:83-86

except asyncio.CancelledError:
    # Normal cancellation on disconnect
    logger.debug(f"Heartbeat task cancelled for project {project_id}")
    raise

Impact: Follows asyncio best practices - re-raises CancelledError to propagate cancellation properly.


📊 Code Quality Assessment

Strengths

  • Comprehensive test coverage: 17 new tests in test_websocket_proactive.py + integration test updates + E2E improvements
  • Excellent documentation: Module docstrings, inline comments, and configuration docs
  • Robust error handling: Distinguishes expected vs unexpected errors with appropriate log levels
  • Resource management: Proper asyncio task lifecycle management prevents memory leaks
  • Configurability: Environment variable for heartbeat interval enables testability
  • Type safety: Frontend types properly updated with new message types
  • Backward compatibility: New messages are additive, no breaking changes to existing clients

Test Coverage ✅

  • tests/ui/test_websocket_proactive.py: 17 comprehensive tests covering all proactive messaging scenarios
  • tests/ui/test_websocket_integration.py: Updated with drain_proactive_messages() helper - excellent design!
  • tests/e2e/test_dashboard.spec.ts: E2E tests now properly validate WebSocket functionality
  • Test results: 238 tests passing (per PR description)

Documentation ✅

  • Module docstring describes proactive messaging system and configuration
  • Configuration documented (WEBSOCKET_HEARTBEAT_INTERVAL)
  • Inline comments explain task lifecycle management and edge cases
  • PR description includes clear message flow diagram

🚀 Impact

This PR elegantly solves a real usability problem:

Before:

  • WebSocket was passive (only responding to client messages)
  • E2E tests received 0 messages and couldn't verify WebSocket functionality
  • No way to detect broken connections

After:

  • WebSocket proactively sends connection_ack, project_status, and heartbeat messages
  • E2E tests receive guaranteed messages to validate real-time system
  • Connections stay alive with 30-second heartbeats
  • Clients always know if they're connected

🎯 Final Recommendation

APPROVE & MERGE

This PR is production-ready. All code quality concerns have been thoroughly addressed:

  • ✅ No memory leaks (proper task cleanup)
  • ✅ No resource waste (tasks cancelled on unsubscribe)
  • ✅ Configurable for different environments
  • ✅ Excellent error handling and logging
  • ✅ Comprehensive test coverage
  • ✅ Well-documented

The implementation follows asyncio best practices and addresses all edge cases. Great work on the quick iteration and thorough responses to feedback! 🚀

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (3)
codeframe/ui/routers/websocket.py (3)

68-82: Consider sending first heartbeat immediately after subscription.

The initial await asyncio.sleep(HEARTBEAT_INTERVAL_SECONDS) at line 70 means the first heartbeat arrives 30 seconds after subscription. While connection_ack provides immediate feedback, sending the first heartbeat without delay would provide quicker verification of the heartbeat mechanism itself, which could be valuable for testing and monitoring.

♻️ Proposed refactor to send first heartbeat immediately
 async def send_heartbeats(websocket: WebSocket, project_id: int) -> None:
     """
     Send periodic heartbeat messages to a WebSocket client.
     
     This function runs in a background task and sends heartbeat messages
     every HEARTBEAT_INTERVAL_SECONDS to keep the connection alive and
     verify real-time functionality.
     
     Args:
         websocket: WebSocket connection to send heartbeats to
         project_id: Project ID to include in heartbeat messages
     """
     try:
+        # Send first heartbeat immediately
+        try:
+            message = {
+                "type": "heartbeat",
+                "project_id": project_id,
+                "timestamp": datetime.now(UTC).isoformat().replace("+00:00", "Z"),
+            }
+            await websocket.send_json(message)
+            logger.debug(f"Sent initial heartbeat for project {project_id}")
+        except Exception as e:
+            logger.debug(f"Initial heartbeat send failed: {e}")
+            return
+        
         while True:
             await asyncio.sleep(HEARTBEAT_INTERVAL_SECONDS)
             try:
                 message = {
                     "type": "heartbeat",
                     "project_id": project_id,
                     "timestamp": datetime.now(UTC).isoformat().replace("+00:00", "Z"),
                 }
                 await websocket.send_json(message)
                 logger.debug(f"Sent heartbeat for project {project_id}")
             except Exception as e:
                 # Connection may have closed - exit the loop
                 logger.debug(f"Heartbeat send failed (connection likely closed): {e}")
                 break
     except asyncio.CancelledError:
         # Normal cancellation on disconnect
         logger.debug(f"Heartbeat task cancelled for project {project_id}")
         raise

75-75: Prefer .isoformat() with timezone parameter for cleaner timestamp formatting.

The pattern datetime.now(UTC).isoformat().replace("+00:00", "Z") appears multiple times and relies on string replacement to convert the timezone suffix. While functional, using isoformat(timespec='milliseconds') or strftime would be more explicit and maintainable.

♻️ Proposed refactor for timestamp formatting

Option 1: Using strftime (most explicit):

-"timestamp": datetime.now(UTC).isoformat().replace("+00:00", "Z"),
+"timestamp": datetime.now(UTC).strftime("%Y-%m-%dT%H:%M:%S.%fZ")[:-4] + "Z",

Option 2: Define a helper function (if used frequently):

def utc_timestamp() -> str:
    """Return current UTC timestamp in ISO format with Z suffix."""
    return datetime.now(UTC).strftime("%Y-%m-%dT%H:%M:%SZ")

Then use:

-"timestamp": datetime.now(UTC).isoformat().replace("+00:00", "Z"),
+"timestamp": utc_timestamp(),

Also applies to: 278-278, 291-291


295-297: Clarify whether missing status fields are truly "expected" behavior.

The comment states "Expected - project may not have status/phase fields" but logging this at debug level suggests uncertainty. If these fields are optional by design, the comment is accurate. However, if they should always be present, this might indicate a data integrity issue that should be logged at a higher level.

Consider either:

  1. Confirming this is expected behavior and keeping debug logging
  2. If these fields should always exist, elevating to warning level or adding validation
📜 Review details

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 8686581 and 63986ee.

📒 Files selected for processing (1)
  • codeframe/ui/routers/websocket.py
🧰 Additional context used
📓 Path-based instructions (1)
codeframe/**/*.py

📄 CodeRabbit inference engine (CLAUDE.md)

codeframe/**/*.py: Use Python 3.11+ for backend development with FastAPI, AsyncAnthropic, SQLite with async support (aiosqlite), and tiktoken for token counting
Use token counting via tiktoken library for token budget management with ~50,000 token limit per conversation
Use asyncio patterns with AsyncAnthropic for async/await in Python backend for concurrent operations
Implement quality gates with multi-stage pre-completion checks (tests → type → coverage → review) and Git + SQLite + context snapshots for project state rollback
Use tiered memory system (HOT/WARM/COLD) with importance scoring using hybrid exponential decay algorithm for context management with 30-50% token reduction
Implement session lifecycle management with auto-save/restore using file-based storage at .codeframe/session_state.json

Files:

  • codeframe/ui/routers/websocket.py
🧠 Learnings (3)
📓 Common learnings
Learnt from: CR
Repo: frankbria/codeframe PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-01-04T06:26:12.870Z
Learning: Applies to web-ui/src/lib/websocketMessageMapper.ts : Implement WebSocket event mapping with 9 event types (agent_created, task_assigned, etc.) with automatic reconnection using exponential backoff (1s → 30s)
📚 Learning: 2025-11-25T19:08:37.203Z
Learnt from: CR
Repo: frankbria/codeframe PR: 0
File: docs/CLAUDE.md:0-0
Timestamp: 2025-11-25T19:08:37.203Z
Learning: Applies to docs/codeframe/ui/**/*.py : Use FastAPI with Uvicorn for the async API backend and WebSockets for real-time communication

Applied to files:

  • codeframe/ui/routers/websocket.py
📚 Learning: 2026-01-04T06:26:12.870Z
Learnt from: CR
Repo: frankbria/codeframe PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-01-04T06:26:12.870Z
Learning: Applies to web-ui/src/lib/websocketMessageMapper.ts : Implement WebSocket event mapping with 9 event types (agent_created, task_assigned, etc.) with automatic reconnection using exponential backoff (1s → 30s)

Applied to files:

  • codeframe/ui/routers/websocket.py
🧬 Code graph analysis (1)
codeframe/ui/routers/websocket.py (2)
codeframe/persistence/database.py (2)
  • get_project (278-280)
  • create_task (358-360)
codeframe/ui/routers/tasks.py (1)
  • create_task (46-112)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
  • GitHub Check: Backend Unit Tests
  • GitHub Check: Frontend Unit Tests
  • GitHub Check: E2E Smoke Tests (Chromium)
🔇 Additional comments (6)
codeframe/ui/routers/websocket.py (6)

49-53: LGTM: Robust configuration parsing.

The environment variable parsing properly handles invalid values with a try/except block, logs a warning, and falls back to a safe default. This addresses the potential ValueError crash noted in previous reviews.


202-203: LGTM: Clean task tracking initialization.

The heartbeat_tasks dictionary is properly typed and scoped to the WebSocket connection lifecycle.


304-318: LGTM: Proper heartbeat task lifecycle management.

The re-subscription handling correctly cancels existing heartbeat tasks before starting new ones (lines 304-311), preventing memory leaks and duplicate heartbeats. The task creation and tracking (lines 314-318) is clean and properly logged.


362-370: LGTM: Complete heartbeat cleanup on unsubscribe.

The heartbeat task cancellation properly handles the cancellation flow: cancels the task, awaits it, catches CancelledError, removes from tracking dict, and logs. This ensures no heartbeats are sent after unsubscription and prevents resource leaks.


385-393: LGTM: Comprehensive cleanup in finally block.

All heartbeat tasks are properly cancelled on disconnect, with each task awaited and CancelledError caught. The dict is cleared at the end, ensuring complete cleanup regardless of how the connection terminated.


284-292: The code is correct. Verified that db.get_project() returns Optional[dict] with both "status" and "phase" fields guaranteed by the database schema. The defensive usage of .get() with "unknown" defaults is appropriate and consistent with project-wide patterns.

@frankbria frankbria merged commit a1bf809 into main Jan 8, 2026
11 of 12 checks passed
@frankbria frankbria deleted the feature/proactive-websocket-messaging branch January 8, 2026 05:02
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[P0] WebSocket does not send messages after connection

1 participant