Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(replay): Stop recording when hitting a rate limit #7018

Merged
merged 2 commits into from
Feb 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
34 changes: 1 addition & 33 deletions packages/replay/src/replay.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@
import { EventType, record } from '@sentry-internal/rrweb';
import { captureException } from '@sentry/core';
import type { Breadcrumb, ReplayRecordingMode } from '@sentry/types';
import type { RateLimits } from '@sentry/utils';
import { disabledUntil, logger } from '@sentry/utils';
import { logger } from '@sentry/utils';

import {
ERROR_CHECKOUT_TIME,
Expand Down Expand Up @@ -40,7 +39,6 @@ import { isExpired } from './util/isExpired';
import { isSessionExpired } from './util/isSessionExpired';
import { overwriteRecordDroppedEvent, restoreRecordDroppedEvent } from './util/monkeyPatchRecordDroppedEvent';
import { sendReplay } from './util/sendReplay';
import { RateLimitError } from './util/sendReplayRequest';

/**
* The main replay container class, which holds all the state and methods for recording and sending replays.
Expand Down Expand Up @@ -809,11 +807,6 @@ export class ReplayContainer implements ReplayContainerInterface {
} catch (err) {
this._handleException(err);

if (err instanceof RateLimitError) {
this._handleRateLimit(err.rateLimits);
return;
}

// This means we retried 3 times, and all of them failed
// In this case, we want to completely stop the replay - otherwise, we may get inconsistent segments
this.stop();
Expand Down Expand Up @@ -873,29 +866,4 @@ export class ReplayContainer implements ReplayContainerInterface {
saveSession(this.session);
}
}

/**
* Pauses the replay and resumes it after the rate-limit duration is over.
*/
private _handleRateLimit(rateLimits: RateLimits): void {
// in case recording is already paused, we don't need to do anything, as we might have already paused because of a
// rate limit
if (this.isPaused()) {
return;
}

const rateLimitEnd = disabledUntil(rateLimits, 'replay');
const rateLimitDuration = rateLimitEnd - Date.now();

if (rateLimitDuration > 0) {
__DEBUG_BUILD__ && logger.warn('[Replay]', `Rate limit hit, pausing replay for ${rateLimitDuration}ms`);
this.pause();
this._debouncedFlush.cancel();

setTimeout(() => {
__DEBUG_BUILD__ && logger.info('[Replay]', 'Resuming replay after rate limit');
this.resume();
}, rateLimitDuration);
}
}
}
4 changes: 2 additions & 2 deletions packages/replay/src/util/sendReplay.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { captureException, setContext } from '@sentry/core';

import { RETRY_BASE_INTERVAL, RETRY_MAX_COUNT, UNABLE_TO_SEND_REPLAY } from '../constants';
import type { SendReplayData } from '../types';
import { RateLimitError, sendReplayRequest, TransportStatusCodeError } from './sendReplayRequest';
import { sendReplayRequest, TransportStatusCodeError } from './sendReplayRequest';

/**
* Finalize and send the current replay event to Sentry
Expand All @@ -25,7 +25,7 @@ export async function sendReplay(
await sendReplayRequest(replayData);
return true;
} catch (err) {
if (err instanceof RateLimitError || err instanceof TransportStatusCodeError) {
if (err instanceof TransportStatusCodeError) {
throw err;
}

Expand Down
20 changes: 1 addition & 19 deletions packages/replay/src/util/sendReplayRequest.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import { getCurrentHub } from '@sentry/core';
import type { ReplayEvent, TransportMakeRequestResponse } from '@sentry/types';
import type { RateLimits } from '@sentry/utils';
import { isRateLimited, logger, updateRateLimits } from '@sentry/utils';
import { logger } from '@sentry/utils';

import { REPLAY_EVENT_NAME, UNABLE_TO_SEND_REPLAY } from '../constants';
import type { SendReplayData } from '../types';
Expand Down Expand Up @@ -125,11 +124,6 @@ export async function sendReplayRequest({
return response;
}

const rateLimits = updateRateLimits({}, response);
if (isRateLimited(rateLimits, 'replay')) {
throw new RateLimitError(rateLimits);
}

// If the status code is invalid, we want to immediately stop & not retry
if (typeof response.statusCode === 'number' && (response.statusCode < 200 || response.statusCode >= 300)) {
throw new TransportStatusCodeError(response.statusCode);
Expand All @@ -138,18 +132,6 @@ export async function sendReplayRequest({
return response;
}

/**
* This error indicates that we hit a rate limit API error.
*/
export class RateLimitError extends Error {
public rateLimits: RateLimits;

public constructor(rateLimits: RateLimits) {
super('Rate limit hit');
this.rateLimits = rateLimits;
}
}

