Skip to content

feat(transport): CEP-41 open-ended streams#71

Merged
ContextVM-org merged 9 commits into
masterfrom
feat/cep-41
May 7, 2026
Merged

feat(transport): CEP-41 open-ended streams#71
ContextVM-org merged 9 commits into
masterfrom
feat/cep-41

Conversation

@ContextVM-org
Copy link
Copy Markdown
Contributor

- add CEP-41 end-to-end and unit coverage for stream lifecycle,
  accept-gated bootstrap, malformed progress payloads, and keepalive behavior
- add writer tests for ping/pong nonce handling and optional close.lastChunkIndex
- update open-stream frame types/builders and writer behavior for nonce support
  and relaxed close metadata
- harden open-stream registry/session handling and apply the lint-safe deferred
  typing fix
@1amKhush
Copy link
Copy Markdown
Contributor

1amKhush commented May 6, 2026

@ContextVM-org

Hey! Awesome work on this draft PR. The architecture looks really solid. (The callToolStream free helper is a great design choice for the consumer API and the deferred-based async iterator for the session handles backpressure beautifully)

I did a deep dive into the code and found a couple of bugs and edge cases to look at before we finalize:

1. ping() consumes two progress values

In writer.ts, the ping() method calls this.nextProgress() twice:

public async ping(): Promise<void> {
  if (!this.active) return;
  const nonce = String(this.nextProgress()); // <--- progress bumped here
  await this.publishFrame(
    buildOpenStreamPingFrame({
      progressToken: this.progressToken,
      progress: this.nextProgress(),         // <--- bumped again!
      nonce,
    }),
  );
}

This means every ping skips a progress number. The nonce should probably just reuse the single progress value generated for that frame. (Note: The writer.test.ts test on line 3749 actually encodes this bug by expecting progress: 3 and nonce: "2"!).2. onClose fires even on abort (Double delete)

2. Abort currently runs close cleanup semantics

OpenStreamSession.finish() always calls onClose, even when the stream finished with an error/abort. Local abort() also calls onAbort, so registry cleanup can run through both paths. Remote aborts via processFrame() go through finish(error) and therefore currently call onClose, not onAbort.

This works because Map.delete() is idempotent, but the lifecycle semantics are muddy. It would be cleaner to split successful close cleanup from failed/aborted cleanup.

3. Unbounded buffers on outbound client sessions

In nostr-client-transport.tscreateOutboundOpenStreamSession hardcodes the limits to Number.MAX\_SAFE\_INTEGER:

maxBufferedChunks: Number.MAX_SAFE_INTEGER,
maxBufferedBytes: Number.MAX_SAFE_INTEGER,

Since this session consumes chunks from the server, a buggy or malicious peer can drive unbounded buffering. This should reuse the configured open-stream policy/defaults, or the receiver/registry should expose a create path that applies those defaults automatically.

4. Missing lastChunkIndex validation in the receiver

The spec mentions that if close.lastChunkIndex is present, the receiver must verify it got all chunks up to that index. In session.ts, the processFrame switch statement for close handles the graceful finish, but it currently ignores the lastChunkIndex field on the frame.

5. Monkey-patching the Writer

In nostr-server-transport.ts, the close and abort methods on the OpenStreamWriter are overridden (monkey-patched) via .bind() to flush the pending JSON-RPC response. This totally works, but it's a bit fragile (e.g. if someone subclasses the writer later). We might want to use a callback hook on the writer constructor (like onClose / onAbort) to trigger the response flush instead.

6. Timers (Just a note)

I noticed the idle timeout, probe timeout, and close grace period timers aren't implemented yet in the session/writer (though they are in the policy types). I'm assuming that's just deferred for later since this is a draft, but wanted to point it out just in case!

7. Stale chunkIndex values can wedge the session

bufferChunk() rejects duplicate chunk indexes only if they are still in bufferedChunks. Once a chunk has already been flushed and nextExpectedChunkIndex has advanced, a later duplicate/stale chunk index is accepted into bufferedChunks.

