Skip to content

v0.9.0 compliance: full upgrade against core/go reference#1

Open
Snider wants to merge 140 commits intomainfrom
dev
Open

v0.9.0 compliance: full upgrade against core/go reference#1
Snider wants to merge 140 commits intomainfrom
dev

Conversation

@Snider
Copy link
Copy Markdown
Contributor

@Snider Snider commented Apr 28, 2026

Brings this repo to verdict: COMPLIANT against the v0.9.0 audit.

🤖 Generated with Claude Code + Codex
Co-Authored-By: Codex noreply@openai.com
Co-Authored-By: Virgil virgil@lethean.io

Summary by CodeRabbit

Release Notes

  • New Features

    • Redis adapter now uses pub/sub transport for direct cross-hub message delivery.
    • TCP and ZMQ adapters enhanced with improved connection handshakes and authentication.
    • SSE and WebSocket adapters now support per-channel handlers for more granular control.
    • Core hub API adds publish/broadcast methods with source tracking to prevent message echo.
  • Refactor

    • Stream hub architecture migrated to queue-based message delivery for improved concurrency.
    • Authentication system refactored for consistent error handling and claim normalisation.
  • Documentation

    • Added extensive examples for all adapters and core functionality.
    • New compatibility layer for legacy WebSocket API.
  • Tests

    • Comprehensive test coverage added across all adapters and core stream functionality.

Virgil added 30 commits April 4, 2026 18:17
Co-Authored-By: Virgil <virgil@lethean.io>
Co-Authored-By: Virgil <virgil@lethean.io>
Co-Authored-By: Virgil <virgil@lethean.io>
Co-Authored-By: Virgil <virgil@lethean.io>
Co-Authored-By: Virgil <virgil@lethean.io>
Co-Authored-By: Virgil <virgil@lethean.io>
Co-Authored-By: Virgil <virgil@lethean.io>
Co-Authored-By: Virgil <virgil@lethean.io>
Co-Authored-By: Virgil <virgil@lethean.io>
Co-Authored-By: Virgil <virgil@lethean.io>
Co-Authored-By: Virgil <virgil@lethean.io>
Co-Authored-By: Virgil <virgil@lethean.io>
Co-Authored-By: Virgil <virgil@lethean.io>
Co-Authored-By: Virgil <virgil@lethean.io>
Co-Authored-By: Virgil <virgil@lethean.io>
Co-Authored-By: Virgil <virgil@lethean.io>
Co-Authored-By: Virgil <virgil@lethean.io>
Co-Authored-By: Virgil <virgil@lethean.io>
Co-Authored-By: Virgil <virgil@lethean.io>
Co-Authored-By: Virgil <virgil@lethean.io>
Co-Authored-By: Virgil <virgil@lethean.io>
Co-Authored-By: Virgil <virgil@lethean.io>
Co-Authored-By: Virgil <virgil@lethean.io>
Co-Authored-By: Virgil <virgil@lethean.io>
Co-Authored-By: Virgil <virgil@lethean.io>
Co-Authored-By: Virgil <virgil@lethean.io>
Co-Authored-By: Virgil <virgil@lethean.io>
Co-Authored-By: Virgil <virgil@lethean.io>
Virgil and others added 28 commits April 5, 2026 02:30
Co-Authored-By: Virgil <virgil@lethean.io>
Co-Authored-By: Virgil <virgil@lethean.io>
Co-Authored-By: Virgil <virgil@lethean.io>
Co-Authored-By: Virgil <virgil@lethean.io>
Co-Authored-By: Virgil <virgil@lethean.io>
Co-Authored-By: Virgil <virgil@lethean.io>
Co-Authored-By: Virgil <virgil@lethean.io>
Co-Authored-By: Virgil <virgil@lethean.io>
Co-Authored-By: Virgil <virgil@lethean.io>
Co-Authored-By: Virgil <virgil@lethean.io>
Co-Authored-By: Virgil <virgil@lethean.io>
Co-Authored-By: Virgil <virgil@lethean.io>
Co-Authored-By: Virgil <virgil@lethean.io>
Co-Authored-By: Virgil <virgil@lethean.io>
Co-Authored-By: Virgil <virgil@lethean.io>
Co-Authored-By: Virgil <virgil@lethean.io>
Co-Authored-By: Virgil <virgil@lethean.io>
Co-Authored-By: Virgil <virgil@lethean.io>
Co-Authored-By: Virgil <virgil@lethean.io>
Co-Authored-By: Virgil <virgil@lethean.io>
Co-Authored-By: Virgil <virgil@lethean.io>
Co-Authored-By: Virgil <virgil@lethean.io>
Co-Authored-By: Virgil <virgil@lethean.io>
Co-Authored-By: Virgil <virgil@lethean.io>
Add stream_test.go and stats_test.go with full Good/Bad/Ugly coverage
for ConnectionState, Envelope, Peer, Pipe, encodeTCPFrame, cloneFrame,
onceFunction, randomUUID, HubStats, PeerCount, ChannelCount,
ChannelSubscriberCount, AllPeers, AllChannels, and JSON serialisation.
Expand message_test.go from 1 test to 9 (all MessageType constants,
Message fields, and edge cases). Add usage-example comments to all
unexported helpers in hub.go, stream.go, auth.go, and hub_config.go
per AX principle 2 (comments as usage examples, not descriptions).

Co-Authored-By: Virgil <virgil@lethean.io>
Remove encoding/json from ws adapter tests (use core.JSONUnmarshal).
Remove strings from SSE adapter tests (use core.Trim). Convert prose
comments to usage-example comments in both compat layers (AX-2).