/**
* This error indicates that the transport returned an invalid status code.
*/
Expand Down
201 changes: 12 additions & 189 deletions packages/replay/test/integration/rateLimiting.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { getCurrentHub } from '@sentry/core';
import type { Transport, TransportMakeRequestResponse } from '@sentry/types';
import type { Transport } from '@sentry/types';

import { DEFAULT_FLUSH_MIN_DELAY, SESSION_IDLE_DURATION } from '../../src/constants';
import type { ReplayContainer } from '../../src/replay';
Expand Down Expand Up @@ -64,120 +64,9 @@ describe('Integration | rate-limiting behaviour', () => {
replay && replay.stop();
});

it.each([
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO we don't need these tests anymore, as they only tested that we pause correctly for different rate limit timeouts we might receive from the Sentry backend. Just ensuring that we don't do anything after a 429 response should be good enough

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, makes sense! 👍 We could even move it to the general sendReplay tests I guess?

Copy link
Member Author

@Lms24 Lms24 Feb 1, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally I think it makes sense but given that we have this neat comment under the last test,

// NOTE: If you add a test after the last one, make sure to adjust the test setup
// As this ends with a `stopped()` replay, which may prevent future tests from working
// Sadly, fixing this turned out to be much more annoying than expected, so leaving this warning here for now

and that I ran exactly into this problem, wasting 30minutes trying fix it, I'd vote we leave it as is.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ahh, right 🙈 damn we should fix those xD

{
statusCode: 429,
headers: {
'x-sentry-rate-limits': '30',
'retry-after': null,
},
},
{
statusCode: 429,
headers: {
'x-sentry-rate-limits': '30:replay',
'retry-after': null,
},
},
{
statusCode: 429,
headers: {
'x-sentry-rate-limits': null,
'retry-after': '30',
},
},
] as TransportMakeRequestResponse[])(
'pauses recording and flushing a rate limit is hit and resumes both after the rate limit duration is over %j',
async rateLimitResponse => {
expect(replay.session?.segmentId).toBe(0);
jest.spyOn(replay, 'pause');
jest.spyOn(replay, 'resume');
// @ts-ignore private API
jest.spyOn(replay, '_handleRateLimit');

const TEST_EVENT = { data: {}, timestamp: BASE_TIMESTAMP, type: 2 };

mockTransportSend.mockImplementationOnce(() => {
return Promise.resolve(rateLimitResponse);
});

mockRecord._emitter(TEST_EVENT);

// T = base + 5
await advanceTimers(DEFAULT_FLUSH_MIN_DELAY);

expect(mockRecord.takeFullSnapshot).not.toHaveBeenCalled();
expect(mockTransportSend).toHaveBeenCalledTimes(1);
expect(replay).toHaveLastSentReplay({ recordingData: JSON.stringify([TEST_EVENT]) });

expect(replay['_handleRateLimit']).toHaveBeenCalledTimes(1);
// resume() was called once before we even started
expect(replay.resume).not.toHaveBeenCalled();
expect(replay.pause).toHaveBeenCalledTimes(1);

// No user activity to trigger an update
expect(replay.session?.lastActivity).toBe(BASE_TIMESTAMP);
expect(replay.session?.segmentId).toBe(1);

// let's simulate the rate-limit time of inactivity (30secs) and check that we don't do anything in the meantime
const TEST_EVENT2 = { data: {}, timestamp: BASE_TIMESTAMP + DEFAULT_FLUSH_MIN_DELAY, type: 3 };
for (let i = 0; i < 5; i++) {
const ev = {
...TEST_EVENT2,
timestamp: BASE_TIMESTAMP + DEFAULT_FLUSH_MIN_DELAY * (i + 1),
};
mockRecord._emitter(ev);
await advanceTimers(DEFAULT_FLUSH_MIN_DELAY);
expect(replay.isPaused()).toBe(true);
expect(mockSendReplayRequest).toHaveBeenCalledTimes(1);
expect(mockTransportSend).toHaveBeenCalledTimes(1);
}

// T = base + 35
await advanceTimers(DEFAULT_FLUSH_MIN_DELAY);

// now, recording should resume and first, we expect a checkout event to be sent, as resume()
// should trigger a full snapshot
expect(replay.resume).toHaveBeenCalledTimes(1);
expect(replay.isPaused()).toBe(false);

expect(mockSendReplayRequest).toHaveBeenCalledTimes(2);
expect(replay).toHaveLastSentReplay({
recordingData: JSON.stringify([
{ data: { isCheckout: true }, timestamp: BASE_TIMESTAMP + DEFAULT_FLUSH_MIN_DELAY * 7, type: 2 },
]),
});

// and let's also emit a new event and check that it is recorded
const TEST_EVENT3 = {
data: {},
timestamp: BASE_TIMESTAMP + 7 * DEFAULT_FLUSH_MIN_DELAY,
type: 3,
};
mockRecord._emitter(TEST_EVENT3);

// T = base + 40
await advanceTimers(DEFAULT_FLUSH_MIN_DELAY);
expect(mockSendReplayRequest).toHaveBeenCalledTimes(3);
expect(replay).toHaveLastSentReplay({ recordingData: JSON.stringify([TEST_EVENT3]) });

// nothing should happen afterwards
// T = base + 60
await advanceTimers(20_000);
expect(mockSendReplayRequest).toHaveBeenCalledTimes(3);
expect(replay).toHaveLastSentReplay({ recordingData: JSON.stringify([TEST_EVENT3]) });

// events array should be empty
expect(replay.eventBuffer?.hasEvents).toBe(false);
},
);

