Skip to content

Commit

Permalink
Merge pull request #57 from aesy-engineering/weighted_semaphore
Browse files Browse the repository at this point in the history
Weighted semaphore
  • Loading branch information
DirtyHairy committed Jun 16, 2022
2 parents 2fb5f9c + 7a94809 commit 67efcfe
Show file tree
Hide file tree
Showing 8 changed files with 10,161 additions and 538 deletions.
9,567 changes: 9,567 additions & 0 deletions package-lock.json

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions src/Mutex.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ class Mutex implements MutexInterface {
this._semaphore = new Semaphore(1, cancelError);
}

async weightedAcquire(): Promise<MutexInterface.Releaser> {
return this.acquire()
}

async acquire(): Promise<MutexInterface.Releaser> {
const [, releaser] = await this._semaphore.acquire();

Expand Down
2 changes: 2 additions & 0 deletions src/MutexInterface.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
interface MutexInterface {
acquire(): Promise<MutexInterface.Releaser>;

weightedAcquire(): Promise<MutexInterface.Releaser>;

runExclusive<T>(callback: MutexInterface.Worker<T>): Promise<T>;

waitForUnlock(): Promise<void>;
Expand Down
20 changes: 15 additions & 5 deletions src/Semaphore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,16 @@ class Semaphore implements SemaphoreInterface {
}

acquire(): Promise<[number, SemaphoreInterface.Releaser]> {
return this.weightedAcquire(1)
}

weightedAcquire(weight: number): Promise<[number, SemaphoreInterface.Releaser]> {
const locked = this.isLocked();
const ticketPromise = new Promise<[number, SemaphoreInterface.Releaser]>((resolve, reject) =>
this._queue.push({ resolve, reject })
);

if (!locked) this._dispatch();
if (!locked) this._dispatch(weight);

return ticketPromise;
}
Expand Down Expand Up @@ -75,7 +79,7 @@ class Semaphore implements SemaphoreInterface {
this._queue = [];
}

private _dispatch(): void {
private _dispatch(weight: number): void {
const nextTicket = this._queue.shift();

if (!nextTicket) return;
Expand All @@ -85,13 +89,19 @@ class Semaphore implements SemaphoreInterface {
if (released) return;

released = true;
this._value++;
this._value = this._value + weight
this._resolveWaiters();

this._dispatch();
this._dispatch(weight);
};

nextTicket.resolve([this._value--, this._currentReleaser]);
nextTicket.resolve([this._valueMinusMinusWeight(weight), this._currentReleaser]);
}

private _valueMinusMinusWeight(weight: number): number {
const oldValue = this._value
this._value = this._value - weight
return oldValue
}

private _resolveWaiters() {
Expand Down
2 changes: 2 additions & 0 deletions src/SemaphoreInterface.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
interface SemaphoreInterface {
acquire(): Promise<[number, SemaphoreInterface.Releaser]>;

weightedAcquire(weight: number): Promise<[number, SemaphoreInterface.Releaser]>;

runExclusive<T>(callback: SemaphoreInterface.Worker<T>): Promise<T>;

waitForUnlock(): Promise<void>;
Expand Down
8 changes: 6 additions & 2 deletions src/withTimeout.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ export function withTimeout(semaphore: SemaphoreInterface, timeout: number, time
// eslint-disable-next-line @typescript-eslint/explicit-module-boundary-types
export function withTimeout(sync: MutexInterface | SemaphoreInterface, timeout: number, timeoutError = E_TIMEOUT) {
return {
acquire: (): Promise<MutexInterface.Releaser | [number, SemaphoreInterface.Releaser]> =>
weightedAcquire: (weight: number): Promise<MutexInterface.Releaser | [number, SemaphoreInterface.Releaser]> =>
new Promise(async (resolve, reject) => {
let isTimeout = false;

Expand All @@ -17,7 +17,7 @@ export function withTimeout(sync: MutexInterface | SemaphoreInterface, timeout:
}, timeout);

try {
const ticket = await sync.acquire();
const ticket = await sync.weightedAcquire(weight);

if (isTimeout) {
const release = Array.isArray(ticket) ? ticket[1] : ticket;
Expand All @@ -36,6 +36,10 @@ export function withTimeout(sync: MutexInterface | SemaphoreInterface, timeout:
}
}),

acquire(): Promise<MutexInterface.Releaser | [number, SemaphoreInterface.Releaser]> {
return this.weightedAcquire(1)
},

async runExclusive<T>(callback: (value?: number) => Promise<T> | T): Promise<T> {
let release: () => void = () => undefined;

Expand Down
49 changes: 49 additions & 0 deletions test/semaphore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,28 @@ export const semaphoreSuite = (factory: (maxConcurrency: number, err?: Error) =>
assert.deepStrictEqual(values.sort(), [1, 2]);
});

test('weightedAcquire does not block while the semaphore has not reached zero', async () => {
const values: Array<number> = [];

semaphore.weightedAcquire(1).then(([value]) => values.push(value));
semaphore.weightedAcquire(1).then(([value]) => values.push(value));

await clock.tickAsync(0);

assert.deepStrictEqual(values.sort(), [1, 2]);
});

test('weightedAcquire does block while the semaphore has reached zero', async () => {
const values: Array<number> = [];

semaphore.weightedAcquire(2).then(([value]) => values.push(value));
semaphore.weightedAcquire(1).then(([value]) => values.push(value));

await clock.tickAsync(0);

assert.deepStrictEqual(values.sort(), [2]);
});

test('acquire blocks when the semaphore has reached zero', async () => {
const values: Array<number> = [];

Expand All @@ -48,6 +70,33 @@ export const semaphoreSuite = (factory: (maxConcurrency: number, err?: Error) =>
assert.deepStrictEqual(values.sort(), [1, 1, 2]);
});

test('weightedAcquire blocks when the semaphore has reached zero and unblocked on release', async () => {
const values: Array<number> = [];

semaphore.weightedAcquire(1).then(([value, release]) => {
values.push(value)
release()
});
semaphore.weightedAcquire(3).then(([value, release]) => {
values.push(value);
setTimeout(() => {
release();
}, 100);
});
semaphore.weightedAcquire(1).then(([value, release]) => {
values.push(value)
release();
});

await clock.tickAsync(0);

assert.deepStrictEqual(values.sort(), [-1, 1, 2]);

await clock.runAllAsync();

assert.deepStrictEqual(values.sort(), [-1, 1, 2]);
});

test('the semaphore increments again after a release', async () => {
semaphore.acquire().then(([, release]) => setTimeout(release, 100));
semaphore.acquire().then(([, release]) => setTimeout(release, 200));
Expand Down

0 comments on commit 67efcfe

Please sign in to comment.