Co-Authored-By: Virgil <virgil@lethean.io>
- Bump dappco.re/go/* deps to v0.8.0-alpha.1 in go.mod (any forge.lthn.ai/core/* paths migrated to canonical dappco.re/go/* form)
- Add tests/cli/stream/Taskfile.yaml AX-10 scaffold (build/vet/test under default deps), per RFC-CORE-008-AGENT-EXPERIENCE.md §10

Co-Authored-By: Athena <athena@lthn.ai>
bash /tmp/v090/audit.sh . → verdict: COMPLIANT (all 7 dimensions zero).

Co-authored-by: Codex <noreply@openai.com>
Co-Authored-By: Virgil <virgil@lethean.io>
@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Apr 28, 2026

📝 Walkthrough

Walkthrough

This pull request refactors the stream package and its adapters to implement message queuing in the hub, add connection state tracking across adapters, introduce per-source publish/broadcast methods, enhance authentication with nil-safety, and add comprehensive test coverage across all components. Redis, TCP, WebSocket, and ZMQ adapters are reworked for direct socket/pub-sub transport instead of in-process registries.

Changes

Cohort / File(s) Summary
Documentation & Configuration
CODEX.md, docs/specs/core/go/RFC.md, docs/specs/rfc/RFC-CORE-008-AGENT-EXPERIENCE.md, go.mod, tests/cli/stream/Taskfile.yaml
Added coding conventions guide, RFC mirrors, dependency updates (replacing dappco.re/go/core with dappco.re/go v0.9.0, adding miniredis, zmq4, redis/go-redis/v9), and test workflow task file.
Stream Core Package – Hub & Messaging
hub.go, hub_test.go, hub_config.go, stream.go, stream_test.go
Refactored hub to use separate queue channels for publish and broadcast, added peer-aware publish/broadcast methods (PublishFromPeer, PublishFromBridge, BroadcastFromPeer, BroadcastFromBridge), introduced subscriber hooks (SubscribePublished, SubscribeBroadcast), added CanSubscribePeer authorization check, split handler storage by type, added error-returning subscription variant (SubscribeE/SubscribeWithError), and enhanced Pipe to support channel-preserving forwarding with fallback to wildcard mode. Updated Peer with close hooks and UUID generation.
Stream Core Package – Authentication & Utilities
auth.go, auth_test.go, errors.go, message.go, message_test.go, stats.go, stats_test.go
Refactored authenticators for shared token parsing via bearerTokenFromRequest, added nil-safety checks, normalised AuthResult to always have non-nil Claims, added MessageType.String() method, and updated documentation examples. No exported API signature changes beyond parameter renames (rrequest).
Stream Core Package – Tests
ax7_more_test.go, example_test.go
Added comprehensive test suites for authentication, hub lifecycle, subscription/broadcast behaviour, peer state, and statistics. Included usage examples for publishing, subscribing, piping hubs, and authentication flows.
Redis Adapter
adapter/redis/redis.go, redis_test.go, ax7_more_test.go, example_test.go
Refactored from in-process registry to direct Redis pub/sub using github.com/redis/go-redis/v9. NewBridge now creates a Redis client and generates random sourceID per instance. Start(ctx) sets up pattern subscription, event listening, and frame routing via PublishFromBridge/BroadcastFromBridge. Channel publishing now validates inputs and publishes directly to Redis keys. Added comprehensive tests for cross-hub delivery, source ID uniqueness, and error cases.
SSE Adapter
adapter/sse/sse.go, sse_test.go, ax7_more_test.go, example_test.go
Refactored to implement http.Handler via ServeHTTP method. Added OnAuthFailure callback support, pre-subscription checks via hub.CanSubscribePeer, peer close hooks with sync.Once, and switched output generation to helper functions with consistent SSE framing. Enhanced heartbeat and retry handling with local defaults and proper cleanup on request cancellation.
TCP Adapter
adapter/tcp/tcp.go, tcp_test.go, reconnect.go, ax7_more_test.go, example_test.go
Reworked to support post-dial handshake via HandshakeFrame/HandshakeChannel config. Dial now handles handshake exchange and validates against ConnAuthenticator. Added connection state tracking (StateConnecting/Connected/Disconnected), OnReconnect callback for reconnecting client, write concurrency safety via writeMutex, and per-write deadlines. Frame routing centralised through dispatchTCPFrame using peer-aware methods. Listener lifecycle now clears reference on close.
WebSocket Adapter
adapter/ws/ws.go, ws_test.go, ax7_more_test.go, example_test.go, reconnect.go
Refactored to implement http.Handler via ServeHTTP with shared internal logic. Added HandlerForChannel for channel-specific routes. Connect now includes context-based close hook via context.AfterFunc. Reorganised message dispatch to handle subscribe/unsubscribe/ping, with structured error messages on subscribe failures. Introduced writePump with optional heartbeat pings and pong-based read deadlines via hub.Config().PongTimeout.
WebSocket Legacy Compatibility Layer
ws/compat.go, compat_test.go, example_test.go
Added comprehensive compatibility layer re-exporting stream/adapter types as aliases, legacy constructors (NewHub, NewHubWithConfig, NewPeer, Pipe), hub wrapper with lazy-loaded adapter, and NewRedisBridge. Includes integration tests validating backward compatibility and hub-to-hub piping behaviour.
ZMQ Adapter
adapter/zmq/zmq.go, zmq_test.go, ax7_more_test.go, example_test.go
Reworked from registry-driven multi-adapter model to direct socket-based send/receive. Added ConnAuthenticator support with handshake timeout (defaulting to 5s). Start now validates mode/role, creates/connects socket, performs authenticated handshake, registers peer, then continuously receives and routes messages via hub.Broadcast/hub.Publish. Publish now validates role and started state. Added String() methods for Mode and Role. Comprehensive tests cover pubsub/push-pull, authentication, and error cases.
🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 8.11% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately and concisely describes the main change: a v0.9.0 compliance upgrade against the core/go reference implementation, which is clearly reflected throughout the changeset.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.


Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 15

Note

Due to the large number of review comments, Critical, Major severity comments were prioritized as inline comments.

🟡 Minor comments (9)
docs/specs/rfc/RFC-CORE-008-AGENT-EXPERIENCE.md-4-4 (1)

4-4: ⚠️ Potential issue | 🟡 Minor

Broken link: Remove environment-specific /workspace/ prefix.

The link path /workspace/docs/RFC-025-AGENT-EXPERIENCE.md contains an environment-specific prefix that will break when the repository is viewed on GitHub or cloned elsewhere. Use a repository-relative path instead.

🔗 Proposed fix
-[`docs/RFC-025-AGENT-EXPERIENCE.md`](/workspace/docs/RFC-025-AGENT-EXPERIENCE.md).
+[`docs/RFC-025-AGENT-EXPERIENCE.md`](/docs/RFC-025-AGENT-EXPERIENCE.md).

Alternatively, use a relative path:

-[`docs/RFC-025-AGENT-EXPERIENCE.md`](/workspace/docs/RFC-025-AGENT-EXPERIENCE.md).
+[`docs/RFC-025-AGENT-EXPERIENCE.md`](../../RFC-025-AGENT-EXPERIENCE.md).
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@docs/specs/rfc/RFC-CORE-008-AGENT-EXPERIENCE.md` at line 4, Replace the
environment-specific link `/workspace/docs/RFC-025-AGENT-EXPERIENCE.md` with a
repository-relative path so it works outside the current environment; update the
reference in RFC-CORE-008-AGENT-EXPERIENCE.md to
`docs/RFC-025-AGENT-EXPERIENCE.md` (or a relative
`./RFC-025-AGENT-EXPERIENCE.md` if you prefer) to remove the `/workspace/`
prefix.
docs/specs/core/go/RFC.md-3-3 (1)

3-3: ⚠️ Potential issue | 🟡 Minor

Remove environment-specific /workspace/ prefix from link.

The link path /workspace/docs/RFC.md contains an environment-specific prefix that will not resolve on GitHub or in standard repository clones. Use a repository-relative path instead.

Proposed fix
-The canonical implementation spec for this module is [`docs/RFC.md`](/workspace/docs/RFC.md).
+The canonical implementation spec for this module is [`docs/RFC.md`](/docs/RFC.md).

Alternatively, use a relative path:

-The canonical implementation spec for this module is [`docs/RFC.md`](/workspace/docs/RFC.md).
+The canonical implementation spec for this module is [`docs/RFC.md`](../../../RFC.md).
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@docs/specs/core/go/RFC.md` at line 3, Update the markdown link in the
sentence that currently points to "/workspace/docs/RFC.md" to use a
repository-relative path (e.g., "docs/RFC.md") or a relative path
("../docs/RFC.md") so the link resolves outside the environment; locate the
string "/workspace/docs/RFC.md" in RFC.md and replace it accordingly.
CODEX.md-16-16 (1)

16-16: ⚠️ Potential issue | 🟡 Minor

Consider using standard grammar: "prefer X to Y".

The phrase "Prefer predictable names over shorthand" uses informal grammar. Standard English prefers "prefer X to Y" rather than "prefer X over Y". As per static analysis, consider adjusting for consistency with formal writing conventions.

📝 Proposed grammar fix
-- Prefer predictable names over shorthand.
+- Prefer predictable names to shorthand.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@CODEX.md` at line 16, The sentence "Prefer predictable names over shorthand"
should be reworded to use standard "prefer X to Y" grammar; update the phrase to
"Prefer predictable names to shorthand" (or "Prefer predictable names to
shorthand names") wherever the phrase appears in the document (the existing
sentence "Prefer predictable names over shorthand").
example_test.go-78-88 (1)

78-88: ⚠️ Potential issue | 🟡 Minor

Wait for the hub before snapshotting stats.

This example reads hub.Stats() immediately after go hub.Run(ctx), while AddPeer/SubscribePeer errors are ignored. That makes the // Output: assertion scheduler-dependent and can leave this example printing 0 0 0 on a slower run. Reuse waitForHub(hub) here and fail fast on the returned errors before taking the snapshot.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@example_test.go` around lines 78 - 88, The example calls hub.Stats()
immediately after starting the hub (go hub.Run(ctx)) and ignores errors from
AddPeer/SubscribePeer, causing flaky output; call waitForHub(hub) after starting
the hub to ensure it is ready, check and fail fast on errors returned by
AddPeer(peer) and SubscribePeer(peer, "hashrate"), and only then call
hub.Stats() (referencing NewHub, hub.Run, AddPeer, SubscribePeer, waitForHub,
and Stats) so the snapshot is deterministic.
adapter/tcp/ax7_more_test.go-197-199 (1)

197-199: ⚠️ Potential issue | 🟡 Minor

Check the deferred pipe shutdown errors.

These deferred Close() calls are ignored at the moment, which is already failing errcheck for this file. Please wrap them in a deferred assertion/helper so the tests stay lint-clean.

Suggested tidy-up
-	defer left.Close()
-	defer right.Close()
+	defer func() { core.AssertNoError(t, left.Close()) }()
+	defer func() { core.AssertNoError(t, right.Close()) }()
-	defer right.Close()
+	defer func() { core.AssertNoError(t, right.Close()) }()

Also applies to: 245-246

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@adapter/tcp/ax7_more_test.go` around lines 197 - 199, The deferred Close()
calls on the pipe endpoints returned from core.NetPipe() (variables left and
right) ignore errors and fail errcheck; replace each plain defer left.Close() /
defer right.Close() with a deferred func that checks and asserts the error (e.g.
defer func() { if err := left.Close(); err != nil { t.Fatalf("left.Close failed:
%v", err) } }()) or use testify/require.NoError inside the deferred func; apply
the same change for the other pair at the later occurrence (the second
left/right closes) so both defers assert any Close() error.
adapter/zmq/zmq_test.go-449-455 (1)

449-455: ⚠️ Potential issue | 🟡 Minor

Handle the helper listener shutdown error.

randomTCPEndpoint() currently drops the listener.Close() result, so this helper stays red under errcheck. A deferred assertion/helper is enough here.

Suggested fix
 	listener, err := net.Listen("tcp", "127.0.0.1:0")
 	if err != nil {
 		t.Fatalf("Listen() error = %v", err)
 	}
-	defer listener.Close()
+	defer func() {
+		if closeErr := listener.Close(); closeErr != nil {
+			t.Fatalf("Close() error = %v", closeErr)
+		}
+	}()
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@adapter/zmq/zmq_test.go` around lines 449 - 455, randomTCPEndpoint currently
ignores the error returned by listener.Close(), which fails errcheck; modify the
defer to capture and assert the Close() error (e.g. defer a closure that calls
listener.Close() and calls t.Fatalf or t.Fatalf-like helper if it returns
non-nil), keeping the existing t.Helper() and referencing the randomTCPEndpoint
function and the listener variable so the test helper fails on Close errors
instead of dropping them.
adapter/ws/reconnect.go-89-104 (1)

89-104: ⚠️ Potential issue | 🟡 Minor

HTTP response body from WebSocket dial should be closed.

The DialContext method returns an *http.Response that should be closed even on successful connection. The static analysis correctly flags this as a potential resource leak.

🐛 Proposed fix
-		conn, _, err := dialer.DialContext(ctx, client.config.URL, client.config.Headers)
+		conn, resp, err := dialer.DialContext(ctx, client.config.URL, client.config.Headers)
+		if resp != nil && resp.Body != nil {
+			_ = resp.Body.Close()
+		}
 		if err != nil {
 			attempt++
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@adapter/ws/reconnect.go` around lines 89 - 104, Capture and close the HTTP
response returned by dialer.DialContext to avoid leaking the response body:
change the call to conn, resp, err := dialer.DialContext(ctx, client.config.URL,
client.config.Headers) and, on successful connect (err == nil), ensure you call
resp.Body.Close() (e.g. if resp != nil { resp.Body.Close() }) before continuing;
make this change in the reconnect loop around dialer.DialContext (the block that
updates attempt, calls client.setState, client.config.OnReconnect, sleepContext,
and nextBackoff).
adapter/sse/sse.go-150-158 (1)

150-158: ⚠️ Potential issue | 🟡 Minor

Peer may not be removed from hub if subscription fails after AddPeer.

At line 161, AddPeer succeeds and the defer at line 164 is set up. However, if SubscribePeer fails at line 150-154, the function returns at line 156 after calling peer.Close(). The defer at line 164 will execute hub.RemovePeer(peer), so this is actually correct.

Wait - looking again: The defer is at line 164, and the subscription loop is at lines 146-158, which is before the defer statement. So if subscription fails and returns at line 152 or 156, the defer at line 164 hasn't been registered yet.

🐛 Proposed fix - move AddPeer and defer earlier
+	if err := adapter.hub.AddPeer(peer); err != nil {
+		return
+	}
+	defer adapter.hub.RemovePeer(peer)
+
 	for _, channel := range channels {
 		if channel == "" {
 			continue
 		}
 		if err := adapter.hub.SubscribePeer(peer, channel); err != nil {
-			http.Error(w, "stream hub not running", http.StatusInternalServerError)
+			peer.Close()
 			return
 		}
 	}

 	header.Set("Connection", "keep-alive")

 	_, _ = io.WriteString(w, "retry: "+strconv.Itoa(config.RetryMs)+"\n\n")
 	flusher.Flush()

-	if err := adapter.hub.AddPeer(peer); err != nil {
-		return
-	}
-	defer adapter.hub.RemovePeer(peer)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@adapter/sse/sse.go` around lines 150 - 158, The subscription failure can
return before the defer that calls hub.RemovePeer(peer) is registered, leaving
the peer in the hub; fix by moving the call to adapter.hub.AddPeer(peer) and
immediately registering defer adapter.hub.RemovePeer(peer) (and any necessary
peer.Close() cleanup) to occur before calling adapter.hub.SubscribePeer(peer,
channel), so any early return from SubscribePeer triggers removal; update
references in this block (AddPeer, SubscribePeer, RemovePeer, peer, hub)
accordingly and preserve existing error handling and peer.Close() behavior.
adapter/ws/ws.go-130-142 (1)

130-142: ⚠️ Potential issue | 🟡 Minor

http.Error calls after WebSocket upgrade have no effect.

After a successful upgrader.Upgrade(), the HTTP response has been hijacked and converted to a WebSocket connection. Calling http.Error() at lines 132 and 140 will not send the error to the client. The connection should simply be closed.

🐛 Proposed fix
 	conn, err := upgrader.Upgrade(w, r, nil)
 	if err != nil {
-		http.Error(w, err.Error(), http.StatusInternalServerError)
 		return
 	}

 	if err := adapter.hub.AddPeer(peer); err != nil {
 		if closeErr := conn.Close(); closeErr != nil {
 			return
 		}
-		http.Error(w, "stream hub not running", http.StatusInternalServerError)
 		return
 	}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@adapter/ws/ws.go` around lines 130 - 142, After upgrader.Upgrade succeeds the
HTTP response is hijacked so subsequent http.Error calls do nothing; in the
error path after calling adapter.hub.AddPeer(peer) (referencing
upgrader.Upgrade, adapter.hub.AddPeer and conn.Close) remove the http.Error call
and simply close the WebSocket connection (call conn.Close()), optionally log
the failure, and return; keep the existing http.Error only for the
upgrader.Upgrade error path.
🧹 Nitpick comments (11)
tests/cli/stream/Taskfile.yaml (3)

22-26: Consider adding race detection to test task.

The PR introduces concurrency features (hub message queuing, connection state tracking). Adding the -race flag would help catch data races during development and CI runs.

🏁 Proposed enhancement to add race detection
   test:
     desc: Run unit tests.
     dir: ../../..
     cmds:
-      - GOWORK=off go test -count=1 ./...
+      - GOWORK=off go test -race -count=1 ./...
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/cli/stream/Taskfile.yaml` around lines 22 - 26, The test task in
Taskfile.yaml currently runs "GOWORK=off go test -count=1 ./..." without race
detection; update the "test" task's cmds entry to include the Go race detector
by adding the "-race" flag to the go test invocation (i.e., use "GOWORK=off go
test -race -count=1 ./...") so concurrent issues in the new hub/message queuing
and connection-tracking code are caught during local and CI test runs.

10-26: Eliminate GOWORK=off repetition.

The GOWORK=off environment variable is repeated in every task. Consider extracting it to a global env section to follow the DRY principle.

♻️ Proposed refactor to use global env
 version: "3"
 
+env:
+  GOWORK: off
+
 tasks:
   default:
     deps:
       - build
       - vet
       - test
 
   build:
     desc: Compile every package in go-stream.
     dir: ../../..
     cmds:
-      - GOWORK=off go build ./...
+      - go build ./...
 
   vet:
     desc: Run go vet across the module.
     dir: ../../..
     cmds:
-      - GOWORK=off go vet ./...
+      - go vet ./...
 
   test:
     desc: Run unit tests.
     dir: ../../..
     cmds:
-      - GOWORK=off go test -count=1 ./...
+      - go test -count=1 ./...
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/cli/stream/Taskfile.yaml` around lines 10 - 26, The Taskfile repeats
the same env var in each task (build, vet, test); refactor by adding a top-level
env section with GOWORK: "off" (or GOWORK: off) and remove the inline GOWORK=off
prefix from the cmds for the build, vet, and test tasks so each task retains its
desc, dir, and cmds but inherits the global environment variable instead of
repeating it.

4-8: Consider sequential execution instead of parallel dependencies.

Task runs deps in parallel by default, meaning build, vet, and test will execute concurrently. This could cause vet and test to fail or behave unexpectedly if build hasn't completed successfully first.

🔄 Proposed fix to enforce sequential execution
   default:
-    deps:
+    cmds:
+      - task: build
+      - task: vet
+      - task: test
-      - build
-      - vet
-      - test
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/cli/stream/Taskfile.yaml` around lines 4 - 8, The default task
currently lists build, vet, and test under deps which Task runs in parallel;
change the default task to invoke those tasks sequentially instead (e.g. replace
the deps array with a single command sequence that runs "task build && task vet
&& task test" or use explicit cmd/run steps that call the build, vet, then test
tasks in order) so that build finishes before vet and test run; update the
default task block that references deps, build, vet, and test accordingly.
adapter/redis/redis_test.go (1)

40-42: Avoid fixed start-up sleeps in these bridge tests.

These assertions depend on Start() having fully subscribed before the publish happens, and the hard-coded 100ms delay makes that timing-sensitive. A small poll/readiness helper would be more stable than sleeping and hoping the bridge is ready.

Also applies to: 123-124, 168-170

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@adapter/redis/redis_test.go` around lines 40 - 42, The tests currently use a
fixed 100ms sleep after calling bridge1.Start and bridge2.Start which is racy;
replace that sleep with a deterministic readiness check: modify the test to poll
or wait for a readiness signal from the bridge (for example add/use a Ready()
method or a channel the Start goroutine closes when subscription is established)
or loop with a short timeout trying to publish and assert receipt until success;
update occurrences around bridge1.Start/bridge2.Start and the other similar
blocks (lines around 123-124, 168-170) to wait on that readiness signal or
polling loop instead of time.Sleep so tests no longer rely on timing
assumptions.
adapter/redis/redis.go (3)

73-74: Unchecked error return value from client.Close().

The static analysis tool correctly identifies that the error from client.Close() is not checked. While the client is temporary (used only for ping validation), suppressing the error silently could hide connection issues.

♻️ Proposed fix
 	client := newRedisClient(config)
-	defer client.Close()
+	defer func() { _ = client.Close() }()

Alternatively, if you want to log or handle the error:

 	client := newRedisClient(config)
-	defer client.Close()
+	defer func() {
+		if err := client.Close(); err != nil {
+			// Consider logging the close error
+		}
+	}()
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@adapter/redis/redis.go` around lines 73 - 74, The call to client.Close() from
newRedisClient() is ignoring its error return; update the code where client :=
newRedisClient(config) is used (the temporary client used for ping validation)
to capture the returned error from client.Close(), check it, and handle it
appropriately (e.g., log via processLogger or return/wrap the error from the
surrounding function). Ensure you call client.Close() in a defer that assigns to
a named error variable or call Close() explicitly and handle/log any non-nil
error so connection-close failures are not silently dropped.

131-155: Cleanup defer block silently swallows errors.

The defer block at lines 131-155 discards errors from pubsub.Close() and client.Close(). Consider logging these errors or aggregating them for debugging purposes, especially in production environments.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@adapter/redis/redis.go` around lines 131 - 155, The defer currently drops
errors from pubsub.Close() and client.Close() and returns early; update the
defer in the function that manipulates bridge (references: bridge.mutex,
publishStop, broadcastStop, runCancel, pubsub.Close, client.Close) so that
Close() errors are not silently swallowed: capture their returned errors,
aggregate or wrap them into a named error return (or log them using the
project's logger on the bridge or a provided logger), and ensure
publishStop()/broadcastStop() and runCancel() always run regardless of Close()
results; do not use early returns in the defer — instead log/append errors and
let the defer finish.

264-267: Unchecked error return value from client.Close() in deferred function.

The static analysis tool flags this correctly. When creating a fallback client for publishing (when bridge.client is nil), the deferred close error is discarded.

♻️ Proposed fix
 	if client == nil {
 		client = newRedisClient(bridge.config)
-		defer client.Close()
+		defer func() { _ = client.Close() }()
 	}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@adapter/redis/redis.go` around lines 264 - 267, Replace the plain deferred
call defer client.Close() with a deferred closure that checks the returned error
from client.Close() and logs it instead of discarding it; locate the creation
site where client is set via newRedisClient(bridge.config) in
adapter/redis/redis.go and change the defer to call client.Close(), capture its
error (if any) and forward it to the component logger (e.g.,
bridge.logger.Errorf or log.Printf) with a clear message like "failed to close
redis client" so the error is not silently ignored.
adapter/ws/reconnect.go (1)

174-186: Double-checked locking pattern for Send is correct but verbose.

The implementation first acquires a read lock to check if conn is available, then acquires a write lock before actually sending. This pattern works but the read-lock check at lines 174-179 is redundant since you acquire the write lock anyway at line 180.

♻️ Simplified version
-	client.mutex.RLock()
-	conn := client.conn
-	client.mutex.RUnlock()
-	if conn == nil {
-		return core.E("stream.ws", "not connected", nil)
-	}
 	client.mutex.Lock()
 	defer client.mutex.Unlock()
 	if client.conn == nil {
 		return core.E("stream.ws", "not connected", nil)
 	}
 	return client.conn.WriteMessage(websocket.TextMessage, payload.Value.([]byte))
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@adapter/ws/reconnect.go` around lines 174 - 186, The initial RLock/RUnlock
before acquiring the write lock in the Send path is redundant; remove the
read-lock block and simply acquire client.mutex.Lock(), defer
client.mutex.Unlock(), then check if client.conn == nil and return
core.E("stream.ws", "not connected", nil) if so, otherwise call
client.conn.WriteMessage(websocket.TextMessage, payload.Value.([]byte)). Update
the method containing client.mutex and client.conn (the Send/write call) to use
this single-lock simplified pattern.
adapter/tcp/tcp.go (2)

187-191: Consider using net.ListenConfig for context-aware listening.

The static analysis suggests using (*net.ListenConfig).Listen instead of net.Listen. This allows the listen operation to respect context cancellation. However, for a long-running server listener, this is a minor improvement as the listener lifecycle is managed separately via the ctx.Done() goroutine.

♻️ Optional improvement
+	lc := net.ListenConfig{}
 	if adapter.config.TLS != nil {
-		listener, err = tls.Listen("tcp", adapter.config.Addr, adapter.config.TLS)
+		listener, err = lc.Listen(context.Background(), "tcp", adapter.config.Addr)
+		if err == nil {
+			listener = tls.NewListener(listener, adapter.config.TLS)
+		}
 	} else {
-		listener, err = net.Listen("tcp", adapter.config.Addr)
+		listener, err = lc.Listen(context.Background(), "tcp", adapter.config.Addr)
 	}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@adapter/tcp/tcp.go` around lines 187 - 191, Use a context-aware listen by
switching from net.Listen / tls.Listen to net.ListenConfig{}.Listen(ctx, "tcp",
adapter.config.Addr) to obtain the base net.Listener, and then if
adapter.config.TLS != nil wrap that listener with tls.NewListener(baseListener,
adapter.config.TLS); otherwise use the base listener directly. Update the code
path that currently calls tls.Listen(...) and net.Listen(...) to create the
listener via ListenConfig.Listen with the existing ctx so the accept can respect
context cancellation while still supporting TLS via tls.NewListener.

415-420: isClosedNetworkError may not catch all closed connection scenarios.

The function only checks for net.ErrClosed. Depending on the Go version and platform, other error types or wrapped errors might indicate a closed connection. However, for the current usage (suppressing expected errors during shutdown), this is likely sufficient.

♻️ More robust alternative
 func isClosedNetworkError(err error) bool {
 	if err == nil {
 		return false
 	}
-	return err == net.ErrClosed
+	if err == net.ErrClosed {
+		return true
+	}
+	// Also check for wrapped errors
+	return errors.Is(err, net.ErrClosed)
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@adapter/tcp/tcp.go` around lines 415 - 420, The current isClosedNetworkError
only compares err == net.ErrClosed and misses wrapped/other platform errors;
update isClosedNetworkError to use errors.Is against net.ErrClosed and
os.ErrClosed and also treat common connection-close indicators (e.g.
syscall.EPIPE, syscall.ECONNRESET, io.EOF) as closed-network errors, importing
errors, os, io and syscall as needed so wrapped errors are detected and
shutdown-suppressed errors are handled reliably.
adapter/redis/ax7_more_test.go (1)

151-152: Test assertions in goroutines may not fail the test properly.

Using core.AssertNoError inside goroutines won't cause the test to fail immediately since the assertion happens on a different goroutine. Consider using channels or t.Errorf with proper synchronisation to capture errors.

♻️ Suggested approach
-	go func() { core.AssertNoError(t, bridge1.Start(ctx)) }()
-	go func() { core.AssertNoError(t, bridge2.Start(ctx)) }()
+	errCh := make(chan error, 2)
+	go func() { errCh <- bridge1.Start(ctx) }()
+	go func() { errCh <- bridge2.Start(ctx) }()
 	core.Sleep(100 * core.Millisecond)
+	// Optionally check errCh after test completes
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@adapter/redis/ax7_more_test.go` around lines 151 - 152, The test spawns
goroutines that call core.AssertNoError for bridge1.Start(ctx) and
bridge2.Start(ctx), which won't fail the test reliably because assertions run
off the main test goroutine; change the pattern to capture errors from
bridge1.Start and bridge2.Start via channels or a sync.WaitGroup and report them
on the main test goroutine (e.g., send any non-nil error to an errCh and call
t.Fatalf/t.Errorf or core.AssertNoError on the received error), ensuring the
main test waits for both goroutines to finish before returning.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@adapter/sse/sse_test.go`:
- Around line 30-34: Replace uses of http.Get(...) and deferred
response.Body.Close() in the sse tests with context-aware requests and cleanup:
create a context (e.g., ctx, cancel := context.WithTimeout(context.Background(),
time.Second*5); defer cancel()), build the request via
http.NewRequestWithContext(ctx, http.MethodGet, server.URL+"?channel=hashrate",
nil), execute it with http.DefaultClient.Do(req) (replacing the http.Get call
sites), and register t.Cleanup(func(){ if err := response.Body.Close(); err !=
nil { t.Fatalf("closing response body: %v", err) } }) instead of defer; update
all test functions (including TestAX7_Adapter_Handler_Ugly and the other tests
that currently call http.Get and defer response.Body.Close) and replace the old
http.Get and defer patterns accordingly.

In `@adapter/tcp/reconnect.go`:
- Around line 98-102: Connect() can continue running after Close() because Close
only flips closed and closes the socket; create an internal cancellable context
on the client (e.g., client.ctx and client.cancel) and use that context in
sleepContext, DialContext, and any handshake/wait loops so ongoing retries/dials
are cancelled when Close() calls client.cancel(); additionally, before calling
setConn(conn) (and after any successful Dial/handshake), re-check isClosed() and
abort/close the just-dialled conn if the client is closed to avoid publishing a
connection after shutdown; update Close() to call client.cancel() and ensure
functions like Connect(), sleepContext, DialContext usage, nextTCPBackoff loop,
and any places that previously only relied on the caller context observe
client.ctx cancellation.

In `@adapter/tcp/tcp_test.go`:
- Line 56: Replace plain net.Dial calls in tcp_test.go with cancellable
DialContext calls using the existing listenContext: where you currently call
net.Dial("tcp", address) (and the seven other instances at the same pattern),
use net.Dialer{}.DialContext(listenContext, "tcp", address) so the test dials
are cancellable and satisfy the noctx linter; update the variable/assignment
sites (e.g., the connection, err assignments) accordingly while keeping the same
error handling in the tests.
- Line 463: Add a targeted staticcheck suppression comment immediately above the
test call to adapter.Dial(nil, hub) to silence SA1012 for the intentional nil
context; e.g., insert a line like "//lint:ignore SA1012 nil context tested
intentionally: Adapter.Dial normalises nil contexts" directly above the
adapter.Dial(nil, hub) invocation so the test continues to exercise Dial's
nil-context normalisation without staticcheck failure.
- Around line 62-64: Tests call connection.Write(...) directly which can
short-write and corrupt framed messages; replace those direct Write calls with
the writeAll helper so the full encoded frame is sent. For each occurrence
(e.g., where you currently call connection.Write(encodeTCPFrame("",
[]byte("hello")))), change it to writeAll(connection, encodeTCPFrame(...)) and
handle the returned error like the previous Write error checks; use the existing
encodeTCPFrame to build the frame and keep readTCPFrame as-is for reads. Ensure
you update all listed locations that call connection.Write so they all use
writeAll(connection, payload) and fail the test on any write error.

In `@auth_test.go`:
- Line 13: Replace every call to httptest.NewRequest(...) in auth_test.go with
httptest.NewRequestWithContext(context.Background(), ...) (e.g., change request
:= httptest.NewRequest(http.MethodGet, "/stream/ws", nil) to use
NewRequestWithContext with context.Background()); ensure the file imports the
context package if not already present and update all 13 occurrences (the
request variable instantiations at the locations mentioned) so lint rule noctx
is satisfied.

In `@CODEX.md`:
- Line 3: Update the path reference in CODEX.md that currently points to the
absolute "/workspace/CLAUDE.md" and change it to a repository-relative reference
(e.g., "CLAUDE.md" or "./CLAUDE.md") so the link works for users who clone the
repo; locate the string "/workspace/CLAUDE.md" in CODEX.md and replace it with
the relative path.

In `@hub_test.go`:
- Around line 677-714: The test uses unsynchronised shared counters (panicked,
count) updated inside hub-delivery goroutines and read by the test goroutine
(e.g., the panicked variable used with hub.SubscribeE handlers and the count
variables in other cases), causing data races; change those to atomic counters
(e.g., uint32/uint64) and replace increments with atomic.AddUint32/Uint64 and
reads with atomic.LoadUint32/Uint64, or alternatively use a channel to signal
the test goroutine, and add the necessary import (sync/atomic) and atomic reads
in the wait loops so the test no longer races or flakes.

In `@hub.go`:
- Around line 259-263: The code currently calls the caller-supplied
ChannelAuthoriser while holding hub.mutex (in the block around
hub.mutex.Lock()/Unlock() that checks hub.config.ChannelAuthoriser and calls
channel authoriser with peer and channel), which can block or deadlock; change
this so the mutex is not held during the authoriser call: grab the minimal state
under hub.mutex (e.g., read a local copy of hub.config.ChannelAuthoriser and any
required config or a boolean), release the mutex, then if the copied authoriser
is non-nil and channel != "*" call authoriser(peer, channel); if the authoriser
returns false then return ErrAuthRejected. Apply the same change to the other
similar block at the second occurrence (around the code at lines 291-295) so no
caller code runs while hub.mutex is held.
- Around line 353-366: The current default path spawns a new goroutine per
overflow (select -> go hub.enqueueBroadcast(...)), which can create unbounded
goroutines under pressure; instead introduce a bounded spill buffer and a single
background worker to serialize retries: add a hub.broadcastSpill (chan
broadcastDelivery) with fixed capacity, change the default case to
non-blockingly send the broadcastDelivery into broadcastSpill (or return an
error to the caller) rather than spawning a goroutine, and ensure hub.run() (or
hub.startWorker()) launches one long-lived goroutine that drains broadcastSpill
and calls hub.enqueueBroadcast/send to hub.broadcastQueue; apply the same
pattern to the publish path (the code around lines 744-748) so overflow handling
is bounded and centralized.
- Around line 699-717: broadcastToPeers currently ignores its source parameter
(declared as _ *Peer) so PublishFromPeer/BroadcastFromPeer end up echoing frames
back to the origin; change the signature to accept a source *Peer (remove the
underscore), and inside the loop over peers skip calling sendBroadcastToPeer
when peer == source (use pointer equality). Also ensure
invokeHandlers/invokeBroadcastHandlers behavior remains unchanged, and apply the
same source-aware skip in the similar peer-broadcasting function referenced by
the other block (the one around invokeBroadcastHandlers usage).

In `@stream.go`:
- Around line 242-255: The current logic appends specialized subscriptions
separately which can leave only one of the hooks wired and thus disable the
generic fallback in Pipe(); change this so the specialized path is used only
when both hooks are present: perform type assertions for publishedFrameSource
(SubscribePublished) and broadcastFrameSource (SubscribeBroadcast) together and
only append the onceFunction subscriptions to stops if both assertions succeed,
otherwise fall back to the generic subscription path. Ensure you reference the
same symbols (publishedFrameSource, SubscribePublished, broadcastFrameSource,
SubscribeBroadcast, onceFunction, stops, Pipe) so the code change replaces the
two independent branches with a single guarded branch that requires both
interfaces before disabling the generic fallback.
- Around line 153-160: SetCloseHook currently stores a hook that can be lost if
Close() won the race; update SetCloseHook to detect if the peer is already
closed (check the peer's closeOnce/closed state under peer.mutex) and, in that
case, invoke closeFunc immediately instead of just storing it. Specifically,
inside SetCloseHook acquire peer.mutex, if peer is closed (e.g. a closed bool or
closeOnce already executed), release the lock and call closeFunc right away;
otherwise set peer.closeHook = closeFunc as before. Ensure the immediate call
happens outside the mutex to avoid deadlocks and reference the SetCloseHook,
Close, closeOnce/closed, closeHook and mutex symbols when making the change.
- Around line 285-288: randomUUID currently ignores errors from crypto/rand.Read
causing duplicate zeroed UUIDs on failure; change randomUUID to return (string,
error), check the return value/error from rand.Read and return a non-nil error
if it fails (or panic if you choose non-recoverable), then update NewPeer (and
any callers) to accept the error from randomUUID and propagate it (return an
error from NewPeer or handle the panic) so peer creation fails instead of
producing identical IDs; update all call sites of NewPeer accordingly.

In `@ws/compat.go`:
- Around line 142-153: The NewRedisBridge function fails to handle a literal nil
interface and thus returns the default "unsupported hub type"; before the type
switch in NewRedisBridge, add an explicit nil check for the hub parameter (if
hub == nil) and call adapterredis.NewBridge(nil, config) to preserve the
documented nil-compatible path; keep the existing type switch for cases *Hub and
*stream.Hub and return errors unchanged for unsupported types.

---

Minor comments:
In `@adapter/sse/sse.go`:
- Around line 150-158: The subscription failure can return before the defer that
calls hub.RemovePeer(peer) is registered, leaving the peer in the hub; fix by
moving the call to adapter.hub.AddPeer(peer) and immediately registering defer
adapter.hub.RemovePeer(peer) (and any necessary peer.Close() cleanup) to occur
before calling adapter.hub.SubscribePeer(peer, channel), so any early return
from SubscribePeer triggers removal; update references in this block (AddPeer,
SubscribePeer, RemovePeer, peer, hub) accordingly and preserve existing error
handling and peer.Close() behavior.

In `@adapter/tcp/ax7_more_test.go`:
- Around line 197-199: The deferred Close() calls on the pipe endpoints returned
from core.NetPipe() (variables left and right) ignore errors and fail errcheck;
replace each plain defer left.Close() / defer right.Close() with a deferred func
that checks and asserts the error (e.g. defer func() { if err := left.Close();
err != nil { t.Fatalf("left.Close failed: %v", err) } }()) or use
testify/require.NoError inside the deferred func; apply the same change for the
other pair at the later occurrence (the second left/right closes) so both defers
assert any Close() error.

In `@adapter/ws/reconnect.go`:
- Around line 89-104: Capture and close the HTTP response returned by
dialer.DialContext to avoid leaking the response body: change the call to conn,
resp, err := dialer.DialContext(ctx, client.config.URL, client.config.Headers)
and, on successful connect (err == nil), ensure you call resp.Body.Close() (e.g.
if resp != nil { resp.Body.Close() }) before continuing; make this change in the
reconnect loop around dialer.DialContext (the block that updates attempt, calls
client.setState, client.config.OnReconnect, sleepContext, and nextBackoff).

In `@adapter/ws/ws.go`:
- Around line 130-142: After upgrader.Upgrade succeeds the HTTP response is
hijacked so subsequent http.Error calls do nothing; in the error path after
calling adapter.hub.AddPeer(peer) (referencing upgrader.Upgrade,
adapter.hub.AddPeer and conn.Close) remove the http.Error call and simply close
the WebSocket connection (call conn.Close()), optionally log the failure, and
return; keep the existing http.Error only for the upgrader.Upgrade error path.

In `@adapter/zmq/zmq_test.go`:
- Around line 449-455: randomTCPEndpoint currently ignores the error returned by
listener.Close(), which fails errcheck; modify the defer to capture and assert
the Close() error (e.g. defer a closure that calls listener.Close() and calls
t.Fatalf or t.Fatalf-like helper if it returns non-nil), keeping the existing
t.Helper() and referencing the randomTCPEndpoint function and the listener
variable so the test helper fails on Close errors instead of dropping them.

In `@CODEX.md`:
- Line 16: The sentence "Prefer predictable names over shorthand" should be
reworded to use standard "prefer X to Y" grammar; update the phrase to "Prefer
predictable names to shorthand" (or "Prefer predictable names to shorthand
names") wherever the phrase appears in the document (the existing sentence
"Prefer predictable names over shorthand").

In `@docs/specs/core/go/RFC.md`:
- Line 3: Update the markdown link in the sentence that currently points to
"/workspace/docs/RFC.md" to use a repository-relative path (e.g., "docs/RFC.md")
or a relative path ("../docs/RFC.md") so the link resolves outside the
environment; locate the string "/workspace/docs/RFC.md" in RFC.md and replace it
accordingly.

In `@docs/specs/rfc/RFC-CORE-008-AGENT-EXPERIENCE.md`:
- Line 4: Replace the environment-specific link
`/workspace/docs/RFC-025-AGENT-EXPERIENCE.md` with a repository-relative path so
it works outside the current environment; update the reference in
RFC-CORE-008-AGENT-EXPERIENCE.md to `docs/RFC-025-AGENT-EXPERIENCE.md` (or a
relative `./RFC-025-AGENT-EXPERIENCE.md` if you prefer) to remove the
`/workspace/` prefix.

In `@example_test.go`:
- Around line 78-88: The example calls hub.Stats() immediately after starting
the hub (go hub.Run(ctx)) and ignores errors from AddPeer/SubscribePeer, causing
flaky output; call waitForHub(hub) after starting the hub to ensure it is ready,
check and fail fast on errors returned by AddPeer(peer) and SubscribePeer(peer,
"hashrate"), and only then call hub.Stats() (referencing NewHub, hub.Run,
AddPeer, SubscribePeer, waitForHub, and Stats) so the snapshot is deterministic.

---

Nitpick comments:
In `@adapter/redis/ax7_more_test.go`:
- Around line 151-152: The test spawns goroutines that call core.AssertNoError
for bridge1.Start(ctx) and bridge2.Start(ctx), which won't fail the test
reliably because assertions run off the main test goroutine; change the pattern
to capture errors from bridge1.Start and bridge2.Start via channels or a
sync.WaitGroup and report them on the main test goroutine (e.g., send any
non-nil error to an errCh and call t.Fatalf/t.Errorf or core.AssertNoError on
the received error), ensuring the main test waits for both goroutines to finish
before returning.

In `@adapter/redis/redis_test.go`:
- Around line 40-42: The tests currently use a fixed 100ms sleep after calling
bridge1.Start and bridge2.Start which is racy; replace that sleep with a
deterministic readiness check: modify the test to poll or wait for a readiness
signal from the bridge (for example add/use a Ready() method or a channel the
Start goroutine closes when subscription is established) or loop with a short
timeout trying to publish and assert receipt until success; update occurrences
around bridge1.Start/bridge2.Start and the other similar blocks (lines around
123-124, 168-170) to wait on that readiness signal or polling loop instead of
time.Sleep so tests no longer rely on timing assumptions.

In `@adapter/redis/redis.go`:
- Around line 73-74: The call to client.Close() from newRedisClient() is
ignoring its error return; update the code where client :=
newRedisClient(config) is used (the temporary client used for ping validation)
to capture the returned error from client.Close(), check it, and handle it
appropriately (e.g., log via processLogger or return/wrap the error from the
surrounding function). Ensure you call client.Close() in a defer that assigns to
a named error variable or call Close() explicitly and handle/log any non-nil
error so connection-close failures are not silently dropped.
- Around line 131-155: The defer currently drops errors from pubsub.Close() and
client.Close() and returns early; update the defer in the function that
manipulates bridge (references: bridge.mutex, publishStop, broadcastStop,
runCancel, pubsub.Close, client.Close) so that Close() errors are not silently
swallowed: capture their returned errors, aggregate or wrap them into a named
error return (or log them using the project's logger on the bridge or a provided
logger), and ensure publishStop()/broadcastStop() and runCancel() always run
regardless of Close() results; do not use early returns in the defer — instead
log/append errors and let the defer finish.
- Around line 264-267: Replace the plain deferred call defer client.Close() with
a deferred closure that checks the returned error from client.Close() and logs
it instead of discarding it; locate the creation site where client is set via
newRedisClient(bridge.config) in adapter/redis/redis.go and change the defer to
call client.Close(), capture its error (if any) and forward it to the component
logger (e.g., bridge.logger.Errorf or log.Printf) with a clear message like
"failed to close redis client" so the error is not silently ignored.

In `@adapter/tcp/tcp.go`:
- Around line 187-191: Use a context-aware listen by switching from net.Listen /
tls.Listen to net.ListenConfig{}.Listen(ctx, "tcp", adapter.config.Addr) to
obtain the base net.Listener, and then if adapter.config.TLS != nil wrap that
listener with tls.NewListener(baseListener, adapter.config.TLS); otherwise use
the base listener directly. Update the code path that currently calls
tls.Listen(...) and net.Listen(...) to create the listener via
ListenConfig.Listen with the existing ctx so the accept can respect context
cancellation while still supporting TLS via tls.NewListener.
- Around line 415-420: The current isClosedNetworkError only compares err ==
net.ErrClosed and misses wrapped/other platform errors; update
isClosedNetworkError to use errors.Is against net.ErrClosed and os.ErrClosed and
also treat common connection-close indicators (e.g. syscall.EPIPE,
syscall.ECONNRESET, io.EOF) as closed-network errors, importing errors, os, io
and syscall as needed so wrapped errors are detected and shutdown-suppressed
errors are handled reliably.

In `@adapter/ws/reconnect.go`:
- Around line 174-186: The initial RLock/RUnlock before acquiring the write lock
in the Send path is redundant; remove the read-lock block and simply acquire
client.mutex.Lock(), defer client.mutex.Unlock(), then check if client.conn ==
nil and return core.E("stream.ws", "not connected", nil) if so, otherwise call
client.conn.WriteMessage(websocket.TextMessage, payload.Value.([]byte)). Update
the method containing client.mutex and client.conn (the Send/write call) to use
this single-lock simplified pattern.

In `@tests/cli/stream/Taskfile.yaml`:
- Around line 22-26: The test task in Taskfile.yaml currently runs "GOWORK=off
go test -count=1 ./..." without race detection; update the "test" task's cmds
entry to include the Go race detector by adding the "-race" flag to the go test
invocation (i.e., use "GOWORK=off go test -race -count=1 ./...") so concurrent
issues in the new hub/message queuing and connection-tracking code are caught
during local and CI test runs.
- Around line 10-26: The Taskfile repeats the same env var in each task (build,
vet, test); refactor by adding a top-level env section with GOWORK: "off" (or
GOWORK: off) and remove the inline GOWORK=off prefix from the cmds for the
build, vet, and test tasks so each task retains its desc, dir, and cmds but
inherits the global environment variable instead of repeating it.
- Around line 4-8: The default task currently lists build, vet, and test under
deps which Task runs in parallel; change the default task to invoke those tasks
sequentially instead (e.g. replace the deps array with a single command sequence
that runs "task build && task vet && task test" or use explicit cmd/run steps
that call the build, vet, then test tasks in order) so that build finishes
before vet and test run; update the default task block that references deps,
build, vet, and test accordingly.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 4f1e9eda-3e52-45da-a512-e9708fea5091

📥 Commits

Reviewing files that changed from the base of the PR and between fbac8c7 and 56dd1b1.

⛔ Files ignored due to path filters (1)
  • go.sum is excluded by !**/*.sum
