Skip to content

Commit

Permalink
feat(replay): Stop recording when hitting a rate limit (#7018)
Browse files Browse the repository at this point in the history
We want to entirely stop recording the replay once we receive a 429 rate limit response. This patch gets rid of all the "pause on rate limit" logic. Because we already stop when we receive other http error responses, we can simply use this logic instead.
  • Loading branch information
Lms24 committed Feb 1, 2023
1 parent a90ba73 commit 6ddc5cd
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 243 deletions.
34 changes: 1 addition & 33 deletions packages/replay/src/replay.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@
import { EventType, record } from '@sentry-internal/rrweb';
import { captureException } from '@sentry/core';
import type { Breadcrumb, ReplayRecordingMode } from '@sentry/types';
import type { RateLimits } from '@sentry/utils';
import { disabledUntil, logger } from '@sentry/utils';
import { logger } from '@sentry/utils';

import {
ERROR_CHECKOUT_TIME,
Expand Down Expand Up @@ -40,7 +39,6 @@ import { isExpired } from './util/isExpired';
import { isSessionExpired } from './util/isSessionExpired';
import { overwriteRecordDroppedEvent, restoreRecordDroppedEvent } from './util/monkeyPatchRecordDroppedEvent';
import { sendReplay } from './util/sendReplay';
import { RateLimitError } from './util/sendReplayRequest';

/**
* The main replay container class, which holds all the state and methods for recording and sending replays.
Expand Down Expand Up @@ -809,11 +807,6 @@ export class ReplayContainer implements ReplayContainerInterface {
} catch (err) {
this._handleException(err);

if (err instanceof RateLimitError) {
this._handleRateLimit(err.rateLimits);
return;
}

// This means we retried 3 times, and all of them failed
// In this case, we want to completely stop the replay - otherwise, we may get inconsistent segments
this.stop();
Expand Down Expand Up @@ -873,29 +866,4 @@ export class ReplayContainer implements ReplayContainerInterface {
saveSession(this.session);
}
}

/**
* Pauses the replay and resumes it after the rate-limit duration is over.
*/
private _handleRateLimit(rateLimits: RateLimits): void {
// in case recording is already paused, we don't need to do anything, as we might have already paused because of a
// rate limit
if (this.isPaused()) {
return;
}

const rateLimitEnd = disabledUntil(rateLimits, 'replay');
const rateLimitDuration = rateLimitEnd - Date.now();

if (rateLimitDuration > 0) {
__DEBUG_BUILD__ && logger.warn('[Replay]', `Rate limit hit, pausing replay for ${rateLimitDuration}ms`);
this.pause();
this._debouncedFlush.cancel();

setTimeout(() => {
__DEBUG_BUILD__ && logger.info('[Replay]', 'Resuming replay after rate limit');
this.resume();
}, rateLimitDuration);
}
}
}
4 changes: 2 additions & 2 deletions packages/replay/src/util/sendReplay.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { captureException, setContext } from '@sentry/core';

import { RETRY_BASE_INTERVAL, RETRY_MAX_COUNT, UNABLE_TO_SEND_REPLAY } from '../constants';
import type { SendReplayData } from '../types';
import { RateLimitError, sendReplayRequest, TransportStatusCodeError } from './sendReplayRequest';
import { sendReplayRequest, TransportStatusCodeError } from './sendReplayRequest';

/**
* Finalize and send the current replay event to Sentry
Expand All @@ -25,7 +25,7 @@ export async function sendReplay(
await sendReplayRequest(replayData);
return true;
} catch (err) {
if (err instanceof RateLimitError || err instanceof TransportStatusCodeError) {
if (err instanceof TransportStatusCodeError) {
throw err;
}

Expand Down
20 changes: 1 addition & 19 deletions packages/replay/src/util/sendReplayRequest.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import { getCurrentHub } from '@sentry/core';
import type { ReplayEvent, TransportMakeRequestResponse } from '@sentry/types';
import type { RateLimits } from '@sentry/utils';
import { isRateLimited, logger, updateRateLimits } from '@sentry/utils';
import { logger } from '@sentry/utils';

import { REPLAY_EVENT_NAME, UNABLE_TO_SEND_REPLAY } from '../constants';
import type { SendReplayData } from '../types';
Expand Down Expand Up @@ -125,11 +124,6 @@ export async function sendReplayRequest({
return response;
}

const rateLimits = updateRateLimits({}, response);
if (isRateLimited(rateLimits, 'replay')) {
throw new RateLimitError(rateLimits);
}

// If the status code is invalid, we want to immediately stop & not retry
if (typeof response.statusCode === 'number' && (response.statusCode < 200 || response.statusCode >= 300)) {
throw new TransportStatusCodeError(response.statusCode);
Expand All @@ -138,18 +132,6 @@ export async function sendReplayRequest({
return response;
}

/**
* This error indicates that we hit a rate limit API error.
*/
export class RateLimitError extends Error {
public rateLimits: RateLimits;

public constructor(rateLimits: RateLimits) {
super('Rate limit hit');
this.rateLimits = rateLimits;
}
}

/**
* This error indicates that the transport returned an invalid status code.
*/
Expand Down
201 changes: 12 additions & 189 deletions packages/replay/test/integration/rateLimiting.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { getCurrentHub } from '@sentry/core';
import type { Transport, TransportMakeRequestResponse } from '@sentry/types';
import type { Transport } from '@sentry/types';

import { DEFAULT_FLUSH_MIN_DELAY, SESSION_IDLE_DURATION } from '../../src/constants';
import type { ReplayContainer } from '../../src/replay';
Expand Down Expand Up @@ -64,120 +64,9 @@ describe('Integration | rate-limiting behaviour', () => {
replay && replay.stop();
});

it.each([
{
statusCode: 429,
headers: {
'x-sentry-rate-limits': '30',
'retry-after': null,
},
},
{
statusCode: 429,
headers: {
'x-sentry-rate-limits': '30:replay',
'retry-after': null,
},
},
{
statusCode: 429,
headers: {
'x-sentry-rate-limits': null,
'retry-after': '30',
},
},
] as TransportMakeRequestResponse[])(
'pauses recording and flushing a rate limit is hit and resumes both after the rate limit duration is over %j',
async rateLimitResponse => {
expect(replay.session?.segmentId).toBe(0);
jest.spyOn(replay, 'pause');
jest.spyOn(replay, 'resume');
// @ts-ignore private API
jest.spyOn(replay, '_handleRateLimit');

const TEST_EVENT = { data: {}, timestamp: BASE_TIMESTAMP, type: 2 };

mockTransportSend.mockImplementationOnce(() => {
return Promise.resolve(rateLimitResponse);
});

mockRecord._emitter(TEST_EVENT);

// T = base + 5
await advanceTimers(DEFAULT_FLUSH_MIN_DELAY);

expect(mockRecord.takeFullSnapshot).not.toHaveBeenCalled();
expect(mockTransportSend).toHaveBeenCalledTimes(1);
expect(replay).toHaveLastSentReplay({ recordingData: JSON.stringify([TEST_EVENT]) });

expect(replay['_handleRateLimit']).toHaveBeenCalledTimes(1);
// resume() was called once before we even started
expect(replay.resume).not.toHaveBeenCalled();
expect(replay.pause).toHaveBeenCalledTimes(1);

// No user activity to trigger an update
expect(replay.session?.lastActivity).toBe(BASE_TIMESTAMP);
expect(replay.session?.segmentId).toBe(1);

// let's simulate the rate-limit time of inactivity (30secs) and check that we don't do anything in the meantime
const TEST_EVENT2 = { data: {}, timestamp: BASE_TIMESTAMP + DEFAULT_FLUSH_MIN_DELAY, type: 3 };
for (let i = 0; i < 5; i++) {
const ev = {
...TEST_EVENT2,
timestamp: BASE_TIMESTAMP + DEFAULT_FLUSH_MIN_DELAY * (i + 1),
};
mockRecord._emitter(ev);
await advanceTimers(DEFAULT_FLUSH_MIN_DELAY);
expect(replay.isPaused()).toBe(true);
expect(mockSendReplayRequest).toHaveBeenCalledTimes(1);
expect(mockTransportSend).toHaveBeenCalledTimes(1);
}

// T = base + 35
await advanceTimers(DEFAULT_FLUSH_MIN_DELAY);

// now, recording should resume and first, we expect a checkout event to be sent, as resume()
// should trigger a full snapshot
expect(replay.resume).toHaveBeenCalledTimes(1);
expect(replay.isPaused()).toBe(false);

expect(mockSendReplayRequest).toHaveBeenCalledTimes(2);
expect(replay).toHaveLastSentReplay({
recordingData: JSON.stringify([
{ data: { isCheckout: true }, timestamp: BASE_TIMESTAMP + DEFAULT_FLUSH_MIN_DELAY * 7, type: 2 },
]),
});

// and let's also emit a new event and check that it is recorded
const TEST_EVENT3 = {
data: {},
timestamp: BASE_TIMESTAMP + 7 * DEFAULT_FLUSH_MIN_DELAY,
type: 3,
};
mockRecord._emitter(TEST_EVENT3);

// T = base + 40
await advanceTimers(DEFAULT_FLUSH_MIN_DELAY);
expect(mockSendReplayRequest).toHaveBeenCalledTimes(3);
expect(replay).toHaveLastSentReplay({ recordingData: JSON.stringify([TEST_EVENT3]) });

// nothing should happen afterwards
// T = base + 60
await advanceTimers(20_000);
expect(mockSendReplayRequest).toHaveBeenCalledTimes(3);
expect(replay).toHaveLastSentReplay({ recordingData: JSON.stringify([TEST_EVENT3]) });

// events array should be empty
expect(replay.eventBuffer?.hasEvents).toBe(false);
},
);

it('handles rate-limits from a plain 429 response without any retry time', async () => {
it('handles rate-limit 429 responses by stopping the replay', async () => {
expect(replay.session?.segmentId).toBe(0);
jest.spyOn(replay, 'pause');
jest.spyOn(replay, 'resume');
// @ts-ignore private API
jest.spyOn(replay, '_handleRateLimit');
jest.spyOn(replay, 'stop');

const TEST_EVENT = { data: {}, timestamp: BASE_TIMESTAMP, type: 2 };

Expand All @@ -194,99 +83,33 @@ describe('Integration | rate-limiting behaviour', () => {
expect(mockTransportSend).toHaveBeenCalledTimes(1);
expect(replay).toHaveLastSentReplay({ recordingData: JSON.stringify([TEST_EVENT]) });

expect(replay['_handleRateLimit']).toHaveBeenCalledTimes(1);
// resume() was called once before we even started
expect(replay.resume).not.toHaveBeenCalled();
expect(replay.pause).toHaveBeenCalledTimes(1);
expect(replay.stop).toHaveBeenCalledTimes(1);

// No user activity to trigger an update
expect(replay.session?.lastActivity).toBe(BASE_TIMESTAMP);
expect(replay.session?.segmentId).toBe(1);

// let's simulate the rate-limit time of inactivity (60secs) and check that we don't do anything in the meantime
// let's simulate the default rate-limit time of inactivity (60secs) and check that we
// don't do anything in the meantime or after the time has passed
// 60secs are the default we fall back to in the plain 429 case in updateRateLimits()
const TEST_EVENT2 = { data: {}, timestamp: BASE_TIMESTAMP + DEFAULT_FLUSH_MIN_DELAY, type: 3 };
for (let i = 0; i < 11; i++) {
const ev = {
...TEST_EVENT2,
timestamp: BASE_TIMESTAMP + DEFAULT_FLUSH_MIN_DELAY * (i + 1),
};
mockRecord._emitter(ev);
await advanceTimers(DEFAULT_FLUSH_MIN_DELAY);
expect(replay.isPaused()).toBe(true);
expect(mockSendReplayRequest).toHaveBeenCalledTimes(1);
expect(mockTransportSend).toHaveBeenCalledTimes(1);
}

// T = base + 60
await advanceTimers(DEFAULT_FLUSH_MIN_DELAY);

// now, recording should resume and first, we expect a checkout event to be sent, as resume()
// should trigger a full snapshot
expect(replay.resume).toHaveBeenCalledTimes(1);
expect(replay.isPaused()).toBe(false);
await advanceTimers(DEFAULT_FLUSH_MIN_DELAY * 12);

expect(mockSendReplayRequest).toHaveBeenCalledTimes(2);
expect(replay).toHaveLastSentReplay({
recordingData: JSON.stringify([
{ data: { isCheckout: true }, timestamp: BASE_TIMESTAMP + DEFAULT_FLUSH_MIN_DELAY * 13, type: 2 },
]),
});
expect(mockSendReplayRequest).toHaveBeenCalledTimes(1);
expect(mockTransportSend).toHaveBeenCalledTimes(1);

// and let's also emit a new event and check that it is recorded
// and let's also emit a new event and check that it is not recorded
const TEST_EVENT3 = {
data: {},
timestamp: BASE_TIMESTAMP + 7 * DEFAULT_FLUSH_MIN_DELAY,
type: 3,
};
mockRecord._emitter(TEST_EVENT3);

// T = base + 65
await advanceTimers(DEFAULT_FLUSH_MIN_DELAY);
expect(mockSendReplayRequest).toHaveBeenCalledTimes(3);
expect(replay).toHaveLastSentReplay({ recordingData: JSON.stringify([TEST_EVENT3]) });

// nothing should happen afterwards
// T = base + 85
// T = base + 80
await advanceTimers(20_000);
expect(mockSendReplayRequest).toHaveBeenCalledTimes(3);
expect(replay).toHaveLastSentReplay({ recordingData: JSON.stringify([TEST_EVENT3]) });

// events array should be empty
expect(replay.eventBuffer?.hasEvents).toBe(false);
});

it("doesn't do anything, if a rate limit is hit and recording is already paused", async () => {
let paused = false;
expect(replay.session?.segmentId).toBe(0);
jest.spyOn(replay, 'isPaused').mockImplementation(() => {
return paused;
});
jest.spyOn(replay, 'pause');
jest.spyOn(replay, 'resume');
// @ts-ignore private API
jest.spyOn(replay, '_handleRateLimit');

const TEST_EVENT = { data: {}, timestamp: BASE_TIMESTAMP, type: 2 };

mockTransportSend.mockImplementationOnce(() => {
return Promise.resolve({ statusCode: 429 });
});

mockRecord._emitter(TEST_EVENT);
paused = true;

// T = base + 5
await advanceTimers(DEFAULT_FLUSH_MIN_DELAY);

expect(mockRecord.takeFullSnapshot).not.toHaveBeenCalled();
expect(mockSendReplayRequest).toHaveBeenCalledTimes(1);
expect(mockTransportSend).toHaveBeenCalledTimes(1);

expect(replay).toHaveLastSentReplay({ recordingData: JSON.stringify([TEST_EVENT]) });

expect(replay['_handleRateLimit']).toHaveBeenCalledTimes(1);
expect(replay.resume).not.toHaveBeenCalled();
expect(replay.isPaused).toHaveBeenCalledTimes(2);
expect(replay.pause).not.toHaveBeenCalled();
});
});

0 comments on commit 6ddc5cd

Please sign in to comment.