Conversation
73f8ae0 to
7408f06
Compare
879c6a9 to
86e97d0
Compare
FUDCo
left a comment
There was a problem hiding this comment.
Not done reading the code yet, but publishing the first batch of comments.
| const connectionFactory = new ConnectionFactory( | ||
| keySeed, | ||
| knownRelays, | ||
| logger, | ||
| signal, | ||
| ); | ||
|
|
||
| const libp2p = await createLibp2p({ | ||
| privateKey, | ||
| addresses: { | ||
| listen: [ | ||
| // TODO: Listen on tcp addresses for Node.js | ||
| // '/ip4/0.0.0.0/tcp/0/ws', | ||
| // '/ip4/0.0.0.0/tcp/0', | ||
| // Browser: listen on WebRTC and circuit relay | ||
| '/webrtc', | ||
| '/p2p-circuit', | ||
| ], | ||
| appendAnnounce: ['/webrtc'], | ||
| }, | ||
| transports: [ | ||
| webSockets(), | ||
| webTransport(), | ||
| webRTC({ | ||
| rtcConfiguration: { | ||
| iceServers: [ | ||
| { | ||
| urls: [ | ||
| 'stun:stun.l.google.com:19302', | ||
| 'stun:global.stun.twilio.com:3478', | ||
| ], | ||
| }, | ||
| ], | ||
| }, | ||
| }), | ||
| circuitRelayTransport(), | ||
| ], | ||
| connectionEncrypters: [noise()], | ||
| streamMuxers: [yamux()], | ||
| connectionGater: { | ||
| // Allow private addresses for local testing | ||
| denyDialMultiaddr: async () => false, | ||
| }, | ||
| peerDiscovery: [ | ||
| bootstrap({ | ||
| list: knownRelays, | ||
| }), | ||
| ], | ||
| services: { | ||
| identify: identify(), | ||
| ping: ping(), | ||
| }, | ||
| }); | ||
|
|
||
| // Detailed logging for libp2p events. Uncomment as needed. Arguably this | ||
| // should be controlled by an environment variable or some similar kind of | ||
| // runtime flag, but probably not worth the effort since when you're debugging | ||
| // you're likely going to be tweaking with the code a lot anyway. | ||
| /* | ||
| const eventTypes = [ | ||
| 'certificate:provision', | ||
| 'certificate:renew', | ||
| 'connection:close', | ||
| 'connection:open', | ||
| 'connection:prune', | ||
| 'peer:connect', | ||
| 'peer:disconnect', | ||
| 'peer:discovery', | ||
| 'peer:identify', | ||
| 'peer:reconnect-failure', | ||
| 'peer:update', | ||
| 'self:peer:update', | ||
| 'start', | ||
| 'stop', | ||
| 'transport:close', | ||
| 'transport:listening', | ||
| ]; | ||
| for (const et of eventTypes) { | ||
| libp2p.addEventListener(et as keyof Libp2pEvents, (event) => { | ||
| if (et === 'connection:open' || et === 'connection:close') { | ||
| const legible = (raw: any): string => JSON.stringify({ | ||
| direction: raw.direction, | ||
| encryption: raw.encryption, | ||
| id: raw.id, | ||
| remoteAddr: raw.remoteAddr.toString(), | ||
| remotePeer: raw.remotePeer.toString(), | ||
| }); | ||
| logger.log(`@@@@ libp2p ${et} ${legible(event.detail)}`, event.detail); | ||
| } else if (et === 'peer:identify') { | ||
| const legible = (raw: any): string => JSON.stringify({ | ||
| peerId: raw.peerId ? raw.peerId.toString() : 'undefined', | ||
| protocolVersion: raw.protocolVersion, | ||
| agentVersion: raw.agentVersion, | ||
| observedAddr: raw.observedAddr ? raw.observedAddr.toString() : 'undefined', | ||
| listenAddrs: raw.listenAddrs.map((addr: object) => addr ? addr.toString() : 'undefined'), | ||
| protocols: raw.protocols, | ||
| }); | ||
| logger.log(`@@@@ libp2p ${et} ${legible(event.detail)}`, event.detail); | ||
| } else if (et === 'transport:listening') { | ||
| const legible = (raw: any): string => JSON.stringify(raw.getAddrs()); | ||
| logger.log(`@@@@ libp2p ${et} ${legible(event.detail)}`, event.detail); | ||
| } else { | ||
| logger.log(`@@@@ libp2p ${et} ${JSON.stringify(event.detail)}`, event.detail); | ||
| } | ||
| }); | ||
| } | ||
| */ | ||
| // Initialize the connection factory | ||
| await connectionFactory.initialize(); |
There was a problem hiding this comment.
I'm thinking perhaps ConnectionFactory should follow the pattern (as found in, e.g., Kernel, VatHandle, and RemoteHandle) of making the constructor and initialization methods private and providing a public static make method so that the uninitialized object is never exposed publicly.
| async dialIdempotent( | ||
| peerId: string, | ||
| hints: string[], | ||
| withRetry: boolean, | ||
| ): Promise<Channel> { | ||
| let promise = this.#inflightDials.get(peerId); | ||
| if (!promise) { | ||
| promise = ( | ||
| withRetry | ||
| ? this.openChannelWithRetry(peerId, hints) | ||
| : this.openChannelOnce(peerId, hints) | ||
| ).finally(() => this.#inflightDials.delete(peerId)); | ||
| this.#inflightDials.set(peerId, promise); | ||
| } |
There was a problem hiding this comment.
It feels to me like there's an impedance mismatch between the withRetry parameter and the collection of in-flight dials. Basically, the first caller determines the retry behavior and the flag is thenceforth ignored on successive attempts. Arguably this is not wrong, and I'm not sure I'd do it differently, but it smells off.
There was a problem hiding this comment.
The first caller sets the retry behavior, and later concurrent calls for the same peer reuse the same in-flight promise, so their withRetry value is ignored, but this is intentional. We deduplicate in-flight dials (only one active dial per peer at a time) which requires picking a single retry strategy per attempt.
I think for this PR keeping the current approach is fine. Concurrent callers for the same peer get the same result (success or failure), and we avoid duplicate dials. But we could track dials separately per (peerId, withRetry) combination. wdyt?
| // Detect graceful disconnect | ||
| const rtcProblem = problem as { | ||
| errorDetail?: string; | ||
| sctpCauseCode?: number; | ||
| }; | ||
| if ( | ||
| rtcProblem.errorDetail === 'sctp-failure' && | ||
| rtcProblem?.errorDetail === 'sctp-failure' && |
There was a problem hiding this comment.
Does "graceful disconnect" mean "the other end deliberately closed the connection"? Because in that case I don't think we should be reconnecting -- though I don't see how this code actually reacts to that case anyway. But there needs to be some way to close a connection on purpose.
There was a problem hiding this comment.
Yes, "graceful disconnect" means the other end deliberately closed the connection. SCTP_USER_INITIATED_ABORT (cause code 12) indicates an intentional close by the remote peer, not a network failure.
You're right: the code detects this case (lines 135-139) but doesn't prevent reconnection. It only logs "remote disconnected" instead of an error, then calls handleConnectionLoss() on line 148, which triggers reconnection. We should distinguish intentional disconnects from transient failures and avoid auto-reconnecting for the former.
This is out of scope for this PR, which focuses on automatic reconnection with exponential backoff for network failures (timeouts, connection resets, etc.). Handling intentional disconnects would require:
- A mechanism to signal an intentional close
- Distinguishing intentional closes from failures in the error handling
- Preventing reconnection when the close was intentional
I suggest we create a follow-up issue to track intentional disconnect handling, including proper connection lifecycle management and explicit close operations. For now, this PR maintains current behavior (treating all disconnects as recoverable) while adding retry/backoff for network failures.
| */ | ||
| function handleConnectionLoss(peerId: string, hints: string[] = []): void { | ||
| logger.log(`${peerId}:: connection lost, initiating reconnection`); | ||
| channels.delete(peerId); |
There was a problem hiding this comment.
My original intent was that a "channel" was a logical abstraction that would survive loss of the underlying connection, whereas a "connection" was the abstraction of a concrete communications link that could be disrupted by network problems. This looks to me like that abstraction distinction got discarded by this refactor, though I obviously could be missing something in the big PR. It seems like that's an important distinction to keep track of, but maybe you've thought it through more deeply?
There was a problem hiding this comment.
Right: Channel is currently tied to the physical connection. When the connection dies, we delete the channel (line 166), so channels don't survive connection loss.
The abstraction that persists is at a higher level:
messageQueuesbuffer messages across disconnectionsreconnectionManagertracks reconnection state per peer- New connections reuse the same peer identity and deliver queued messages
So messages and peer state persist, but Channel objects don't.
To make Channel a true logical abstraction that survives connection loss, we'd need to:
- Keep channel objects alive across connections
- Have channels reference the current connection (which changes)
- Separate channel lifecycle from connection lifecycle
That's a larger architectural change beyond this PR's scope. For now, queues and reconnection provide the logical abstraction, while Channel remains tied to the physical connection.
Should we track this as a follow-up?
| @@ -0,0 +1,65 @@ | |||
| /** | |||
| * Options for configuring the wake detector. | |||
There was a problem hiding this comment.
The phrase "wake detector" here obviously refers to recognizing awakening, i.e., coming back from being asleep, but somehow it keeps registering to me as the thing that reports that your boat is going too fast inside the boundaries of the marina.
There was a problem hiding this comment.
Should I rename it? :)
| let last = Date.now(); | ||
|
|
||
| const intervalId = setInterval(() => { | ||
| const now = Date.now(); | ||
| if (now - last > intervalMs + jumpThreshold) { | ||
| // Clock jumped forward significantly - probable wake from sleep | ||
| onWake(); | ||
| } | ||
| last = now; | ||
| }, intervalMs); |
There was a problem hiding this comment.
Since the interval is being tracked by state that is kept in memory, this code clearly is dealing with time discontinuities within a single executing process. But it seems to me that we also need to detect discontinuities across successive incarnations of a kernel. This is obviously not that (you wouldn't detect those by looking at the clock anyway), but is that case also handled somewhere (e.g., as part of kernel startup)?
There was a problem hiding this comment.
Yes, the current detector only works within a single running process. It tracks time in memory, so it won't detect sleep/wake across kernel restarts.
Cross-incarnation detection would require different logic:
- Store the last known timestamp in persistent storage (e.g., kernel store) when shutting down
- On startup, compare the stored timestamp with the current time
- If the gap exceeds a threshold, treat it as a wake event
This isn't currently handled at kernel startup. We could add it if needed. It would need:
- Writing a timestamp on shutdown (or periodically)
- Checking it on startup in
initNetwork(or during kernel initialization) - Resetting backoffs if a time discontinuity is detected
We can track this as a follow-up:
Cross-incarnation wake detection: Detect time discontinuities across kernel restarts by storing the last known timestamp in persistent storage on shutdown and comparing it with the current time on startup. If the gap exceeds a threshold (indicating the system was asleep between shutdown and startup), reset reconnection backoffs similar to the runtime wake detector. This ensures reconnection backoffs are reset even when the kernel wasn't running during the sleep period.
FUDCo
left a comment
There was a problem hiding this comment.
I think this is good as far as it goes, but it's stimulating a lot of concerns on my part that we haven't thoroughly thought through the lifecycle model of the relationship between communicating objects living in vats on separate machines, given that this relationship can be disrupted by not only the network but the uptime of the browsers that are hosting the respective kernels and the machines that are hosting the respective browsers.
In particular, we generally consider network disruptions to be transient errors (though the transience may span all kinds of things -- not just a TCP connection drop but possibly rehosting an endpoint at an entirely different address, such as when I recently switched my household internet from Comcast Business to AT&T Fiber).
On the other hand, browser or host uptime disruptions can include both transient events (such as power failures) and intentional acts by a user at one of the endpoints (e.g., I decide I don't want to run this thing any more). I think the state of our thinking on this stuff is currently rather muddled. I also strongly expect that the path out of this muddle is going to involve a lot of practical trial and error experience aside from whatever technical brilliance we may bring to the party. In other words, I think this may take time and experimentation rather than just raw intellect.
Stopping now because Consensys IT is insisting on rebooting my machine NOW.
| * @param error - The error to check if it is retryable. | ||
| * @returns True if the error is retryable, false otherwise. | ||
| */ | ||
| export function isRetryableNetworkError(error: unknown): boolean { |
There was a problem hiding this comment.
Which conditions would constitute a non-retryable network error? I think we'll get an ECONNRESET if a kernel is shutdown and probably ECONNREFUSED or EHOSTUNREACH if it's just not running when you try to talk to it, so do we have a way to distinguish "it's not running now" from "it will never be running again" from "you have the wrong address"?
There was a problem hiding this comment.
Yeah, the function doesn't distinguish between "not running now" (temporary) "will never be running again" (permanent) and "wrong address" (permanent). All listed network errors (ECONNRESET, ECONNREFUSED, EHOSTUNREACH, etc.) are treated as retryable, so we rely on maxAttempts to eventually stop retrying.
We can't distinguish these scenarios from error codes alone. I'll create a follow up task
Error pattern analysis and permanent failure detection: Implement heuristics to distinguish temporary network failures from permanent failures. Track error patterns over time (consecutive identical errors, error frequency) and classify persistent failures as permanently non-retryable:
- Track the pattern of errors across reconnection attempts
- If the same error code (e.g.,
ECONNREFUSED,EHOSTUNREACH) persists across many attempts without any success, classify it as permanently non-retryable - Detect "wrong address" scenarios through persistent connection refusal patterns
- Integrate with reconnection logic to stop retrying when patterns indicate permanent failure
- This would enhance
isRetryableNetworkErrorto become stateful (tracking patterns) or add a separate mechanism that feeds into retry decisions
| signal.addEventListener('abort', onAbort, { once: true }); | ||
| } | ||
| }); | ||
| } |
There was a problem hiding this comment.
I think Bugbot is right about this one.
| * Gracefully stop the kernel without deleting vats. | ||
| */ | ||
| async stop(): Promise<void> { | ||
| await this.#remoteManager.stopRemoteComms(); |
There was a problem hiding this comment.
I like having a provision for an orderly shutdown, but I fear that this will rarely happen in the wild. Instead, I think it far more likely that somebody just quits their browser, taking any running kernel and its vats with it. Outside parties that had a communications relationship with whatever had been running in there might have to wait a very long time before being able to reconnect. I'm not sure what this means in terms of maximum retry backoff limits, but I suspect it means something.
There was a problem hiding this comment.
Will be handled on a follow up
| * Handles queueing of messages with their hints during reconnection. | ||
| */ | ||
| export class MessageQueue { | ||
| readonly #queue: QueuedMessage[] = []; |
There was a problem hiding this comment.
I'm wondering if queued messages need to be backed up in persistent storage.
There was a problem hiding this comment.
Yeah we can also track this as a follow up task
…r remote communications
86e97d0 to
bd5dda1
Compare
FUDCo
left a comment
There was a problem hiding this comment.
There's lots still to do here, but I concur that it's all stuff for follow-on issues.
Let's plug it in and give it the smoke test.
Closes #655 #659
Implements resilient automatic reconnection for remote kernel communications with exponential backoff, message queuing, and intelligent error handling. Remote connections now recover seamlessly from network failures without manual intervention or message loss.
Motivation
Distributed kernel systems experience frequent network disruptions due to:
Previously, connections would simply drop and fail. This PR makes the system resilient to these real-world scenarios.
Key Features
Infinite Reconnection with Exponential Backoff
Message Queuing
Sleep/Wake Detection
Clean Lifecycle Management
stop()function for graceful shutdownAbortSignalpropagation cancels all in-flight operationsdialIdempotent()Technical Implementation
Core Changes:
network.ts: Reconnection orchestration, message queuing, wake detectionRemoteManager.ts: AddedstopRemoteComms()lifecycle methodremote-comms.ts: Integrated stop functionalityNew Utilities (in
kernel-utils):wake-detector.ts: Reusable sleep/wake detectioncalculateReconnectionBackoff(): Shared backoff calculationabortableDelay(): AbortSignal-aware delaysNew Error Handling (in
kernel-errors):isRetryableNetworkError(): Classifies transient vs. permanent errorsAbortErrorusage throughoutArchitecture
Initialization Flow
Browser Environment:
Node.js Environment (Simpler):
Cleanup Flow
Note
Adds automatic reconnection with exponential backoff and message queuing for remote communications, plus a stopRemoteComms lifecycle, new error utilities, and full platform wiring with tests.
ConnectionFactory,MessageQueue,ReconnectionManager, and overhaulremotes/network.tsfor autodial, exponential backoff (with jitter), per-peer queues (max 200), and wake-from-sleep handling.stop()to network init; expose viaStopRemoteCommsandRemoteComms.stopRemoteComms.stopRemoteCommswith spec/handler; integrated inplatform-services.store/methods/remote.ts).initializeRemoteComms,sendRemoteMessage, and newstopRemoteComms.PlatformServices: mirror start/stop/send; manage handlers and cleanup.remoteManager.stopRemoteComms()duringKernel.stop().retry,retryWithBackoff,calculateReconnectionBackoff,abortableDelay, andinstallWakeDetector; export fromindex.ts.AbortErrorandisRetryableNetworkError(libp2p/Node codes); export updates.remotes/types.ts; re-export in package index.@libp2p/interfacedep; small tooling/settings tweaks.Written by Cursor Bugbot for commit 8832e09. This will update automatically on new commits. Configure here.