📒 Files selected for processing (47)
  • CODEX.md
  • adapter/redis/ax7_more_test.go
  • adapter/redis/example_test.go
  • adapter/redis/redis.go
  • adapter/redis/redis_test.go
  • adapter/sse/ax7_more_test.go
  • adapter/sse/example_test.go
  • adapter/sse/sse.go
  • adapter/sse/sse_test.go
  • adapter/tcp/ax7_more_test.go
  • adapter/tcp/example_test.go
  • adapter/tcp/reconnect.go
  • adapter/tcp/tcp.go
  • adapter/tcp/tcp_test.go
  • adapter/ws/ax7_more_test.go
  • adapter/ws/compat.go
  • adapter/ws/compat_test.go
  • adapter/ws/example_test.go
  • adapter/ws/reconnect.go
  • adapter/ws/ws.go
  • adapter/ws/ws_test.go
  • adapter/zmq/ax7_more_test.go
  • adapter/zmq/example_test.go
  • adapter/zmq/zmq.go
  • adapter/zmq/zmq_test.go
  • auth.go
  • auth_test.go
  • ax7_more_test.go
  • docs/specs/core/go/RFC.md
  • docs/specs/rfc/RFC-CORE-008-AGENT-EXPERIENCE.md
  • errors.go
  • example_test.go
  • go.mod
  • hub.go
  • hub_config.go
  • hub_test.go
  • message.go
  • message_test.go
  • stats.go
  • stats_test.go
  • stream.go
  • stream_test.go
  • tests/cli/stream/Taskfile.yaml
  • ws/ax7_more_test.go
  • ws/compat.go
  • ws/compat_test.go
  • ws/example_test.go

