From 08a08124dd1556229663c6f73f0c3a99eb2dfb19 Mon Sep 17 00:00:00 2001 From: Abdelrahman Awad Date: Mon, 27 Apr 2026 21:27:21 -0400 Subject: [PATCH 1/4] fix(replay): Avoid main-thread blocking in WorkerHandler under event bursts WorkerHandler.postMessage attached a fresh 'message' listener per request, removed only when the matching response arrived. Under a burst of N in-flight requests, every worker response was dispatched to all N attached listeners, giving O(n^2) main-thread dispatch work. Replace with a single long-lived listener attached in the constructor, and route responses through a Map. destroy() also rejects pending requests instead of leaving them hanging. Public API unchanged. Closes #20547 --- .../src/eventBuffer/WorkerHandler.ts | 67 +++++++++++-------- 1 file changed, 38 insertions(+), 29 deletions(-) diff --git a/packages/replay-internal/src/eventBuffer/WorkerHandler.ts b/packages/replay-internal/src/eventBuffer/WorkerHandler.ts index 790185712b1c..33684df61c56 100644 --- a/packages/replay-internal/src/eventBuffer/WorkerHandler.ts +++ b/packages/replay-internal/src/eventBuffer/WorkerHandler.ts @@ -2,6 +2,12 @@ import { DEBUG_BUILD } from '../debug-build'; import type { WorkerRequest, WorkerResponse } from '../types'; import { debug } from '../util/logger'; +interface PendingRequest { + method: WorkerRequest['method']; + resolve: (value: unknown) => void; + reject: (reason: unknown) => void; +} + /** * Event buffer that uses a web worker to compress events. * Exported only for testing. @@ -10,10 +16,16 @@ export class WorkerHandler { private _worker: Worker; private _id: number; private _ensureReadyPromise?: Promise; + private _pending: Map; public constructor(worker: Worker) { this._worker = worker; this._id = 0; + this._pending = new Map(); + // A single long-lived listener routes responses by id. Per-request + // listeners would make worker dispatch O(n) per response, so a burst of N + // in-flight requests becomes O(n^2) main-thread work. + this._worker.addEventListener('message', this._onMessage); } /** @@ -62,6 +74,9 @@ export class WorkerHandler { */ public destroy(): void { DEBUG_BUILD && debug.log('Destroying compression worker'); + this._worker.removeEventListener('message', this._onMessage); + this._pending.forEach(pending => pending.reject(new Error('Worker destroyed'))); + this._pending.clear(); this._worker.terminate(); } @@ -71,39 +86,33 @@ export class WorkerHandler { public postMessage(method: WorkerRequest['method'], arg?: WorkerRequest['arg']): Promise { const id = this._getAndIncrementId(); - return new Promise((resolve, reject) => { - const listener = ({ data }: MessageEvent): void => { - const response = data as WorkerResponse; - if (response.method !== method) { - return; - } - - // There can be multiple listeners for a single method, the id ensures - // that the response matches the caller. - if (response.id !== id) { - return; - } - - // At this point, we'll always want to remove listener regardless of result status - this._worker.removeEventListener('message', listener); + return new Promise((resolve, reject) => { + this._pending.set(id, { + method, + resolve: resolve as (value: unknown) => void, + reject, + }); + this._worker.postMessage({ id, method, arg }); + }); + } - if (!response.success) { - // TODO: Do some error handling, not sure what - DEBUG_BUILD && debug.error('Error in compression worker: ', response.response); + private _onMessage = ({ data }: MessageEvent): void => { + const response = data as WorkerResponse; + const pending = this._pending.get(response.id); + if (!pending || pending.method !== response.method) { + return; + } - reject(new Error('Error in compression worker')); - return; - } + this._pending.delete(response.id); - resolve(response.response as T); - }; + if (!response.success) { + DEBUG_BUILD && debug.error('Error in compression worker: ', response.response); + pending.reject(new Error('Error in compression worker')); + return; + } - // Note: we can't use `once` option because it's possible it needs to - // listen to multiple messages - this._worker.addEventListener('message', listener); - this._worker.postMessage({ id, method, arg }); - }); - } + pending.resolve(response.response); + }; /** Get the current ID and increment it for the next call. */ private _getAndIncrementId(): number { From 26199632496d378a5a892376c5e14bb068315667 Mon Sep 17 00:00:00 2001 From: Abdelrahman Awad Date: Mon, 27 Apr 2026 21:50:51 -0400 Subject: [PATCH 2/4] test(replay): Add WorkerHandler unit tests --- .../unit/eventBuffer/WorkerHandler.test.ts | 138 ++++++++++++++++++ 1 file changed, 138 insertions(+) create mode 100644 packages/replay-internal/test/unit/eventBuffer/WorkerHandler.test.ts diff --git a/packages/replay-internal/test/unit/eventBuffer/WorkerHandler.test.ts b/packages/replay-internal/test/unit/eventBuffer/WorkerHandler.test.ts new file mode 100644 index 000000000000..fc0a407d7256 --- /dev/null +++ b/packages/replay-internal/test/unit/eventBuffer/WorkerHandler.test.ts @@ -0,0 +1,138 @@ +/** + * @vitest-environment jsdom + */ + +import { describe, expect, it } from 'vitest'; +import { WorkerHandler } from '../../../src/eventBuffer/WorkerHandler'; +import type { WorkerResponse } from '../../../src/types'; + +/** + * Minimal Worker stub that lets tests control when responses dispatch and + * track how many 'message' listeners are attached at any time. Real workers + * are async; we model that with a queue we drain manually so the test can + * assert on the listener count while requests are in flight. + */ +class MockWorker implements Pick { + public listenerCount = 0; + public terminated = false; + + private _listeners = new Map>(); + private _pendingRequests: Array<{ id: number; method: string }> = []; + + public addEventListener(type: string, listener: EventListenerOrEventListenerObject): void { + if (!this._listeners.has(type)) this._listeners.set(type, new Set()); + this._listeners.get(type)!.add(listener); + if (type === 'message') this.listenerCount++; + } + + public removeEventListener(type: string, listener: EventListenerOrEventListenerObject): void { + const set = this._listeners.get(type); + if (set?.delete(listener) && type === 'message') this.listenerCount--; + } + + public postMessage(data: unknown): void { + const { id, method } = data as { id: number; method: string }; + this._pendingRequests.push({ id, method }); + } + + public terminate(): void { + this.terminated = true; + } + + /** Dispatch the queued response for a given id (FIFO order otherwise). */ + public flushOne(overrides?: Partial): void { + const next = this._pendingRequests.shift(); + if (!next) return; + const response: WorkerResponse = { + id: next.id, + method: next.method, + success: true, + response: `result-${next.id}`, + ...overrides, + }; + this._dispatch('message', { data: response } as MessageEvent); + } + + public flushAll(): void { + while (this._pendingRequests.length > 0) this.flushOne(); + } + + public get pendingCount(): number { + return this._pendingRequests.length; + } + + private _dispatch(type: string, event: MessageEvent): void { + const set = this._listeners.get(type); + if (!set) return; + for (const listener of set) { + if (typeof listener === 'function') listener(event); + else listener.handleEvent(event); + } + } +} + +const makeHandler = () => { + const worker = new MockWorker(); + const handler = new WorkerHandler(worker as unknown as Worker); + return { worker, handler }; +}; + +describe('Unit | eventBuffer | WorkerHandler', () => { + it('does not attach a new message listener per postMessage call (regression: #20547)', async () => { + const { worker, handler } = makeHandler(); + + // One listener is attached at construction time. + expect(worker.listenerCount).toBe(1); + + // Fire a burst of in-flight requests. The pre-fix implementation attached + // one listener per call, growing linearly; this would dispatch every + // response to all attached listeners (O(n^2) main-thread work). + const promises = Array.from({ length: 100 }, (_, i) => handler.postMessage('addEvent', `arg-${i}`)); + + expect(worker.listenerCount).toBe(1); + expect(worker.pendingCount).toBe(100); + + worker.flushAll(); + await Promise.all(promises); + + // Listener count is still 1 after the burst drains. + expect(worker.listenerCount).toBe(1); + }); + + it('resolves concurrent postMessage calls with the correct response per id', async () => { + const { worker, handler } = makeHandler(); + + const p0 = handler.postMessage('addEvent', 'a'); + const p1 = handler.postMessage('addEvent', 'b'); + const p2 = handler.postMessage('addEvent', 'c'); + + worker.flushAll(); + + await expect(p0).resolves.toBe('result-0'); + await expect(p1).resolves.toBe('result-1'); + await expect(p2).resolves.toBe('result-2'); + }); + + it('rejects when the worker reports success: false', async () => { + const { worker, handler } = makeHandler(); + + const promise = handler.postMessage('addEvent', 'a'); + worker.flushOne({ success: false, response: 'boom' }); + + await expect(promise).rejects.toThrow('Error in compression worker'); + }); + + it('destroy() rejects pending requests and detaches the listener', async () => { + const { worker, handler } = makeHandler(); + + const p1 = handler.postMessage('addEvent', 'a'); + const p2 = handler.postMessage('addEvent', 'b'); + + handler.destroy(); + + await expect(p1).rejects.toThrow('Worker destroyed'); + await expect(p2).rejects.toThrow('Worker destroyed'); + expect(worker.terminated).toBe(true); + expect(worker.listenerCount).toBe(0); + }); +}); From 327897de18cbfd2f1b7087b4dd50f76b993ffb03 Mon Sep 17 00:00:00 2001 From: Abdelrahman Awad Date: Mon, 27 Apr 2026 21:55:41 -0400 Subject: [PATCH 3/4] fix(replay): Handle WorkerHandler edge cases from review - Wrap worker.postMessage in try/catch so a synchronous throw (e.g. DataCloneError, terminated worker) cleans up the pending entry and rejects the returned promise instead of leaking. - Skip messages without a numeric id in _onMessage. The worker emits { id: undefined, method: 'init', ... } on load; this was already handled implicitly via Map.get(undefined), but a typeof guard makes the contract explicit and is robust against future malformed messages. --- .../src/eventBuffer/WorkerHandler.ts | 15 +++++++- .../unit/eventBuffer/WorkerHandler.test.ts | 36 +++++++++++++++++++ 2 files changed, 50 insertions(+), 1 deletion(-) diff --git a/packages/replay-internal/src/eventBuffer/WorkerHandler.ts b/packages/replay-internal/src/eventBuffer/WorkerHandler.ts index 33684df61c56..dba3c858b711 100644 --- a/packages/replay-internal/src/eventBuffer/WorkerHandler.ts +++ b/packages/replay-internal/src/eventBuffer/WorkerHandler.ts @@ -92,12 +92,25 @@ export class WorkerHandler { resolve: resolve as (value: unknown) => void, reject, }); - this._worker.postMessage({ id, method, arg }); + try { + this._worker.postMessage({ id, method, arg }); + } catch (error) { + // If postMessage throws synchronously (e.g. DataCloneError, worker + // already terminated), drop the pending entry so it doesn't leak. + this._pending.delete(id); + reject(error); + } }); } private _onMessage = ({ data }: MessageEvent): void => { const response = data as WorkerResponse; + // The worker emits an init message with `id: undefined` on load, which is + // handled by `ensureReady()` via its own listener. Ignore anything that + // doesn't carry a numeric id we issued. + if (typeof response.id !== 'number') { + return; + } const pending = this._pending.get(response.id); if (!pending || pending.method !== response.method) { return; diff --git a/packages/replay-internal/test/unit/eventBuffer/WorkerHandler.test.ts b/packages/replay-internal/test/unit/eventBuffer/WorkerHandler.test.ts index fc0a407d7256..0b28cec37348 100644 --- a/packages/replay-internal/test/unit/eventBuffer/WorkerHandler.test.ts +++ b/packages/replay-internal/test/unit/eventBuffer/WorkerHandler.test.ts @@ -57,6 +57,11 @@ class MockWorker implements Pick 0) this.flushOne(); } + /** Dispatch a message that doesn't correspond to a queued request. */ + public dispatchRaw(response: Partial): void { + this._dispatch('message', { data: response } as MessageEvent); + } + public get pendingCount(): number { return this._pendingRequests.length; } @@ -122,6 +127,37 @@ describe('Unit | eventBuffer | WorkerHandler', () => { await expect(promise).rejects.toThrow('Error in compression worker'); }); + it('rejects and cleans up the pending entry when worker.postMessage throws synchronously', async () => { + const { worker, handler } = makeHandler(); + const error = new Error('DataCloneError'); + worker.postMessage = () => { + throw error; + }; + + await expect(handler.postMessage('addEvent', 'a')).rejects.toBe(error); + + // A subsequent successful call should still work — the previous failure + // didn't leave a stale entry behind. + worker.postMessage = MockWorker.prototype.postMessage.bind(worker); + const promise = handler.postMessage('addEvent', 'b'); + worker.flushOne(); + await expect(promise).resolves.toBe('result-1'); + }); + + it('ignores messages without a numeric id (e.g. the worker init message)', async () => { + const { worker, handler } = makeHandler(); + + const promise = handler.postMessage('addEvent', 'a'); + + // Simulate the init message the worker emits on load. Should be ignored + // and not crash. + worker.dispatchRaw({ id: undefined, method: 'init', success: true }); + + // The legitimate response still resolves. + worker.flushOne(); + await expect(promise).resolves.toBe('result-0'); + }); + it('destroy() rejects pending requests and detaches the listener', async () => { const { worker, handler } = makeHandler(); From 9d9136ac707d3e0d356619a0c615fb5777eb2904 Mon Sep 17 00:00:00 2001 From: Abdelrahman Awad Date: Tue, 28 Apr 2026 11:49:46 -0400 Subject: [PATCH 4/4] chore: Bump size limit for CDN Bundle (Tracing, Replay, Feedback) uncompressed --- .size-limit.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.size-limit.js b/.size-limit.js index 38880cc91247..18580bf7b182 100644 --- a/.size-limit.js +++ b/.size-limit.js @@ -326,7 +326,7 @@ module.exports = [ path: createCDNPath('bundle.tracing.replay.feedback.min.js'), gzip: false, brotli: false, - limit: '271 KB', + limit: '272 KB', disablePlugins: ['@size-limit/esbuild'], }, {