diff --git a/source/event_hub.ts b/source/event_hub.ts index 2eb0e446..6dfa92c7 100644 --- a/source/event_hub.ts +++ b/source/event_hub.ts @@ -182,6 +182,14 @@ export class EventHub { this._socketIo = io.connect(this._serverUrl, this._apiUser, this._apiKey); this._socketIo.on("connect", this._onSocketConnected); this._socketIo.on("ftrack.event", this._handle); + // SimpleSocketIOClient.connect is a singleton keyed on credentials, so + // a second EventHub built with the same credentials reuses an + // already-open socket. In that case the "connect" event has already + // fired and the listener above will never run on its own — run the + // handler now so this hub registers its reply subscription. + if (this._socketIo.socket.connected) { + this._onSocketConnected(); + } } /** diff --git a/test/event_hub.test.ts b/test/event_hub.test.ts index 9be97f6b..9653c6dc 100644 --- a/test/event_hub.test.ts +++ b/test/event_hub.test.ts @@ -1,5 +1,6 @@ import { EventHub } from "../source/event_hub.js"; import { Event } from "../source/event.js"; +import SimpleSocketIOClient from "../source/simple_socketio.js"; import { vi, describe, expect, beforeEach, afterEach, test } from "vitest"; describe("EventHub", () => { @@ -248,6 +249,222 @@ describe("EventHub", () => { }); }); +describe("EventHub sharing a SimpleSocketIOClient singleton", () => { + // Regression test for the publishAndWaitForReply drop-on-second-Session + // bug: SimpleSocketIOClient.connect() is a singleton keyed on + // (serverUrl, apiUser, apiKey). A second EventHub built with the same + // credentials reuses the already-open socket. Each EventHub generates + // its own _id and registers its reply subscription inside + // _onSocketConnected, wired via socketIo.on("connect", ...). Because + // the "connect" event has already fired by the time the second hub + // attaches its listener, the listener would never run without the + // already-connected check in EventHub#connect — so the second hub + // would never tell the server about a subscription with its own _id, + // and replies targeted at it would be silently dropped on the server. + function makeSharedSocket() { + const handlers: Record void>> = {}; + return { + socket: { connected: false }, + emit: vi.fn(), + reconnect: vi.fn(), + disconnect: vi.fn(), + on(name: string, cb: (data: unknown) => void) { + (handlers[name] ||= []).push(cb); + }, + fire(name: string, data: unknown) { + handlers[name]?.forEach((cb) => cb(data)); + }, + }; + } + + function emittedReplySubscriberIds( + socket: ReturnType, + ) { + return socket.emit.mock.calls + .filter( + ([eventName, payload]) => + eventName === "ftrack.event" && + (payload as any)?.topic === "ftrack.meta.subscribe" && + (payload as any)?.data?.subscription === "topic=ftrack.meta.reply", + ) + .map(([, payload]) => (payload as any)?.data?.subscriber?.id); + } + + test("second hub registers its reply subscription even when the shared socket is already connected", async () => { + const sharedSocket = makeSharedSocket(); + // Stand in for SimpleSocketIOClient.connect's credential-keyed + // singleton: both EventHubs get the same socket object back. + const connectSpy = vi + .spyOn(SimpleSocketIOClient, "connect") + .mockReturnValue(sharedSocket as unknown as SimpleSocketIOClient); + + try { + const hubA = new EventHub("http://ftrack.test", "user", "key") as any; + const hubB = new EventHub("http://ftrack.test", "user", "key") as any; + + // Hub A connects before the socket opens (normal first-Session + // case). Its on("connect", ...) listener registers, then the + // socket finishes opening and fires "connect". + hubA.connect(); + sharedSocket.socket.connected = true; + sharedSocket.fire("connect", {}); + // publish() chains emits through a microtask; flush it. + await Promise.resolve(); + await Promise.resolve(); + + const hubAReplySub = hubA._subscribers.find( + (s: any) => s.subscription === "topic=ftrack.meta.reply", + ); + expect(hubAReplySub?.metadata.id).toBe(hubA._id); + expect(emittedReplySubscriberIds(sharedSocket)).toContain(hubA._id); + + // Hub B connects AFTER the only "connect" event has fired. Without + // the already-connected fast-path in EventHub#connect, Hub B's + // _onSocketConnected would never run. + hubB.connect(); + await Promise.resolve(); + await Promise.resolve(); + + // With the fix in place, Hub B has its reply subscription + // registered locally and the server has been notified with Hub B's + // own _id — so future replies with target="id=" will + // match a subscription on the server and reach this hub. + const hubBReplySub = hubB._subscribers.find( + (s: any) => s.subscription === "topic=ftrack.meta.reply", + ); + expect(hubBReplySub?.metadata.id).toBe(hubB._id); + expect(emittedReplySubscriberIds(sharedSocket)).toContain(hubB._id); + } finally { + connectSpy.mockRestore(); + } + }); + + test("publishAndWaitForReply routes replies to the correct hub on a shared socket", async () => { + const sharedSocket = makeSharedSocket(); + const connectSpy = vi + .spyOn(SimpleSocketIOClient, "connect") + .mockReturnValue(sharedSocket as unknown as SimpleSocketIOClient); + + try { + const hubA = new EventHub("http://ftrack.test", "user", "key") as any; + const hubB = new EventHub("http://ftrack.test", "user", "key") as any; + + hubA.connect(); + sharedSocket.socket.connected = true; + sharedSocket.fire("connect", {}); + hubB.connect(); + await Promise.resolve(); + await Promise.resolve(); + + sharedSocket.emit.mockClear(); + + // Each hub kicks off a publishAndWaitForReply. They share one + // underlying socket — replies need to land on the correct hub. + const replyAPromise = hubA.publishAndWaitForReply( + new Event("foo.bar", { who: "A" }), + ); + const replyBPromise = hubB.publishAndWaitForReply( + new Event("foo.bar", { who: "B" }), + ); + await Promise.resolve(); + await Promise.resolve(); + + // Pull the emitted ftrack.event payloads to learn each hub's + // event id and confirm source.id reflects the publishing hub. + const emittedFooBar = sharedSocket.emit.mock.calls + .filter( + ([eventName, payload]) => + eventName === "ftrack.event" && + (payload as any)?.topic === "foo.bar", + ) + .map(([, payload]) => payload as any); + const eventA = emittedFooBar.find((p) => p.data.who === "A"); + const eventB = emittedFooBar.find((p) => p.data.who === "B"); + expect(eventA?.source?.id).toBe(hubA._id); + expect(eventB?.source?.id).toBe(hubB._id); + + // Simulate the event server delivering a reply addressed to hub A. + // Both hubs receive every "ftrack.event" through the shared + // socket's handler chain — only the hub whose _replyCallbacks + // contains the matching inReplyToEvent should resolve. + sharedSocket.fire("ftrack.event", { + topic: "ftrack.meta.reply", + data: { result: "A-reply" }, + source: { id: "responder" }, + id: "server-reply-1", + inReplyToEvent: eventA.id, + target: `id=${hubA._id}`, + }); + // …and a reply for hub B. + sharedSocket.fire("ftrack.event", { + topic: "ftrack.meta.reply", + data: { result: "B-reply" }, + source: { id: "responder" }, + id: "server-reply-2", + inReplyToEvent: eventB.id, + target: `id=${hubB._id}`, + }); + + const [replyA, replyB] = await Promise.all([ + replyAPromise, + replyBPromise, + ]); + expect((replyA as any).data.result).toBe("A-reply"); + expect((replyB as any).data.result).toBe("B-reply"); + } finally { + connectSpy.mockRestore(); + } + }); + + test("reconnect re-runs _onSocketConnected for both hubs without duplicating local subscribers", async () => { + const sharedSocket = makeSharedSocket(); + const connectSpy = vi + .spyOn(SimpleSocketIOClient, "connect") + .mockReturnValue(sharedSocket as unknown as SimpleSocketIOClient); + + try { + const hubA = new EventHub("http://ftrack.test", "user", "key") as any; + const hubB = new EventHub("http://ftrack.test", "user", "key") as any; + + hubA.connect(); + sharedSocket.socket.connected = true; + sharedSocket.fire("connect", {}); + hubB.connect(); + await Promise.resolve(); + await Promise.resolve(); + + const replySubs = (hub: any) => + hub._subscribers.filter( + (s: any) => s.subscription === "topic=ftrack.meta.reply", + ); + expect(replySubs(hubA)).toHaveLength(1); + expect(replySubs(hubB)).toHaveLength(1); + + // Simulate a reconnect: real handleOpen() fires "connect" again + // after the socket recovers. Both hubs are now wired up for it. + sharedSocket.emit.mockClear(); + sharedSocket.fire("connect", {}); + await Promise.resolve(); + await Promise.resolve(); + + // _onSocketConnected catches NotUniqueError from the duplicate + // subscribe() call, so the local subscriber list stays at one + // entry per hub. + expect(replySubs(hubA)).toHaveLength(1); + expect(replySubs(hubB)).toHaveLength(1); + + // …but both hubs do re-notify the server (via the for-loop in + // _onSocketConnected), which is the intended behaviour for + // surviving an event-server-side state loss. + const reNotifiedIds = emittedReplySubscriberIds(sharedSocket); + expect(reNotifiedIds).toContain(hubA._id); + expect(reNotifiedIds).toContain(hubB._id); + } finally { + connectSpy.mockRestore(); + } + }); +}); + test("EventHub constructor", async () => { // Scenario 1 const eventHub1 = new EventHub("https://ftrack.test", "testUser", "testKey", {