From 4c77797ce9634582bb4255247d70574c9f7b28d1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ramirez=20Vargas=2C=20Jos=C3=A9=20Pablo?= Date: Wed, 11 Sep 2024 22:29:28 -0600 Subject: [PATCH] feat: Add Semaphore and Mutex --- README.md | 10 +- pages/package-lock.json | 16 +- pages/src/lib/CrossOriginNotIsolated.svelte | 2 + pages/src/lib/Instructions.svelte | 33 +++- pages/src/lib/Primes.svelte | 22 ++- pages/src/lib/Stopwatch.svelte | 80 ++++++++ pages/src/lib/Timer.svelte | 109 +++++++++++ pages/src/workers/exampleWorker.ts | 10 +- src/cancellation/CancellationSource.ts | 25 ++- src/cancellation/TaskCancelledError.ts | 2 +- src/events/AutoResetEvent.ts | 77 -------- src/events/Event.ts | 41 ----- src/events/ManualResetEvent.ts | 71 -------- src/index.ts | 16 +- src/sync/AutoResetEvent.ts | 77 ++++++++ src/sync/Event.ts | 19 ++ src/sync/ManualResetEvent.ts | 70 +++++++ src/sync/Mutex.ts | 63 +++++++ src/sync/Semaphore.ts | 191 ++++++++++++++++++++ src/sync/SyncObject.ts | 30 +++ src/sync/identifiers.ts | 50 +++++ src/workers/InternalSharedWorker.ts | 1 - src/workers/InternalWorker.ts | 1 - 23 files changed, 781 insertions(+), 235 deletions(-) create mode 100644 pages/src/lib/Stopwatch.svelte create mode 100644 pages/src/lib/Timer.svelte delete mode 100644 src/events/AutoResetEvent.ts delete mode 100644 src/events/Event.ts delete mode 100644 src/events/ManualResetEvent.ts create mode 100644 src/sync/AutoResetEvent.ts create mode 100644 src/sync/Event.ts create mode 100644 src/sync/ManualResetEvent.ts create mode 100644 src/sync/Mutex.ts create mode 100644 src/sync/Semaphore.ts create mode 100644 src/sync/SyncObject.ts create mode 100644 src/sync/identifiers.ts diff --git a/README.md b/README.md index 29ad995..6e56064 100644 --- a/README.md +++ b/README.md @@ -259,7 +259,7 @@ This package provides synchronization objects that use `Atomics` to cross the th This is the web worker equivalent of `AbortController` and provides a token that can be passed to workers in a task's payload. -The worker can use `CancellationSource.isSignalled()` or `CancellationSource.throwIfSignalled()` through polling in order +The worker can use `CancellationSource.isSignaled()` or `CancellationSource.throwIfSignaled()` through polling in order to abort the current work: ```typescript @@ -268,7 +268,7 @@ import { CancellationSource, type Token } from '@wjfe/async-workers'; function computeSomeStuff(cancelToken?: Token) { for (let i = 0; i < Number.MAX_SAFE_INTEGER; ++i) { // No thowing will be done if cancelToken is undefined. - CancellationSource.throwIfSignalled(cancelToken); + CancellationSource.throwIfSignaled(cancelToken); ... } } @@ -300,7 +300,7 @@ If you're using `CancellationSource` on your own: ### ManualResetEvent -This is a synchronization object that can be used to signal multiple threads at once because it will remain signalled +This is a synchronization object that can be used to signal multiple threads at once because it will remain signaled until the `reset()` event is invoked. A typical use case is to use it for pausing a worker's work. ### AutoResetEvent @@ -373,7 +373,7 @@ work properly in Node. | - | - | - | | [x] ManualResetEvent | [x] Simple request/response scenario | [x] Simple request/response scenario | | [x] AutoResetEvent | [x] Request/multiple response scenario | [x] Request/multiple response scenario | -| [ ] Semaphore | [x] Strongly-typed tasks | [x] Strongly-typed tasks | +| [x] Semaphore | [x] Strongly-typed tasks | [x] Strongly-typed tasks | | [x] CancellationSource | [x] Worker termination | | -| | [x] Out-of-order work items | [x] Out-of-order work items | +| [x] Mutex | [x] Out-of-order work items | [x] Out-of-order work items | | | [x] Task cancellation | [x] Task cancellation| diff --git a/pages/package-lock.json b/pages/package-lock.json index dccb104..0027238 100644 --- a/pages/package-lock.json +++ b/pages/package-lock.json @@ -3405,23 +3405,23 @@ } }, "node_modules/svelte": { - "version": "5.0.0-next.237", - "resolved": "https://registry.npmjs.org/svelte/-/svelte-5.0.0-next.237.tgz", - "integrity": "sha512-EkNhMFq6cjfrQzBv+YWaxs8mgRyHlfjMTYMlVkx/lsWq4EUTHTRYzNf2nfWn7cXMYE1fXg8DsR3CZ6zv1qlQeQ==", + "version": "5.0.0-next.244", + "resolved": "https://registry.npmjs.org/svelte/-/svelte-5.0.0-next.244.tgz", + "integrity": "sha512-whSOcKdpuAFd5xD9J2EhuHeRs4J4nHis6NSUKRXpC3HQoCmsoKhyIldMjiv6QFkQpe6QMsid8lwvgLXkZTSC/A==", "license": "MIT", "dependencies": { - "@ampproject/remapping": "^2.2.1", - "@jridgewell/sourcemap-codec": "^1.4.15", + "@ampproject/remapping": "^2.3.0", + "@jridgewell/sourcemap-codec": "^1.5.0", "@types/estree": "^1.0.5", - "acorn": "^8.11.3", + "acorn": "^8.12.1", "acorn-typescript": "^1.4.13", "aria-query": "^5.3.0", - "axobject-query": "^4.0.0", + "axobject-query": "^4.1.0", "esm-env": "^1.0.0", "esrap": "^1.2.2", "is-reference": "^3.0.2", "locate-character": "^3.0.0", - "magic-string": "^0.30.5", + "magic-string": "^0.30.11", "zimmerframe": "^1.1.2" }, "engines": { diff --git a/pages/src/lib/CrossOriginNotIsolated.svelte b/pages/src/lib/CrossOriginNotIsolated.svelte index 50b7964..2f55e94 100644 --- a/pages/src/lib/CrossOriginNotIsolated.svelte +++ b/pages/src/lib/CrossOriginNotIsolated.svelte @@ -15,6 +15,8 @@
  • ManualResetEvent
  • AutoResetEvent
  • CancellationSource
  • +
  • Semaphore
  • +
  • Mutex
  • In order to solve this problem in your own project/site, diff --git a/pages/src/lib/Instructions.svelte b/pages/src/lib/Instructions.svelte index f5863c4..520ae45 100644 --- a/pages/src/lib/Instructions.svelte +++ b/pages/src/lib/Instructions.svelte @@ -2,8 +2,8 @@

     Instructions

    Play around with the three work items below. Each will run the same example worker, which is a worker that - calculates prime numbers using a not-so-good algorithm that runs up to the shown number. All three work items - use the same worker object, so they run serially between each other. + calculates prime numbers using a not-so-good algorithm that runs up to the specified number in the slider. All + three work items use the same worker object, so they run serially between each other.

    {#if crossOriginIsolated}

    @@ -13,13 +13,30 @@ here), you may pause or cancel the worker at any time.

    - NOTE: Notice how significant the pause wait is in terms of performance. Waiting on the - token is expensive for sure, so do this only on well-thought-out scenarios. + NOTE ON PERFORMANCE: Atomic operations are less performant than polling. Don't go crazy + with the synchronization objects, and see if polling fits the bill first. For example, the primes worker + in this demo page does polling to condition the wait as opposed to blindly waiting: +

    +
    if (!ManualResetEvent.isSignaled(pause)) {
    +    ManualResetEvent.wait(pause);
    +}
    +

    + Still, even with this modification, you'll see a big difference in times between a pausable and a + non-pausable run for the same numerical limit.

    {:else} - Since cross origin is not isolated (see - - the requirements - here), you may only cancel the worker before it starts. Furthermore, you cannot pause the worker. +

    + Since cross origin is not isolated (see + + the requirements + here), you may only cancel the worker before it starts. Furthermore, you cannot pause the worker. +

    +

    + To experience the power of the synchronization objects, head to the + GitHub repository and clone + the source code. This demonstration's code is inside the folder named pages. Open a console, + change to the pages folder, install the packages running npm ci, and then execute + the development server with npm run dev. +

    {/if} \ No newline at end of file diff --git a/pages/src/lib/Primes.svelte b/pages/src/lib/Primes.svelte index d4d3316..e2bde26 100644 --- a/pages/src/lib/Primes.svelte +++ b/pages/src/lib/Primes.svelte @@ -1,7 +1,9 @@ + + + + {#key hh} + {f(hh)} + {/key} + : + {#key mm} + {f(mm)} + {/key} + : + {#key ss} + {f(ss)} + {/key} + + + + diff --git a/pages/src/lib/Timer.svelte b/pages/src/lib/Timer.svelte new file mode 100644 index 0000000..8348f2b --- /dev/null +++ b/pages/src/lib/Timer.svelte @@ -0,0 +1,109 @@ + + + + + {#key hh} + {f(hh)} + {/key} + : + {#key mm} + {f(mm)} + {/key} + : + {#key ss} + {f(ss)} + {/key} + + + + diff --git a/pages/src/workers/exampleWorker.ts b/pages/src/workers/exampleWorker.ts index 17ae34c..6fd885a 100644 --- a/pages/src/workers/exampleWorker.ts +++ b/pages/src/workers/exampleWorker.ts @@ -3,7 +3,7 @@ import { CancellationSource, ManualResetEvent, workerListener, type PostFn, type function isPrime(n: number, cancelToken?: Token) { // Made unecessarily inefficient for demo purposes. for (let i = 2; i <= n / 2; ++i) { - CancellationSource.throwIfSignalled(cancelToken); + CancellationSource.throwIfSignaled(cancelToken); if (n % i === 0) { return false; } @@ -14,8 +14,10 @@ function isPrime(n: number, cancelToken?: Token) { function isPrimePausable(n: number, pause: Token, cancelToken?: Token) { // Made unecessarily inefficient for demo purposes. for (let i = 2; i <= n / 2; ++i) { - CancellationSource.throwIfSignalled(cancelToken); - ManualResetEvent.wait(pause); + CancellationSource.throwIfSignaled(cancelToken); + if (!ManualResetEvent.isSignaled(pause)) { + ManualResetEvent.wait(pause); + } if (n % i === 0) { return false; } @@ -33,7 +35,7 @@ function getAllPrimes(max: number, pause: Token | undefined, post: PostFn, cance } export const exampleWorker = { - sayHello(payload: { name: string; }) { + iExistToShowOffIntellisense(payload: { name: string; }) { console.log('Hello, %s!', payload.name); }, calculatePrimes(payload: { to: number; pause?: Token }, post: PostFn, cancelToken?: Token) { diff --git a/src/cancellation/CancellationSource.ts b/src/cancellation/CancellationSource.ts index a2befc3..93be82b 100644 --- a/src/cancellation/CancellationSource.ts +++ b/src/cancellation/CancellationSource.ts @@ -1,27 +1,36 @@ -import { ManualResetEvent } from "../events/ManualResetEvent.js"; +import { Event } from "../sync/Event.js"; +import { cancellationSourceIdentityData } from "../sync/identifiers.js"; +import { isSignaled } from "../sync/ManualResetEvent.js"; import type { Token } from "../workers.js"; import { TaskCancelledError } from "./TaskCancelledError.js"; /** * Specialized synchronization event object for the purposes of signalling cancellation intent. */ -export class CancellationSource extends ManualResetEvent { +export class CancellationSource extends Event { + constructor() { + super(cancellationSourceIdentityData[0], undefined); + } /** - * Do not call. Cancellation tokens cannot be reset. + * Checks whether or not a cancellation source's token is in its signaled state. + * + * This method may be used by worker threads in polling mode. + * @param token Token to check. + * @returns `true` if the token is signaled, or `false` otherwise. */ - reset(): void { - throw new Error("Cancellation tokens cannot be reset."); + static isSignaled(token: Token) { + return isSignaled(cancellationSourceIdentityData, token); } /** * Checks the given cancellation token and throws an instance of `TaskCancelledError` if the token is in its - * signalled state. + * signaled state. * @param token Cancellation token to check. */ - static throwIfSignalled(token: Token | undefined) { + static throwIfSignaled(token: Token | undefined) { if (!token) { return; } - if (this.isSignalled(token)) { + if (this.isSignaled(token)) { throw new TaskCancelledError(); } } diff --git a/src/cancellation/TaskCancelledError.ts b/src/cancellation/TaskCancelledError.ts index 5113967..fd43130 100644 --- a/src/cancellation/TaskCancelledError.ts +++ b/src/cancellation/TaskCancelledError.ts @@ -1,5 +1,5 @@ /** - * Error class used by `CancellationSource.throwIfSignalled` to abort a worker thread's current work. + * Error class used by `CancellationSource.throwIfSignaled` to abort a worker thread's current work. */ export class TaskCancelledError extends Error { constructor(message?: string, options?: ErrorOptions) { diff --git a/src/events/AutoResetEvent.ts b/src/events/AutoResetEvent.ts deleted file mode 100644 index 8094f3b..0000000 --- a/src/events/AutoResetEvent.ts +++ /dev/null @@ -1,77 +0,0 @@ -import type { Token } from "../workers.js"; -import { checkToken, Event } from "./Event.js"; - -const identifier = 0x02; -const checkData = [ - identifier, - 'automatically-reset event', - 'an' -] as const; - -/** - * Synchronization event object that automatically resets whenever a worker thread is unlocked by it. - * - * It is useful to ensure that only one thread at a time runs at any given time. - */ -export class AutoResetEvent extends Event { - constructor() { - super(identifier); - } - - /** - * Signals the event, unblocking at most one blocked thread. - */ - signal(): void { - super.signal(); - Atomics.notify(super.token, 0, 1); - } - /** - * Checks whether or not an auto-resettable event's token is in its signalled state. - * - * Auto-reset events automatically reset when a thread is freed by it, so expect this function to only return - * `true` if the token has been signalled and there were no threads blocked by it. - * @param token Auto-resettable token to check. - * @returns `true` if the token is signalled, or `false` otherwise. - */ - static isSignalled(token: Token) { - checkToken(token, ...checkData); - return Atomics.load(token, 0) === 1; - } - /** - * Waits on the specified auto-resettable token to be signalled. - * - * Use this method to block a worker's thread for whatever reason. - * @param token Auto-reset token to wait on. - * @param timeout Maximum time to wait on the token. Don't specify a value to wait indefinitely. - * @returns `'ok'` when the waiting is over because the token signalled, `timed-out` when the specified timeout - * elapsed and the token did not signal, or `not-equal` if no wait took place (which should not happen for - * auto-reset events). - */ - static wait(token: Token, timeout?: number) { - checkToken(token, ...checkData); - const result = Atomics.wait(token, 0, 0, timeout); - if (result === 'ok') { - Atomics.store(token, 0, 0); - } - return result; - } - /** - * Asynchronously waits on the specified auto-reset token to be signalled. - * - * Use this method to stop the current work and release the worker thread (to pick up on new messages, perhaps). - * @param token Auto-reset token to wait on. - * @param timeout Maximum time to wait on the token. Don't specify a value to wait indefinitely. - * @returns `'ok'` when the waiting is over because the token signalled, `timed-out` when the specified timeout - * elapsed and the token did not signal, or `not-equal` if no wait took place (which should not happen for - * auto-reset events). - */ - static async waitAsync(token: Token, timeout?: number) { - checkToken(token, ...checkData); - const result = Atomics.waitAsync(token, 0, 0, timeout); - const finalResult = result.async ? await result.value : result.value; - if (finalResult !== 'timed-out') { - Atomics.store(token, 0, 0); - } - return finalResult; - } -}; diff --git a/src/events/Event.ts b/src/events/Event.ts deleted file mode 100644 index 8bf1ec9..0000000 --- a/src/events/Event.ts +++ /dev/null @@ -1,41 +0,0 @@ -import type { Token } from "../workers"; - -export class Event { - #buffer; - #array; - #identifier; - constructor(identifier: number) { - if (!crossOriginIsolated) { - throw new Error('Cannot operate: Cross origin is not isolated. See https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/SharedArrayBuffer#security_requirements for details.'); - } - this.#buffer = new SharedArrayBuffer(8); - this.#array = new Int32Array(this.#buffer); - this.#identifier = identifier; - Atomics.store(this.#array, 1, identifier); - } - /** - * Gets the synchronization event's token. - */ - get token(): Token { - return this.#array; - } - /** - * Signals the event. - */ - signal() { - Atomics.store(this.#array, 0, 1); - } -}; - -/** - * Ensures the given token is of the expected type by throwing an error if this is not the case. - * @param token Token to check for. - * @param identifier Token type identifier. - * @param objectName Object name, used for constructing the error's message. - * @param article Article for the object name, so the error's message is written in proper English. - */ -export function checkToken(token: Token, identifier: number, objectName: string, article: string) { - if (Atomics.load(token, 1) !== identifier) { - throw new Error(`The provided token is not that of ${article} ${objectName}.`); - } -} diff --git a/src/events/ManualResetEvent.ts b/src/events/ManualResetEvent.ts deleted file mode 100644 index 613fe9e..0000000 --- a/src/events/ManualResetEvent.ts +++ /dev/null @@ -1,71 +0,0 @@ -import type { Token } from "../workers.js"; -import { checkToken, Event } from "./Event.js"; - -const identifier = 0x01; -const checkData = [ - identifier, - 'manually-reset event', - 'a' -] as const; - -/** - * Synchronization event object that can be signalled on-demand whenever it is appropriate. - * - * It is useful in cases where external control is the priority, such as pausing work. - */ -export class ManualResetEvent extends Event { - constructor() { - super(identifier); - } - /** - * Signals the event, unblocking all threads that are waiting on it. Use `reset()` to revert the signalled state. - */ - signal(): void { - super.signal(); - Atomics.notify(super.token, 0); - } - /** - * Resets the token. - */ - reset() { - Atomics.store(super.token, 0, 0); - } - /** - * Checks whether or not a manually-resettable event's token is in its signalled state. - * - * This method may be used by worker threads in polling mode. - * @param token Manually-resettable token to check. - * @returns `true` if the token is signalled, or `false` otherwise. - */ - static isSignalled(token: Token) { - checkToken(token, ...checkData); - return Atomics.load(token, 0) === 1; - } - /** - * Waits on the specified manually-resettable token to be signalled. - * - * Use this method to block a worker's thread for whatever reason. - * @param token Manually-resettable token to wait on. - * @param timeout Maximum time to wait on the token. Don't specify a value to wait indefinitely. - * @returns `'ok'` when the waiting is over because the token signalled, `timed-out` when the specified timeout - * elapsed and the token did not signal, or `not-equal` if no wait took place. - */ - static wait(token: Token, timeout?: number) { - checkToken(token, ...checkData); - return Atomics.wait(token, 0, 0, timeout); - } - /** - * Asynchronously waits on the specified manually-resettable token to be signalled. - * - * Use this method to stop the current work and release the worker thread (to pick up on new messages, perhaps). - * @param token Manually-resettable token to wait on. - * @param timeout Maximum time to wait on the token. Don't specify a value to wait indefinitely. - * @returns `'ok'` when the waiting is over because the token signalled, `timed-out` when the specified timeout - * elapsed and the token did not signal, or `not-equal` if no wait took place. - */ - static async waitAsync(token: Token, timeout?: number) { - checkToken(token, ...checkData); - const result = Atomics.waitAsync(token, 0, 0, timeout); - return result.async ? await result.value : result.value; - } -}; diff --git a/src/index.ts b/src/index.ts index 4b345d4..9128e6d 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,10 +1,12 @@ // cancellation -export * from "./cancellation/CancellationSource.js"; -export * from "./cancellation/CancelledMessage.js"; -export * from "./cancellation/TaskCancelledError.js"; -// events -export * from "./events/AutoResetEvent.js"; -export * from "./events/ManualResetEvent.js"; +export { CancellationSource } from "./cancellation/CancellationSource.js"; +export { CancelledMessage } from "./cancellation/CancelledMessage.js"; +export { TaskCancelledError } from "./cancellation/TaskCancelledError.js"; +// synchronization objects +export { AutoResetEvent } from "./sync/AutoResetEvent.js"; +export { ManualResetEvent } from "./sync/ManualResetEvent.js"; +export { Mutex } from "./sync/Mutex.js"; +export { Semaphore } from "./sync/Semaphore.js"; // workers export * from './workers/AsyncWorker.js'; export * from "./workers/workerListener.js"; @@ -37,5 +39,3 @@ export type AsyncMessage any>> = cancelToken?: Token; payload?: WorkerTasks[keyof Tasks]['payload']; }; - -export type ProcessMessageFn = (payload: any, cancelToken?: Token) => boolean; diff --git a/src/sync/AutoResetEvent.ts b/src/sync/AutoResetEvent.ts new file mode 100644 index 0000000..b2204d8 --- /dev/null +++ b/src/sync/AutoResetEvent.ts @@ -0,0 +1,77 @@ +import type { Token } from "../workers.js"; +import { Event } from "./Event.js"; +import { autoResetEventIdentityData, checkToken } from "./identifiers.js"; + +/** + * Synchronization event object that automatically resets whenever a worker thread is unlocked by it. + * + * It is useful to ensure that only one thread at a time runs at any given time. + */ +export class AutoResetEvent extends Event { + constructor() { + super(autoResetEventIdentityData[0], 1); + } + /** + * Checks whether or not an auto-resettable event's token is in its signaled state. + * + * Auto-reset events automatically reset when a thread is freed by it, so expect this function to only return + * `true` if the token has been signaled and there were no threads blocked by it. + * @param token Token to check. + * @returns `true` if the token is signaled, or `false` otherwise. + */ + static isSignaled(token: Token) { + checkToken(token, ...autoResetEventIdentityData); + return Atomics.load(token, 0) === 1; + } + /** + * Waits on the specified auto-resettable event's token to be signaled. + * + * Use this method to block a worker's thread for whatever reason. + * @param token Token to wait on. + * @param timeout Maximum time to wait on the token. Don't specify a value to wait indefinitely. + * @returns `'ok'` when the waiting is over because the token signaled while waiting on it, `'timed-out'` when the + * specified timeout elapsed and the token did not signal, or `'not-equal'` if no wait took place. + */ + static wait(token: Token, timeout?: number) { + checkToken(token, ...autoResetEventIdentityData); + // Performance optimization: Blind attempt. + if (Atomics.compareExchange(token, 0, 1, 0) === 1) { + return 'not-equal'; + } + while (true) { + const result = Atomics.wait(token, 0, 0, timeout); + if (result === 'timed-out') { + return result; + } + if (Atomics.compareExchange(token, 0, 1, 0) === 1) { + return result; + } + } + } + /** + * Asynchronously waits on the specified auto-reset token to be signaled. + * + * Use this method to stop the current work and release the worker thread (to pick up on new messages, perhaps). + * @param token Token to wait on. + * @param timeout Maximum time to wait on the token. Don't specify a value to wait indefinitely. + * @returns `'ok'` when the waiting is over because the token signaled while waiting on it, `'timed-out'` when the + * specified timeout elapsed and the token did not signal, or `'not-equal'` if no wait took place. + */ + static async waitAsync(token: Token, timeout?: number) { + checkToken(token, ...autoResetEventIdentityData); + // Performance optimization: Blind attempt. + if (Atomics.compareExchange(token, 0, 1, 0) === 1) { + return 'not-equal'; + } + while (true) { + const result = Atomics.waitAsync(token, 0, 0, timeout); + const finalResult = result.async ? await result.value : result.value; + if (finalResult === 'timed-out') { + return finalResult; + } + if (Atomics.compareExchange(token, 0, 1, 0) === 1) { + return finalResult; + } + } + } +}; diff --git a/src/sync/Event.ts b/src/sync/Event.ts new file mode 100644 index 0000000..080800c --- /dev/null +++ b/src/sync/Event.ts @@ -0,0 +1,19 @@ +import { SyncObject } from "./SyncObject.js"; + +/** + * Base class for event synchronization objects. + */ +export class Event extends SyncObject { + #threadsToNofity; + constructor(identifier: number, threadsToNotify: number | undefined) { + super(identifier); + this.#threadsToNofity = threadsToNotify; + } + /** + * Signals the event. + */ + signal() { + Atomics.store(this.token, 0, 1); + Atomics.notify(this.token, 0, this.#threadsToNofity); + } +} diff --git a/src/sync/ManualResetEvent.ts b/src/sync/ManualResetEvent.ts new file mode 100644 index 0000000..c974d54 --- /dev/null +++ b/src/sync/ManualResetEvent.ts @@ -0,0 +1,70 @@ +import type { Token } from "../workers.js"; +import { Event } from './Event.js'; +import { checkToken, manualResetEventIdentityData, type IdentifierData } from "./identifiers.js"; + +export function isSignaled(identifierData: IdentifierData, token: Token) { + checkToken(token, ...identifierData); + return Atomics.load(token, 0) === 1; +} + +export function wait(identifierData: IdentifierData, token: Token, timeout?: number) { + checkToken(token, ...identifierData); + return Atomics.wait(token, 0, 0, timeout); +} + +export async function waitAsync(identifierData: IdentifierData, token: Token, timeout?: number) { + checkToken(token, ...identifierData); + const result = Atomics.waitAsync(token, 0, 0, timeout); + return result.async ? await result.value : result.value; +} + +/** + * Synchronization event object that can be signaled on-demand whenever it is appropriate. + * + * It is useful in cases where external control is the priority, such as pausing work. + */ +export class ManualResetEvent extends Event { + constructor() { + super(manualResetEventIdentityData[0], undefined); + } + /** + * Resets the token. + */ + reset() { + Atomics.store(super.token, 0, 0); + } + /** + * Checks whether or not a manually-resettable event's token is in its signaled state. + * + * This method may be used by worker threads in polling mode. + * @param token Token to check. + * @returns `true` if the token is signaled, or `false` otherwise. + */ + static isSignaled(token: Token) { + return isSignaled(manualResetEventIdentityData, token); + } + /** + * Waits on the specified manually-resettable token to be signaled. + * + * Use this method to block a worker's thread for whatever reason. + * @param token Token to wait on. + * @param timeout Maximum time to wait on the token. Don't specify a value to wait indefinitely. + * @returns `'ok'` when the waiting is over because the token signaled while waiting on it, `'timed-out'` when the + * specified timeout elapsed and the token did not signal, or `'not-equal'` if no wait took place. + */ + static wait(token: Token, timeout?: number) { + return wait(manualResetEventIdentityData, token, timeout); + } + /** + * Asynchronously waits on the specified manually-resettable token to be signaled. + * + * Use this method to stop the current work and release the worker thread (to pick up on new messages, perhaps). + * @param token Token to wait on. + * @param timeout Maximum time to wait on the token. Don't specify a value to wait indefinitely. + * @returns `'ok'` when the waiting is over because the token signaled while waiting on it, `'timed-out'` when the + * specified timeout elapsed and the token did not signal, or `'not-equal'` if no wait took place. + */ + static waitAsync(token: Token, timeout?: number) { + return waitAsync(manualResetEventIdentityData, token, timeout); + } +}; diff --git a/src/sync/Mutex.ts b/src/sync/Mutex.ts new file mode 100644 index 0000000..554c195 --- /dev/null +++ b/src/sync/Mutex.ts @@ -0,0 +1,63 @@ +import { Token } from "../workers"; +import { mutexIdentityData } from "./identifiers"; +import { acquire, acquireAsync, SemaphoreInternal, type Releaser } from "./Semaphore"; + +/** + * Synchronization object that can be used to grant a single thread exclusive access to a resource or critical section. + */ +export class Mutex extends SemaphoreInternal { + constructor(createDisabled: boolean = false) { + super(mutexIdentityData[0], 1, createDisabled); + } + + /** + * Acquires exclusivity from the specified mutex's token. + * + * **IMPORTANT**: Acquiring a mutex halts other work. Releasing the mutex by invoking the releaser function that + * this method returns is *imperative*. Use a `try..finally` construct to make sure that the releaser function is + * always executed, even in the event of an error. + * + * @example + * ```typescript + * const releaser = await Mutex.acquireAsync(myMutexToken); + * try { + * ... + * } + * finally { + * releaser(); + * } + * ``` + * @param token Mutex token to be acquired. + * @returns A releaser object that can and should be used for releasing the mutex. + */ + static acquire(token: Token): Releaser; + /** + * Acquires exclusivity from the specified mutex's token. + * + * **IMPORTANT**: Acquiring a mutex halts other work. Releasing the mutex by invoking the releaser function that + * this method returns is *imperative*. Use a `try..finally` construct to make sure that the releaser function is + * always executed, even in the event of an error. + * + * @example + * ```typescript + * const releaser = await Mutex.acquireAsync(myMutexToken); + * try { + * ... + * } + * finally { + * releaser(); + * } + * ``` + * @param token Mutex token to be acquired. + * @param timeout Timeout value in milliseconds to wait for acquisition. + * @returns A releaser object that can and should be used for releasing the mutex, or the value `'timed-out'` if + * the mutex could not be acquired before the specified timeout time elapsed. + */ + static acquire(token: Token, timeout: number): Releaser | "timed-out"; + static acquire(token: Token, timeout?: number) { + return acquire(mutexIdentityData, token, timeout); + } + static acquireAsync(token: Token, timeout?: number) { + return acquireAsync(mutexIdentityData, token, timeout); + } +}; diff --git a/src/sync/Semaphore.ts b/src/sync/Semaphore.ts new file mode 100644 index 0000000..ff894be --- /dev/null +++ b/src/sync/Semaphore.ts @@ -0,0 +1,191 @@ +import type { Token } from "../workers.js"; +import { checkToken, semaphoreIdentityData, type IdentifierData } from "./identifiers.js"; +import { SyncObject } from "./SyncObject.js"; + +/** + * Function used to release an acquired synchronization object. + */ +export type Releaser = () => void; + +export class SemaphoreInternal extends SyncObject { + #capacity; + #disabled; + constructor(identifier: number, capacity: number, createDisabled: boolean) { + if (capacity <= 0 || !Number.isInteger(capacity)) { + throw new Error("A semaphore's capacity can only be a postive integer."); + } + super(identifier, createDisabled ? 0 : capacity); + this.#capacity = capacity; + this.#disabled = createDisabled; + } + + /** + * Enables the synchronization object if it was created in a disabled state and hasn't been enabled yet. + * @returns `true` if the synchronization object was disabled and got enabled, or `false` otherwise. + */ + enable() { + if (!this.#disabled) { + return false; + } + this.#disabled = false; + Atomics.store(this.token, 0, this.#capacity); + Atomics.notify(this.token, 0); + return true; + } +}; + +function buildReleaser(token: Token) { + let released = false; + return (() => { + if (released) { + throw new Error('The semaphore has already been released and cannot be released again.'); + } + Atomics.add(token, 0, 1); + Atomics.notify(token, 0, 1); + released = true; + }) as Releaser; +} + +export function acquire(identifierData: IdentifierData, token: Token, timeout?: number) { + checkToken(token, ...identifierData); + while (true) { + const available = Atomics.load(token, 0); + if (available === 0) { + const waitResult = Atomics.wait(token, 0, 0, timeout); + if (waitResult === 'timed-out') { + return waitResult; + } + } + else { + if (Atomics.compareExchange(token, 0, available, available - 1) === available) { + return buildReleaser(token); + } + } + } +} + +export async function acquireAsync(identifierData: IdentifierData, token: Token, timeout?: number) { + checkToken(token, ...identifierData); + while (true) { + const available = Atomics.load(token, 0); + if (available === 0) { + const waitResult = Atomics.waitAsync(token, 0, 0, timeout); + let finalResult = waitResult.async ? await waitResult.value : waitResult.value; + if (finalResult === 'timed-out') { + return finalResult; + } + } + else { + if (Atomics.compareExchange(token, 0, available, available - 1) === available) { + return buildReleaser(token); + } + } + } +} + +/** + * Synchronization object that defines a maximum concurrency value (capacity). + * + * Useful to throttle operations or similar work, such as limiting the number of simultaneous HTTP requests. + */ +export class Semaphore extends SemaphoreInternal { + constructor(capacity: number, createDisabled: boolean = false) { + super(semaphoreIdentityData[0], capacity, createDisabled); + const x = Semaphore.acquire(this.token, 1000); + } + /** + * Acquires a slot from the specified semaphore's token. + * + * **IMPORTANT**: Acquiring reduces the semaphore's capacity. It is *imperative* that the capacity is returned to + * the semaphore by invoking the releaser function that this method returns. Use a `try..finally` construct to + * make sure that the releaser function is always executed, even in the event of an error. + * + * @example + * ```typescript + * const releaser = Semaphore.acquire(mySemaphoreToken); + * try { + * ... + * } + * finally { + * releaser(); + * } + * ``` + * @param token Semaphore token to be acquired. + * @returns A releaser object that can and should be used for releasing the semaphore. + */ + static acquire(token: Token): Releaser; + /** + * Acquires a slot from the specified semaphore's token. + * + * **IMPORTANT**: Acquiring reduces the semaphore's capacity. It is *imperative* that the capacity is returned to + * the semaphore by invoking the releaser function that this method returns. Use a `try..finally` construct to + * make sure that the releaser function is always executed, even in the event of an error. + * + * @example + * ```typescript + * const releaser = Semaphore.acquire(mySemaphoreToken); + * try { + * ... + * } + * finally { + * releaser(); + * } + * ``` + * @param token Semaphore token to be acquired. + * @param timeout Optional timeout value in milliseconds to wait for acquisition. + * @returns A releaser object that can and should be used for releasing the semaphore, or the value `'timed-out'` + * if the sempahore could not be acquired before the specified timeout time elapsed. + */ + static acquire(token: Token, timeout: number): 'timed-out' | Releaser; + static acquire(token: Token, timeout?: number) { + return acquire(semaphoreIdentityData, token, timeout); + } + + /** + * Asynchronously acquires a slot from the specified semaphore's token. + * + * **IMPORTANT**: Acquiring reduces the semaphore's capacity. It is *imperative* that the capacity is returned to + * the semaphore by invoking the releaser function that this method returns. Use a `try..finally` construct to + * make sure that the releaser function is always executed, even in the event of an error. + * + * @example + * ```typescript + * const releaser = await Semaphore.acquireAsync(mySemaphoreToken); + * try { + * ... + * } + * finally { + * releaser(); + * } + * ``` + * @param token Semaphore token to be acquired. + * @returns A releaser object that can and should be used for releasing the semaphore. + */ + static acquireAsync(token: Token): Promise; + /** + * Asynchronously acquires a slot from the specified semaphore's token. + * + * **IMPORTANT**: Acquiring reduces the semaphore's capacity. It is *imperative* that the capacity is returned to + * the semaphore by invoking the releaser function that this method returns. Use a `try..finally` construct to + * make sure that the releaser function is always executed, even in the event of an error. + * + * @example + * ```typescript + * const releaser = await Semaphore.acquireAsync(mySemaphoreToken); + * try { + * ... + * } + * finally { + * releaser(); + * } + * ``` + * @param token Semaphore token to be acquired. + * @param timeout Timeout value in milliseconds to wait for acquisition. + * @returns A releaser object that can and should be used for releasing the semaphore, or the value `'timed-out'` + * if the sempahore could not be acquired before the specified timeout time elapsed. + */ + static acquireAsync(token: Token, timeout: number): Promise<"timed-out" | Releaser>; + static acquireAsync(token: Token, timeout?: number) { + return acquireAsync(semaphoreIdentityData, token, timeout); + } +} diff --git a/src/sync/SyncObject.ts b/src/sync/SyncObject.ts new file mode 100644 index 0000000..acafe52 --- /dev/null +++ b/src/sync/SyncObject.ts @@ -0,0 +1,30 @@ +import { Token } from "../workers"; + +/** + * Base class for synchronization objects. + */ +export class SyncObject { + #buffer; + #array; + /** + * Initializes a new instance of this class. + * @param identifier Token identifier value. It is used to ensure that actions taken on the token are compatible + * with the synchronization object that created it. + * @param initialValue Initial value to store in the shared array buffer's first slot. + */ + constructor(identifier: number, initialValue: number = 0) { + if (globalThis.crossOriginIsolated !== undefined && !crossOriginIsolated) { + throw new Error('Cannot operate: Cross origin is not isolated. See https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/SharedArrayBuffer#security_requirements for details.'); + } + this.#buffer = new SharedArrayBuffer(8); + this.#array = new Int32Array(this.#buffer); + Atomics.store(this.#array, 1, identifier); + Atomics.store(this.#array, 0, initialValue); + } + /** + * Gets the synchronization object's token. + */ + get token(): Token { + return this.#array; + } +}; diff --git a/src/sync/identifiers.ts b/src/sync/identifiers.ts new file mode 100644 index 0000000..c2891b7 --- /dev/null +++ b/src/sync/identifiers.ts @@ -0,0 +1,50 @@ +import { type Token } from "../workers"; + +export type IdentifierData = [ + number, + string, + string +]; + +export const manualResetEventIdentityData = [ + 1, + 'manually-resettable event', + 'a' +] as const satisfies IdentifierData; + +export const autoResetEventIdentityData = [ + 2, + 'automatically-resettable event', + 'an' +] as const satisfies IdentifierData; + +export const cancellationSourceIdentityData = [ + 3, + 'cancellation source', + 'a' +] as const satisfies IdentifierData; + +export const semaphoreIdentityData = [ + 4, + 'semaphore', + 'a' +] as const satisfies IdentifierData; + +export const mutexIdentityData = [ + 5, + 'mutex', + 'a' +] as const satisfies IdentifierData; + +/** + * Ensures the given token is of the expected type by throwing an error if this is not the case. + * @param token Token to check for. + * @param identifier Token type identifier. + * @param objectName Object name, used for constructing the error's message. + * @param article Article for the object name, so the error's message is written in proper English. + */ +export function checkToken(token: Token, identifier: number, objectName: string, article: string) { + if (Atomics.load(token, 1) !== identifier) { + throw new Error(`The provided token is not that of ${article} ${objectName}.`); + } +} diff --git a/src/workers/InternalSharedWorker.ts b/src/workers/InternalSharedWorker.ts index 10f946a..1b860e0 100644 --- a/src/workers/InternalSharedWorker.ts +++ b/src/workers/InternalSharedWorker.ts @@ -20,7 +20,6 @@ export class InternalSharedWorker implements IWorker { #listenerFactory(id: number, processMessage: ProcessMessageFn, resolve: (data:any) => void) { return (ev: MessageEvent) => { if (isTaskCancelledMessage(ev.data) && ev.data.workItemId === id) { - console.log('Received a cancellation: %o', ev.data); this.#rejectMap.get(id)?.(new CancelledMessage(false)); } else if (isAsyncResponse(ev.data) && (ev.data.workItemId === id)) { if (processMessage(ev.data.payload)) { diff --git a/src/workers/InternalWorker.ts b/src/workers/InternalWorker.ts index 533caaf..b814bba 100644 --- a/src/workers/InternalWorker.ts +++ b/src/workers/InternalWorker.ts @@ -23,7 +23,6 @@ export class InternalWorker implements IWorker { #listenerFactory(id: number, processMessage: ProcessMessageFn, resolve: (data:any) => void) { return (ev: MessageEvent) => { if (isTaskCancelledMessage(ev.data) && ev.data.workItemId === id) { - console.log('Received a cancellation: %o', ev.data); this.#rejectMap.get(id)?.(new CancelledMessage(false)); } else if (isAsyncResponse(ev.data) && (ev.data.workItemId === id)) { if (processMessage(ev.data.payload)) {