Skip to content

Commit

Permalink
Added support for Svelte & maxThread config option
Browse files Browse the repository at this point in the history
  • Loading branch information
W4G1 committed Dec 19, 2023
1 parent 0149b6b commit 9fb4ddd
Show file tree
Hide file tree
Showing 7 changed files with 106 additions and 33 deletions.
36 changes: 34 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@

# multithreading

Multithreading is a tiny runtime that allows you to execute functions on separate threads. It is designed to be as simple and fast as possible, and to be used in a similar way to regular functions.
Multithreading is a tiny runtime that allows you to execute JavaScript functions on separate threads. It is designed to be as simple and fast as possible, and to be used in a similar way to regular functions.

With a minified size of only 3.8kb, it has first class support for [Node.js](https://nodejs.org/), [Deno](https://deno.com/) and the [browser](https://caniuse.com/?search=webworkers). It can also be used with any framework or library such as [React](https://react.dev/), [Vue](https://vuejs.org/) or [Svelte](https://svelte.dev/).
With a minified size of only 4.5kb, it has first class support for [Node.js](https://nodejs.org/), [Deno](https://deno.com/) and the [browser](https://caniuse.com/?search=webworkers). It can also be used with any framework or library such as [React](https://react.dev/), [Vue](https://vuejs.org/) or [Svelte](https://svelte.dev/).

Depending on the environment, it uses [Worker Threads](https://nodejs.org/api/worker_threads.html) or [Web Workers](https://developer.mozilla.org/en-US/docs/Web/API/Worker). In addition to [ES6 generators](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Statements/function*) to make multithreading as simple as possible.

Expand Down Expand Up @@ -110,3 +110,35 @@ In this example, the `add` function is used within the multithreaded `addBalance
As with previous examples, the shared state is managed using `$claim` and `$unclaim` to guarantee proper synchronization and prevent data conflicts.

> External functions like `add` cannot have external dependencies themselves. All variables and functions used by an external function must be declared within the function itself.
### Using imports from external packages

When using external modules, you can dynamically import them by using the `import()` statement. This is useful when you want to use other packages within a threaded function.

```js
import { threaded } from "multithreading";

const getId = threaded(async function* () {
yield {};

const uuid = await import("uuid"); // Import other package

return uuid.v4();
}

console.log(await getId()); // 1a107623-3052-4f61-aca9-9d9388fb2d81
```
### Usage with Svelte
Svelte disallows imports whose name start with a `$`. To use multithreading with Svelte, you can also retrieve `$claim` and `$unclaim` directly from the `yield` statement.
```js
import { threaded } from "multithreading";

const fn = threaded(function* () {
const { $claim, $unclaim } = yield {};

// ...
}
```
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "multithreading",
"version": "0.1.14",
"version": "0.1.15",
"description": "⚡ Multithreading functions in JavaScript, designed to be as simple and fast as possible.",
"author": "Walter van der Giessen <waltervdgiessen@gmail.com>",
"homepage": "https://multithreading.io",
Expand Down Expand Up @@ -68,4 +68,4 @@
"optionalDependencies": {
"@rollup/rollup-linux-x64-gnu": "^4.9.1"
}
}
}
2 changes: 1 addition & 1 deletion rollup.config.dev.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ export default ["cjs"].flatMap((type) => {
],
output: [
{
file: `dist/${type}/index.${ext}`,
file: `dist/index.${ext}`,
format: type,
sourcemap: false,
name: "multithreading",
Expand Down
51 changes: 43 additions & 8 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,26 +20,60 @@ type CommonGenerator<T, TReturn, TNext> =

type UserFunction<T extends Array<unknown> = [], TReturn = void> = (
...args: T
) => CommonGenerator<any, TReturn, void>;
) => CommonGenerator<
any,
TReturn,
{
$claim: typeof $claim;
$unclaim: typeof $unclaim;
}
>;

interface ThreadedConfig {
debug: boolean;
maxThreads: number;
}

export function threaded<T extends Array<unknown>, TReturn>(
fn: UserFunction<T, TReturn>
): ((...args: T) => Promise<TReturn>) & { dispose: () => void };

export function threaded<T extends Array<unknown>, TReturn>(
config: Partial<ThreadedConfig>,
fn: UserFunction<T, TReturn>
): ((...args: T) => Promise<TReturn>) & { dispose: () => void };

export function threaded<T extends Array<unknown>, TReturn>(
configOrFn: Partial<ThreadedConfig> | UserFunction<T, TReturn>,
maybeFn?: UserFunction<T, TReturn>
): ((...args: T) => Promise<TReturn>) & { dispose: () => void } {
const config: ThreadedConfig = {
debug: false,
maxThreads:
typeof navigator !== "undefined" ? navigator.hardwareConcurrency : 4,
};
let fn: UserFunction<T, TReturn>;

if (typeof configOrFn === "function") {
fn = configOrFn as UserFunction<T, TReturn>;
} else {
Object.assign(config, configOrFn);
fn = maybeFn as UserFunction<T, TReturn>;
}

let context: Record<string, any> = {};
const workerPool: Worker[] = [];
const invocationQueue = new Map<number, PromiseWithResolvers<TReturn>>();

workerPools.set(fn, workerPool);

const workerCount =
typeof navigator !== "undefined" ? navigator.hardwareConcurrency : 4;
let invocationCount = 0;

const init = (async () => {
let fnStr = fn.toString();
const hasDependencies = fnStr.includes("yield");
const hasYield = fnStr.includes("yield");

if (hasDependencies) {
if (hasYield) {
// @ts-ignore - Call function without arguments
const gen = fn();
const result = await gen.next();
Expand Down Expand Up @@ -68,7 +102,7 @@ export function threaded<T extends Array<unknown>, TReturn>(
// Polyfill for Node.js
globalThis.Worker ??= (await import("web-worker")).default;

for (let i = 0; i < workerCount; i++) {
for (let i = 0; i < config.maxThreads; i++) {
const worker = new Worker(
"data:text/javascript;charset=utf-8," +
encodeURIComponent(workerCode.join("\n")),
Expand All @@ -91,8 +125,9 @@ export function threaded<T extends Array<unknown>, TReturn>(
[$.EventType]: $.Init,
[$.EventValue]: {
[$.ProcessId]: i,
[$.HasYield]: hasDependencies,
[$.HasYield]: hasYield,
[$.Variables]: serializedVariables,
[$.DebugEnabled]: config.debug,
},
} satisfies MainEvent);
}
Expand All @@ -101,7 +136,7 @@ export function threaded<T extends Array<unknown>, TReturn>(
const wrapper = async (...args: T) => {
await init;

const worker = workerPool[invocationCount % workerCount];
const worker = workerPool[invocationCount % config.maxThreads];

const pwr = Promise.withResolvers<TReturn>();
invocationQueue.set(invocationCount, pwr);
Expand Down
2 changes: 2 additions & 0 deletions src/lib/keys.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ export const HasYield = "g";
export const InvocationId = "h";
export const Value = "i";
export const ProcessId = "j";
export const DebugEnabled = "k";

export declare type Function = typeof Function;
export declare type Other = typeof Other;
Expand All @@ -41,3 +42,4 @@ export declare type HasYield = typeof HasYield;
export declare type InvocationId = typeof InvocationId;
export declare type Value = typeof Value;
export declare type ProcessId = typeof ProcessId;
export declare type DebugEnabled = typeof DebugEnabled;
1 change: 1 addition & 0 deletions src/lib/types.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ interface InitEvent {
[$.ProcessId]: number;
[$.HasYield]: boolean;
[$.Variables]: Record<string, any>;
[$.DebugEnabled]: boolean;
};
}

Expand Down
43 changes: 23 additions & 20 deletions src/lib/worker.worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,21 @@ import { deserialize } from "./serialize.ts";
import * as $ from "./keys.ts";
import {
ClaimAcceptanceEvent,
ClaimEvent,
InitEvent,
InvocationEvent,
MainEvent,
ReturnEvent,
SynchronizationEvent,
ThreadEvent,
UnclaimEvent,
} from "./types";
import { replaceContents } from "./replaceContents.ts";

declare var pid: number;

const cyanStart = "\x1b[36m";
const cyanEnd = "\x1b[39m";

const originalLog = console.log;
console.log = (...args) => {
originalLog(`${cyanStart}[Thread_${pid}]${cyanEnd}`, ...args);
};
declare global {
var pid: number;
function $claim(value: Object): Promise<void>;
function $unclaim(value: Object): void;
}

globalThis.onmessage = async (e: MessageEvent<MainEvent>) => {
switch (e.data[$.EventType]) {
Expand All @@ -34,20 +29,25 @@ globalThis.onmessage = async (e: MessageEvent<MainEvent>) => {
Thread.handleInvocation(e.data[$.EventValue]);
break;
case $.ClaimAcceptance:
// console.log("Claimed", e.data[$.EventValue][$.Name]);
Thread.handleClaimAcceptance(e.data[$.EventValue]);
break;
case $.Synchronization:
}
};

globalThis.$claim = async function $claim(value: Object) {
const cyanStart = "\x1b[36m";
const cyanEnd = "\x1b[39m";

const originalLog = console.log;
console.log = (...args) => {
originalLog(`${cyanStart}[Thread_${pid}]${cyanEnd}`, ...args);
};

const $claim = async function $claim(value: Object) {
const valueName = Thread.shareableNameMap.get(value)!;

Thread.valueInUseCount[valueName]++;

// console.log(valueName, "claim");

// First check if the variable is already (being) claimed
if (Thread.valueClaimMap.has(valueName)) {
return Thread.valueClaimMap.get(valueName)!.promise;
Expand All @@ -63,13 +63,11 @@ globalThis.$claim = async function $claim(value: Object) {
return Thread.valueClaimMap.get(valueName)!.promise;
};

globalThis.$unclaim = function $unclaim(value: Object) {
const $unclaim = function $unclaim(value: Object) {
const valueName = Thread.shareableNameMap.get(value)!;

if (--Thread.valueInUseCount[valueName] > 0) return;

// console.log("Unclaimed", valueName);

Thread.valueClaimMap.delete(valueName);
globalThis.postMessage({
[$.EventType]: $.Unclaim,
Expand All @@ -80,6 +78,10 @@ globalThis.$unclaim = function $unclaim(value: Object) {
} satisfies ThreadEvent);
};

// Make globally available
globalThis.$claim = $claim;
globalThis.$unclaim = $unclaim;

// Separate namespace to avoid polluting the global namespace
// and avoid name collisions with the user defined function
namespace Thread {
Expand Down Expand Up @@ -124,9 +126,10 @@ namespace Thread {
const gen = globalThis[GLOBAL_FUNCTION_NAME](...data[$.Args]);

hasYield && gen.next();
const returnValue = await gen.next();

// console.log("Returned", returnValue.value);
const returnValue = await gen.next({
$claim,
$unclaim,
});

globalThis.postMessage({
[$.EventType]: $.Return,
Expand Down

0 comments on commit 9fb4ddd

Please sign in to comment.