Comment thread adapter/sse/sse_test.go
Comment on lines +30 to +34
response, err := http.Get(server.URL + "?channel=hashrate")
if err != nil {
t.Fatalf("Get() error = %v", err)
}
defer response.Body.Close()
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
rg -n 'http\.Get\(|response\.Body\.Close\(' adapter/sse/sse_test.go -C1

Repository: dAppCore/go-stream

Length of output: 1363


🏁 Script executed:

#!/bin/bash
# Check file size and beginning to understand structure
wc -l adapter/sse/sse_test.go

Repository: dAppCore/go-stream

Length of output: 90


🏁 Script executed:

#!/bin/bash
# Look for any lint configuration in the repo
find . -maxdepth 2 -type f \( -name ".golangci.yml" -o -name "golangci.yml" -o -name ".golintci" -o -name "Makefile" -o -name "go.mod" \) | head -20

Repository: dAppCore/go-stream

Length of output: 71


🏁 Script executed:

#!/bin/bash
# Check the imports in the test file to understand what's available
head -30 adapter/sse/sse_test.go

Repository: dAppCore/go-stream

Length of output: 675


🏁 Script executed:

#!/bin/bash
# Look at one complete test function to understand the pattern
sed -n '25,40p' adapter/sse/sse_test.go

Repository: dAppCore/go-stream

