Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .size-limit.js
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ module.exports = [
path: createCDNPath('bundle.tracing.replay.feedback.min.js'),
gzip: false,
brotli: false,
limit: '271 KB',
limit: '272 KB',
disablePlugins: ['@size-limit/esbuild'],
},
{
Expand Down
80 changes: 51 additions & 29 deletions packages/replay-internal/src/eventBuffer/WorkerHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@ import { DEBUG_BUILD } from '../debug-build';
import type { WorkerRequest, WorkerResponse } from '../types';
import { debug } from '../util/logger';

interface PendingRequest {
method: WorkerRequest['method'];
resolve: (value: unknown) => void;
reject: (reason: unknown) => void;
}

/**
* Event buffer that uses a web worker to compress events.
* Exported only for testing.
Expand All @@ -10,10 +16,16 @@ export class WorkerHandler {
private _worker: Worker;
private _id: number;
private _ensureReadyPromise?: Promise<void>;
private _pending: Map<number, PendingRequest>;

public constructor(worker: Worker) {
this._worker = worker;
this._id = 0;
this._pending = new Map();
// A single long-lived listener routes responses by id. Per-request
// listeners would make worker dispatch O(n) per response, so a burst of N
// in-flight requests becomes O(n^2) main-thread work.
this._worker.addEventListener('message', this._onMessage);
}

/**
Expand Down Expand Up @@ -62,6 +74,9 @@ export class WorkerHandler {
*/
public destroy(): void {
DEBUG_BUILD && debug.log('Destroying compression worker');
this._worker.removeEventListener('message', this._onMessage);
this._pending.forEach(pending => pending.reject(new Error('Worker destroyed')));
this._pending.clear();
this._worker.terminate();
Comment thread
logaretm marked this conversation as resolved.
Comment thread
logaretm marked this conversation as resolved.
}

Expand All @@ -71,39 +86,46 @@ export class WorkerHandler {
public postMessage<T>(method: WorkerRequest['method'], arg?: WorkerRequest['arg']): Promise<T> {
const id = this._getAndIncrementId();

return new Promise((resolve, reject) => {
const listener = ({ data }: MessageEvent): void => {
const response = data as WorkerResponse;
if (response.method !== method) {
return;
}

// There can be multiple listeners for a single method, the id ensures
// that the response matches the caller.
if (response.id !== id) {
return;
}

// At this point, we'll always want to remove listener regardless of result status
this._worker.removeEventListener('message', listener);
return new Promise<T>((resolve, reject) => {
this._pending.set(id, {
method,
resolve: resolve as (value: unknown) => void,
reject,
});
try {
this._worker.postMessage({ id, method, arg });
} catch (error) {
// If postMessage throws synchronously (e.g. DataCloneError, worker
// already terminated), drop the pending entry so it doesn't leak.
this._pending.delete(id);
reject(error);
}
});
}

if (!response.success) {
// TODO: Do some error handling, not sure what
DEBUG_BUILD && debug.error('Error in compression worker: ', response.response);
private _onMessage = ({ data }: MessageEvent): void => {
const response = data as WorkerResponse;
Comment thread
logaretm marked this conversation as resolved.
// The worker emits an init message with `id: undefined` on load, which is
// handled by `ensureReady()` via its own listener. Ignore anything that
// doesn't carry a numeric id we issued.
if (typeof response.id !== 'number') {
return;
}
const pending = this._pending.get(response.id);
if (!pending || pending.method !== response.method) {
return;
}

reject(new Error('Error in compression worker'));
return;
}
this._pending.delete(response.id);

resolve(response.response as T);
};
if (!response.success) {
DEBUG_BUILD && debug.error('Error in compression worker: ', response.response);
pending.reject(new Error('Error in compression worker'));
return;
}

// Note: we can't use `once` option because it's possible it needs to
// listen to multiple messages
this._worker.addEventListener('message', listener);
this._worker.postMessage({ id, method, arg });
});
}
pending.resolve(response.response);
};
Comment thread
cursor[bot] marked this conversation as resolved.

/** Get the current ID and increment it for the next call. */
private _getAndIncrementId(): number {
Expand Down
174 changes: 174 additions & 0 deletions packages/replay-internal/test/unit/eventBuffer/WorkerHandler.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
/**
* @vitest-environment jsdom
*/

import { describe, expect, it } from 'vitest';
import { WorkerHandler } from '../../../src/eventBuffer/WorkerHandler';
import type { WorkerResponse } from '../../../src/types';

/**
* Minimal Worker stub that lets tests control when responses dispatch and
* track how many 'message' listeners are attached at any time. Real workers
* are async; we model that with a queue we drain manually so the test can
* assert on the listener count while requests are in flight.
*/
class MockWorker implements Pick<Worker, 'addEventListener' | 'removeEventListener' | 'postMessage' | 'terminate'> {
public listenerCount = 0;
public terminated = false;

private _listeners = new Map<string, Set<EventListenerOrEventListenerObject>>();
private _pendingRequests: Array<{ id: number; method: string }> = [];

public addEventListener(type: string, listener: EventListenerOrEventListenerObject): void {
if (!this._listeners.has(type)) this._listeners.set(type, new Set());
this._listeners.get(type)!.add(listener);
if (type === 'message') this.listenerCount++;
}

public removeEventListener(type: string, listener: EventListenerOrEventListenerObject): void {
const set = this._listeners.get(type);
if (set?.delete(listener) && type === 'message') this.listenerCount--;
}

public postMessage(data: unknown): void {
const { id, method } = data as { id: number; method: string };
this._pendingRequests.push({ id, method });
}

public terminate(): void {
this.terminated = true;
}

/** Dispatch the queued response for a given id (FIFO order otherwise). */
public flushOne(overrides?: Partial<WorkerResponse>): void {
const next = this._pendingRequests.shift();
if (!next) return;
const response: WorkerResponse = {
id: next.id,
method: next.method,
success: true,
response: `result-${next.id}`,
...overrides,
};
this._dispatch('message', { data: response } as MessageEvent);
}

public flushAll(): void {
while (this._pendingRequests.length > 0) this.flushOne();
}

/** Dispatch a message that doesn't correspond to a queued request. */
public dispatchRaw(response: Partial<WorkerResponse>): void {
this._dispatch('message', { data: response } as MessageEvent);
}

public get pendingCount(): number {
return this._pendingRequests.length;
}

private _dispatch(type: string, event: MessageEvent): void {
const set = this._listeners.get(type);
if (!set) return;
for (const listener of set) {
if (typeof listener === 'function') listener(event);
else listener.handleEvent(event);
}
}
}

const makeHandler = () => {
const worker = new MockWorker();
const handler = new WorkerHandler(worker as unknown as Worker);
return { worker, handler };
};

describe('Unit | eventBuffer | WorkerHandler', () => {
it('does not attach a new message listener per postMessage call (regression: #20547)', async () => {
const { worker, handler } = makeHandler();

// One listener is attached at construction time.
expect(worker.listenerCount).toBe(1);

// Fire a burst of in-flight requests. The pre-fix implementation attached
// one listener per call, growing linearly; this would dispatch every
// response to all attached listeners (O(n^2) main-thread work).
const promises = Array.from({ length: 100 }, (_, i) => handler.postMessage('addEvent', `arg-${i}`));

expect(worker.listenerCount).toBe(1);
expect(worker.pendingCount).toBe(100);

worker.flushAll();
await Promise.all(promises);

// Listener count is still 1 after the burst drains.
expect(worker.listenerCount).toBe(1);
});

it('resolves concurrent postMessage calls with the correct response per id', async () => {
const { worker, handler } = makeHandler();

const p0 = handler.postMessage<string>('addEvent', 'a');
const p1 = handler.postMessage<string>('addEvent', 'b');
const p2 = handler.postMessage<string>('addEvent', 'c');

worker.flushAll();

await expect(p0).resolves.toBe('result-0');
await expect(p1).resolves.toBe('result-1');
await expect(p2).resolves.toBe('result-2');
});

it('rejects when the worker reports success: false', async () => {
const { worker, handler } = makeHandler();

const promise = handler.postMessage('addEvent', 'a');
worker.flushOne({ success: false, response: 'boom' });

await expect(promise).rejects.toThrow('Error in compression worker');
});

it('rejects and cleans up the pending entry when worker.postMessage throws synchronously', async () => {
const { worker, handler } = makeHandler();
const error = new Error('DataCloneError');
worker.postMessage = () => {
throw error;
};

await expect(handler.postMessage('addEvent', 'a')).rejects.toBe(error);

// A subsequent successful call should still work — the previous failure
// didn't leave a stale entry behind.
worker.postMessage = MockWorker.prototype.postMessage.bind(worker);
const promise = handler.postMessage<string>('addEvent', 'b');
worker.flushOne();
await expect(promise).resolves.toBe('result-1');
});

it('ignores messages without a numeric id (e.g. the worker init message)', async () => {
const { worker, handler } = makeHandler();

const promise = handler.postMessage<string>('addEvent', 'a');

// Simulate the init message the worker emits on load. Should be ignored
// and not crash.
worker.dispatchRaw({ id: undefined, method: 'init', success: true });

// The legitimate response still resolves.
worker.flushOne();
await expect(promise).resolves.toBe('result-0');
});

it('destroy() rejects pending requests and detaches the listener', async () => {
const { worker, handler } = makeHandler();

const p1 = handler.postMessage('addEvent', 'a');
const p2 = handler.postMessage('addEvent', 'b');

handler.destroy();

await expect(p1).rejects.toThrow('Worker destroyed');
await expect(p2).rejects.toThrow('Worker destroyed');
expect(worker.terminated).toBe(true);
expect(worker.listenerCount).toBe(0);
});
});
Loading