Skip to content

Commit

Permalink
fix(replay): Handle compression worker errors more gracefully (#6936)
Browse files Browse the repository at this point in the history
  • Loading branch information
mydea committed Jan 26, 2023
1 parent 6098879 commit b86ac10
Show file tree
Hide file tree
Showing 8 changed files with 263 additions and 188 deletions.
3 changes: 3 additions & 0 deletions packages/replay/src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,8 @@ export const MASK_ALL_TEXT_SELECTOR = 'body *:not(style), body *:not(script)';
export const DEFAULT_FLUSH_MIN_DELAY = 5_000;
export const DEFAULT_FLUSH_MAX_DELAY = 5_000;

/* How long to wait for error checkouts */
export const ERROR_CHECKOUT_TIME = 60_000;

export const RETRY_BASE_INTERVAL = 5000;
export const RETRY_MAX_COUNT = 3;
54 changes: 54 additions & 0 deletions packages/replay/src/eventBuffer/EventBufferArray.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import type { AddEventResult, EventBuffer, RecordingEvent } from '../types';

/**
* A basic event buffer that does not do any compression.
* Used as fallback if the compression worker cannot be loaded or is disabled.
*/
export class EventBufferArray implements EventBuffer {
private _events: RecordingEvent[];

public constructor() {
this._events = [];
}

/** @inheritdoc */
public get pendingLength(): number {
return this._events.length;
}

/**
* Returns the raw events that are buffered. In `EventBufferArray`, this is the
* same as `this._events`.
*/
public get pendingEvents(): RecordingEvent[] {
return this._events;
}

/** @inheritdoc */
public destroy(): void {
this._events = [];
}

/** @inheritdoc */
public async addEvent(event: RecordingEvent, isCheckout?: boolean): Promise<AddEventResult> {
if (isCheckout) {
this._events = [event];
return;
}

this._events.push(event);
return;
}

/** @inheritdoc */
public finish(): Promise<string> {
return new Promise<string>(resolve => {
// Make a copy of the events array reference and immediately clear the
// events member so that we do not lose new events while uploading
// attachment.
const eventsRet = this._events;
this._events = [];
resolve(JSON.stringify(eventsRet));
});
}
}
Original file line number Diff line number Diff line change
@@ -1,153 +1,7 @@
/* eslint-disable @typescript-eslint/no-unsafe-member-access */
// TODO: figure out member access types and remove the line above

import type { ReplayRecordingData } from '@sentry/types';
import { logger } from '@sentry/utils';

import type { AddEventResult, EventBuffer, RecordingEvent, WorkerRequest } from './types';
import workerString from './worker/worker.js';

interface CreateEventBufferParams {
useCompression: boolean;
}

/**
* Create an event buffer for replays.
*/
export function createEventBuffer({ useCompression }: CreateEventBufferParams): EventBuffer {
// eslint-disable-next-line no-restricted-globals
if (useCompression && window.Worker) {
const workerBlob = new Blob([workerString]);
const workerUrl = URL.createObjectURL(workerBlob);

__DEBUG_BUILD__ && logger.log('[Replay] Using compression worker');
const worker = new Worker(workerUrl);
return new EventBufferProxy(worker);
}

__DEBUG_BUILD__ && logger.log('[Replay] Using simple buffer');
return new EventBufferArray();
}

/**
* This proxy will try to use the compression worker, and fall back to use the simple buffer if an error occurs there.
* This can happen e.g. if the worker cannot be loaded.
* Exported only for testing.
*/
export class EventBufferProxy implements EventBuffer {
private _fallback: EventBufferArray;
private _compression: EventBufferCompressionWorker;
private _used: EventBuffer;

public constructor(worker: Worker) {
this._fallback = new EventBufferArray();
this._compression = new EventBufferCompressionWorker(worker);
this._used = this._fallback;

void this._ensureWorkerIsLoaded();
}

/** @inheritDoc */
public get pendingLength(): number {
return this._used.pendingLength;
}

/** @inheritDoc */
public get pendingEvents(): RecordingEvent[] {
return this._used.pendingEvents;
}

/** @inheritDoc */
public destroy(): void {
this._fallback.destroy();
this._compression.destroy();
}

/**
* Add an event to the event buffer.
*
* Returns true if event was successfully added.
*/
public addEvent(event: RecordingEvent, isCheckout?: boolean): Promise<AddEventResult> {
return this._used.addEvent(event, isCheckout);
}

/** @inheritDoc */
public finish(): Promise<ReplayRecordingData> {
return this._used.finish();
}

/** Ensure the worker has loaded. */
private async _ensureWorkerIsLoaded(): Promise<void> {
try {
await this._compression.ensureReady();
} catch (error) {
// If the worker fails to load, we fall back to the simple buffer.
// Nothing more to do from our side here
__DEBUG_BUILD__ && logger.log('[Replay] Failed to load the compression worker, falling back to simple buffer');
return;
}

// Compression worker is ready, we can use it
// Now we need to switch over the array buffer to the compression worker
const addEventPromises: Promise<void>[] = [];
for (const event of this._fallback.pendingEvents) {
addEventPromises.push(this._compression.addEvent(event));
}

// We switch over to the compression buffer immediately - any further events will be added
// after the previously buffered ones
this._used = this._compression;

// Wait for original events to be re-added before resolving
await Promise.all(addEventPromises);
}
}

class EventBufferArray implements EventBuffer {
private _events: RecordingEvent[];

public constructor() {
this._events = [];
}

public get pendingLength(): number {
return this._events.length;
}

/**
* Returns the raw events that are buffered. In `EventBufferArray`, this is the
* same as `this._events`.
*/
public get pendingEvents(): RecordingEvent[] {
return this._events;
}

public destroy(): void {
this._events = [];
}

public async addEvent(event: RecordingEvent, isCheckout?: boolean): Promise<AddEventResult> {
if (isCheckout) {
this._events = [event];
return;
}

this._events.push(event);
return;
}

public finish(): Promise<string> {
return new Promise<string>(resolve => {
// Make a copy of the events array reference and immediately clear the
// events member so that we do not lose new events while uploading
// attachment.
const eventsRet = this._events;
this._events = [];
resolve(JSON.stringify(eventsRet));
});
}
}
import type { AddEventResult, EventBuffer, RecordingEvent, WorkerRequest, WorkerResponse } from '../types';

/**
* Event buffer that uses a web worker to compress events.
Expand All @@ -164,6 +18,7 @@ export class EventBufferCompressionWorker implements EventBuffer {
private _worker: Worker;
private _eventBufferItemLength: number = 0;
private _id: number = 0;
private _ensureReadyPromise?: Promise<void>;

public constructor(worker: Worker) {
this._worker = worker;
Expand All @@ -190,11 +45,16 @@ export class EventBufferCompressionWorker implements EventBuffer {
* This will either resolve when the worker is ready, or reject if an error occured.
*/
public ensureReady(): Promise<void> {
return new Promise((resolve, reject) => {
// Ensure we only check once
if (this._ensureReadyPromise) {
return this._ensureReadyPromise;
}

this._ensureReadyPromise = new Promise((resolve, reject) => {
this._worker.addEventListener(
'message',
({ data }: MessageEvent) => {
if (data.success) {
if ((data as WorkerResponse).success) {
resolve();
} else {
reject();
Expand All @@ -211,6 +71,8 @@ export class EventBufferCompressionWorker implements EventBuffer {
{ once: true },
);
});

return this._ensureReadyPromise;
}

/**
Expand Down Expand Up @@ -248,39 +110,46 @@ export class EventBufferCompressionWorker implements EventBuffer {
/**
* Finish the event buffer and return the compressed data.
*/
public finish(): Promise<Uint8Array> {
return this._finishRequest(this._getAndIncrementId());
public async finish(): Promise<ReplayRecordingData> {
try {
return await this._finishRequest(this._getAndIncrementId());
} catch (error) {
__DEBUG_BUILD__ && logger.error('[Replay] Error when trying to compress events', error);
// fall back to uncompressed
const events = this.pendingEvents;
return JSON.stringify(events);
}
}

/**
* Post message to worker and wait for response before resolving promise.
*/
private _postMessage<T>({ id, method, args }: WorkerRequest): Promise<T> {
return new Promise((resolve, reject) => {
// eslint-disable-next-line @typescript-eslint/explicit-function-return-type
const listener = ({ data }: MessageEvent) => {
if (data.method !== method) {
const listener = ({ data }: MessageEvent): void => {
const response = data as WorkerResponse;
if (response.method !== method) {
return;
}

// There can be multiple listeners for a single method, the id ensures
// that the response matches the caller.
if (data.id !== id) {
if (response.id !== id) {
return;
}

// At this point, we'll always want to remove listener regardless of result status
this._worker.removeEventListener('message', listener);

if (!data.success) {
if (!response.success) {
// TODO: Do some error handling, not sure what
__DEBUG_BUILD__ && logger.error('[Replay]', data.response);
__DEBUG_BUILD__ && logger.error('[Replay]', response.response);

reject(new Error('Error in compression worker'));
return;
}

resolve(data.response);
resolve(response.response as T);
};

let stringifiedArgs;
Expand Down

0 comments on commit b86ac10

Please sign in to comment.