Skip to content
This repository was archived by the owner on Jul 10, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions packages/fluence-js/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@fluencelabs/fluence",
"version": "0.24.1",
"version": "0.25.0",
"description": "TypeScript implementation of Fluence Peer",
"main": "./dist/index.js",
"typings": "./dist/index.d.ts",
Expand All @@ -25,11 +25,11 @@
"copy-marine": "dist/tools/copyMarine.js"
},
"dependencies": {
"@fluencelabs/avm": "0.27.8",
"@fluencelabs/avm": "0.28.8",
"@fluencelabs/connection": "workspace:0.2.0",
"@fluencelabs/interfaces": "workspace:0.1.0",
"@fluencelabs/keypair": "workspace:0.2.0",
"@fluencelabs/marine-js": "0.3.10",
"@fluencelabs/marine-js": "0.3.16",
"async": "3.2.3",
"base64-js": "^1.5.1",
"browser-or-node": "^2.0.0",
Expand Down
148 changes: 56 additions & 92 deletions packages/fluence-js/src/internal/FluencePeer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,16 @@ import type { MultiaddrInput } from 'multiaddr';
import { CallServiceData, CallServiceResult, GenericCallServiceHandler, ResultCodes } from './commonTypes';
import { PeerIdB58 } from './commonTypes';
import { Particle, ParticleExecutionStage, ParticleQueueItem } from './Particle';
import { throwIfNotSupported, dataToString, jsonify, MarineLoglevel, marineLogLevelToEnvs } from './utils';
import { throwIfNotSupported, dataToString, jsonify, MarineLoglevel, marineLogLevelToEnvs, isString } from './utils';
import { concatMap, filter, pipe, Subject, tap } from 'rxjs';
import log from 'loglevel';
import { builtInServices } from './builtins/common';
import { defaultSigGuard, Sig } from './builtins/Sig';
import { registerSig } from './_aqua/services';
import Buffer from './Buffer';

import { AVM, AvmRunner } from './avm';
import { isBrowser, isNode } from 'browser-or-node';
import { InterpreterResult, LogLevel } from '@fluencelabs/avm';
import { deserializeAvmResult, InterpreterResult, JSONValue, LogLevel, serializeAvmArgs } from '@fluencelabs/avm';

/**
* Node of the Fluence network specified as a pair of node's multiaddr and it's peer id
Expand Down Expand Up @@ -100,12 +99,6 @@ export interface PeerConfig {
*/
defaultTtlMs?: number;

/**
* @deprecated. AVM run through marine-js infrastructure.
* @see marineJS option to configure AVM
*/
avmRunner?: AvmRunner;

/**
* This option allows to specify the location of various dependencies needed for marine-js.
* Each key specifies the location of the corresponding dependency.
Expand Down Expand Up @@ -306,9 +299,7 @@ export class FluencePeer {
this._keyPair = undefined; // This will set peer to non-initialized state and stop particle processing
this._stopParticleProcessing();
await this.disconnect();
await this._avmRunner?.terminate();
await this._fluenceAppService?.terminate();
this._avmRunner = undefined;
this._fluenceAppService = undefined;
this._classServices = undefined;

Expand All @@ -331,12 +322,12 @@ export class FluencePeer {
new Error("Can't use avm: peer is not initialized");
}

const args = JSON.stringify([air]);
const rawRes = await this._fluenceAppService!.callService('avm', 'ast', args, undefined);
let res;
const res = await this._fluenceAppService!.callService('avm', 'ast', [air], undefined);
if (!isString(res)) {
throw new Error(`Call to avm:ast expected to return string. Actual return: ${res}`);
}

try {
res = JSON.parse(rawRes);
res = res.result as string;
if (res.startsWith('error')) {
return {
success: false,
Expand All @@ -349,7 +340,7 @@ export class FluencePeer {
};
}
} catch (err) {
throw new Error('Failed to call avm. Raw result: ' + rawRes + '. Error: ' + err);
throw new Error('Failed to call avm. Result: ' + res + '. Error: ' + err);
}
},
createNewParticle: (script: string, ttl: number = this._defaultTTL) => {
Expand Down Expand Up @@ -454,8 +445,6 @@ export class FluencePeer {
undefined,
marineLogLevelToEnvs(this._marineLogLevel),
);
this._avmRunner = config?.avmRunner || new AVM(this._fluenceAppService);
await this._avmRunner.init(config?.avmLogLevel || 'off');

registerDefaultServices(this);

Expand Down Expand Up @@ -520,11 +509,6 @@ export class FluencePeer {
private _defaultTTL: number = DEFAULT_TTL;
private _keyPair: KeyPair | undefined;
private _connection?: FluenceConnection;

/**
* @deprecated. AVM run through marine-js infrastructure. This field is needed for backward compatibility with the previous API
*/
private _avmRunner?: AvmRunner;
private _fluenceAppService?: FluenceAppService;
private _timeouts: Array<NodeJS.Timeout> = [];
private _particleQueues = new Map<string, Subject<ParticleQueueItem>>();
Expand Down Expand Up @@ -576,7 +560,7 @@ export class FluencePeer {
() => {
item.onStageChange({ stage: 'sent' });
},
(e) => {
(e: any) => {
log.error(e);
},
);
Expand Down Expand Up @@ -605,7 +589,7 @@ export class FluencePeer {

concatMap(async (item) => {
const status = this.getStatus();
if (!status.isInitialized || this._avmRunner === undefined) {
if (!status.isInitialized || this._fluenceAppService === undefined) {
// If `.stop()` was called return null to stop particle processing immediately
return null;
}
Expand All @@ -615,14 +599,37 @@ export class FluencePeer {
// MUST happen sequentially (in a critical section).
// Otherwise the race between runner might occur corrupting the prevData

const result = await runAvmRunner(status.peerId, this._avmRunner, item.particle, prevData);
const newData = Buffer.from(result.data);
prevData = newData;
const args = serializeAvmArgs(
{
initPeerId: item.particle.initPeerId,
currentPeerId: status.peerId,
timestamp: item.particle.timestamp,
ttl: item.particle.ttl,
},
item.particle.script,
prevData,
item.particle.data,
item.particle.callResults,
);

item.particle.logTo('debug', 'Sending particle to interpreter');
log.debug('prevData: ', dataToString(prevData));
let avmCallResult: InterpreterResult | Error;
try {
const res = await this._fluenceAppService.callService('avm', 'invoke', args, undefined);
avmCallResult = deserializeAvmResult(res);
} catch (e) {
avmCallResult = e instanceof Error ? e : new Error((e as any).toString());
}
Comment on lines +622 to +623
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not critical of course, but I think there is no need for a separate newData variable

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was left intentionally to show what is being swapped with what. Otherwise is hard to track what is data, prevdata and newdata

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah. kind of agree


if (!(avmCallResult instanceof Error) && avmCallResult.retCode === 0) {
const newData = Buffer.from(avmCallResult.data);
prevData = newData;
}

return {
...item,
result: result,
newData: newData,
result: avmCallResult,
};
}),
)
Expand All @@ -633,19 +640,30 @@ export class FluencePeer {
}

// Do not continue if there was an error in particle interpretation
if (!isInterpretationSuccessful(item.result)) {
if (item.result instanceof Error) {
log.error('Interpreter failed: ', jsonify(item.result.message));
item.onStageChange({ stage: 'interpreterError', errorMessage: item.result.message });
return;
}

const toLog = { ...item.result, data: dataToString(item.result.data) };
if (item.result.retCode !== 0) {
log.error('Interpreter failed: ', jsonify(toLog));
item.onStageChange({ stage: 'interpreterError', errorMessage: item.result.errorMessage });
return;
}

log.debug('Interpreter result: ', jsonify(toLog));

setTimeout(() => {
item.onStageChange({ stage: 'interpreted' });
}, 0);

// send particle further if requested
if (item.result.nextPeerPks.length > 0) {
const newParticle = item.particle.clone();
newParticle.data = item.newData;
const newData = Buffer.from(item.result.data);
newParticle.data = newData;
this._outgoingParticles.next({
...item,
particle: newParticle,
Expand Down Expand Up @@ -700,31 +718,12 @@ export class FluencePeer {
const particleId = req.particleContext.particleId;

if (this._fluenceAppService && this._marineServices.has(req.serviceId)) {
const args = JSON.stringify(req.args);
const rawResult = await this._fluenceAppService.callService(req.serviceId, req.fnName, args, undefined);
const result = await this._fluenceAppService.callService(req.serviceId, req.fnName, req.args, undefined);

try {
const result = JSON.parse(rawResult);
if (typeof result.error === 'string' && result.error.length > 0) {
return {
retCode: ResultCodes.error,
result: result.error,
};
}

if (result.result === undefined) {
throw new Error(
`Call to marine-js returned no error and empty result. Original request: ${jsonify(req)}`,
);
}

return {
retCode: ResultCodes.success,
result: result.result,
};
} catch (e) {
throw new Error(`Call to marine-js. Result parsing error: ${e}, original text: ${rawResult}`);
}
return {
retCode: ResultCodes.success,
result: result as JSONValue,
};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think in the furture we should get rid of the typecasts because who knows what data can be sent by bad actors - I think everything should be checked and acted apon appropriatly instead of just failing

}

const key = serviceFnKey(req.serviceId, req.fnName);
Expand Down Expand Up @@ -805,10 +804,6 @@ async function configToConnection(
return res;
}

function isInterpretationSuccessful(result: InterpreterResult) {
return result.retCode === 0;
}

function serviceFnKey(serviceId: string, fnName: string) {
return `${serviceId}/${fnName}`;
}
Expand All @@ -821,37 +816,6 @@ function registerDefaultServices(peer: FluencePeer) {
});
}

async function runAvmRunner(
currentPeerId: PeerIdB58,
runner: AvmRunner,
particle: Particle,
prevData: Uint8Array,
): Promise<InterpreterResult> {
particle.logTo('debug', 'Sending particle to interpreter');
log.debug('prevData: ', dataToString(prevData));
const interpreterResult = await runner.run(
particle.script,
prevData,
particle.data,
{
initPeerId: particle.initPeerId,
currentPeerId: currentPeerId,
timestamp: particle.timestamp,
ttl: particle.ttl,
},
particle.callResults,
);

const toLog = { ...interpreterResult, data: dataToString(interpreterResult.data) };

if (isInterpretationSuccessful(interpreterResult)) {
log.debug('Interpreter result: ', jsonify(toLog));
} else {
log.error('Interpreter failed: ', jsonify(toLog));
}
return interpreterResult;
}

function filterExpiredParticles(onParticleExpiration: (item: ParticleQueueItem) => void) {
return pipe(
tap((item: ParticleQueueItem) => {
Expand Down
65 changes: 0 additions & 65 deletions packages/fluence-js/src/internal/avm.ts

This file was deleted.

6 changes: 5 additions & 1 deletion packages/fluence-js/src/internal/commonTypes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ export interface CallServiceData {
/**
* Type for all the possible objects that can be returned to the AVM
*/
export type CallServiceResultType = object | boolean | number | string | null;
export type CallServiceResultType = JSONValue;

/**
* Generic call service handler
Expand All @@ -146,3 +146,7 @@ export interface CallServiceResult {
*/
result: CallServiceResultType;
}

export type JSONValue = string | number | boolean | null | { [x: string]: JSONValue } | Array<JSONValue>;
export type JSONArray = Array<JSONValue>;
export type JSONObject = { [x: string]: JSONValue };
4 changes: 4 additions & 0 deletions packages/fluence-js/src/internal/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -173,3 +173,7 @@ export type MarineLoglevel = LogLevel;

export const marineLogLevelToEnvs = (marineLogLevel: MarineLoglevel | undefined) =>
marineLogLevel ? { WASM_LOG: marineLogLevel } : undefined;

export const isString = (x: unknown): x is string => {
return x !== null && typeof x === 'string';
};
Loading