Length of output: 460


🏁 Script executed:

#!/bin/bash
# Check for linter configuration
cat go.mod | head -20

Repository: dAppCore/go-stream

Length of output: 642


🏁 Script executed:

#!/bin/bash
# Look for .golangci.yml or similar
fd -type f -name ".*lint*" -o -name "Makefile" -o -name "*.yml" | grep -E "(lint|golangci|Makefile)" | head -10

Repository: dAppCore/go-stream

Length of output: 233


🏁 Script executed:

#!/bin/bash
# Check if there's an existing pattern for handling http requests with context elsewhere in the codebase
rg "http\.NewRequestWithContext|NewRequest.*Context" --type go | head -20

Repository: dAppCore/go-stream

Length of output: 277


🏁 Script executed:

#!/bin/bash
# Examine one more test to see if there are variations in how Body.Close is handled
sed -n '225,235p' adapter/sse/sse_test.go

Repository: dAppCore/go-stream

Length of output: 224


🏁 Script executed:

#!/bin/bash
# Find which test function uses http.NewRequestWithContext
grep -n "http\.NewRequestWithContext\|func Test" adapter/sse/sse_test.go | head -20

Repository: dAppCore/go-stream

Length of output: 776


🏁 Script executed:

#!/bin/bash
# Look at the test that uses requestContext to understand the pattern
sed -n '155,175p' adapter/sse/sse_test.go

