From b86ac10ed7b67bf403c66864a4d714c24699edc5 Mon Sep 17 00:00:00 2001 From: Francesco Novy Date: Thu, 26 Jan 2023 12:39:03 -0500 Subject: [PATCH] fix(replay): Handle compression worker errors more gracefully (#6936) --- packages/replay/src/constants.ts | 3 + .../src/eventBuffer/EventBufferArray.ts | 54 +++++ .../EventBufferCompressionWorker.ts} | 185 +++--------------- .../src/eventBuffer/EventBufferProxy.ts | 92 +++++++++ packages/replay/src/eventBuffer/index.ts | 33 ++++ packages/replay/src/replay.ts | 11 +- .../test/integration/errorSampleRate.test.ts | 10 +- packages/replay/test/unit/eventBuffer.test.ts | 63 +++--- 8 files changed, 263 insertions(+), 188 deletions(-) create mode 100644 packages/replay/src/eventBuffer/EventBufferArray.ts rename packages/replay/src/{eventBuffer.ts => eventBuffer/EventBufferCompressionWorker.ts} (50%) create mode 100644 packages/replay/src/eventBuffer/EventBufferProxy.ts create mode 100644 packages/replay/src/eventBuffer/index.ts diff --git a/packages/replay/src/constants.ts b/packages/replay/src/constants.ts index 4cdae5c8199f..434c64158cb4 100644 --- a/packages/replay/src/constants.ts +++ b/packages/replay/src/constants.ts @@ -27,5 +27,8 @@ export const MASK_ALL_TEXT_SELECTOR = 'body *:not(style), body *:not(script)'; export const DEFAULT_FLUSH_MIN_DELAY = 5_000; export const DEFAULT_FLUSH_MAX_DELAY = 5_000; +/* How long to wait for error checkouts */ +export const ERROR_CHECKOUT_TIME = 60_000; + export const RETRY_BASE_INTERVAL = 5000; export const RETRY_MAX_COUNT = 3; diff --git a/packages/replay/src/eventBuffer/EventBufferArray.ts b/packages/replay/src/eventBuffer/EventBufferArray.ts new file mode 100644 index 000000000000..59eda189e60c --- /dev/null +++ b/packages/replay/src/eventBuffer/EventBufferArray.ts @@ -0,0 +1,54 @@ +import type { AddEventResult, EventBuffer, RecordingEvent } from '../types'; + +/** + * A basic event buffer that does not do any compression. + * Used as fallback if the compression worker cannot be loaded or is disabled. + */ +export class EventBufferArray implements EventBuffer { + private _events: RecordingEvent[]; + + public constructor() { + this._events = []; + } + + /** @inheritdoc */ + public get pendingLength(): number { + return this._events.length; + } + + /** + * Returns the raw events that are buffered. In `EventBufferArray`, this is the + * same as `this._events`. + */ + public get pendingEvents(): RecordingEvent[] { + return this._events; + } + + /** @inheritdoc */ + public destroy(): void { + this._events = []; + } + + /** @inheritdoc */ + public async addEvent(event: RecordingEvent, isCheckout?: boolean): Promise { + if (isCheckout) { + this._events = [event]; + return; + } + + this._events.push(event); + return; + } + + /** @inheritdoc */ + public finish(): Promise { + return new Promise(resolve => { + // Make a copy of the events array reference and immediately clear the + // events member so that we do not lose new events while uploading + // attachment. + const eventsRet = this._events; + this._events = []; + resolve(JSON.stringify(eventsRet)); + }); + } +} diff --git a/packages/replay/src/eventBuffer.ts b/packages/replay/src/eventBuffer/EventBufferCompressionWorker.ts similarity index 50% rename from packages/replay/src/eventBuffer.ts rename to packages/replay/src/eventBuffer/EventBufferCompressionWorker.ts index 94aa0870bf73..66cb92db849b 100644 --- a/packages/replay/src/eventBuffer.ts +++ b/packages/replay/src/eventBuffer/EventBufferCompressionWorker.ts @@ -1,153 +1,7 @@ -/* eslint-disable @typescript-eslint/no-unsafe-member-access */ -// TODO: figure out member access types and remove the line above - import type { ReplayRecordingData } from '@sentry/types'; import { logger } from '@sentry/utils'; -import type { AddEventResult, EventBuffer, RecordingEvent, WorkerRequest } from './types'; -import workerString from './worker/worker.js'; - -interface CreateEventBufferParams { - useCompression: boolean; -} - -/** - * Create an event buffer for replays. - */ -export function createEventBuffer({ useCompression }: CreateEventBufferParams): EventBuffer { - // eslint-disable-next-line no-restricted-globals - if (useCompression && window.Worker) { - const workerBlob = new Blob([workerString]); - const workerUrl = URL.createObjectURL(workerBlob); - - __DEBUG_BUILD__ && logger.log('[Replay] Using compression worker'); - const worker = new Worker(workerUrl); - return new EventBufferProxy(worker); - } - - __DEBUG_BUILD__ && logger.log('[Replay] Using simple buffer'); - return new EventBufferArray(); -} - -/** - * This proxy will try to use the compression worker, and fall back to use the simple buffer if an error occurs there. - * This can happen e.g. if the worker cannot be loaded. - * Exported only for testing. - */ -export class EventBufferProxy implements EventBuffer { - private _fallback: EventBufferArray; - private _compression: EventBufferCompressionWorker; - private _used: EventBuffer; - - public constructor(worker: Worker) { - this._fallback = new EventBufferArray(); - this._compression = new EventBufferCompressionWorker(worker); - this._used = this._fallback; - - void this._ensureWorkerIsLoaded(); - } - - /** @inheritDoc */ - public get pendingLength(): number { - return this._used.pendingLength; - } - - /** @inheritDoc */ - public get pendingEvents(): RecordingEvent[] { - return this._used.pendingEvents; - } - - /** @inheritDoc */ - public destroy(): void { - this._fallback.destroy(); - this._compression.destroy(); - } - - /** - * Add an event to the event buffer. - * - * Returns true if event was successfully added. - */ - public addEvent(event: RecordingEvent, isCheckout?: boolean): Promise { - return this._used.addEvent(event, isCheckout); - } - - /** @inheritDoc */ - public finish(): Promise { - return this._used.finish(); - } - - /** Ensure the worker has loaded. */ - private async _ensureWorkerIsLoaded(): Promise { - try { - await this._compression.ensureReady(); - } catch (error) { - // If the worker fails to load, we fall back to the simple buffer. - // Nothing more to do from our side here - __DEBUG_BUILD__ && logger.log('[Replay] Failed to load the compression worker, falling back to simple buffer'); - return; - } - - // Compression worker is ready, we can use it - // Now we need to switch over the array buffer to the compression worker - const addEventPromises: Promise[] = []; - for (const event of this._fallback.pendingEvents) { - addEventPromises.push(this._compression.addEvent(event)); - } - - // We switch over to the compression buffer immediately - any further events will be added - // after the previously buffered ones - this._used = this._compression; - - // Wait for original events to be re-added before resolving - await Promise.all(addEventPromises); - } -} - -class EventBufferArray implements EventBuffer { - private _events: RecordingEvent[]; - - public constructor() { - this._events = []; - } - - public get pendingLength(): number { - return this._events.length; - } - - /** - * Returns the raw events that are buffered. In `EventBufferArray`, this is the - * same as `this._events`. - */ - public get pendingEvents(): RecordingEvent[] { - return this._events; - } - - public destroy(): void { - this._events = []; - } - - public async addEvent(event: RecordingEvent, isCheckout?: boolean): Promise { - if (isCheckout) { - this._events = [event]; - return; - } - - this._events.push(event); - return; - } - - public finish(): Promise { - return new Promise(resolve => { - // Make a copy of the events array reference and immediately clear the - // events member so that we do not lose new events while uploading - // attachment. - const eventsRet = this._events; - this._events = []; - resolve(JSON.stringify(eventsRet)); - }); - } -} +import type { AddEventResult, EventBuffer, RecordingEvent, WorkerRequest, WorkerResponse } from '../types'; /** * Event buffer that uses a web worker to compress events. @@ -164,6 +18,7 @@ export class EventBufferCompressionWorker implements EventBuffer { private _worker: Worker; private _eventBufferItemLength: number = 0; private _id: number = 0; + private _ensureReadyPromise?: Promise; public constructor(worker: Worker) { this._worker = worker; @@ -190,11 +45,16 @@ export class EventBufferCompressionWorker implements EventBuffer { * This will either resolve when the worker is ready, or reject if an error occured. */ public ensureReady(): Promise { - return new Promise((resolve, reject) => { + // Ensure we only check once + if (this._ensureReadyPromise) { + return this._ensureReadyPromise; + } + + this._ensureReadyPromise = new Promise((resolve, reject) => { this._worker.addEventListener( 'message', ({ data }: MessageEvent) => { - if (data.success) { + if ((data as WorkerResponse).success) { resolve(); } else { reject(); @@ -211,6 +71,8 @@ export class EventBufferCompressionWorker implements EventBuffer { { once: true }, ); }); + + return this._ensureReadyPromise; } /** @@ -248,8 +110,15 @@ export class EventBufferCompressionWorker implements EventBuffer { /** * Finish the event buffer and return the compressed data. */ - public finish(): Promise { - return this._finishRequest(this._getAndIncrementId()); + public async finish(): Promise { + try { + return await this._finishRequest(this._getAndIncrementId()); + } catch (error) { + __DEBUG_BUILD__ && logger.error('[Replay] Error when trying to compress events', error); + // fall back to uncompressed + const events = this.pendingEvents; + return JSON.stringify(events); + } } /** @@ -257,30 +126,30 @@ export class EventBufferCompressionWorker implements EventBuffer { */ private _postMessage({ id, method, args }: WorkerRequest): Promise { return new Promise((resolve, reject) => { - // eslint-disable-next-line @typescript-eslint/explicit-function-return-type - const listener = ({ data }: MessageEvent) => { - if (data.method !== method) { + const listener = ({ data }: MessageEvent): void => { + const response = data as WorkerResponse; + if (response.method !== method) { return; } // There can be multiple listeners for a single method, the id ensures // that the response matches the caller. - if (data.id !== id) { + if (response.id !== id) { return; } // At this point, we'll always want to remove listener regardless of result status this._worker.removeEventListener('message', listener); - if (!data.success) { + if (!response.success) { // TODO: Do some error handling, not sure what - __DEBUG_BUILD__ && logger.error('[Replay]', data.response); + __DEBUG_BUILD__ && logger.error('[Replay]', response.response); reject(new Error('Error in compression worker')); return; } - resolve(data.response); + resolve(response.response as T); }; let stringifiedArgs; diff --git a/packages/replay/src/eventBuffer/EventBufferProxy.ts b/packages/replay/src/eventBuffer/EventBufferProxy.ts new file mode 100644 index 000000000000..f438cd5a6ef9 --- /dev/null +++ b/packages/replay/src/eventBuffer/EventBufferProxy.ts @@ -0,0 +1,92 @@ +import type { ReplayRecordingData } from '@sentry/types'; +import { logger } from '@sentry/utils'; + +import type { AddEventResult, EventBuffer, RecordingEvent } from '../types'; +import { EventBufferArray } from './EventBufferArray'; +import { EventBufferCompressionWorker } from './EventBufferCompressionWorker'; + +/** + * This proxy will try to use the compression worker, and fall back to use the simple buffer if an error occurs there. + * This can happen e.g. if the worker cannot be loaded. + * Exported only for testing. + */ +export class EventBufferProxy implements EventBuffer { + private _fallback: EventBufferArray; + private _compression: EventBufferCompressionWorker; + private _used: EventBuffer; + private _ensureWorkerIsLoadedPromise: Promise; + + public constructor(worker: Worker) { + this._fallback = new EventBufferArray(); + this._compression = new EventBufferCompressionWorker(worker); + this._used = this._fallback; + + this._ensureWorkerIsLoadedPromise = this._ensureWorkerIsLoaded().catch(() => { + // Ignore errors here + }); + } + + /** @inheritDoc */ + public get pendingLength(): number { + return this._used.pendingLength; + } + + /** @inheritDoc */ + public get pendingEvents(): RecordingEvent[] { + return this._used.pendingEvents; + } + + /** @inheritDoc */ + public destroy(): void { + this._fallback.destroy(); + this._compression.destroy(); + } + + /** + * Add an event to the event buffer. + * + * Returns true if event was successfully added. + */ + public addEvent(event: RecordingEvent, isCheckout?: boolean): Promise { + return this._used.addEvent(event, isCheckout); + } + + /** @inheritDoc */ + public async finish(): Promise { + // Ensure the worker is loaded, so the sent event is compressed + await this.ensureWorkerIsLoaded(); + + return this._used.finish(); + } + + /** Ensure the worker has loaded. */ + public ensureWorkerIsLoaded(): Promise { + return this._ensureWorkerIsLoadedPromise; + } + + /** Actually check if the worker has been loaded. */ + private async _ensureWorkerIsLoaded(): Promise { + try { + await this._compression.ensureReady(); + } catch (error) { + // If the worker fails to load, we fall back to the simple buffer. + // Nothing more to do from our side here + __DEBUG_BUILD__ && logger.log('[Replay] Failed to load the compression worker, falling back to simple buffer'); + return; + } + + // Compression worker is ready, we can use it + // Now we need to switch over the array buffer to the compression worker + const addEventPromises: Promise[] = []; + for (const event of this._fallback.pendingEvents) { + addEventPromises.push(this._compression.addEvent(event)); + } + + // We switch over to the compression buffer immediately - any further events will be added + // after the previously buffered ones + this._used = this._compression; + + // Wait for original events to be re-added before resolving + await Promise.all(addEventPromises); + } +} diff --git a/packages/replay/src/eventBuffer/index.ts b/packages/replay/src/eventBuffer/index.ts new file mode 100644 index 000000000000..a928d6832d1e --- /dev/null +++ b/packages/replay/src/eventBuffer/index.ts @@ -0,0 +1,33 @@ +import { logger } from '@sentry/utils'; + +import type { EventBuffer } from '../types'; +import workerString from '../worker/worker.js'; +import { EventBufferArray } from './EventBufferArray'; +import { EventBufferProxy } from './EventBufferProxy'; + +interface CreateEventBufferParams { + useCompression: boolean; +} + +/** + * Create an event buffer for replays. + */ +export function createEventBuffer({ useCompression }: CreateEventBufferParams): EventBuffer { + // eslint-disable-next-line no-restricted-globals + if (useCompression && window.Worker) { + try { + const workerBlob = new Blob([workerString]); + const workerUrl = URL.createObjectURL(workerBlob); + + __DEBUG_BUILD__ && logger.log('[Replay] Using compression worker'); + const worker = new Worker(workerUrl); + return new EventBufferProxy(worker); + } catch (error) { + __DEBUG_BUILD__ && logger.log('[Replay] Failed to create compression worker'); + // Fall back to use simple event buffer array + } + } + + __DEBUG_BUILD__ && logger.log('[Replay] Using simple buffer'); + return new EventBufferArray(); +} diff --git a/packages/replay/src/replay.ts b/packages/replay/src/replay.ts index fd31e5e9ef5d..651d430e6b0d 100644 --- a/packages/replay/src/replay.ts +++ b/packages/replay/src/replay.ts @@ -5,7 +5,13 @@ import type { Breadcrumb, ReplayRecordingMode } from '@sentry/types'; import type { RateLimits } from '@sentry/utils'; import { disabledUntil, logger } from '@sentry/utils'; -import { MAX_SESSION_LIFE, SESSION_IDLE_DURATION, VISIBILITY_CHANGE_TIMEOUT, WINDOW } from './constants'; +import { + ERROR_CHECKOUT_TIME, + MAX_SESSION_LIFE, + SESSION_IDLE_DURATION, + VISIBILITY_CHANGE_TIMEOUT, + WINDOW, +} from './constants'; import { setupPerformanceObserver } from './coreHandlers/performanceObserver'; import { createEventBuffer } from './eventBuffer'; import { getSession } from './session/getSession'; @@ -196,7 +202,7 @@ export class ReplayContainer implements ReplayContainerInterface { // When running in error sampling mode, we need to overwrite `checkoutEveryNms` // Without this, it would record forever, until an error happens, which we don't want // instead, we'll always keep the last 60 seconds of replay before an error happened - ...(this.recordingMode === 'error' && { checkoutEveryNms: 60000 }), + ...(this.recordingMode === 'error' && { checkoutEveryNms: ERROR_CHECKOUT_TIME }), emit: this._handleRecordingEmit, }); } catch (err) { @@ -536,6 +542,7 @@ export class ReplayContainer implements ReplayContainerInterface { // replays (e.g. opening and closing a tab quickly), but these can be // filtered on the UI. if (this.recordingMode === 'session') { + // We want to ensure the worker is ready, as otherwise we'd always send the first event uncompressed void this.flushImmediate(); } diff --git a/packages/replay/test/integration/errorSampleRate.test.ts b/packages/replay/test/integration/errorSampleRate.test.ts index 2b52e6c7d96c..e95b2b1594b9 100644 --- a/packages/replay/test/integration/errorSampleRate.test.ts +++ b/packages/replay/test/integration/errorSampleRate.test.ts @@ -1,6 +1,12 @@ import { captureException } from '@sentry/core'; -import { DEFAULT_FLUSH_MIN_DELAY, REPLAY_SESSION_KEY, VISIBILITY_CHANGE_TIMEOUT, WINDOW } from '../../src/constants'; +import { + DEFAULT_FLUSH_MIN_DELAY, + ERROR_CHECKOUT_TIME, + REPLAY_SESSION_KEY, + VISIBILITY_CHANGE_TIMEOUT, + WINDOW, +} from '../../src/constants'; import type { ReplayContainer } from '../../src/replay'; import { addEvent } from '../../src/util/addEvent'; import { PerformanceEntryResource } from '../fixtures/performanceEntry/resource'; @@ -322,7 +328,7 @@ describe('Integration | errorSampleRate', () => { }); it('has correct timestamps when error occurs much later than initial pageload/checkout', async () => { - const ELAPSED = 60000; + const ELAPSED = ERROR_CHECKOUT_TIME; const TEST_EVENT = { data: {}, timestamp: BASE_TIMESTAMP, type: 3 }; mockRecord._emitter(TEST_EVENT); diff --git a/packages/replay/test/unit/eventBuffer.test.ts b/packages/replay/test/unit/eventBuffer.test.ts index 1a48cc333638..b9c96b225cc3 100644 --- a/packages/replay/test/unit/eventBuffer.test.ts +++ b/packages/replay/test/unit/eventBuffer.test.ts @@ -2,7 +2,8 @@ import 'jsdom-worker'; import pako from 'pako'; -import { createEventBuffer, EventBufferProxy } from './../../src/eventBuffer'; +import { EventBufferProxy } from '../../src/eventBuffer/EventBufferProxy'; +import { createEventBuffer } from './../../src/eventBuffer'; import { BASE_TIMESTAMP } from './../index'; const TEST_EVENT = { data: {}, timestamp: BASE_TIMESTAMP, type: 3 }; @@ -57,7 +58,7 @@ describe('Unit | eventBuffer', () => { expect(buffer).toBeInstanceOf(EventBufferProxy); // Ensure worker is ready - await buffer['_ensureWorkerIsLoaded'](); + await buffer.ensureWorkerIsLoaded(); buffer.addEvent(TEST_EVENT); buffer.addEvent(TEST_EVENT); @@ -79,7 +80,7 @@ describe('Unit | eventBuffer', () => { expect(buffer).toBeInstanceOf(EventBufferProxy); // Ensure worker is ready - await buffer['_ensureWorkerIsLoaded'](); + await buffer.ensureWorkerIsLoaded(); await buffer.addEvent(TEST_EVENT); await buffer.addEvent(TEST_EVENT); @@ -102,7 +103,7 @@ describe('Unit | eventBuffer', () => { expect(buffer).toBeInstanceOf(EventBufferProxy); // Ensure worker is ready - await buffer['_ensureWorkerIsLoaded'](); + await buffer.ensureWorkerIsLoaded(); buffer.addEvent(TEST_EVENT); @@ -126,11 +127,12 @@ describe('Unit | eventBuffer', () => { expect(buffer).toBeInstanceOf(EventBufferProxy); // Ensure worker is ready - await buffer['_ensureWorkerIsLoaded'](); + await buffer.ensureWorkerIsLoaded(); buffer.addEvent(TEST_EVENT); const promise1 = buffer.finish(); + await new Promise(process.nextTick); buffer.addEvent({ ...TEST_EVENT, type: 5 }); const promise2 = buffer.finish(); @@ -144,6 +146,31 @@ describe('Unit | eventBuffer', () => { expect(restored1).toEqual(JSON.stringify([TEST_EVENT])); expect(restored2).toEqual(JSON.stringify([{ ...TEST_EVENT, type: 5 }])); }); + + it('handles an error when compressing the payload', async function () { + const buffer = createEventBuffer({ + useCompression: true, + }) as EventBufferProxy; + + expect(buffer).toBeInstanceOf(EventBufferProxy); + + // Ensure worker is ready + await buffer.ensureWorkerIsLoaded(); + + buffer.addEvent(TEST_EVENT); + buffer.addEvent(TEST_EVENT); + + // @ts-ignore Mock this private so it triggers an error + const postMessageSpy = jest.spyOn(buffer._compression, '_postMessage').mockImplementation(() => { + return Promise.reject('test worker error'); + }); + + const result = await buffer.finish(); + + expect(postMessageSpy).toHaveBeenCalledTimes(1); + + expect(result).toEqual(JSON.stringify([TEST_EVENT, TEST_EVENT])); + }); }); describe('EventBufferProxy fallback', () => { @@ -158,7 +185,7 @@ describe('Unit | eventBuffer', () => { consoleErrorSpy.mockRestore(); }); - it('first uses simple buffer, and switches over once worker is loaded', async function () { + it('waits for the worker to be loaded when calling finish', async function () { const buffer = createEventBuffer({ useCompression: true, }) as EventBufferProxy; @@ -170,26 +197,10 @@ describe('Unit | eventBuffer', () => { expect(buffer.pendingEvents).toEqual([TEST_EVENT, TEST_EVENT]); - // Finish before the worker is loaded const result = await buffer.finish(); - expect(typeof result).toBe('string'); - expect(result).toEqual(JSON.stringify([TEST_EVENT, TEST_EVENT])); - - // Now actually finish loading the worker - await buffer['_ensureWorkerIsLoaded'](); - - buffer.addEvent(TEST_EVENT); - buffer.addEvent(TEST_EVENT); - buffer.addEvent(TEST_EVENT); - - expect(buffer.pendingEvents).toEqual([TEST_EVENT, TEST_EVENT, TEST_EVENT]); - - const result2 = await buffer.finish(); - expect(result2).toBeInstanceOf(Uint8Array); - - const restored2 = pako.inflate(result2 as Uint8Array, { to: 'string' }); - - expect(restored2).toEqual(JSON.stringify([TEST_EVENT, TEST_EVENT, TEST_EVENT])); + expect(result).toBeInstanceOf(Uint8Array); + const restored = pako.inflate(result as Uint8Array, { to: 'string' }); + expect(restored).toEqual(JSON.stringify([TEST_EVENT, TEST_EVENT])); }); it('keeps using simple buffer if worker cannot be loaded', async function () { @@ -210,7 +221,7 @@ describe('Unit | eventBuffer', () => { expect(result).toEqual(JSON.stringify([TEST_EVENT, TEST_EVENT])); // Now actually finish loading the worker - which triggers an error - await buffer['_ensureWorkerIsLoaded'](); + await buffer.ensureWorkerIsLoaded(); buffer.addEvent(TEST_EVENT); buffer.addEvent(TEST_EVENT);