Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions source/event_hub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,14 @@ export class EventHub {
this._socketIo = io.connect(this._serverUrl, this._apiUser, this._apiKey);
this._socketIo.on("connect", this._onSocketConnected);
this._socketIo.on("ftrack.event", this._handle);
// SimpleSocketIOClient.connect is a singleton keyed on credentials, so
// a second EventHub built with the same credentials reuses an
// already-open socket. In that case the "connect" event has already
// fired and the listener above will never run on its own — run the
// handler now so this hub registers its reply subscription.
if (this._socketIo.socket.connected) {
this._onSocketConnected();
}
}

/**
Expand Down
217 changes: 217 additions & 0 deletions test/event_hub.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { EventHub } from "../source/event_hub.js";
import { Event } from "../source/event.js";
import SimpleSocketIOClient from "../source/simple_socketio.js";
import { vi, describe, expect, beforeEach, afterEach, test } from "vitest";

describe("EventHub", () => {
Expand Down Expand Up @@ -248,6 +249,222 @@ describe("EventHub", () => {
});
});

describe("EventHub sharing a SimpleSocketIOClient singleton", () => {
// Regression test for the publishAndWaitForReply drop-on-second-Session
// bug: SimpleSocketIOClient.connect() is a singleton keyed on
// (serverUrl, apiUser, apiKey). A second EventHub built with the same
// credentials reuses the already-open socket. Each EventHub generates
// its own _id and registers its reply subscription inside
// _onSocketConnected, wired via socketIo.on("connect", ...). Because
// the "connect" event has already fired by the time the second hub
// attaches its listener, the listener would never run without the
// already-connected check in EventHub#connect — so the second hub
// would never tell the server about a subscription with its own _id,
// and replies targeted at it would be silently dropped on the server.
function makeSharedSocket() {
const handlers: Record<string, Array<(data: unknown) => void>> = {};
return {
socket: { connected: false },
emit: vi.fn(),
reconnect: vi.fn(),
disconnect: vi.fn(),
on(name: string, cb: (data: unknown) => void) {
(handlers[name] ||= []).push(cb);
},
fire(name: string, data: unknown) {
handlers[name]?.forEach((cb) => cb(data));
},
};
}

function emittedReplySubscriberIds(
socket: ReturnType<typeof makeSharedSocket>,
) {
return socket.emit.mock.calls
.filter(
([eventName, payload]) =>
eventName === "ftrack.event" &&
(payload as any)?.topic === "ftrack.meta.subscribe" &&
(payload as any)?.data?.subscription === "topic=ftrack.meta.reply",
)
.map(([, payload]) => (payload as any)?.data?.subscriber?.id);
}

test("second hub registers its reply subscription even when the shared socket is already connected", async () => {
const sharedSocket = makeSharedSocket();
// Stand in for SimpleSocketIOClient.connect's credential-keyed
// singleton: both EventHubs get the same socket object back.
const connectSpy = vi
.spyOn(SimpleSocketIOClient, "connect")
.mockReturnValue(sharedSocket as unknown as SimpleSocketIOClient);

try {
const hubA = new EventHub("http://ftrack.test", "user", "key") as any;
const hubB = new EventHub("http://ftrack.test", "user", "key") as any;

// Hub A connects before the socket opens (normal first-Session
// case). Its on("connect", ...) listener registers, then the
// socket finishes opening and fires "connect".
hubA.connect();
sharedSocket.socket.connected = true;
sharedSocket.fire("connect", {});
// publish() chains emits through a microtask; flush it.
await Promise.resolve();
await Promise.resolve();

const hubAReplySub = hubA._subscribers.find(
(s: any) => s.subscription === "topic=ftrack.meta.reply",
);
expect(hubAReplySub?.metadata.id).toBe(hubA._id);
expect(emittedReplySubscriberIds(sharedSocket)).toContain(hubA._id);

// Hub B connects AFTER the only "connect" event has fired. Without
// the already-connected fast-path in EventHub#connect, Hub B's
// _onSocketConnected would never run.
hubB.connect();
await Promise.resolve();
await Promise.resolve();

// With the fix in place, Hub B has its reply subscription
// registered locally and the server has been notified with Hub B's
// own _id — so future replies with target="id=<hubB._id>" will
// match a subscription on the server and reach this hub.
const hubBReplySub = hubB._subscribers.find(
(s: any) => s.subscription === "topic=ftrack.meta.reply",
);
expect(hubBReplySub?.metadata.id).toBe(hubB._id);
expect(emittedReplySubscriberIds(sharedSocket)).toContain(hubB._id);
} finally {
connectSpy.mockRestore();
}
});

test("publishAndWaitForReply routes replies to the correct hub on a shared socket", async () => {
const sharedSocket = makeSharedSocket();
const connectSpy = vi
.spyOn(SimpleSocketIOClient, "connect")
.mockReturnValue(sharedSocket as unknown as SimpleSocketIOClient);

try {
const hubA = new EventHub("http://ftrack.test", "user", "key") as any;
const hubB = new EventHub("http://ftrack.test", "user", "key") as any;

hubA.connect();
sharedSocket.socket.connected = true;
sharedSocket.fire("connect", {});
hubB.connect();
await Promise.resolve();
await Promise.resolve();

sharedSocket.emit.mockClear();

// Each hub kicks off a publishAndWaitForReply. They share one
// underlying socket — replies need to land on the correct hub.
const replyAPromise = hubA.publishAndWaitForReply(
new Event("foo.bar", { who: "A" }),
);
const replyBPromise = hubB.publishAndWaitForReply(
new Event("foo.bar", { who: "B" }),
);
await Promise.resolve();
await Promise.resolve();

// Pull the emitted ftrack.event payloads to learn each hub's
// event id and confirm source.id reflects the publishing hub.
const emittedFooBar = sharedSocket.emit.mock.calls
.filter(
([eventName, payload]) =>
eventName === "ftrack.event" &&
(payload as any)?.topic === "foo.bar",
)
.map(([, payload]) => payload as any);
const eventA = emittedFooBar.find((p) => p.data.who === "A");
const eventB = emittedFooBar.find((p) => p.data.who === "B");
expect(eventA?.source?.id).toBe(hubA._id);
expect(eventB?.source?.id).toBe(hubB._id);

// Simulate the event server delivering a reply addressed to hub A.
// Both hubs receive every "ftrack.event" through the shared
// socket's handler chain — only the hub whose _replyCallbacks
// contains the matching inReplyToEvent should resolve.
sharedSocket.fire("ftrack.event", {
topic: "ftrack.meta.reply",
data: { result: "A-reply" },
source: { id: "responder" },
id: "server-reply-1",
inReplyToEvent: eventA.id,
target: `id=${hubA._id}`,
});
// …and a reply for hub B.
sharedSocket.fire("ftrack.event", {
topic: "ftrack.meta.reply",
data: { result: "B-reply" },
source: { id: "responder" },
id: "server-reply-2",
inReplyToEvent: eventB.id,
target: `id=${hubB._id}`,
});

const [replyA, replyB] = await Promise.all([
replyAPromise,
replyBPromise,
]);
expect((replyA as any).data.result).toBe("A-reply");
expect((replyB as any).data.result).toBe("B-reply");
} finally {
connectSpy.mockRestore();
}
});

test("reconnect re-runs _onSocketConnected for both hubs without duplicating local subscribers", async () => {
const sharedSocket = makeSharedSocket();
const connectSpy = vi
.spyOn(SimpleSocketIOClient, "connect")
.mockReturnValue(sharedSocket as unknown as SimpleSocketIOClient);

try {
const hubA = new EventHub("http://ftrack.test", "user", "key") as any;
const hubB = new EventHub("http://ftrack.test", "user", "key") as any;

hubA.connect();
sharedSocket.socket.connected = true;
sharedSocket.fire("connect", {});
hubB.connect();
await Promise.resolve();
await Promise.resolve();

const replySubs = (hub: any) =>
hub._subscribers.filter(
(s: any) => s.subscription === "topic=ftrack.meta.reply",
);
expect(replySubs(hubA)).toHaveLength(1);
expect(replySubs(hubB)).toHaveLength(1);

// Simulate a reconnect: real handleOpen() fires "connect" again
// after the socket recovers. Both hubs are now wired up for it.
sharedSocket.emit.mockClear();
sharedSocket.fire("connect", {});
await Promise.resolve();
await Promise.resolve();

// _onSocketConnected catches NotUniqueError from the duplicate
// subscribe() call, so the local subscriber list stays at one
// entry per hub.
expect(replySubs(hubA)).toHaveLength(1);
expect(replySubs(hubB)).toHaveLength(1);

// …but both hubs do re-notify the server (via the for-loop in
// _onSocketConnected), which is the intended behaviour for
// surviving an event-server-side state loss.
const reNotifiedIds = emittedReplySubscriberIds(sharedSocket);
expect(reNotifiedIds).toContain(hubA._id);
expect(reNotifiedIds).toContain(hubB._id);
} finally {
connectSpy.mockRestore();
}
});
});

test("EventHub constructor", async () => {
// Scenario 1
const eventHub1 = new EventHub("https://ftrack.test", "testUser", "testKey", {
Expand Down
Loading