Repository: dAppCore/go-stream

Length of output: 695


🏁 Script executed:

#!/bin/bash
# Search for t.Cleanup usage in the file
rg "t\.Cleanup" adapter/sse/sse_test.go

Repository: dAppCore/go-stream

Length of output: 44


🏁 Script executed:

#!/bin/bash
# Look at the complete TestAX7_Adapter_Handler_Ugly function that uses NewRequestWithContext
sed -n '207,235p' adapter/sse/sse_test.go

Repository: dAppCore/go-stream

Length of output: 897


🏁 Script executed:

#!/bin/bash
# Check if golangci.yml exists in the repo
find . -name ".golangci.yml" -o -name "golangci.yml"

Repository: dAppCore/go-stream

Length of output: 44


🏁 Script executed:

#!/bin/bash
# Look for any CI/CD configuration that might run linters
find . -maxdepth 2 -type f \( -name "*.yml" -o -name "*.yaml" \) | grep -v node_modules | head -10

Repository: dAppCore/go-stream

Length of output: 44


Use context-aware HTTP requests and stop discarding close errors.

These streaming tests currently rely on http.Get() in eight of the nine test functions, which trips noctx, whilst the deferred response.Body.Close() calls trip errcheck. Building the request with http.NewRequestWithContext(...) and handling the close via t.Cleanup() with error checking keeps the tests cancellable and lint-clean. One test (TestAX7_Adapter_Handler_Ugly) already uses context-aware requests but inconsistently handles the close—update all nine tests to use the pattern below uniformly.

Proposed pattern
- response, err := http.Get(server.URL + "?channel=hashrate")
+ request, err := http.NewRequestWithContext(context.Background(), http.MethodGet, server.URL+"?channel=hashrate", nil)
+ if err != nil {
+ 	t.Fatalf("NewRequestWithContext() error = %v", err)
+ }
+ response, err := http.DefaultClient.Do(request)
  if err != nil {
- 	t.Fatalf("Get() error = %v", err)
+ 	t.Fatalf("Do() error = %v", err)
  }
- defer response.Body.Close()
+ t.Cleanup(func() {
+ 	if closeErr := response.Body.Close(); closeErr != nil {
+ 		t.Errorf("Body.Close() error = %v", closeErr)
+ 	}
+ })

Applies to lines 30–34, 65–69, 100–104, 147–151, 168–172, 195–199, 248–252, 283–287, 318–322.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
response, err := http.Get(server.URL + "?channel=hashrate")
if err != nil {
t.Fatalf("Get() error = %v", err)
}
defer response.Body.Close()
request, err := http.NewRequestWithContext(context.Background(), http.MethodGet, server.URL+"?channel=hashrate", nil)
if err != nil {
t.Fatalf("NewRequestWithContext() error = %v", err)
}
response, err := http.DefaultClient.Do(request)
if err != nil {
t.Fatalf("Do() error = %v", err)
}
t.Cleanup(func() {
if closeErr := response.Body.Close(); closeErr != nil {
t.Errorf("Body.Close() error = %v", closeErr)
}
})
🧰 Tools
🪛 golangci-lint (2.11.4)

[error] 34-34: Error return value of response.Body.Close is not checked

(errcheck)


[error] 30-30: net/http.Get must not be called. use net/http.NewRequestWithContext and (*net/http.Client).Do(*http.Request)

(noctx)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@adapter/sse/sse_test.go` around lines 30 - 34, Replace uses of http.Get(...)
and deferred response.Body.Close() in the sse tests with context-aware requests
and cleanup: create a context (e.g., ctx, cancel :=
context.WithTimeout(context.Background(), time.Second*5); defer cancel()), build
the request via http.NewRequestWithContext(ctx, http.MethodGet,
server.URL+"?channel=hashrate", nil), execute it with http.DefaultClient.Do(req)
(replacing the http.Get call sites), and register t.Cleanup(func(){ if err :=
response.Body.Close(); err != nil { t.Fatalf("closing response body: %v", err) }
}) instead of defer; update all test functions (including
TestAX7_Adapter_Handler_Ugly and the other tests that currently call http.Get
and defer response.Body.Close) and replace the old http.Get and defer patterns
accordingly.

Comment thread adapter/tcp/reconnect.go
Comment on lines 98 to +102
if err := sleepContext(ctx, backoff); err != nil {
return err
}
backoff = nextTCPBackoff(backoff, rc.config.BackoffMultiplier, rc.config.MaxBackoff)
backoff = nextTCPBackoff(backoff, client.config.BackoffMultiplier, client.config.MaxBackoff)
continue
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Close() does not actually stop an in-flight Connect().

At the moment Close() only flips closed and closes the current socket. If Connect() is sleeping between retries, blocked in DialContext, or has just finished the handshake, it keeps going until the caller context changes; in the success path it can even still run setConn(conn) after the client was closed. That leaves a closed client reconnecting in the background and makes shutdown latency depend on the current backoff/dial duration.

Please give the client its own cancellation path and re-check isClosed() before publishing a newly dialled connection.

Also applies to: 123-145, 161-164, 198-211

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@adapter/tcp/reconnect.go` around lines 98 - 102, Connect() can continue
running after Close() because Close only flips closed and closes the socket;
create an internal cancellable context on the client (e.g., client.ctx and
client.cancel) and use that context in sleepContext, DialContext, and any
handshake/wait loops so ongoing retries/dials are cancelled when Close() calls
client.cancel(); additionally, before calling setConn(conn) (and after any
successful Dial/handshake), re-check isClosed() and abort/close the just-dialled
conn if the client is closed to avoid publishing a connection after shutdown;
update Close() to call client.cancel() and ensure functions like Connect(),
sleepContext, DialContext usage, nextTCPBackoff loop, and any places that
previously only relied on the caller context observe client.ctx cancellation.

