Skip to content

fix(event-hub): register reply subscription when reusing an open socket#386

Merged
jimmycallin merged 1 commit into
mainfrom
fix/event-hub-shared-socket-reply-routing
May 18, 2026
Merged

fix(event-hub): register reply subscription when reusing an open socket#386
jimmycallin merged 1 commit into
mainfrom
fix/event-hub-shared-socket-reply-routing

Conversation

@jimmycallin
Copy link
Copy Markdown
Contributor

  • I have added automatic tests where applicable
  • The PR title is suitable as a release note
  • The PR contains a description of what has been changed
  • The description contains manual test instructions

Changes

When a JS process constructs two or more new Session(...) instances with the same serverUrl + apiUser + apiKey, the second-and-onward Sessions silently lost publishAndWaitForReply replies, surfacing as timeouts. In a 50/50 split between two Sessions, exactly half the calls timed out.

Root cause

SimpleSocketIOClient.connect() is a singleton keyed on (serverUrl, apiUser, apiKey) (source/simple_socketio.ts:85), so a second EventHub built with the same credentials reuses an already-open socket. Each EventHub generates its own _id and registers topic=ftrack.meta.reply inside _onSocketConnected, which is wired via socketIo.on("connect", ...). By the time the second hub attaches that listener, the "connect" event has already fired and won't fire again, so the listener never runs — the hub never tells the server about a subscription with its own _id. The event server enforces strict id-equality on the reply target, so replies addressed to the second hub match zero server-side subscriptions and are dropped.

Fix

EventHub#connect() now invokes _onSocketConnected() synchronously when the underlying socket is already connected, so late-arriving hubs still register their reply subscription. The handler is idempotent (it catches NotUniqueError), so it remains safe on the existing reconnect path.

Tests

Three new tests in test/event_hub.test.ts under describe("EventHub sharing a SimpleSocketIOClient singleton"):

  1. Second hub registers its reply subscription when the shared socket is already connected — proves the root cause is addressed.
  2. publishAndWaitForReply routes replies to the correct hub on a shared socket — drives two hubs publishing concurrently, fires simulated reply events back through the shared socket, asserts each hub's promise resolves with its own reply.
  3. Reconnect re-runs _onSocketConnected for both hubs without duplicating local subscribers — confirms the fast-path composes cleanly with the existing reconnect flow.

All three fail in the expected places when the fix is reverted.

Resolves F-990

Test

Automated coverage above. To verify manually:

import { Session, Event } from "@ftrack/api";

const a = new Session(serverUrl, apiUser, apiKey, { autoConnectEventHub: true });
const b = new Session(serverUrl, apiUser, apiKey, { autoConnectEventHub: true });

await Promise.all([
  a.eventHub.subscribe("topic=ftrack.test.ping", () => "pong-A"),
  b.eventHub.subscribe("topic=ftrack.test.ping", () => "pong-B"),
]);

// Before the fix: one of these two calls times out.
// After the fix: both resolve.
const [resA, resB] = await Promise.all([
  a.eventHub.publishAndWaitForReply(new Event("ftrack.test.ping", { from: "A" })),
  b.eventHub.publishAndWaitForReply(new Event("ftrack.test.ping", { from: "B" })),
]);

SimpleSocketIOClient.connect() is a singleton keyed on
(serverUrl, apiUser, apiKey), so a second EventHub built with the
same credentials reuses an already-open socket. The connect handler
that registers each hub's topic=ftrack.meta.reply subscription was
wired via socketIo.on("connect", ...) inside EventHub#connect — but
the "connect" event had already fired by the time the second hub
attached, so its listener never ran. With the server enforcing
strict id-equality on the reply target, replies addressed to the
second hub matched zero server-side subscriptions and were silently
dropped, surfacing as publishAndWaitForReply timeouts whenever two
Sessions shared credentials.

Run _onSocketConnected() synchronously inside connect() when the
underlying socket is already connected so late-arriving hubs still
register their reply subscription. The handler is idempotent (it
catches NotUniqueError) so it stays safe for the regular reconnect
path.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@jimmycallin jimmycallin requested a review from a team as a code owner May 12, 2026 08:18
@seebergs
Copy link
Copy Markdown

Tried to verify manually but unsure if this is correct?

async function test() {
    const a = new Session('', '', '', { autoConnectEventHub: true })
    const b = new Session('', '', '', { autoConnectEventHub: true });
    
    await Promise.all([
      a.eventHub.subscribe("topic=ftrack.test.ping", () => "pong-A"),
      b.eventHub.subscribe("topic=ftrack.test.ping", () => "pong-B"),
    ])
    
    const [resA, resB] = await Promise.all([
      a.eventHub.publishAndWaitForReply(new Event("ftrack.test.ping", { from: "A" })),
      b.eventHub.publishAndWaitForReply(new Event("ftrack.test.ping", { from: "B" })),
    ])
  
    return { resA: resA.data, resB: resB.data };
  }
  
  test().then((data) => {
    console.log("Test completed:", data)
  }).catch((error) => {
    console.error("Test failed:", error)
  })