it('handles rate-limits from a plain 429 response without any retry time', async () => {
it('handles rate-limit 429 responses by stopping the replay', async () => {
expect(replay.session?.segmentId).toBe(0);
jest.spyOn(replay, 'pause');
jest.spyOn(replay, 'resume');
// @ts-ignore private API
jest.spyOn(replay, '_handleRateLimit');
jest.spyOn(replay, 'stop');

const TEST_EVENT = { data: {}, timestamp: BASE_TIMESTAMP, type: 2 };

Expand All @@ -194,99 +83,33 @@ describe('Integration | rate-limiting behaviour', () => {
expect(mockTransportSend).toHaveBeenCalledTimes(1);
expect(replay).toHaveLastSentReplay({ recordingData: JSON.stringify([TEST_EVENT]) });

expect(replay['_handleRateLimit']).toHaveBeenCalledTimes(1);
// resume() was called once before we even started
expect(replay.resume).not.toHaveBeenCalled();
expect(replay.pause).toHaveBeenCalledTimes(1);
expect(replay.stop).toHaveBeenCalledTimes(1);

// No user activity to trigger an update
expect(replay.session?.lastActivity).toBe(BASE_TIMESTAMP);
expect(replay.session?.segmentId).toBe(1);

// let's simulate the rate-limit time of inactivity (60secs) and check that we don't do anything in the meantime
// let's simulate the default rate-limit time of inactivity (60secs) and check that we
// don't do anything in the meantime or after the time has passed
// 60secs are the default we fall back to in the plain 429 case in updateRateLimits()
const TEST_EVENT2 = { data: {}, timestamp: BASE_TIMESTAMP + DEFAULT_FLUSH_MIN_DELAY, type: 3 };
for (let i = 0; i < 11; i++) {
const ev = {
...TEST_EVENT2,
timestamp: BASE_TIMESTAMP + DEFAULT_FLUSH_MIN_DELAY * (i + 1),
};
mockRecord._emitter(ev);
await advanceTimers(DEFAULT_FLUSH_MIN_DELAY);
expect(replay.isPaused()).toBe(true);
expect(mockSendReplayRequest).toHaveBeenCalledTimes(1);
expect(mockTransportSend).toHaveBeenCalledTimes(1);
}

// T = base + 60
await advanceTimers(DEFAULT_FLUSH_MIN_DELAY);

// now, recording should resume and first, we expect a checkout event to be sent, as resume()
// should trigger a full snapshot
expect(replay.resume).toHaveBeenCalledTimes(1);
expect(replay.isPaused()).toBe(false);
await advanceTimers(DEFAULT_FLUSH_MIN_DELAY * 12);

expect(mockSendReplayRequest).toHaveBeenCalledTimes(2);
expect(replay).toHaveLastSentReplay({
recordingData: JSON.stringify([
{ data: { isCheckout: true }, timestamp: BASE_TIMESTAMP + DEFAULT_FLUSH_MIN_DELAY * 13, type: 2 },
]),
});
expect(mockSendReplayRequest).toHaveBeenCalledTimes(1);
expect(mockTransportSend).toHaveBeenCalledTimes(1);

// and let's also emit a new event and check that it is recorded
// and let's also emit a new event and check that it is not recorded
const TEST_EVENT3 = {
data: {},
timestamp: BASE_TIMESTAMP + 7 * DEFAULT_FLUSH_MIN_DELAY,
type: 3,
};
mockRecord._emitter(TEST_EVENT3);

// T = base + 65
await advanceTimers(DEFAULT_FLUSH_MIN_DELAY);
expect(mockSendReplayRequest).toHaveBeenCalledTimes(3);
expect(replay).toHaveLastSentReplay({ recordingData: JSON.stringify([TEST_EVENT3]) });

// nothing should happen afterwards
// T = base + 85
// T = base + 80
await advanceTimers(20_000);
expect(mockSendReplayRequest).toHaveBeenCalledTimes(3);
expect(replay).toHaveLastSentReplay({ recordingData: JSON.stringify([TEST_EVENT3]) });

// events array should be empty
expect(replay.eventBuffer?.hasEvents).toBe(false);
});

it("doesn't do anything, if a rate limit is hit and recording is already paused", async () => {
let paused = false;
expect(replay.session?.segmentId).toBe(0);
jest.spyOn(replay, 'isPaused').mockImplementation(() => {
return paused;
});
jest.spyOn(replay, 'pause');
jest.spyOn(replay, 'resume');
// @ts-ignore private API
jest.spyOn(replay, '_handleRateLimit');

const TEST_EVENT = { data: {}, timestamp: BASE_TIMESTAMP, type: 2 };

mockTransportSend.mockImplementationOnce(() => {
return Promise.resolve({ statusCode: 429 });
});

mockRecord._emitter(TEST_EVENT);
paused = true;

// T = base + 5
await advanceTimers(DEFAULT_FLUSH_MIN_DELAY);

expect(mockRecord.takeFullSnapshot).not.toHaveBeenCalled();
expect(mockSendReplayRequest).toHaveBeenCalledTimes(1);
expect(mockTransportSend).toHaveBeenCalledTimes(1);

expect(replay).toHaveLastSentReplay({ recordingData: JSON.stringify([TEST_EVENT]) });

expect(replay['_handleRateLimit']).toHaveBeenCalledTimes(1);
expect(replay.resume).not.toHaveBeenCalled();
expect(replay.isPaused).toHaveBeenCalledTimes(2);
expect(replay.pause).not.toHaveBeenCalled();
});
});