Comment thread adapter/tcp/tcp_test.go
}()

address := waitForListenerAddress(t, adapter)
connection, err := net.Dial("tcp", address)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
rg -n '\bnet\.Dial\("tcp",' adapter/tcp/tcp_test.go

Repository: dAppCore/go-stream

Length of output: 452


🏁 Script executed:

# First, let's look at the test file structure around these lines
sed -n '50,95p' adapter/tcp/tcp_test.go

Repository: dAppCore/go-stream

Length of output: 1199


🏁 Script executed:

# Also check around the other instances to understand the pattern
sed -n '125,140p' adapter/tcp/tcp_test.go
sed -n '180,195p' adapter/tcp/tcp_test.go

Repository: dAppCore/go-stream

Length of output: 841


🏁 Script executed:

# Check if listenContext is defined anywhere in the test file
rg -n 'listenContext' adapter/tcp/tcp_test.go

Repository: dAppCore/go-stream

Length of output: 1117


🏁 Script executed:

# Check the beginning of the test file to see how context is set up
head -50 adapter/tcp/tcp_test.go

Repository: dAppCore/go-stream

Length of output: 1172


Switch these test dials to DialContext with the existing listenContext.

These direct net.Dial calls trip the noctx linter and make test dials uncancellable if a case stalls. Using net.Dialer{}.DialContext(listenContext, "tcp", address) with the test context keeps the suite lint-clean and easier to tear down. All eight instances have listenContext available in scope.

Proposed pattern
- connection, err := net.Dial("tcp", address)
+ connection, err := (&net.Dialer{}).DialContext(listenContext, "tcp", address)

Also applies to: 87, 131, 185, 242, 277, 305, 424

🧰 Tools
🪛 golangci-lint (2.11.4)

[error] 56-56: net.Dial must not be called. use (*net.Dialer).DialContext

(noctx)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@adapter/tcp/tcp_test.go` at line 56, Replace plain net.Dial calls in
tcp_test.go with cancellable DialContext calls using the existing listenContext:
where you currently call net.Dial("tcp", address) (and the seven other instances
at the same pattern), use net.Dialer{}.DialContext(listenContext, "tcp",
address) so the test dials are cancellable and satisfy the noctx linter; update
the variable/assignment sites (e.g., the connection, err assignments)
accordingly while keeping the same error handling in the tests.

Comment thread adapter/tcp/tcp_test.go
Comment on lines +62 to +64
if _, err := connection.Write(encodeTCPFrame("", []byte("hello"))); err != nil {
t.Fatalf("Write() error = %v", err)
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
rg -n 'Write\(encodeTCPFrame\(' adapter/tcp/tcp_test.go

Repository: dAppCore/go-stream

Length of output: 754


🏁 Script executed:

#!/bin/bash
# Check if writeAll helper exists and examine its implementation
sed -n '795,805p' adapter/tcp/tcp_test.go

Repository: dAppCore/go-stream

Length of output: 323


🏁 Script executed:

#!/bin/bash
# Find all usages of writeAll in the file
rg -n 'writeAll\(' adapter/tcp/tcp_test.go

Repository: dAppCore/go-stream

Length of output: 239


🏁 Script executed:

#!/bin/bash
# Examine the context around one of the direct Write calls to understand the test structure
sed -n '55,75p' adapter/tcp/tcp_test.go

Repository: dAppCore/go-stream

Length of output: 610


🏁 Script executed:

#!/bin/bash
# Search for writeAll function definition
rg -n 'func writeAll' adapter/tcp/tcp_test.go -A 5

Repository: dAppCore/go-stream

Length of output: 44


🏁 Script executed:

#!/bin/bash
# Search for writeAll definition across the repo
rg -n 'func writeAll' adapter/tcp/ -A 5

Repository: dAppCore/go-stream

Length of output: 351


🏁 Script executed:

#!/bin/bash
# Check imports and top of tcp_test.go
head -30 adapter/tcp/tcp_test.go

Repository: dAppCore/go-stream

Length of output: 657


Replace direct Write calls with writeAll for framed socket writes.

A single net.Conn.Write() can short-write. In tests with framed protocols, this leaves truncated frames on the wire and causes intermittent readTCPFrame() failures. The writeAll helper in tcp.go (lines 391–397) loops until all payload is written.

Proposed pattern
- if _, err := connection.Write(encodeTCPFrame("block", []byte("template"))); err != nil {
- 	t.Fatalf("Write() error = %v", err)
+ if err := writeAll(connection, encodeTCPFrame("block", []byte("template"))); err != nil {
+ 	t.Fatalf("writeAll() error = %v", err)
+ }

Applies to: lines 62–64, 99–101, 143–145, 191–193, 248–250, 318–320, 431–433, and 458.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if _, err := connection.Write(encodeTCPFrame("", []byte("hello"))); err != nil {
t.Fatalf("Write() error = %v", err)
}
if err := writeAll(connection, encodeTCPFrame("", []byte("hello"))); err != nil {
t.Fatalf("writeAll() error = %v", err)
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@adapter/tcp/tcp_test.go` around lines 62 - 64, Tests call
connection.Write(...) directly which can short-write and corrupt framed
messages; replace those direct Write calls with the writeAll helper so the full
encoded frame is sent. For each occurrence (e.g., where you currently call
connection.Write(encodeTCPFrame("", []byte("hello")))), change it to
writeAll(connection, encodeTCPFrame(...)) and handle the returned error like the
previous Write error checks; use the existing encodeTCPFrame to build the frame
and keep readTCPFrame as-is for reads. Ensure you update all listed locations
that call connection.Write so they all use writeAll(connection, payload) and
fail the test on any write error.

Comment thread adapter/tcp/tcp_test.go
}()

adapter := New(Config{Addr: listener.Addr().String()})
peer, err := adapter.Dial(nil, hub)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
rg -n 'Dial\(nil,\s*hub\)' adapter/tcp/tcp_test.go -C2

Repository: dAppCore/go-stream

Length of output: 230


🏁 Script executed:

#!/bin/bash
# Find the Dial method definition
fd -t f -e go adapter/tcp/ | head -20

Repository: dAppCore/go-stream

Length of output: 416


🏁 Script executed:

#!/bin/bash
# Search for the Dial method definition
rg -n 'func.*Dial.*\(' adapter/tcp/ -A3

Repository: dAppCore/go-stream

Length of output: 1972


🏁 Script executed:

#!/bin/bash
# Check the Adapter struct and its Dial method signature
rg -n 'func.*\(a \*Adapter\).*Dial' adapter/tcp/ -A5

Repository: dAppCore/go-stream

Length of output: 44


🏁 Script executed:

#!/bin/bash
# Get the full Dial method implementation
rg -n 'func.*\(adapter \*Adapter\).*Dial' adapter/tcp/tcp.go -A30

Repository: dAppCore/go-stream

Length of output: 950


🏁 Script executed:

#!/bin/bash
# Get the full test function to understand intent
rg -n 'func TestTCP_Dial_NilContext_Good' adapter/tcp/tcp_test.go -A30

Repository: dAppCore/go-stream

Length of output: 1002


Add a lint suppression for intentional nil-context testing.

The test explicitly exercises Adapter.Dial's nil-context normalisation (lines 129–131 in tcp.go), but staticcheck will flag the literal nil argument with SA1012. Add a targeted suppression to keep the test suite passing:

- peer, err := adapter.Dial(nil, hub)
+ //nolint:staticcheck // exercising Adapter.Dial nil-context fallback behaviour
+ peer, err := adapter.Dial(nil, hub)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
peer, err := adapter.Dial(nil, hub)
//nolint:staticcheck // exercising Adapter.Dial nil-context fallback behaviour
peer, err := adapter.Dial(nil, hub)
🧰 Tools
🪛 golangci-lint (2.11.4)

[error] 463-463: SA1012: do not pass a nil Context, even if a function permits it; pass context.TODO if you are unsure about which Context to use

