Skip to content
This repository was archived by the owner on Jul 10, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions packages/core/js-client-isomorphic/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
* limitations under the License.
*/

import { Worker } from "@fluencelabs/threads/master";
import type { MarineBackgroundInterface } from "@fluencelabs/marine-worker";
import { ModuleThread } from "@fluencelabs/threads/master";

import versions from "./versions.js";

Expand All @@ -23,7 +24,7 @@ type VersionedPackage = { name: string; version: string };
export type GetWorkerFn = (
pkg: FetchedPackages,
CDNUrl: string,
) => Promise<Worker>;
) => Promise<ModuleThread<MarineBackgroundInterface>>;

export const getVersionedPackage = (pkg: FetchedPackages): VersionedPackage => {
return {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
* limitations under the License.
*/

import { BlobWorker } from "@fluencelabs/threads/master";
import type { MarineBackgroundInterface } from "@fluencelabs/marine-worker";
import { BlobWorker, ModuleThread, spawn } from "@fluencelabs/threads/master";

import { fetchResource } from "../fetchers/browser.js";
import type { FetchedPackages, GetWorkerFn } from "../types.js";
Expand All @@ -34,5 +35,9 @@ export const getWorker: GetWorkerFn = async (
};

const workerCode = await fetchWorkerCode();
return BlobWorker.fromText(workerCode);

const workerThread: ModuleThread<MarineBackgroundInterface> =
await spawn<MarineBackgroundInterface>(BlobWorker.fromText(workerCode));

return workerThread;
};
10 changes: 7 additions & 3 deletions packages/core/js-client-isomorphic/src/worker-resolvers/node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@ import { createRequire } from "module";
import { dirname, relative } from "path";
import { fileURLToPath } from "url";

import { Worker } from "@fluencelabs/threads/master";
import type { MarineBackgroundInterface } from "@fluencelabs/marine-worker";
import { ModuleThread, spawn, Worker } from "@fluencelabs/threads/master";

import type { FetchedPackages, GetWorkerFn } from "../types.js";
import { getVersionedPackage } from "../types.js";

export const getWorker: GetWorkerFn = (pkg: FetchedPackages) => {
export const getWorker: GetWorkerFn = async (pkg: FetchedPackages) => {
const require = createRequire(import.meta.url);

const pathToThisFile = dirname(fileURLToPath(import.meta.url));
Expand All @@ -33,5 +34,8 @@ export const getWorker: GetWorkerFn = (pkg: FetchedPackages) => {

const relativePathToWorker = relative(pathToThisFile, pathToWorker);

return Promise.resolve(new Worker(relativePathToWorker));
const workerThread: ModuleThread<MarineBackgroundInterface> =
await spawn<MarineBackgroundInterface>(new Worker(relativePathToWorker));

return workerThread;
};
3 changes: 1 addition & 2 deletions packages/core/js-client/src/jsPeer/FluencePeer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -356,9 +356,8 @@ export abstract class FluencePeer {
await this.connection.sendParticle(item.result.nextPeerPks, newParticle);
log_particle.trace("id %s. send successful", newParticle.id);
} catch (e) {
log_particle.error("id %s. send failed %j", newParticle.id, e);

const message = getErrorMessage(e);
log_particle.error("id %s. send failed %s", newParticle.id, message);

item.onError(
new SendError(
Expand Down
11 changes: 5 additions & 6 deletions packages/core/js-client/src/marine/loader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,19 @@

import { fetchResource } from "@fluencelabs/js-client-isomorphic/fetcher";
import { getWorker } from "@fluencelabs/js-client-isomorphic/worker-resolver";
import { Worker } from "@fluencelabs/threads/master";
import type { MarineBackgroundInterface } from "@fluencelabs/marine-worker";
import type { ModuleThread } from "@fluencelabs/threads/master";

type StrategyReturnType = [
marineJsWasm: ArrayBuffer,
avmWasm: ArrayBuffer,
worker: Worker,
worker: ModuleThread<MarineBackgroundInterface>,
];

export const loadMarineDeps = async (
CDNUrl: string,
): Promise<StrategyReturnType> => {
const [marineJsWasm, avmWasm] = await Promise.all([
const [marineJsWasm, avmWasm, worker] = await Promise.all([
fetchResource(
"@fluencelabs/marine-js",
"/dist/marine-js.wasm",
Expand All @@ -38,10 +39,8 @@ export const loadMarineDeps = async (
fetchResource("@fluencelabs/avm", "/dist/avm.wasm", CDNUrl).then((res) => {
return res.arrayBuffer();
}),
getWorker("@fluencelabs/marine-worker", CDNUrl),
]);

// TODO: load worker in parallel with avm and marine, test that it works
const worker = await getWorker("@fluencelabs/marine-worker", CDNUrl);

return [marineJsWasm, avmWasm, worker];
};
43 changes: 4 additions & 39 deletions packages/core/js-client/src/marine/worker/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,51 +21,29 @@ import type {
JSONValueNonNullable,
CallParameters,
} from "@fluencelabs/marine-worker";
import {
ModuleThread,
Thread,
spawn,
Worker,
} from "@fluencelabs/threads/master";
import { ModuleThread, Thread } from "@fluencelabs/threads/master";

import { MarineLogger, marineLogger } from "../../util/logger.js";
import { IMarineHost } from "../interfaces.js";

export class MarineBackgroundRunner implements IMarineHost {
private workerThread?: ModuleThread<MarineBackgroundInterface>;

private loggers = new Map<string, MarineLogger>();

constructor(
private marineJsWasm: ArrayBuffer,
private avmWasm: ArrayBuffer,
private worker: Worker,
private workerThread: ModuleThread<MarineBackgroundInterface>,
) {}

async hasService(serviceId: string) {
if (this.workerThread === undefined) {
throw new Error("Worker is not initialized");
}

return this.workerThread.hasService(serviceId);
}

async removeService(serviceId: string) {
if (this.workerThread === undefined) {
throw new Error("Worker is not initialized");
}

await this.workerThread.removeService(serviceId);
}

async start(): Promise<void> {
if (this.workerThread !== undefined) {
throw new Error("Worker thread already initialized");
}

const workerThread: ModuleThread<MarineBackgroundInterface> =
await spawn<MarineBackgroundInterface>(this.worker);

const logfn: LogFunction = (message) => {
const serviceLogger = this.loggers.get(message.service);

Expand All @@ -76,20 +54,15 @@ export class MarineBackgroundRunner implements IMarineHost {
serviceLogger[message.level](message.message);
};

workerThread.onLogMessage().subscribe(logfn);
await workerThread.init(this.marineJsWasm);
this.workerThread = workerThread;
this.workerThread.onLogMessage().subscribe(logfn);
await this.workerThread.init(this.marineJsWasm);
await this.createService(this.avmWasm, "avm");
}

async createService(
serviceModule: ArrayBuffer | SharedArrayBuffer,
serviceId: string,
): Promise<void> {
if (this.workerThread === undefined) {
throw new Error("Worker is not initialized");
}

this.loggers.set(serviceId, marineLogger(serviceId));
await this.workerThread.createService(serviceModule, serviceId);
}
Expand All @@ -100,10 +73,6 @@ export class MarineBackgroundRunner implements IMarineHost {
args: Array<JSONValueNonNullable> | Record<string, JSONValueNonNullable>,
callParams?: CallParameters,
): Promise<JSONValue> {
if (this.workerThread === undefined) {
throw new Error("Worker is not initialized");
}

return this.workerThread.callService(
serviceId,
functionName,
Expand All @@ -113,10 +82,6 @@ export class MarineBackgroundRunner implements IMarineHost {
}

async stop(): Promise<void> {
if (this.workerThread === undefined) {
return;
}

await this.workerThread.terminate();
await Thread.terminate(this.workerThread);
}
Expand Down