Result:

Screenshot 2026-05-18 at 12 58 55

@jimmycallin
Copy link
Copy Markdown
Contributor Author

Hey @seebergs — the result you got (resA: 'pong-A', resB: 'pong-A') is actually the expected behaviour of ftrack's pub/sub model, not a regression. Let me walk through it.

What you've set up

Two Sessions, same credentials. SimpleSocketIOClient.connect is a credential-keyed singleton, so a.eventHub and b.eventHub share one underlying WebSocket. That sharing is exactly what this PR is making safe — without the fix, hub B would silently fail to register its reply subscription and b.publishAndWaitForReply would time out after 30s. With the fix, both hubs are correctly subscribed, which is what lets your test resolve at all.

Each hub then registers a callback for topic=ftrack.test.ping:

  • hub A: () => "pong-A"
  • hub B: () => "pong-B"

These are two independent subscriptions on the event server. Crucially, subscriptions are global topic filters — they're not scoped to "hub A's events" or "hub B's events". They both say to the server: "whenever anyone publishes ftrack.test.ping, call me."

What happens when hub A publishes its ping

  1. Server matches the event against its subscription table → finds both A's and B's ftrack.test.ping subscribers.
  2. Server fans the event out to both of them.
  3. A's callback runs → returns "pong-A" → hub A publishes a reply with target=id=<hubA._id>.
  4. B's callback also runs → returns "pong-B" → hub B publishes a reply with target=id=<hubA._id>.
  5. Server routes both replies to hub A's reply subscription. Both come back over the socket.
  6. publishAndWaitForReply resolves with the first reply that arrives, then removes its callback. The second reply is dropped.

Which one arrives first? A's. Both _handle listeners are attached to the shared socket in construction order (A before B), so A's listener iterates A's subscribers first, queues publishReply as a microtask, then B does the same. A's reply gets emitted to the wire before B's, the server processes them in order, and A's reply comes back first. → resA = "pong-A".

The exact same thing happens when hub B publishes:

  1. Server fans out to both subscribers.
  2. A's callback returns "pong-A", B's returns "pong-B". Both reply with target=id=<hubB._id>.
  3. A's reply arrives first (same registration-order reason) → resB = "pong-A".

Hence { resA: "pong-A", resB: "pong-A" }.

Why this isn't a bug

The intuition "I called b.subscribe(...), so only b-related events trigger it" is appealing but isn't the contract ftrack's event system offers. From the event server's point of view, hub A and hub B are just two clients (that happen to share a TCP connection) — when one of them publishes ftrack.test.ping, every subscriber to that topic gets notified, regardless of who created the subscription.

A useful sanity check: imagine running this code in two separate browser tabs, both logged in as the same user. Tab A publishes a ping → server fans out → both tabs receive and reply → tab A gets the first reply back. Identical behaviour. The socket-sharing in the singleton case isn't what causes the "cross-talk" — it's just pub/sub fan-out, which would happen across any number of sockets.

Also: publishAndWaitForReply is "first reply wins" by design — see its doc comment ("resolved with reply event if received within timeout"). It doesn't filter by responder.

What this PR fixes vs. what it doesn't

Fixes: the second hub on a shared socket now reliably tells the server about its target=id=<hubB._id> reply subscription, so replies addressed to it are no longer dropped server-side. Concretely: pre-fix, b.publishAndWaitForReply(...) in your test would have timed out; post-fix it resolves (with "pong-A", for the reasons above).

Doesn't change: the pub/sub fan-out semantics. If two hubs subscribe to the same topic, both fire — that's by design, and changing it would mean changing the subscription protocol, not the socket layer.

A test that targets this PR specifically

If you want to manually verify the PR's actual fix in isolation, try this — only one subscriber, and a small delay so the second hub is constructed after the shared socket has opened:

async function test() {
  const a = new Session('', '', '', { autoConnectEventHub: true });
  await new Promise(r => setTimeout(r, 1000)); // let the shared socket finish opening
  const b = new Session('', '', '', { autoConnectEventHub: true });

  a.eventHub.subscribe("topic=ftrack.test.ping", () => "pong");

  // Pre-fix: hangs for 30s, then throws EventServerReplyTimeoutError —
  //          because hub B never registered its reply subscription
  //          with the server, so the reply targeted at hub B was
  //          dropped server-side.
  // Post-fix: resolves quickly with "pong".
  const res = await b.eventHub.publishAndWaitForReply(
    new Event("ftrack.test.ping", {}),
  );
  return res.data;
}

Happy to dig further if anything's still murky.

Copy link
Copy Markdown

@seebergs seebergs left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tested, works as intended.

@jimmycallin jimmycallin merged commit e353bb2 into main May 18, 2026
2 checks passed
@jimmycallin jimmycallin deleted the fix/event-hub-shared-socket-reply-routing branch May 18, 2026 13:32
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants