diff --git a/README.md b/README.md
index 29ad995..6e56064 100644
--- a/README.md
+++ b/README.md
@@ -259,7 +259,7 @@ This package provides synchronization objects that use `Atomics` to cross the th
This is the web worker equivalent of `AbortController` and provides a token that can be passed to workers in a task's
payload.
-The worker can use `CancellationSource.isSignalled()` or `CancellationSource.throwIfSignalled()` through polling in order
+The worker can use `CancellationSource.isSignaled()` or `CancellationSource.throwIfSignaled()` through polling in order
to abort the current work:
```typescript
@@ -268,7 +268,7 @@ import { CancellationSource, type Token } from '@wjfe/async-workers';
function computeSomeStuff(cancelToken?: Token) {
for (let i = 0; i < Number.MAX_SAFE_INTEGER; ++i) {
// No thowing will be done if cancelToken is undefined.
- CancellationSource.throwIfSignalled(cancelToken);
+ CancellationSource.throwIfSignaled(cancelToken);
...
}
}
@@ -300,7 +300,7 @@ If you're using `CancellationSource` on your own:
### ManualResetEvent
-This is a synchronization object that can be used to signal multiple threads at once because it will remain signalled
+This is a synchronization object that can be used to signal multiple threads at once because it will remain signaled
until the `reset()` event is invoked. A typical use case is to use it for pausing a worker's work.
### AutoResetEvent
@@ -373,7 +373,7 @@ work properly in Node.
| - | - | - |
| [x] ManualResetEvent | [x] Simple request/response scenario | [x] Simple request/response scenario |
| [x] AutoResetEvent | [x] Request/multiple response scenario | [x] Request/multiple response scenario |
-| [ ] Semaphore | [x] Strongly-typed tasks | [x] Strongly-typed tasks |
+| [x] Semaphore | [x] Strongly-typed tasks | [x] Strongly-typed tasks |
| [x] CancellationSource | [x] Worker termination | |
-| | [x] Out-of-order work items | [x] Out-of-order work items |
+| [x] Mutex | [x] Out-of-order work items | [x] Out-of-order work items |
| | [x] Task cancellation | [x] Task cancellation|
diff --git a/pages/package-lock.json b/pages/package-lock.json
index dccb104..0027238 100644
--- a/pages/package-lock.json
+++ b/pages/package-lock.json
@@ -3405,23 +3405,23 @@
}
},
"node_modules/svelte": {
- "version": "5.0.0-next.237",
- "resolved": "https://registry.npmjs.org/svelte/-/svelte-5.0.0-next.237.tgz",
- "integrity": "sha512-EkNhMFq6cjfrQzBv+YWaxs8mgRyHlfjMTYMlVkx/lsWq4EUTHTRYzNf2nfWn7cXMYE1fXg8DsR3CZ6zv1qlQeQ==",
+ "version": "5.0.0-next.244",
+ "resolved": "https://registry.npmjs.org/svelte/-/svelte-5.0.0-next.244.tgz",
+ "integrity": "sha512-whSOcKdpuAFd5xD9J2EhuHeRs4J4nHis6NSUKRXpC3HQoCmsoKhyIldMjiv6QFkQpe6QMsid8lwvgLXkZTSC/A==",
"license": "MIT",
"dependencies": {
- "@ampproject/remapping": "^2.2.1",
- "@jridgewell/sourcemap-codec": "^1.4.15",
+ "@ampproject/remapping": "^2.3.0",
+ "@jridgewell/sourcemap-codec": "^1.5.0",
"@types/estree": "^1.0.5",
- "acorn": "^8.11.3",
+ "acorn": "^8.12.1",
"acorn-typescript": "^1.4.13",
"aria-query": "^5.3.0",
- "axobject-query": "^4.0.0",
+ "axobject-query": "^4.1.0",
"esm-env": "^1.0.0",
"esrap": "^1.2.2",
"is-reference": "^3.0.2",
"locate-character": "^3.0.0",
- "magic-string": "^0.30.5",
+ "magic-string": "^0.30.11",
"zimmerframe": "^1.1.2"
},
"engines": {
diff --git a/pages/src/lib/CrossOriginNotIsolated.svelte b/pages/src/lib/CrossOriginNotIsolated.svelte
index 50b7964..2f55e94 100644
--- a/pages/src/lib/CrossOriginNotIsolated.svelte
+++ b/pages/src/lib/CrossOriginNotIsolated.svelte
@@ -15,6 +15,8 @@
ManualResetEvent
AutoResetEvent
CancellationSource
+ Semaphore
+ Mutex
In order to solve this problem in your own project/site,
diff --git a/pages/src/lib/Instructions.svelte b/pages/src/lib/Instructions.svelte
index f5863c4..520ae45 100644
--- a/pages/src/lib/Instructions.svelte
+++ b/pages/src/lib/Instructions.svelte
@@ -2,8 +2,8 @@
Instructions
Play around with the three work items below. Each will run the same example worker, which is a worker that
- calculates prime numbers using a not-so-good algorithm that runs up to the shown number. All three work items
- use the same worker object, so they run serially between each other.
+ calculates prime numbers using a not-so-good algorithm that runs up to the specified number in the slider. All
+ three work items use the same worker object, so they run serially between each other.
{#if crossOriginIsolated}
@@ -13,13 +13,30 @@
here), you may pause or cancel the worker at any time.
- NOTE: Notice how significant the pause wait is in terms of performance. Waiting on the
- token is expensive for sure, so do this only on well-thought-out scenarios.
+ NOTE ON PERFORMANCE: Atomic operations are less performant than polling. Don't go crazy
+ with the synchronization objects, and see if polling fits the bill first. For example, the primes worker
+ in this demo page does polling to condition the wait as opposed to blindly waiting:
+
+ if (!ManualResetEvent.isSignaled(pause)) {
+ ManualResetEvent.wait(pause);
+}
+
+ Still, even with this modification, you'll see a big difference in times between a pausable and a
+ non-pausable run for the same numerical limit.
{:else}
- Since cross origin is not isolated (see
-
- the requirements
- here), you may only cancel the worker before it starts. Furthermore, you cannot pause the worker.
+
+ Since cross origin is not isolated (see
+
+ the requirements
+ here), you may only cancel the worker before it starts. Furthermore, you cannot pause the worker.
+
+
+ To experience the power of the synchronization objects, head to the
+ GitHub repository and clone
+ the source code. This demonstration's code is inside the folder named pages. Open a console,
+ change to the pages folder, install the packages running npm ci, and then execute
+ the development server with npm run dev.
+
{/if}
\ No newline at end of file
diff --git a/pages/src/lib/Primes.svelte b/pages/src/lib/Primes.svelte
index d4d3316..e2bde26 100644
--- a/pages/src/lib/Primes.svelte
+++ b/pages/src/lib/Primes.svelte
@@ -1,7 +1,9 @@
+
+
+
+ {#key hh}
+ {f(hh)}
+ {/key}
+ :
+ {#key mm}
+ {f(mm)}
+ {/key}
+ :
+ {#key ss}
+ {f(ss)}
+ {/key}
+
+
+
+
diff --git a/pages/src/lib/Timer.svelte b/pages/src/lib/Timer.svelte
new file mode 100644
index 0000000..8348f2b
--- /dev/null
+++ b/pages/src/lib/Timer.svelte
@@ -0,0 +1,109 @@
+
+
+
+
+ {#key hh}
+ {f(hh)}
+ {/key}
+ :
+ {#key mm}
+ {f(mm)}
+ {/key}
+ :
+ {#key ss}
+ {f(ss)}
+ {/key}
+
+
+
+
diff --git a/pages/src/workers/exampleWorker.ts b/pages/src/workers/exampleWorker.ts
index 17ae34c..6fd885a 100644
--- a/pages/src/workers/exampleWorker.ts
+++ b/pages/src/workers/exampleWorker.ts
@@ -3,7 +3,7 @@ import { CancellationSource, ManualResetEvent, workerListener, type PostFn, type
function isPrime(n: number, cancelToken?: Token) {
// Made unecessarily inefficient for demo purposes.
for (let i = 2; i <= n / 2; ++i) {
- CancellationSource.throwIfSignalled(cancelToken);
+ CancellationSource.throwIfSignaled(cancelToken);
if (n % i === 0) {
return false;
}
@@ -14,8 +14,10 @@ function isPrime(n: number, cancelToken?: Token) {
function isPrimePausable(n: number, pause: Token, cancelToken?: Token) {
// Made unecessarily inefficient for demo purposes.
for (let i = 2; i <= n / 2; ++i) {
- CancellationSource.throwIfSignalled(cancelToken);
- ManualResetEvent.wait(pause);
+ CancellationSource.throwIfSignaled(cancelToken);
+ if (!ManualResetEvent.isSignaled(pause)) {
+ ManualResetEvent.wait(pause);
+ }
if (n % i === 0) {
return false;
}
@@ -33,7 +35,7 @@ function getAllPrimes(max: number, pause: Token | undefined, post: PostFn, cance
}
export const exampleWorker = {
- sayHello(payload: { name: string; }) {
+ iExistToShowOffIntellisense(payload: { name: string; }) {
console.log('Hello, %s!', payload.name);
},
calculatePrimes(payload: { to: number; pause?: Token }, post: PostFn, cancelToken?: Token) {
diff --git a/src/cancellation/CancellationSource.ts b/src/cancellation/CancellationSource.ts
index a2befc3..93be82b 100644
--- a/src/cancellation/CancellationSource.ts
+++ b/src/cancellation/CancellationSource.ts
@@ -1,27 +1,36 @@
-import { ManualResetEvent } from "../events/ManualResetEvent.js";
+import { Event } from "../sync/Event.js";
+import { cancellationSourceIdentityData } from "../sync/identifiers.js";
+import { isSignaled } from "../sync/ManualResetEvent.js";
import type { Token } from "../workers.js";
import { TaskCancelledError } from "./TaskCancelledError.js";
/**
* Specialized synchronization event object for the purposes of signalling cancellation intent.
*/
-export class CancellationSource extends ManualResetEvent {
+export class CancellationSource extends Event {
+ constructor() {
+ super(cancellationSourceIdentityData[0], undefined);
+ }
/**
- * Do not call. Cancellation tokens cannot be reset.
+ * Checks whether or not a cancellation source's token is in its signaled state.
+ *
+ * This method may be used by worker threads in polling mode.
+ * @param token Token to check.
+ * @returns `true` if the token is signaled, or `false` otherwise.
*/
- reset(): void {
- throw new Error("Cancellation tokens cannot be reset.");
+ static isSignaled(token: Token) {
+ return isSignaled(cancellationSourceIdentityData, token);
}
/**
* Checks the given cancellation token and throws an instance of `TaskCancelledError` if the token is in its
- * signalled state.
+ * signaled state.
* @param token Cancellation token to check.
*/
- static throwIfSignalled(token: Token | undefined) {
+ static throwIfSignaled(token: Token | undefined) {
if (!token) {
return;
}
- if (this.isSignalled(token)) {
+ if (this.isSignaled(token)) {
throw new TaskCancelledError();
}
}
diff --git a/src/cancellation/TaskCancelledError.ts b/src/cancellation/TaskCancelledError.ts
index 5113967..fd43130 100644
--- a/src/cancellation/TaskCancelledError.ts
+++ b/src/cancellation/TaskCancelledError.ts
@@ -1,5 +1,5 @@
/**
- * Error class used by `CancellationSource.throwIfSignalled` to abort a worker thread's current work.
+ * Error class used by `CancellationSource.throwIfSignaled` to abort a worker thread's current work.
*/
export class TaskCancelledError extends Error {
constructor(message?: string, options?: ErrorOptions) {
diff --git a/src/events/AutoResetEvent.ts b/src/events/AutoResetEvent.ts
deleted file mode 100644
index 8094f3b..0000000
--- a/src/events/AutoResetEvent.ts
+++ /dev/null
@@ -1,77 +0,0 @@
-import type { Token } from "../workers.js";
-import { checkToken, Event } from "./Event.js";
-
-const identifier = 0x02;
-const checkData = [
- identifier,
- 'automatically-reset event',
- 'an'
-] as const;
-
-/**
- * Synchronization event object that automatically resets whenever a worker thread is unlocked by it.
- *
- * It is useful to ensure that only one thread at a time runs at any given time.
- */
-export class AutoResetEvent extends Event {
- constructor() {
- super(identifier);
- }
-
- /**
- * Signals the event, unblocking at most one blocked thread.
- */
- signal(): void {
- super.signal();
- Atomics.notify(super.token, 0, 1);
- }
- /**
- * Checks whether or not an auto-resettable event's token is in its signalled state.
- *
- * Auto-reset events automatically reset when a thread is freed by it, so expect this function to only return
- * `true` if the token has been signalled and there were no threads blocked by it.
- * @param token Auto-resettable token to check.
- * @returns `true` if the token is signalled, or `false` otherwise.
- */
- static isSignalled(token: Token) {
- checkToken(token, ...checkData);
- return Atomics.load(token, 0) === 1;
- }
- /**
- * Waits on the specified auto-resettable token to be signalled.
- *
- * Use this method to block a worker's thread for whatever reason.
- * @param token Auto-reset token to wait on.
- * @param timeout Maximum time to wait on the token. Don't specify a value to wait indefinitely.
- * @returns `'ok'` when the waiting is over because the token signalled, `timed-out` when the specified timeout
- * elapsed and the token did not signal, or `not-equal` if no wait took place (which should not happen for
- * auto-reset events).
- */
- static wait(token: Token, timeout?: number) {
- checkToken(token, ...checkData);
- const result = Atomics.wait(token, 0, 0, timeout);
- if (result === 'ok') {
- Atomics.store(token, 0, 0);
- }
- return result;
- }
- /**
- * Asynchronously waits on the specified auto-reset token to be signalled.
- *
- * Use this method to stop the current work and release the worker thread (to pick up on new messages, perhaps).
- * @param token Auto-reset token to wait on.
- * @param timeout Maximum time to wait on the token. Don't specify a value to wait indefinitely.
- * @returns `'ok'` when the waiting is over because the token signalled, `timed-out` when the specified timeout
- * elapsed and the token did not signal, or `not-equal` if no wait took place (which should not happen for
- * auto-reset events).
- */
- static async waitAsync(token: Token, timeout?: number) {
- checkToken(token, ...checkData);
- const result = Atomics.waitAsync(token, 0, 0, timeout);
- const finalResult = result.async ? await result.value : result.value;
- if (finalResult !== 'timed-out') {
- Atomics.store(token, 0, 0);
- }
- return finalResult;
- }
-};
diff --git a/src/events/Event.ts b/src/events/Event.ts
deleted file mode 100644
index 8bf1ec9..0000000
--- a/src/events/Event.ts
+++ /dev/null
@@ -1,41 +0,0 @@
-import type { Token } from "../workers";
-
-export class Event {
- #buffer;
- #array;
- #identifier;
- constructor(identifier: number) {
- if (!crossOriginIsolated) {
- throw new Error('Cannot operate: Cross origin is not isolated. See https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/SharedArrayBuffer#security_requirements for details.');
- }
- this.#buffer = new SharedArrayBuffer(8);
- this.#array = new Int32Array(this.#buffer);
- this.#identifier = identifier;
- Atomics.store(this.#array, 1, identifier);
- }
- /**
- * Gets the synchronization event's token.
- */
- get token(): Token {
- return this.#array;
- }
- /**
- * Signals the event.
- */
- signal() {
- Atomics.store(this.#array, 0, 1);
- }
-};
-
-/**
- * Ensures the given token is of the expected type by throwing an error if this is not the case.
- * @param token Token to check for.
- * @param identifier Token type identifier.
- * @param objectName Object name, used for constructing the error's message.
- * @param article Article for the object name, so the error's message is written in proper English.
- */
-export function checkToken(token: Token, identifier: number, objectName: string, article: string) {
- if (Atomics.load(token, 1) !== identifier) {
- throw new Error(`The provided token is not that of ${article} ${objectName}.`);
- }
-}
diff --git a/src/events/ManualResetEvent.ts b/src/events/ManualResetEvent.ts
deleted file mode 100644
index 613fe9e..0000000
--- a/src/events/ManualResetEvent.ts
+++ /dev/null
@@ -1,71 +0,0 @@
-import type { Token } from "../workers.js";
-import { checkToken, Event } from "./Event.js";
-
-const identifier = 0x01;
-const checkData = [
- identifier,
- 'manually-reset event',
- 'a'
-] as const;
-
-/**
- * Synchronization event object that can be signalled on-demand whenever it is appropriate.
- *
- * It is useful in cases where external control is the priority, such as pausing work.
- */
-export class ManualResetEvent extends Event {
- constructor() {
- super(identifier);
- }
- /**
- * Signals the event, unblocking all threads that are waiting on it. Use `reset()` to revert the signalled state.
- */
- signal(): void {
- super.signal();
- Atomics.notify(super.token, 0);
- }
- /**
- * Resets the token.
- */
- reset() {
- Atomics.store(super.token, 0, 0);
- }
- /**
- * Checks whether or not a manually-resettable event's token is in its signalled state.
- *
- * This method may be used by worker threads in polling mode.
- * @param token Manually-resettable token to check.
- * @returns `true` if the token is signalled, or `false` otherwise.
- */
- static isSignalled(token: Token) {
- checkToken(token, ...checkData);
- return Atomics.load(token, 0) === 1;
- }
- /**
- * Waits on the specified manually-resettable token to be signalled.
- *
- * Use this method to block a worker's thread for whatever reason.
- * @param token Manually-resettable token to wait on.
- * @param timeout Maximum time to wait on the token. Don't specify a value to wait indefinitely.
- * @returns `'ok'` when the waiting is over because the token signalled, `timed-out` when the specified timeout
- * elapsed and the token did not signal, or `not-equal` if no wait took place.
- */
- static wait(token: Token, timeout?: number) {
- checkToken(token, ...checkData);
- return Atomics.wait(token, 0, 0, timeout);
- }
- /**
- * Asynchronously waits on the specified manually-resettable token to be signalled.
- *
- * Use this method to stop the current work and release the worker thread (to pick up on new messages, perhaps).
- * @param token Manually-resettable token to wait on.
- * @param timeout Maximum time to wait on the token. Don't specify a value to wait indefinitely.
- * @returns `'ok'` when the waiting is over because the token signalled, `timed-out` when the specified timeout
- * elapsed and the token did not signal, or `not-equal` if no wait took place.
- */
- static async waitAsync(token: Token, timeout?: number) {
- checkToken(token, ...checkData);
- const result = Atomics.waitAsync(token, 0, 0, timeout);
- return result.async ? await result.value : result.value;
- }
-};
diff --git a/src/index.ts b/src/index.ts
index 4b345d4..9128e6d 100644
--- a/src/index.ts
+++ b/src/index.ts
@@ -1,10 +1,12 @@
// cancellation
-export * from "./cancellation/CancellationSource.js";
-export * from "./cancellation/CancelledMessage.js";
-export * from "./cancellation/TaskCancelledError.js";
-// events
-export * from "./events/AutoResetEvent.js";
-export * from "./events/ManualResetEvent.js";
+export { CancellationSource } from "./cancellation/CancellationSource.js";
+export { CancelledMessage } from "./cancellation/CancelledMessage.js";
+export { TaskCancelledError } from "./cancellation/TaskCancelledError.js";
+// synchronization objects
+export { AutoResetEvent } from "./sync/AutoResetEvent.js";
+export { ManualResetEvent } from "./sync/ManualResetEvent.js";
+export { Mutex } from "./sync/Mutex.js";
+export { Semaphore } from "./sync/Semaphore.js";
// workers
export * from './workers/AsyncWorker.js';
export * from "./workers/workerListener.js";
@@ -37,5 +39,3 @@ export type AsyncMessage any>> =
cancelToken?: Token;
payload?: WorkerTasks[keyof Tasks]['payload'];
};
-
-export type ProcessMessageFn = (payload: any, cancelToken?: Token) => boolean;
diff --git a/src/sync/AutoResetEvent.ts b/src/sync/AutoResetEvent.ts
new file mode 100644
index 0000000..b2204d8
--- /dev/null
+++ b/src/sync/AutoResetEvent.ts
@@ -0,0 +1,77 @@
+import type { Token } from "../workers.js";
+import { Event } from "./Event.js";
+import { autoResetEventIdentityData, checkToken } from "./identifiers.js";
+
+/**
+ * Synchronization event object that automatically resets whenever a worker thread is unlocked by it.
+ *
+ * It is useful to ensure that only one thread at a time runs at any given time.
+ */
+export class AutoResetEvent extends Event {
+ constructor() {
+ super(autoResetEventIdentityData[0], 1);
+ }
+ /**
+ * Checks whether or not an auto-resettable event's token is in its signaled state.
+ *
+ * Auto-reset events automatically reset when a thread is freed by it, so expect this function to only return
+ * `true` if the token has been signaled and there were no threads blocked by it.
+ * @param token Token to check.
+ * @returns `true` if the token is signaled, or `false` otherwise.
+ */
+ static isSignaled(token: Token) {
+ checkToken(token, ...autoResetEventIdentityData);
+ return Atomics.load(token, 0) === 1;
+ }
+ /**
+ * Waits on the specified auto-resettable event's token to be signaled.
+ *
+ * Use this method to block a worker's thread for whatever reason.
+ * @param token Token to wait on.
+ * @param timeout Maximum time to wait on the token. Don't specify a value to wait indefinitely.
+ * @returns `'ok'` when the waiting is over because the token signaled while waiting on it, `'timed-out'` when the
+ * specified timeout elapsed and the token did not signal, or `'not-equal'` if no wait took place.
+ */
+ static wait(token: Token, timeout?: number) {
+ checkToken(token, ...autoResetEventIdentityData);
+ // Performance optimization: Blind attempt.
+ if (Atomics.compareExchange(token, 0, 1, 0) === 1) {
+ return 'not-equal';
+ }
+ while (true) {
+ const result = Atomics.wait(token, 0, 0, timeout);
+ if (result === 'timed-out') {
+ return result;
+ }
+ if (Atomics.compareExchange(token, 0, 1, 0) === 1) {
+ return result;
+ }
+ }
+ }
+ /**
+ * Asynchronously waits on the specified auto-reset token to be signaled.
+ *
+ * Use this method to stop the current work and release the worker thread (to pick up on new messages, perhaps).
+ * @param token Token to wait on.
+ * @param timeout Maximum time to wait on the token. Don't specify a value to wait indefinitely.
+ * @returns `'ok'` when the waiting is over because the token signaled while waiting on it, `'timed-out'` when the
+ * specified timeout elapsed and the token did not signal, or `'not-equal'` if no wait took place.
+ */
+ static async waitAsync(token: Token, timeout?: number) {
+ checkToken(token, ...autoResetEventIdentityData);
+ // Performance optimization: Blind attempt.
+ if (Atomics.compareExchange(token, 0, 1, 0) === 1) {
+ return 'not-equal';
+ }
+ while (true) {
+ const result = Atomics.waitAsync(token, 0, 0, timeout);
+ const finalResult = result.async ? await result.value : result.value;
+ if (finalResult === 'timed-out') {
+ return finalResult;
+ }
+ if (Atomics.compareExchange(token, 0, 1, 0) === 1) {
+ return finalResult;
+ }
+ }
+ }
+};
diff --git a/src/sync/Event.ts b/src/sync/Event.ts
new file mode 100644
index 0000000..080800c
--- /dev/null
+++ b/src/sync/Event.ts
@@ -0,0 +1,19 @@
+import { SyncObject } from "./SyncObject.js";
+
+/**
+ * Base class for event synchronization objects.
+ */
+export class Event extends SyncObject {
+ #threadsToNofity;
+ constructor(identifier: number, threadsToNotify: number | undefined) {
+ super(identifier);
+ this.#threadsToNofity = threadsToNotify;
+ }
+ /**
+ * Signals the event.
+ */
+ signal() {
+ Atomics.store(this.token, 0, 1);
+ Atomics.notify(this.token, 0, this.#threadsToNofity);
+ }
+}
diff --git a/src/sync/ManualResetEvent.ts b/src/sync/ManualResetEvent.ts
new file mode 100644
index 0000000..c974d54
--- /dev/null
+++ b/src/sync/ManualResetEvent.ts
@@ -0,0 +1,70 @@
+import type { Token } from "../workers.js";
+import { Event } from './Event.js';
+import { checkToken, manualResetEventIdentityData, type IdentifierData } from "./identifiers.js";
+
+export function isSignaled(identifierData: IdentifierData, token: Token) {
+ checkToken(token, ...identifierData);
+ return Atomics.load(token, 0) === 1;
+}
+
+export function wait(identifierData: IdentifierData, token: Token, timeout?: number) {
+ checkToken(token, ...identifierData);
+ return Atomics.wait(token, 0, 0, timeout);
+}
+
+export async function waitAsync(identifierData: IdentifierData, token: Token, timeout?: number) {
+ checkToken(token, ...identifierData);
+ const result = Atomics.waitAsync(token, 0, 0, timeout);
+ return result.async ? await result.value : result.value;
+}
+
+/**
+ * Synchronization event object that can be signaled on-demand whenever it is appropriate.
+ *
+ * It is useful in cases where external control is the priority, such as pausing work.
+ */
+export class ManualResetEvent extends Event {
+ constructor() {
+ super(manualResetEventIdentityData[0], undefined);
+ }
+ /**
+ * Resets the token.
+ */
+ reset() {
+ Atomics.store(super.token, 0, 0);
+ }
+ /**
+ * Checks whether or not a manually-resettable event's token is in its signaled state.
+ *
+ * This method may be used by worker threads in polling mode.
+ * @param token Token to check.
+ * @returns `true` if the token is signaled, or `false` otherwise.
+ */
+ static isSignaled(token: Token) {
+ return isSignaled(manualResetEventIdentityData, token);
+ }
+ /**
+ * Waits on the specified manually-resettable token to be signaled.
+ *
+ * Use this method to block a worker's thread for whatever reason.
+ * @param token Token to wait on.
+ * @param timeout Maximum time to wait on the token. Don't specify a value to wait indefinitely.
+ * @returns `'ok'` when the waiting is over because the token signaled while waiting on it, `'timed-out'` when the
+ * specified timeout elapsed and the token did not signal, or `'not-equal'` if no wait took place.
+ */
+ static wait(token: Token, timeout?: number) {
+ return wait(manualResetEventIdentityData, token, timeout);
+ }
+ /**
+ * Asynchronously waits on the specified manually-resettable token to be signaled.
+ *
+ * Use this method to stop the current work and release the worker thread (to pick up on new messages, perhaps).
+ * @param token Token to wait on.
+ * @param timeout Maximum time to wait on the token. Don't specify a value to wait indefinitely.
+ * @returns `'ok'` when the waiting is over because the token signaled while waiting on it, `'timed-out'` when the
+ * specified timeout elapsed and the token did not signal, or `'not-equal'` if no wait took place.
+ */
+ static waitAsync(token: Token, timeout?: number) {
+ return waitAsync(manualResetEventIdentityData, token, timeout);
+ }
+};
diff --git a/src/sync/Mutex.ts b/src/sync/Mutex.ts
new file mode 100644
index 0000000..554c195
--- /dev/null
+++ b/src/sync/Mutex.ts
@@ -0,0 +1,63 @@
+import { Token } from "../workers";
+import { mutexIdentityData } from "./identifiers";
+import { acquire, acquireAsync, SemaphoreInternal, type Releaser } from "./Semaphore";
+
+/**
+ * Synchronization object that can be used to grant a single thread exclusive access to a resource or critical section.
+ */
+export class Mutex extends SemaphoreInternal {
+ constructor(createDisabled: boolean = false) {
+ super(mutexIdentityData[0], 1, createDisabled);
+ }
+
+ /**
+ * Acquires exclusivity from the specified mutex's token.
+ *
+ * **IMPORTANT**: Acquiring a mutex halts other work. Releasing the mutex by invoking the releaser function that
+ * this method returns is *imperative*. Use a `try..finally` construct to make sure that the releaser function is
+ * always executed, even in the event of an error.
+ *
+ * @example
+ * ```typescript
+ * const releaser = await Mutex.acquireAsync(myMutexToken);
+ * try {
+ * ...
+ * }
+ * finally {
+ * releaser();
+ * }
+ * ```
+ * @param token Mutex token to be acquired.
+ * @returns A releaser object that can and should be used for releasing the mutex.
+ */
+ static acquire(token: Token): Releaser;
+ /**
+ * Acquires exclusivity from the specified mutex's token.
+ *
+ * **IMPORTANT**: Acquiring a mutex halts other work. Releasing the mutex by invoking the releaser function that
+ * this method returns is *imperative*. Use a `try..finally` construct to make sure that the releaser function is
+ * always executed, even in the event of an error.
+ *
+ * @example
+ * ```typescript
+ * const releaser = await Mutex.acquireAsync(myMutexToken);
+ * try {
+ * ...
+ * }
+ * finally {
+ * releaser();
+ * }
+ * ```
+ * @param token Mutex token to be acquired.
+ * @param timeout Timeout value in milliseconds to wait for acquisition.
+ * @returns A releaser object that can and should be used for releasing the mutex, or the value `'timed-out'` if
+ * the mutex could not be acquired before the specified timeout time elapsed.
+ */
+ static acquire(token: Token, timeout: number): Releaser | "timed-out";
+ static acquire(token: Token, timeout?: number) {
+ return acquire(mutexIdentityData, token, timeout);
+ }
+ static acquireAsync(token: Token, timeout?: number) {
+ return acquireAsync(mutexIdentityData, token, timeout);
+ }
+};
diff --git a/src/sync/Semaphore.ts b/src/sync/Semaphore.ts
new file mode 100644
index 0000000..ff894be
--- /dev/null
+++ b/src/sync/Semaphore.ts
@@ -0,0 +1,191 @@
+import type { Token } from "../workers.js";
+import { checkToken, semaphoreIdentityData, type IdentifierData } from "./identifiers.js";
+import { SyncObject } from "./SyncObject.js";
+
+/**
+ * Function used to release an acquired synchronization object.
+ */
+export type Releaser = () => void;
+
+export class SemaphoreInternal extends SyncObject {
+ #capacity;
+ #disabled;
+ constructor(identifier: number, capacity: number, createDisabled: boolean) {
+ if (capacity <= 0 || !Number.isInteger(capacity)) {
+ throw new Error("A semaphore's capacity can only be a postive integer.");
+ }
+ super(identifier, createDisabled ? 0 : capacity);
+ this.#capacity = capacity;
+ this.#disabled = createDisabled;
+ }
+
+ /**
+ * Enables the synchronization object if it was created in a disabled state and hasn't been enabled yet.
+ * @returns `true` if the synchronization object was disabled and got enabled, or `false` otherwise.
+ */
+ enable() {
+ if (!this.#disabled) {
+ return false;
+ }
+ this.#disabled = false;
+ Atomics.store(this.token, 0, this.#capacity);
+ Atomics.notify(this.token, 0);
+ return true;
+ }
+};
+
+function buildReleaser(token: Token) {
+ let released = false;
+ return (() => {
+ if (released) {
+ throw new Error('The semaphore has already been released and cannot be released again.');
+ }
+ Atomics.add(token, 0, 1);
+ Atomics.notify(token, 0, 1);
+ released = true;
+ }) as Releaser;
+}
+
+export function acquire(identifierData: IdentifierData, token: Token, timeout?: number) {
+ checkToken(token, ...identifierData);
+ while (true) {
+ const available = Atomics.load(token, 0);
+ if (available === 0) {
+ const waitResult = Atomics.wait(token, 0, 0, timeout);
+ if (waitResult === 'timed-out') {
+ return waitResult;
+ }
+ }
+ else {
+ if (Atomics.compareExchange(token, 0, available, available - 1) === available) {
+ return buildReleaser(token);
+ }
+ }
+ }
+}
+
+export async function acquireAsync(identifierData: IdentifierData, token: Token, timeout?: number) {
+ checkToken(token, ...identifierData);
+ while (true) {
+ const available = Atomics.load(token, 0);
+ if (available === 0) {
+ const waitResult = Atomics.waitAsync(token, 0, 0, timeout);
+ let finalResult = waitResult.async ? await waitResult.value : waitResult.value;
+ if (finalResult === 'timed-out') {
+ return finalResult;
+ }
+ }
+ else {
+ if (Atomics.compareExchange(token, 0, available, available - 1) === available) {
+ return buildReleaser(token);
+ }
+ }
+ }
+}
+
+/**
+ * Synchronization object that defines a maximum concurrency value (capacity).
+ *
+ * Useful to throttle operations or similar work, such as limiting the number of simultaneous HTTP requests.
+ */
+export class Semaphore extends SemaphoreInternal {
+ constructor(capacity: number, createDisabled: boolean = false) {
+ super(semaphoreIdentityData[0], capacity, createDisabled);
+ const x = Semaphore.acquire(this.token, 1000);
+ }
+ /**
+ * Acquires a slot from the specified semaphore's token.
+ *
+ * **IMPORTANT**: Acquiring reduces the semaphore's capacity. It is *imperative* that the capacity is returned to
+ * the semaphore by invoking the releaser function that this method returns. Use a `try..finally` construct to
+ * make sure that the releaser function is always executed, even in the event of an error.
+ *
+ * @example
+ * ```typescript
+ * const releaser = Semaphore.acquire(mySemaphoreToken);
+ * try {
+ * ...
+ * }
+ * finally {
+ * releaser();
+ * }
+ * ```
+ * @param token Semaphore token to be acquired.
+ * @returns A releaser object that can and should be used for releasing the semaphore.
+ */
+ static acquire(token: Token): Releaser;
+ /**
+ * Acquires a slot from the specified semaphore's token.
+ *
+ * **IMPORTANT**: Acquiring reduces the semaphore's capacity. It is *imperative* that the capacity is returned to
+ * the semaphore by invoking the releaser function that this method returns. Use a `try..finally` construct to
+ * make sure that the releaser function is always executed, even in the event of an error.
+ *
+ * @example
+ * ```typescript
+ * const releaser = Semaphore.acquire(mySemaphoreToken);
+ * try {
+ * ...
+ * }
+ * finally {
+ * releaser();
+ * }
+ * ```
+ * @param token Semaphore token to be acquired.
+ * @param timeout Optional timeout value in milliseconds to wait for acquisition.
+ * @returns A releaser object that can and should be used for releasing the semaphore, or the value `'timed-out'`
+ * if the sempahore could not be acquired before the specified timeout time elapsed.
+ */
+ static acquire(token: Token, timeout: number): 'timed-out' | Releaser;
+ static acquire(token: Token, timeout?: number) {
+ return acquire(semaphoreIdentityData, token, timeout);
+ }
+
+ /**
+ * Asynchronously acquires a slot from the specified semaphore's token.
+ *
+ * **IMPORTANT**: Acquiring reduces the semaphore's capacity. It is *imperative* that the capacity is returned to
+ * the semaphore by invoking the releaser function that this method returns. Use a `try..finally` construct to
+ * make sure that the releaser function is always executed, even in the event of an error.
+ *
+ * @example
+ * ```typescript
+ * const releaser = await Semaphore.acquireAsync(mySemaphoreToken);
+ * try {
+ * ...
+ * }
+ * finally {
+ * releaser();
+ * }
+ * ```
+ * @param token Semaphore token to be acquired.
+ * @returns A releaser object that can and should be used for releasing the semaphore.
+ */
+ static acquireAsync(token: Token): Promise;
+ /**
+ * Asynchronously acquires a slot from the specified semaphore's token.
+ *
+ * **IMPORTANT**: Acquiring reduces the semaphore's capacity. It is *imperative* that the capacity is returned to
+ * the semaphore by invoking the releaser function that this method returns. Use a `try..finally` construct to
+ * make sure that the releaser function is always executed, even in the event of an error.
+ *
+ * @example
+ * ```typescript
+ * const releaser = await Semaphore.acquireAsync(mySemaphoreToken);
+ * try {
+ * ...
+ * }
+ * finally {
+ * releaser();
+ * }
+ * ```
+ * @param token Semaphore token to be acquired.
+ * @param timeout Timeout value in milliseconds to wait for acquisition.
+ * @returns A releaser object that can and should be used for releasing the semaphore, or the value `'timed-out'`
+ * if the sempahore could not be acquired before the specified timeout time elapsed.
+ */
+ static acquireAsync(token: Token, timeout: number): Promise<"timed-out" | Releaser>;
+ static acquireAsync(token: Token, timeout?: number) {
+ return acquireAsync(semaphoreIdentityData, token, timeout);
+ }
+}
diff --git a/src/sync/SyncObject.ts b/src/sync/SyncObject.ts
new file mode 100644
index 0000000..acafe52
--- /dev/null
+++ b/src/sync/SyncObject.ts
@@ -0,0 +1,30 @@
+import { Token } from "../workers";
+
+/**
+ * Base class for synchronization objects.
+ */
+export class SyncObject {
+ #buffer;
+ #array;
+ /**
+ * Initializes a new instance of this class.
+ * @param identifier Token identifier value. It is used to ensure that actions taken on the token are compatible
+ * with the synchronization object that created it.
+ * @param initialValue Initial value to store in the shared array buffer's first slot.
+ */
+ constructor(identifier: number, initialValue: number = 0) {
+ if (globalThis.crossOriginIsolated !== undefined && !crossOriginIsolated) {
+ throw new Error('Cannot operate: Cross origin is not isolated. See https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/SharedArrayBuffer#security_requirements for details.');
+ }
+ this.#buffer = new SharedArrayBuffer(8);
+ this.#array = new Int32Array(this.#buffer);
+ Atomics.store(this.#array, 1, identifier);
+ Atomics.store(this.#array, 0, initialValue);
+ }
+ /**
+ * Gets the synchronization object's token.
+ */
+ get token(): Token {
+ return this.#array;
+ }
+};
diff --git a/src/sync/identifiers.ts b/src/sync/identifiers.ts
new file mode 100644
index 0000000..c2891b7
--- /dev/null
+++ b/src/sync/identifiers.ts
@@ -0,0 +1,50 @@
+import { type Token } from "../workers";
+
+export type IdentifierData = [
+ number,
+ string,
+ string
+];
+
+export const manualResetEventIdentityData = [
+ 1,
+ 'manually-resettable event',
+ 'a'
+] as const satisfies IdentifierData;
+
+export const autoResetEventIdentityData = [
+ 2,
+ 'automatically-resettable event',
+ 'an'
+] as const satisfies IdentifierData;
+
+export const cancellationSourceIdentityData = [
+ 3,
+ 'cancellation source',
+ 'a'
+] as const satisfies IdentifierData;
+
+export const semaphoreIdentityData = [
+ 4,
+ 'semaphore',
+ 'a'
+] as const satisfies IdentifierData;
+
+export const mutexIdentityData = [
+ 5,
+ 'mutex',
+ 'a'
+] as const satisfies IdentifierData;
+
+/**
+ * Ensures the given token is of the expected type by throwing an error if this is not the case.
+ * @param token Token to check for.
+ * @param identifier Token type identifier.
+ * @param objectName Object name, used for constructing the error's message.
+ * @param article Article for the object name, so the error's message is written in proper English.
+ */
+export function checkToken(token: Token, identifier: number, objectName: string, article: string) {
+ if (Atomics.load(token, 1) !== identifier) {
+ throw new Error(`The provided token is not that of ${article} ${objectName}.`);
+ }
+}
diff --git a/src/workers/InternalSharedWorker.ts b/src/workers/InternalSharedWorker.ts
index 10f946a..1b860e0 100644
--- a/src/workers/InternalSharedWorker.ts
+++ b/src/workers/InternalSharedWorker.ts
@@ -20,7 +20,6 @@ export class InternalSharedWorker implements IWorker {
#listenerFactory(id: number, processMessage: ProcessMessageFn, resolve: (data:any) => void) {
return (ev: MessageEvent) => {
if (isTaskCancelledMessage(ev.data) && ev.data.workItemId === id) {
- console.log('Received a cancellation: %o', ev.data);
this.#rejectMap.get(id)?.(new CancelledMessage(false));
} else if (isAsyncResponse(ev.data) && (ev.data.workItemId === id)) {
if (processMessage(ev.data.payload)) {
diff --git a/src/workers/InternalWorker.ts b/src/workers/InternalWorker.ts
index 533caaf..b814bba 100644
--- a/src/workers/InternalWorker.ts
+++ b/src/workers/InternalWorker.ts
@@ -23,7 +23,6 @@ export class InternalWorker implements IWorker {
#listenerFactory(id: number, processMessage: ProcessMessageFn, resolve: (data:any) => void) {
return (ev: MessageEvent) => {
if (isTaskCancelledMessage(ev.data) && ev.data.workItemId === id) {
- console.log('Received a cancellation: %o', ev.data);
this.#rejectMap.get(id)?.(new CancelledMessage(false));
} else if (isAsyncResponse(ev.data) && (ev.data.workItemId === id)) {
if (processMessage(ev.data.payload)) {