(staticcheck)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@adapter/tcp/tcp_test.go` at line 463, Add a targeted staticcheck suppression
comment immediately above the test call to adapter.Dial(nil, hub) to silence
SA1012 for the intentional nil context; e.g., insert a line like "//lint:ignore
SA1012 nil context tested intentionally: Adapter.Dial normalises nil contexts"
directly above the adapter.Dial(nil, hub) invocation so the test continues to
exercise Dial's nil-context normalisation without staticcheck failure.

Comment thread hub.go
Comment on lines +699 to +717
func (hub *Hub) broadcastToPeers(_ *Peer, frame []byte, notifyBroadcastSubscribers bool) {
if hub == nil {
return
}
if peer.Transport == "tcp" {
_ = peer.Send(encodeTCPFrame(channel, frame))
hub.mutex.RLock()
peers := make([]*Peer, 0, len(hub.peers))
for peer := range hub.peers {
peers = append(peers, peer)
}
handlers := cloneChannelHandlers(hub.channelHandlers["*"])
broadcastHandlers := cloneBroadcastHandlers(hub.broadcastHandlers)
hub.mutex.RUnlock()
for _, peer := range peers {
hub.sendBroadcastToPeer(peer, frame)
}
hub.invokeHandlers(handlers, frame)
if notifyBroadcastSubscribers {
hub.invokeBroadcastHandlers(broadcastHandlers, frame)
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Use the source peer to suppress self-echo.

The source passed in at Line 149 and Line 328 is discarded here, so PublishFromPeer/BroadcastFromPeer still deliver back to the originating peer. That undermines the new source-aware API and can bounce frames straight back to the sender.

💡 Suggested fix
-func (hub *Hub) broadcastToPeers(_ *Peer, frame []byte, notifyBroadcastSubscribers bool) {
+func (hub *Hub) broadcastToPeers(source *Peer, frame []byte, notifyBroadcastSubscribers bool) {
 	if hub == nil {
 		return
 	}
 	hub.mutex.RLock()
@@
 	hub.mutex.RUnlock()
 	for _, peer := range peers {
+		if peer == source {
+			continue
+		}
 		hub.sendBroadcastToPeer(peer, frame)
 	}
@@
-func (hub *Hub) collectChannelPeersLocked(channel string, _ *Peer) []*Peer {
+func (hub *Hub) collectChannelPeersLocked(channel string, source *Peer) []*Peer {
 	combined := map[*Peer]struct{}{}
 	for peer := range hub.channels[channel] {
 		combined[peer] = struct{}{}
@@
 	}
 	peers := make([]*Peer, 0, len(combined))
 	for peer := range combined {
+		if peer == source {
+			continue
+		}
 		peers = append(peers, peer)
 	}

Also applies to: 843-866

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@hub.go` around lines 699 - 717, broadcastToPeers currently ignores its source
parameter (declared as _ *Peer) so PublishFromPeer/BroadcastFromPeer end up
echoing frames back to the origin; change the signature to accept a source *Peer
(remove the underscore), and inside the loop over peers skip calling
sendBroadcastToPeer when peer == source (use pointer equality). Also ensure
invokeHandlers/invokeBroadcastHandlers behavior remains unchanged, and apply the
same source-aware skip in the similar peer-broadcasting function referenced by
the other block (the one around invokeBroadcastHandlers usage).

Comment thread stream.go
Comment on lines +153 to +160
// peer.SetCloseHook(func() { _ = conn.Close() })
func (peer *Peer) SetCloseHook(closeFunc func()) {
if peer == nil {
return
}
peer.mutex.Lock()
defer peer.mutex.Unlock()
peer.closeHook = closeFunc
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Run late close hooks immediately once the peer is already closed.

A concurrent Close() can win before SetCloseHook(). In that case this setter stores a hook that will never execute, because closeOnce has already been consumed. That leaks the underlying transport on the exact race this hook is meant to cover.

Suggested fix
 type Peer struct {
 	send          chan []byte
 	subscriptions map[string]bool
 	closeHook     func()
+	closed        bool
 	mutex         sync.RWMutex
 	closeOnce     sync.Once
 }
 
 func (peer *Peer) Close() {
 	if peer == nil {
 		return
 	}
 	peer.closeOnce.Do(func() {
 		peer.mutex.Lock()
 		send := peer.send
 		closeHook := peer.closeHook
 		peer.closeHook = nil
+		peer.closed = true
 		peer.mutex.Unlock()
 		if send != nil {
 			close(send)
 		}
 		if closeHook != nil {
 			closeHook()
 		}
 	})
 }
 
 func (peer *Peer) SetCloseHook(closeFunc func()) {
 	if peer == nil {
 		return
 	}
+	runNow := false
 	peer.mutex.Lock()
-	defer peer.mutex.Unlock()
-	peer.closeHook = closeFunc
+	if peer.closed {
+		runNow = closeFunc != nil
+	} else {
+		peer.closeHook = closeFunc
+	}
+	peer.mutex.Unlock()
+	if runNow {
+		closeFunc()
+	}
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@stream.go` around lines 153 - 160, SetCloseHook currently stores a hook that
can be lost if Close() won the race; update SetCloseHook to detect if the peer
is already closed (check the peer's closeOnce/closed state under peer.mutex)
and, in that case, invoke closeFunc immediately instead of just storing it.
Specifically, inside SetCloseHook acquire peer.mutex, if peer is closed (e.g. a
closed bool or closeOnce already executed), release the lock and call closeFunc
right away; otherwise set peer.closeHook = closeFunc as before. Ensure the
immediate call happens outside the mutex to avoid deadlocks and reference the
SetCloseHook, Close, closeOnce/closed, closeHook and mutex symbols when making
the change.

Comment thread stream.go
Comment on lines +242 to +255
if publisher, ok := src.(publishedFrameSource); ok {
stops = append(stops, onceFunction(publisher.SubscribePublished(func(channel string, frame []byte) {
if err := dst.Publish(channel, cloneFrame(frame)); err != nil {
return
}
})))
}
if broadcaster, ok := src.(broadcastFrameSource); ok {
stops = append(stops, onceFunction(broadcaster.SubscribeBroadcast(func(frame []byte) {
if err := dst.Broadcast(cloneFrame(frame)); err != nil {
return
}
})))
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Do not silently drop frames when only one optional hook is implemented.

If src implements SubscribePublished but not SubscribeBroadcast (or the other way round), len(stops) > 0 disables the generic fallback and Pipe() forwards only half the stream. That breaks valid Stream implementations which expose just one of these optional hooks. Either require both specialised hooks before taking this path, or tighten the contract so partial implementations cannot compile.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@stream.go` around lines 242 - 255, The current logic appends specialized
subscriptions separately which can leave only one of the hooks wired and thus
disable the generic fallback in Pipe(); change this so the specialized path is
used only when both hooks are present: perform type assertions for
publishedFrameSource (SubscribePublished) and broadcastFrameSource
(SubscribeBroadcast) together and only append the onceFunction subscriptions to
stops if both assertions succeed, otherwise fall back to the generic
subscription path. Ensure you reference the same symbols (publishedFrameSource,
SubscribePublished, broadcastFrameSource, SubscribeBroadcast, onceFunction,
stops, Pipe) so the code change replaces the two independent branches with a
single guarded branch that requires both interfaces before disabling the generic
fallback.

Comment thread stream.go
Comment on lines +285 to +288
func randomUUID() string {
var raw [16]byte
_, _ = rand.Read(raw[:])
raw[6] = (raw[6] & 0x0f) | 0x40
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

find . -name "stream.go" -type f

Repository: dAppCore/go-stream

Length of output: 74


🏁 Script executed:

cat -n stream.go | sed -n '275,300p'

Repository: dAppCore/go-stream

Length of output: 931


🏁 Script executed:

rg "randomUUID" stream.go

Repository: dAppCore/go-stream

Length of output: 184


🏁 Script executed:

rg -B 5 -A 5 "ID:\s*randomUUID" stream.go

Repository: dAppCore/go-stream

Length of output: 372


🏁 Script executed:

head -50 stream.go | cat -n

Repository: dAppCore/go-stream

Length of output: 1564


🏁 Script executed:

rg "NewPeer\(" stream.go

Repository: dAppCore/go-stream

Length of output: 197


🏁 Script executed:

rg "NewPeer" --type go -l

Repository: dAppCore/go-stream

Length of output: 381


🏁 Script executed:

rg "NewPeer\(" --type go -A 2 -B 2 | head -60

Repository: dAppCore/go-stream

Length of output: 2219


Handle crypto/rand.Read failure in randomUUID().

If rand.Read fails, the raw byte array remains zeroed, and the function still returns a valid UUID format. This causes all peers created during the failure to receive identical IDs, violating the assumption that peer identifiers are unique. Consider returning an error from randomUUID() and propagating it through NewPeer(), or panic if random UUID generation is non-negotiable.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@stream.go` around lines 285 - 288, randomUUID currently ignores errors from
crypto/rand.Read causing duplicate zeroed UUIDs on failure; change randomUUID to
return (string, error), check the return value/error from rand.Read and return a
non-nil error if it fails (or panic if you choose non-recoverable), then update
NewPeer (and any callers) to accept the error from randomUUID and propagate it
(return an error from NewPeer or handle the panic) so peer creation fails
instead of producing identical IDs; update all call sites of NewPeer
accordingly.

Comment thread ws/compat.go
Comment on lines +142 to +153
func NewRedisBridge(hub any, config adapterredis.Config) (*RedisBridge, error) {
switch typedHub := hub.(type) {
case *Hub:
if typedHub == nil {
return adapterredis.NewBridge(nil, config)
}
return adapterredis.NewBridge(typedHub.Hub, config)
case *stream.Hub:
return adapterredis.NewBridge(typedHub, config)
default:
return nil, core.E("stream.ws", "unsupported hub type", nil)
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Handle a literal nil hub before the type switch.

ws.NewRedisBridge(nil, cfg) currently falls into the default branch and returns “unsupported hub type”, because a nil interface does not match case *Hub or case *stream.Hub. That breaks the documented nil-compatible path.

💡 Suggested fix
 func NewRedisBridge(hub any, config adapterredis.Config) (*RedisBridge, error) {
+	if hub == nil {
+		return adapterredis.NewBridge(nil, config)
+	}
 	switch typedHub := hub.(type) {
 	case *Hub:
 		if typedHub == nil {
 			return adapterredis.NewBridge(nil, config)
 		}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func NewRedisBridge(hub any, config adapterredis.Config) (*RedisBridge, error) {
switch typedHub := hub.(type) {
case *Hub:
if typedHub == nil {
return adapterredis.NewBridge(nil, config)
}
return adapterredis.NewBridge(typedHub.Hub, config)
case *stream.Hub:
return adapterredis.NewBridge(typedHub, config)
default:
return nil, core.E("stream.ws", "unsupported hub type", nil)
}
func NewRedisBridge(hub any, config adapterredis.Config) (*RedisBridge, error) {
if hub == nil {
return adapterredis.NewBridge(nil, config)
}
switch typedHub := hub.(type) {
case *Hub:
if typedHub == nil {
return adapterredis.NewBridge(nil, config)
}
return adapterredis.NewBridge(typedHub.Hub, config)
case *stream.Hub:
return adapterredis.NewBridge(typedHub, config)
default:
return nil, core.E("stream.ws", "unsupported hub type", nil)
}
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@ws/compat.go` around lines 142 - 153, The NewRedisBridge function fails to
handle a literal nil interface and thus returns the default "unsupported hub
type"; before the type switch in NewRedisBridge, add an explicit nil check for
the hub parameter (if hub == nil) and call adapterredis.NewBridge(nil, config)
to preserve the documented nil-compatible path; keep the existing type switch
for cases *Hub and *stream.Hub and return errors unchanged for unsupported
types.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant