Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 28 additions & 42 deletions packages/core/src/utils/promisebuffer.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import { rejectedSyncPromise, resolvedSyncPromise, SyncPromise } from './syncpromise';
import { rejectedSyncPromise, resolvedSyncPromise } from './syncpromise';

export interface PromiseBuffer<T> {
// exposes the internal array so tests can assert on the state of it.
// XXX: this really should not be public api.
$: Array<PromiseLike<T>>;
$: PromiseLike<T>[];
add(taskProducer: () => PromiseLike<T>): PromiseLike<T>;
drain(timeout?: number): PromiseLike<boolean>;
}
Expand All @@ -14,11 +14,11 @@ export const SENTRY_BUFFER_FULL_ERROR = Symbol.for('SentryBufferFullError');
* Creates an new PromiseBuffer object with the specified limit
* @param limit max number of promises that can be stored in the buffer
*/
export function makePromiseBuffer<T>(limit?: number): PromiseBuffer<T> {
const buffer: Array<PromiseLike<T>> = [];
export function makePromiseBuffer<T>(limit: number = 100): PromiseBuffer<T> {
const buffer: Set<PromiseLike<T>> = new Set();

function isReady(): boolean {
return limit === undefined || buffer.length < limit;
return buffer.size < limit;
}

/**
Expand All @@ -27,8 +27,8 @@ export function makePromiseBuffer<T>(limit?: number): PromiseBuffer<T> {
* @param task Can be any PromiseLike<T>
* @returns Removed promise.
*/
function remove(task: PromiseLike<T>): PromiseLike<T | void> {
return buffer.splice(buffer.indexOf(task), 1)[0] || Promise.resolve(undefined);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it is necessary to wait on anything here, we can just remove this from the buffer directly and be done with it - we already execute the function anyhow without this.

function remove(task: PromiseLike<T>): void {
buffer.delete(task);
}

/**
Expand All @@ -48,19 +48,11 @@ export function makePromiseBuffer<T>(limit?: number): PromiseBuffer<T> {

// start the task and add its promise to the queue
const task = taskProducer();
if (buffer.indexOf(task) === -1) {
buffer.push(task);
}
void task
.then(() => remove(task))
// Use `then(null, rejectionHandler)` rather than `catch(rejectionHandler)` so that we can use `PromiseLike`
// rather than `Promise`. `PromiseLike` doesn't have a `.catch` method, making its polyfill smaller. (ES5 didn't
// have promises, so TS has to polyfill when down-compiling.)
.then(null, () =>
remove(task).then(null, () => {
// We have to add another catch here because `remove()` starts a new promise chain.
}),
);
buffer.add(task);
void task.then(
() => remove(task),
() => remove(task),
);
return task;
}

Expand All @@ -74,34 +66,28 @@ export function makePromiseBuffer<T>(limit?: number): PromiseBuffer<T> {
* `false` otherwise
*/
function drain(timeout?: number): PromiseLike<boolean> {
return new SyncPromise<boolean>((resolve, reject) => {
let counter = buffer.length;
if (!buffer.size) {
return resolvedSyncPromise(true);
}

if (!counter) {
return resolve(true);
}
// We want to resolve even if one of the promises rejects
const drainPromise = Promise.allSettled(Array.from(buffer)).then(() => true);

if (!timeout) {
return drainPromise;
}

// wait for `timeout` ms and then resolve to `false` (if not cancelled first)
const capturedSetTimeout = setTimeout(() => {
if (timeout && timeout > 0) {
resolve(false);
}
}, timeout);
const promises = [drainPromise, new Promise<boolean>(resolve => setTimeout(() => resolve(false), timeout))];

// if all promises resolve in time, cancel the timer and resolve to `true`
buffer.forEach(item => {
void resolvedSyncPromise(item).then(() => {
if (!--counter) {
clearTimeout(capturedSetTimeout);
resolve(true);
}
}, reject);
});
});
// Promise.race will resolve to the first promise that resolves or rejects
// So if the drainPromise resolves, the timeout promise will be ignored
return Promise.race(promises);
}

return {
$: buffer,
get $(): PromiseLike<T>[] {
return Array.from(buffer);
},
add,
drain,
};
Expand Down
180 changes: 152 additions & 28 deletions packages/core/test/lib/utils/promisebuffer.test.ts
Original file line number Diff line number Diff line change
@@ -1,52 +1,163 @@
import { describe, expect, test, vi } from 'vitest';
import { makePromiseBuffer } from '../../../src/utils/promisebuffer';
import { SyncPromise } from '../../../src/utils/syncpromise';
import { rejectedSyncPromise, resolvedSyncPromise } from '../../../src/utils/syncpromise';

describe('PromiseBuffer', () => {
describe('add()', () => {
test('no limit', () => {
const buffer = makePromiseBuffer();
const p = vi.fn(() => new SyncPromise(resolve => setTimeout(resolve)));
void buffer.add(p);
expect(buffer.$.length).toEqual(1);
test('enforces limit of promises', async () => {
const buffer = makePromiseBuffer(5);

const producer1 = vi.fn(() => new Promise(resolve => setTimeout(resolve, 1)));
const producer2 = vi.fn(() => new Promise(resolve => setTimeout(resolve, 1)));
const producer3 = vi.fn(() => new Promise(resolve => setTimeout(resolve, 1)));
const producer4 = vi.fn(() => new Promise(resolve => setTimeout(resolve, 1)));
const producer5 = vi.fn(() => new Promise(resolve => setTimeout(resolve, 1)));
const producer6 = vi.fn(() => new Promise(resolve => setTimeout(resolve, 1)));

void buffer.add(producer1);
void buffer.add(producer2);
void buffer.add(producer3);
void buffer.add(producer4);
void buffer.add(producer5);
await expect(buffer.add(producer6)).rejects.toThrowError();

expect(producer1).toHaveBeenCalledTimes(1);
expect(producer2).toHaveBeenCalledTimes(1);
expect(producer3).toHaveBeenCalledTimes(1);
expect(producer4).toHaveBeenCalledTimes(1);
expect(producer5).toHaveBeenCalledTimes(1);
expect(producer6).not.toHaveBeenCalled();

expect(buffer.$.length).toEqual(5);

await buffer.drain();

expect(buffer.$.length).toEqual(0);

expect(producer1).toHaveBeenCalledTimes(1);
expect(producer2).toHaveBeenCalledTimes(1);
expect(producer3).toHaveBeenCalledTimes(1);
expect(producer4).toHaveBeenCalledTimes(1);
expect(producer5).toHaveBeenCalledTimes(1);
expect(producer6).not.toHaveBeenCalled();
});

test('sync promises', async () => {
const buffer = makePromiseBuffer(1);
let task1;
const producer1 = vi.fn(() => {
task1 = resolvedSyncPromise();
return task1;
});
const producer2 = vi.fn(() => resolvedSyncPromise());
expect(buffer.add(producer1)).toEqual(task1);
const add2 = buffer.add(producer2);

// This is immediately executed and removed again from the buffer
expect(buffer.$.length).toEqual(0);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can see the slight change of timing semantics here, in that the promise is immediately added & removed from the buffer if it is a sync promise. This makes sense IMHO and is possibly a tiny micro-optimization, I suppose.


await expect(add2).resolves.toBeUndefined();

expect(producer1).toHaveBeenCalled();
expect(producer2).toHaveBeenCalled();
});

test('with limit', () => {
test('async promises', async () => {
const buffer = makePromiseBuffer(1);
let task1;
const producer1 = vi.fn(() => {
task1 = new SyncPromise(resolve => setTimeout(resolve));
task1 = new Promise(resolve => setTimeout(resolve, 1));
return task1;
});
const producer2 = vi.fn(() => new SyncPromise(resolve => setTimeout(resolve)));
const producer2 = vi.fn(() => new Promise(resolve => setTimeout(resolve, 1)));
expect(buffer.add(producer1)).toEqual(task1);
void expect(buffer.add(producer2)).rejects.toThrowError();
const add2 = buffer.add(producer2);

expect(buffer.$.length).toEqual(1);

await expect(add2).rejects.toThrowError();

expect(producer1).toHaveBeenCalled();
expect(producer2).not.toHaveBeenCalled();
});

test('handles multiple equivalent promises', async () => {
const buffer = makePromiseBuffer(10);

const promise = new Promise(resolve => setTimeout(resolve, 1));

const producer = vi.fn(() => promise);
const producer2 = vi.fn(() => promise);

expect(buffer.add(producer)).toEqual(promise);
expect(buffer.add(producer2)).toEqual(promise);

expect(buffer.$.length).toEqual(1);
Comment on lines +92 to +95
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think this is a case we could even run into in real life? (don't think this is wrong per sé just a bit interesting because we handle this differently e.g. in client hook subscribers)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah not quite sure, I kept this behavior (this was already that way) the same as it was before - honestly I think it is not really "desired"/"needed" but just a way to make sure we can easily remove the promises from the buffer again 😅


expect(producer).toHaveBeenCalled();
expect(producer2).toHaveBeenCalled();

await buffer.drain();

expect(buffer.$.length).toEqual(0);
});
});

describe('drain()', () => {
test('without timeout', async () => {
test('drains all promises without timeout', async () => {
const buffer = makePromiseBuffer();
for (let i = 0; i < 5; i++) {
void buffer.add(() => new SyncPromise(resolve => setTimeout(resolve)));
}

const p1 = vi.fn(() => new Promise(resolve => setTimeout(resolve, 1)));
const p2 = vi.fn(() => new Promise(resolve => setTimeout(resolve, 1)));
const p3 = vi.fn(() => new Promise(resolve => setTimeout(resolve, 1)));
const p4 = vi.fn(() => new Promise(resolve => setTimeout(resolve, 1)));
const p5 = vi.fn(() => new Promise(resolve => setTimeout(resolve, 1)));

[p1, p2, p3, p4, p5].forEach(p => {
void buffer.add(p);
});

expect(buffer.$.length).toEqual(5);
const result = await buffer.drain();
expect(result).toEqual(true);
expect(buffer.$.length).toEqual(0);

expect(p1).toHaveBeenCalled();
expect(p2).toHaveBeenCalled();
expect(p3).toHaveBeenCalled();
expect(p4).toHaveBeenCalled();
expect(p5).toHaveBeenCalled();
});

test('with timeout', async () => {
test('drains all promises with timeout', async () => {
const buffer = makePromiseBuffer();
for (let i = 0; i < 5; i++) {
void buffer.add(() => new SyncPromise(resolve => setTimeout(resolve, 100)));
}

const p1 = vi.fn(() => new Promise(resolve => setTimeout(resolve, 2)));
const p2 = vi.fn(() => new Promise(resolve => setTimeout(resolve, 4)));
const p3 = vi.fn(() => new Promise(resolve => setTimeout(resolve, 6)));
const p4 = vi.fn(() => new Promise(resolve => setTimeout(resolve, 8)));
const p5 = vi.fn(() => new Promise(resolve => setTimeout(resolve, 10)));

[p1, p2, p3, p4, p5].forEach(p => {
void buffer.add(p);
});

expect(p1).toHaveBeenCalled();
expect(p2).toHaveBeenCalled();
expect(p3).toHaveBeenCalled();
expect(p4).toHaveBeenCalled();
expect(p5).toHaveBeenCalled();

expect(buffer.$.length).toEqual(5);
const result = await buffer.drain(50);
const result = await buffer.drain(8);
expect(result).toEqual(false);
// p5 is still in the buffer
expect(buffer.$.length).toEqual(1);

// Now drain final item
const result2 = await buffer.drain();
expect(result2).toEqual(true);
expect(buffer.$.length).toEqual(0);
});

test('on empty buffer', async () => {
Expand All @@ -56,11 +167,26 @@ describe('PromiseBuffer', () => {
expect(result).toEqual(true);
expect(buffer.$.length).toEqual(0);
});

test('resolves even if one of the promises rejects', async () => {
const buffer = makePromiseBuffer();
const p1 = vi.fn(() => new Promise(resolve => setTimeout(resolve, 1)));
const p2 = vi.fn(() => new Promise((_, reject) => setTimeout(() => reject(new Error('whoops')), 1)));
void buffer.add(p1);
void buffer.add(p2);

const result = await buffer.drain();
expect(result).toEqual(true);
expect(buffer.$.length).toEqual(0);

expect(p1).toHaveBeenCalled();
expect(p2).toHaveBeenCalled();
});
});

test('resolved promises should not show up in buffer length', async () => {
const buffer = makePromiseBuffer();
const producer = () => new SyncPromise(resolve => setTimeout(resolve));
const producer = () => new Promise(resolve => setTimeout(resolve, 1));
const task = buffer.add(producer);
expect(buffer.$.length).toEqual(1);
await task;
Expand All @@ -69,20 +195,18 @@ describe('PromiseBuffer', () => {

test('rejected promises should not show up in buffer length', async () => {
const buffer = makePromiseBuffer();
const producer = () => new SyncPromise((_, reject) => setTimeout(reject));
const error = new Error('whoops');
const producer = () => new Promise((_, reject) => setTimeout(() => reject(error), 1));
const task = buffer.add(producer);
expect(buffer.$.length).toEqual(1);
try {
await task;
} catch {
// no-empty
}

await expect(task).rejects.toThrow(error);
expect(buffer.$.length).toEqual(0);
});

test('resolved task should give an access to the return value', async () => {
const buffer = makePromiseBuffer<string>();
const producer = () => new SyncPromise<string>(resolve => setTimeout(() => resolve('test')));
const producer = () => resolvedSyncPromise('test');
const task = buffer.add(producer);
const result = await task;
expect(result).toEqual('test');
Expand All @@ -91,7 +215,7 @@ describe('PromiseBuffer', () => {
test('rejected task should give an access to the return value', async () => {
expect.assertions(1);
const buffer = makePromiseBuffer<string>();
const producer = () => new SyncPromise<string>((_, reject) => setTimeout(() => reject(new Error('whoops'))));
const producer = () => rejectedSyncPromise(new Error('whoops'));
const task = buffer.add(producer);
try {
await task;
Expand Down