Skip to content

Commit

Permalink
feat: add scatter and onEmpty
Browse files Browse the repository at this point in the history
  • Loading branch information
Xiphe committed Sep 4, 2023
1 parent 1605abc commit 09b345f
Show file tree
Hide file tree
Showing 8 changed files with 121 additions and 2 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ npm i ichschwoer
- `createQueue`
- `createRateLimit`
- `createBatchResolve`
- `createScatter`

## WTF is that name?

Expand Down
1 change: 1 addition & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ export { default as createBatchResolve } from './batch-resolve';
export { default as Deferred } from './defer';
export { default as createQueue } from './queue';
export { default as createRateLimit } from './rate-limit';
export { default as createScatter } from './scatter';
10 changes: 10 additions & 0 deletions src/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ export type Job<T> = () => PromiseLike<T> | T;
export default function createQueue(maxParallel: number = 1) {
let running = 0;
const jobs: Job<any>[] = [];
const onEmpty: (() => void)[] = [];

const trigger = () => {
if (running < maxParallel && jobs.length) {
Expand All @@ -16,13 +17,22 @@ export default function createQueue(maxParallel: number = 1) {
running--;
trigger();
});
} else if (!jobs.length && running === 0) {
onEmpty.forEach((cb) => cb());
}
};

return {
get length() {
return jobs.length + running;
},
onEmpty(cb: () => void) {
onEmpty.push(cb);

return () => {
onEmpty.splice(onEmpty.indexOf(cb), 1);
};
},
push<T>(job: Job<T>): Promise<T> {
return new Promise<T>((resolve, reject) => {
jobs.push(() => Promise.resolve(job()).then(resolve, reject));
Expand Down
12 changes: 12 additions & 0 deletions src/rate-limit.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,17 @@ export type Job<T> = () => Promise<T> | T;
export default function createRateLimit(max: number, windowMs: number) {
let window = 0;
const jobs: Job<any>[] = [];
const onEmpty: (() => void)[] = [];

const trigger = () => {
if (window === 0) {
setTimeout(() => {
window = 0;
const next = Math.min(max, jobs.length);
if (next === 0) {
onEmpty.forEach((cb) => cb());
return;
}
for (let i = 0; i < next; i++) {
trigger();
}
Expand All @@ -28,6 +33,13 @@ export default function createRateLimit(max: number, windowMs: number) {
get length() {
return jobs.length + window;
},
onEmpty(cb: () => void) {
onEmpty.push(cb);

return () => {
onEmpty.splice(onEmpty.indexOf(cb), 1);
};
},
push<T>(job: Job<T>): Promise<T> {
return new Promise<T>((resolve, reject) => {
jobs.push(() => Promise.resolve(job()).then(resolve, reject));
Expand Down
56 changes: 56 additions & 0 deletions src/scatter.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
export type Job<T> = () => PromiseLike<T> | T;

/**
* Makes sure all callbacks are executed with at least set ms delay
*/
export default function createScatter(delay: number) {
let lastExecution = -Infinity;
let waiting = false;
const jobs: Job<any>[] = [];
const onEmpty: (() => void)[] = [];

const trigger = () => {
if (jobs.length && !waiting) {
const now = Date.now();

if (lastExecution + delay <= now) {
lastExecution = now;
const job = jobs.shift()!;
job().finally(() => {
trigger();
});
if (!jobs.length) {
onEmpty.forEach((cb) => cb());
}
} else {
waiting = true;
setTimeout(() => {
waiting = false;
trigger();
}, lastExecution + delay - now);
}
}
};

return {
get length() {
return jobs.length;
},
onEmpty(cb: () => void) {
onEmpty.push(cb);

return () => {
onEmpty.splice(onEmpty.indexOf(cb), 1);
};
},
push<T>(job: Job<T>): Promise<T> {
return new Promise<T>((resolve, reject) => {
jobs.push(() => Promise.resolve(job()).then(resolve, reject));
trigger();
});
},
clear() {
jobs.length = 0;
},
};
}
4 changes: 3 additions & 1 deletion test/queue.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ test('queue runs jobs in parallel', async () => {

const check: string[] = [];

singleLine.onEmpty(() => check.push('Empty'));

singleLine.push(() => {
check.push('One');
return d1.promise;
Expand Down Expand Up @@ -112,7 +114,7 @@ test('queue runs jobs in parallel', async () => {
d1.resolve('1');
await tick();

assert.deepEqual(check, ['One', 'Two', 'Three', 'Four']);
assert.deepEqual(check, ['One', 'Two', 'Three', 'Four', 'Empty']);
assert.deepEqual(singleLine.length, 0);
});

Expand Down
3 changes: 2 additions & 1 deletion test/rate-limit.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ test('rateLimit executes only set jobs within time window', async () => {
rateLimit.push(() => {
check.push('Three');
});
rateLimit.onEmpty(() => check.push('Empty'));

assert.deepEqual(check, ['One']);
assert.deepEqual(rateLimit.length, 3);
Expand All @@ -36,7 +37,7 @@ test('rateLimit executes only set jobs within time window', async () => {
assert.deepEqual(rateLimit.length, 1);

clock.tick(100);
assert.deepEqual(check, ['One', 'Two', 'Three']);
assert.deepEqual(check, ['One', 'Two', 'Three', 'Empty']);
assert.deepEqual(rateLimit.length, 0);

clock.uninstall();
Expand Down
36 changes: 36 additions & 0 deletions test/scatter.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import test from 'node:test';
import assert from 'node:assert';
import { install } from '@sinonjs/fake-timers';
import { createScatter } from '../src';

test('scatter executes jobs after set delay', async () => {
const clock = install();
const scatter = createScatter(100);

const check: string[] = [];

scatter.push(() => {
check.push('One');
});
scatter.push(() => {
check.push('Two');
});
scatter.onEmpty(() => check.push('Empty'));

assert.deepEqual(check, ['One']);
assert.deepEqual(scatter.length, 1);

clock.tick(100);
assert.deepEqual(check, ['One', 'Two', 'Empty']);
assert.deepEqual(scatter.length, 0);

clock.tick(100);
scatter.push(() => {
check.push('Three');
});

assert.deepEqual(check, ['One', 'Two', 'Empty', 'Three', 'Empty']);
assert.deepEqual(scatter.length, 0);

clock.uninstall();
});

0 comments on commit 09b345f

Please sign in to comment.