Example: receive and flush chunk 0, then receive another chunk 0 with higher progress. It gets buffered, but flushContiguousChunks() is waiting for chunk 1, so that stale chunk remains buffered. If close arrives later, maybeFinishGracefully() sees bufferedChunks.size > 0 and the session never finishes.

The receiver should reject frame.chunkIndex < nextExpectedChunkIndex.

Overall, incredibly clean implementation. Really looking forward to seeing this land! :)

@1amKhush
Copy link
Copy Markdown
Contributor

1amKhush commented May 6, 2026

Btw I’d keep 'Monkey-patching writer methods' only as a non-blocking design comment (The close/abort reassignment is brittle. I’d classify this as design risk, not a correctness bug). Lmk your thoughts on this!

…for open streams

Add idle timeout, probe timeout, and close grace period support to the CEP-41
open stream implementation. Sessions now send periodic ping frames when idle
and abort if no matching pong is received within the probe timeout. Also add
lifecycle hooks (onClose, onAbort) to OpenStreamWriter for proper cleanup after
terminal frames are published. The registry now supports a getSessionOptions
callback for deriving session configuration at creation time.
@ContextVM-org
Copy link
Copy Markdown
Contributor Author

Please review again @1amKhush . Your comments were attached in the last commit

@1amKhush
Copy link
Copy Markdown
Contributor

1amKhush commented May 6, 2026

Looking great @ContextVM-org ! Looks great, all the issues I flagged are addressed 👍

Just had 2 minor/ 'good to have' things (the current implementation is great as is, so i think these can be ignored)

  • The keepalive timers are fully implemented in session.ts, but the e2e test suite doesn't have a test that actually exercises the idle timeout → automatic ping → pong → continued stream flow. There are unit tests for the session, but an e2e test through the full transport would catch wiring issues. This is a "nice to have" though, not a blocker.
  • In createOutboundOpenStreamSession, the client-side sendPing/sendPong/sendAbort callbacks each maintain a local let progress = 0 counter. But the server is also sending frames on the same stream with its own progress counter. The spec says progress must be monotonically increasing across the stream, if the client sends a ping at progress=1 and the server had already sent a chunk at progress=5, the client's ping would have a lower progress value than what the receiver already saw.
    (Though imo this probably doesn't matter in practice since the server is the one sending chunks and the client is the one receiving, and the session validates progress per-direction. Just wanted to know your thoughts on this)

Rest the implementation is really solid at this point! All 6 original issues were fixed properly, the keepalive timers are fully wired, and the buffer limits use real policy values.

Add ping frame handling to NostrServerTransport that responds with pong, enabling
client-side keepalive probes to be acknowledged. Also add comprehensive e2e tests
verifying stream keepalive across idle timeouts, probe timeout abort behavior, and
interleaved control frame handling.
@ContextVM-org
Copy link
Copy Markdown
Contributor Author

Added these test and some more. Let me know your thoughts

@1amKhush
Copy link
Copy Markdown
Contributor

1amKhush commented May 6, 2026

Added these test and some more. Let me know your thoughts

Looking great!

Add assertions to verify that the dispose method properly closes sessions
after tests complete, ensuring proper cleanup of resources in both e2e and
unit tests for the CEP-41 stream implementation.
…on tests

This change makes the optional advisory fields in CEP-41 open-stream frames (contentType, lastChunkIndex, reason) truly optional by only including them when defined, rather than always sending undefined values. It also adds comprehensive unit tests for:
- Start frames with advisory metadata omitted
- Malformed progress payload rejection
- Accept frame sequence validation
- Unexpected pong frame handling

Additionally improves ping nonce generation to use a unique token-based format for better debugging.
Copy link
Copy Markdown
Contributor

@1amKhush 1amKhush left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, good to merge 👍

@ContextVM-org ContextVM-org merged commit 91dc1c7 into master May 7, 2026
3 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants