From 981ba2859d18b11fca64651b90a97c1ae7d96c11 Mon Sep 17 00:00:00 2001 From: Pavel Murygin Date: Wed, 27 Oct 2021 17:46:43 +0300 Subject: [PATCH 01/14] Additional checks when initiating particles --- package-lock.json | 14 ++++++------ package.json | 2 +- src/__test__/integration/peer.spec.ts | 33 ++++++++++++++++++++++++++- src/internal/FluencePeer.ts | 9 ++++++++ 4 files changed, 49 insertions(+), 9 deletions(-) diff --git a/package-lock.json b/package-lock.json index 20a34f121..480dec9a0 100644 --- a/package-lock.json +++ b/package-lock.json @@ -10,7 +10,7 @@ "license": "Apache-2.0", "dependencies": { "@chainsafe/libp2p-noise": "4.0.0", - "@fluencelabs/avm": "0.15.4", + "@fluencelabs/avm": "0.16.0-expose-parse-ast.0", "async": "3.2.0", "base64-js": "1.5.1", "bs58": "4.0.1", @@ -646,9 +646,9 @@ } }, "node_modules/@fluencelabs/avm": { - "version": "0.15.4", - "resolved": "https://registry.npmjs.org/@fluencelabs/avm/-/avm-0.15.4.tgz", - "integrity": "sha512-NLZDq83ocJ1Helm0D8kPMSSkjxH0y+Tujg0px773zjIShbh3jgiJOjAW1xCYgTt9K0LqepjP0bWX4/8nUZfr7g==", + "version": "0.16.0-expose-parse-ast.0", + "resolved": "https://registry.npmjs.org/@fluencelabs/avm/-/avm-0.16.0-expose-parse-ast.0.tgz", + "integrity": "sha512-gJXu9Fp2iaZf178g79wYRpj7VorylMuSYlVEoCdZb/4TkRVI11uiGgdL/T9FYJvpD3/76vnZhl7Y4EcyKREVrw==", "dependencies": { "base64-js": "1.5.1" } @@ -8689,9 +8689,9 @@ } }, "@fluencelabs/avm": { - "version": "0.15.4", - "resolved": "https://registry.npmjs.org/@fluencelabs/avm/-/avm-0.15.4.tgz", - "integrity": "sha512-NLZDq83ocJ1Helm0D8kPMSSkjxH0y+Tujg0px773zjIShbh3jgiJOjAW1xCYgTt9K0LqepjP0bWX4/8nUZfr7g==", + "version": "0.16.0-expose-parse-ast.0", + "resolved": "https://registry.npmjs.org/@fluencelabs/avm/-/avm-0.16.0-expose-parse-ast.0.tgz", + "integrity": "sha512-gJXu9Fp2iaZf178g79wYRpj7VorylMuSYlVEoCdZb/4TkRVI11uiGgdL/T9FYJvpD3/76vnZhl7Y4EcyKREVrw==", "requires": { "base64-js": "1.5.1" } diff --git a/package.json b/package.json index 59c65c56a..46454bc28 100644 --- a/package.json +++ b/package.json @@ -21,7 +21,7 @@ "license": "Apache-2.0", "dependencies": { "@chainsafe/libp2p-noise": "4.0.0", - "@fluencelabs/avm": "0.15.4", + "@fluencelabs/avm": "0.16.0-expose-parse-ast.0", "async": "3.2.0", "base64-js": "1.5.1", "bs58": "4.0.1", diff --git a/src/__test__/integration/peer.spec.ts b/src/__test__/integration/peer.spec.ts index 69a62f29b..9923ae5d9 100644 --- a/src/__test__/integration/peer.spec.ts +++ b/src/__test__/integration/peer.spec.ts @@ -381,7 +381,6 @@ describe('Typescript usage suite', () => { it('Should not crash if an error ocurred in user-defined handler', async () => { // arrange; - setLogLevel('trace'); await anotherPeer.start(); // act @@ -417,6 +416,38 @@ describe('Typescript usage suite', () => { }); }); + it('Should throw error if initiating particle for non-initiated peer', async () => { + // arrange; + const nonInitiatedPeer = new FluencePeer(); + + // act + const action = () => { + const script = `(null)`; + const particle = Particle.createNew(script); + + nonInitiatedPeer.internals.initiateParticle(particle); + }; + + // assert + await expect(action).toThrow('Cannon initiate new particle: peer is no initialized'); + }); + + it('Should throw error if particle with incorrect AIR script is initiated', async () => { + // arrange; + await anotherPeer.start(); + + // act + const action = () => { + const script = `incorrect air script`; + const particle = Particle.createNew(script); + + anotherPeer.internals.initiateParticle(particle); + }; + + // assert + await expect(action).toThrow(/incorrect air script/); + }); + it.skip('Should throw correct error when the client tries to send a particle not to the relay', async () => { // arrange; await anotherPeer.start({ connectTo: nodes[0] }); diff --git a/src/internal/FluencePeer.ts b/src/internal/FluencePeer.ts index 2f8717dac..751b81ee5 100644 --- a/src/internal/FluencePeer.ts +++ b/src/internal/FluencePeer.ts @@ -239,6 +239,10 @@ export class FluencePeer { * @param particle - particle to start execution of */ initiateParticle: (particle: Particle): void => { + if (!this.getStatus().isInitialized) { + throw 'Cannon initiate new particle: peer is no initialized'; + } + if (particle.initPeerId === undefined) { particle.initPeerId = this.getStatus().peerId; } @@ -247,6 +251,11 @@ export class FluencePeer { particle.ttl = this._defaultTTL; } + const res: string = this._interpreter.parseAirScript(particle.script); + if (res.startsWith('error:')) { + throw res; + } + this._incomingParticles.next(particle); }, /** From 739ced313759fb484a6543533de743ce5c6914f2 Mon Sep 17 00:00:00 2001 From: Pavel Murygin Date: Wed, 27 Oct 2021 19:50:28 +0300 Subject: [PATCH 02/14] wip --- src/internal/compilerSupport/v2.ts | 258 ++++++++++++++++------------- 1 file changed, 146 insertions(+), 112 deletions(-) diff --git a/src/internal/compilerSupport/v2.ts b/src/internal/compilerSupport/v2.ts index f45cddaee..a0e682a31 100644 --- a/src/internal/compilerSupport/v2.ts +++ b/src/internal/compilerSupport/v2.ts @@ -203,128 +203,162 @@ export function callFunction(rawFnArgs: Array, def: FunctionCallDef, script throw new Error('Incorrect number of arguments. Expecting ${def.argDefs.length}'); } - const promise = new Promise((resolve, reject) => { + const isVoid = def.returnType.tag === 'void'; + + // if the function has void type we should resolve immediately for API symmetry with non-void types + // to help with debugging we are returning a promise which can be used to track particle errors + // we cannot return a bare promise because JS will lift it, so returning an array with the promise + // TODO:: + const outerPromise = new Promise((outerResolve, outerReject) => { const particle = Particle.createNew(script, config?.ttl); - for (let i = 0; i < def.argDefs.length; i++) { - const argDef = def.argDefs[i]; - const arg = args[i]; - - const [serviceId, fnName, cb] = match(argDef.argType) - // for callback arguments we are registering particle-specific callback which executes the passed function - .with({ tag: 'callback' }, (callbackDef) => { - const fn = async (req: CallServiceData): Promise => { - const args = convertArgsFromReqToUserCall(req, callbackDef.callback.argDefs); - // arg is function at this point - const result = await arg.apply(null, args); - let res; - switch (callbackDef.callback.returnType.tag) { - case 'void': - res = {}; - break; - case 'primitive': - res = result; - break; - case 'optional': - res = tsToAquaOpt(result); - break; - } - return { - retCode: ResultCodes.success, - result: res, + const innerPromise = new Promise((_innerResolve, innerReject) => { + for (let i = 0; i < def.argDefs.length; i++) { + const argDef = def.argDefs[i]; + const arg = args[i]; + + const [serviceId, fnName, cb] = match(argDef.argType) + // for callback arguments we are registering particle-specific callback which executes the passed function + .with({ tag: 'callback' }, (callbackDef) => { + const fn = async (req: CallServiceData): Promise => { + const args = convertArgsFromReqToUserCall(req, callbackDef.callback.argDefs); + // arg is function at this point + const result = await arg.apply(null, args); + let res; + switch (callbackDef.callback.returnType.tag) { + case 'void': + res = {}; + break; + case 'primitive': + res = result; + break; + case 'optional': + res = tsToAquaOpt(result); + break; + } + return { + retCode: ResultCodes.success, + result: res, + }; }; - }; - return [def.names.callbackSrv, argDef.name, fn] as const; - }) - // for optional types we are converting value to array representation in air - .with({ tag: 'optional' }, () => { - const fn = (req: CallServiceData): CallServiceResult => { - // arg is optional at this point - const res = tsToAquaOpt(arg); - return { - retCode: ResultCodes.success, - result: res, + return [def.names.callbackSrv, argDef.name, fn] as const; + }) + // for optional types we are converting value to array representation in air + .with({ tag: 'optional' }, () => { + const fn = (req: CallServiceData): CallServiceResult => { + // arg is optional at this point + const res = tsToAquaOpt(arg); + return { + retCode: ResultCodes.success, + result: res, + }; }; + return [def.names.getDataSrv, argDef.name, fn] as const; + }) + // for primitive types wre are simply passing the value + .with({ tag: 'primitive' }, () => { + // arg is primitive at this point + const fn = (req: CallServiceData): CallServiceResult => ({ + retCode: ResultCodes.success, + result: arg, + }); + return [def.names.getDataSrv, argDef.name, fn] as const; + }) + .exhaustive(); + + // registering handlers for every argument of the function + peer.internals.regHandler.forParticle(particle.id, serviceId, fnName, cb); + } + + // registering handler for function response + peer.internals.regHandler.forParticle( + particle.id, + def.names.responseSrv, + def.names.responseFnName, + (req) => { + const userFunctionReturn = match(def.returnType) + .with({ tag: 'primitive' }, () => req.args[0]) + .with({ tag: 'optional' }, () => aquaOptToTs(req.args[0])) + .with({ tag: 'void' }, () => undefined) + .with({ tag: 'multiReturn' }, (mr) => { + return mr.returnItems.map((x, index) => { + return match(x) + .with({ tag: 'optional' }, () => aquaOptToTs(req.args[index])) + .with({ tag: 'primitive' }, () => req.args[index]) + .exhaustive(); + }); + }) + .exhaustive(); + + setTimeout(() => { + outerResolve(userFunctionReturn); + }, 0); + + return { + retCode: ResultCodes.success, + result: {}, }; - return [def.names.getDataSrv, argDef.name, fn] as const; - }) - // for primitive types wre are simply passing the value - .with({ tag: 'primitive' }, () => { - // arg is primitive at this point - const fn = (req: CallServiceData): CallServiceResult => ({ + }, + ); + + // registering handler for injecting relay variable + peer.internals.regHandler.forParticle(particle.id, def.names.getDataSrv, def.names.relay, (req) => { + return { + retCode: ResultCodes.success, + result: peer.getStatus().relayPeerId, + }; + }); + + // registering handler for error reporting + peer.internals.regHandler.forParticle( + particle.id, + def.names.errorHandlingSrv, + def.names.errorFnName, + (req) => { + const [err, _] = req.args; + setTimeout(() => { + // + if (isVoid) { + innerReject(err); + } else { + outerReject(err); + } + }, 0); + return { retCode: ResultCodes.success, - result: arg, - }); - return [def.names.getDataSrv, argDef.name, fn] as const; - }) - .exhaustive(); - - // registering handlers for every argument of the function - peer.internals.regHandler.forParticle(particle.id, serviceId, fnName, cb); - } - - // registering handler for function response - peer.internals.regHandler.forParticle(particle.id, def.names.responseSrv, def.names.responseFnName, (req) => { - const userFunctionReturn = match(def.returnType) - .with({ tag: 'primitive' }, () => req.args[0]) - .with({ tag: 'optional' }, () => aquaOptToTs(req.args[0])) - .with({ tag: 'void' }, () => undefined) - .with({ tag: 'multiReturn' }, (mr) => { - return mr.returnItems.map((x, index) => { - return match(x) - .with({ tag: 'optional' }, () => aquaOptToTs(req.args[index])) - .with({ tag: 'primitive' }, () => req.args[index]) - .exhaustive(); - }); - }) - .exhaustive(); - - setTimeout(() => { - resolve(userFunctionReturn); - }, 0); - - return { - retCode: ResultCodes.success, - result: {}, - }; - }); - - // registering handler for injecting relay variable - peer.internals.regHandler.forParticle(particle.id, def.names.getDataSrv, def.names.relay, (req) => { - return { - retCode: ResultCodes.success, - result: peer.getStatus().relayPeerId, - }; - }); - - // registering handler for error reporting - peer.internals.regHandler.forParticle(particle.id, def.names.errorHandlingSrv, def.names.errorFnName, (req) => { - const [err, _] = req.args; - setTimeout(() => { - reject(err); - }, 0); - return { - retCode: ResultCodes.success, - result: {}, - }; - }); - - // registering handler for particle timeout - peer.internals.regHandler.timeout(particle.id, () => { - reject(`Request timed out for ${def.functionName}`); + result: {}, + }; + }, + ); + + // registering handler for particle timeout + peer.internals.regHandler.timeout(particle.id, () => { + // failing after timeout only makes sense for functions with return value + // as they have a clear indication of completion + if (!isVoid) { + outerReject(`Request timed out for ${def.functionName}`); + } + }); + + try { + peer.internals.initiateParticle(particle); + } catch (err) { + if (isVoid) { + outerReject(err); + } else { + innerReject(err); + } + } }); - peer.internals.initiateParticle(particle); + // if the return type is void, that we resolve immediately + // returning the inner promise which will handle further execution errors + if (isVoid) { + outerResolve([innerPromise]); + } }); - // if the function has void type we should resolve immediately for API symmetry with non-void types - // to help with debugging we are returning a promise which can be used to track particle errors - // we cannot return a bare promise because JS will lift it, so returning an array with the promise - if (def.returnType.tag === 'void') { - return Promise.resolve([promise]); - } else { - return promise; - } + return outerPromise; } /** From 8e50f071acc34a0a883828ad7135047503763ff5 Mon Sep 17 00:00:00 2001 From: Pavel Murygin Date: Thu, 4 Nov 2021 17:05:16 +0300 Subject: [PATCH 03/14] wip --- src/internal/FluencePeer.ts | 73 +++++++++++++++++++++---------------- src/internal/Particle.ts | 13 +++++++ 2 files changed, 55 insertions(+), 31 deletions(-) diff --git a/src/internal/FluencePeer.ts b/src/internal/FluencePeer.ts index 751b81ee5..70750dfe4 100644 --- a/src/internal/FluencePeer.ts +++ b/src/internal/FluencePeer.ts @@ -27,13 +27,14 @@ import { CallServiceData, CallServiceResult, GenericCallServiceHandler, ResultCo import { CallServiceHandler as LegacyCallServiceHandler } from './compilerSupport/LegacyCallServiceHandler'; import { PeerIdB58 } from './commonTypes'; import { FluenceConnection } from './FluenceConnection'; -import { Particle } from './Particle'; +import { Particle, ParticleExecutionStage, ParticleQueueItem } from './Particle'; import { KeyPair } from './KeyPair'; import { createInterpreter, dataToString } from './utils'; import { filter, pipe, Subject, tap } from 'rxjs'; import { RequestFlow } from './compilerSupport/v1'; import log from 'loglevel'; import { defaultServices } from './defaultServices'; +import { instanceOf } from 'ts-pattern'; /** * Node of the Fluence network specified as a pair of node's multiaddr and it's peer id @@ -200,7 +201,7 @@ export class FluencePeer { peerId: this._keyPair.Libp2pPeerId, relayAddress: connectToMultiAddr, dialTimeoutMs: config.dialTimeoutMs, - onIncomingParticle: (p) => this._incomingParticles.next(p), + onIncomingParticle: (p) => this._incomingParticles.next({ particle: p, onStateChange: () => {} }), }); await this._connect(); @@ -251,12 +252,10 @@ export class FluencePeer { particle.ttl = this._defaultTTL; } - const res: string = this._interpreter.parseAirScript(particle.script); - if (res.startsWith('error:')) { - throw res; - } - - this._incomingParticles.next(particle); + this._incomingParticles.next({ + particle: particle, + onStateChange: () => {}, + }); }, /** * Register Call Service handler functions @@ -341,8 +340,8 @@ export class FluencePeer { // Queues for incoming and outgoing particles - private _incomingParticles = new Subject(); - private _outgoingParticles = new Subject(); + private _incomingParticles = new Subject(); + private _outgoingParticles = new Subject(); // Call service handler @@ -358,15 +357,18 @@ export class FluencePeer { private _connection: FluenceConnection; private _interpreter: AirInterpreter; private _timeouts: Array = []; - private _particleQueues = new Map>(); + private _particleQueues = new Map>(); private _startParticleProcessing() { this._incomingParticles .pipe( - tap((x) => x.logTo('debug', 'particle received:')), + tap((x) => { + x.particle.logTo('debug', 'particle received:'); + }), filterExpiredParticles(this._expireParticle.bind(this)), ) - .subscribe((p) => { + .subscribe((item) => { + const p = item.particle; let particlesQueue = this._particleQueues.get(p.id); if (!particlesQueue) { @@ -374,21 +376,23 @@ export class FluencePeer { this._particleQueues.set(p.id, particlesQueue); const timeout = setTimeout(() => { - this._expireParticle(p.id); + this._expireParticle(item); }, p.actualTtl()); this._timeouts.push(timeout); } - particlesQueue.next(p); + particlesQueue.next(item); }); - this._outgoingParticles.subscribe((p) => { - this._connection.sendParticle(p); + this._outgoingParticles.subscribe(async (item) => { + await this._connection.sendParticle(item.particle); + item.onStateChange(ParticleExecutionStage.sent); }); } - private _expireParticle(particleId: string) { + private _expireParticle(item: ParticleQueueItem) { + const particleId = item.particle.id; log.debug(`particle ${particleId} has expired. Deleting particle-related queues and handlers`); this._particleQueues.delete(particleId); @@ -398,10 +402,11 @@ export class FluencePeer { } this._particleSpecificHandlers.delete(particleId); this._timeoutHandlers.delete(particleId); + item.onStateChange(ParticleExecutionStage.expired); } private _createParticlesProcessingQueue() { - let particlesQueue = new Subject(); + let particlesQueue = new Subject(); let prevData: Uint8Array = Buffer.from([]); particlesQueue @@ -409,28 +414,34 @@ export class FluencePeer { // force new line filterExpiredParticles(this._expireParticle.bind(this)), ) - .subscribe((x) => { - const result = runInterpreter(this.getStatus().peerId, this._interpreter, x, prevData); + .subscribe((item) => { + const particle = item.particle; + const result = runInterpreter(this.getStatus().peerId, this._interpreter, particle, prevData); + setTimeout(() => { + item.onStateChange(ParticleExecutionStage.interpreted); + }, 0); prevData = Buffer.from(result.data); // send particle further if requested if (result.nextPeerPks.length > 0) { - const newParticle = x.clone(); + const newParticle = particle.clone(); newParticle.data = prevData; - this._outgoingParticles.next(newParticle); + this._outgoingParticles.next({ ...item, particle: newParticle }); } // execute call requests if needed // and put particle with the results back to queue if (result.callRequests.length > 0) { - this._execCallRequests(x, result.callRequests).then((callResults) => { - const newParticle = x.clone(); + this._execCallRequests(particle, result.callRequests).then((callResults) => { + const newParticle = particle.clone(); newParticle.callResults = callResults; newParticle.data = Buffer.from([]); - particlesQueue.next(newParticle); + particlesQueue.next({ ...item, particle: newParticle }); }); + } else { + item.onStateChange(ParticleExecutionStage.localWorkDone); } }); @@ -593,13 +604,13 @@ function runInterpreter( return interpreterResult; } -function filterExpiredParticles(onParticleExpiration: (particleId: string) => void) { +function filterExpiredParticles(onParticleExpiration: (item: ParticleQueueItem) => void) { return pipe( - tap((p: Particle) => { - if (p.hasExpired()) { - onParticleExpiration(p.id); + tap((item: ParticleQueueItem) => { + if (item.particle.hasExpired()) { + onParticleExpiration(item); } }), - filter((x: Particle) => !x.hasExpired()), + filter((x: ParticleQueueItem) => !x.particle.hasExpired()), ); } diff --git a/src/internal/Particle.ts b/src/internal/Particle.ts index 519ec66b1..69e5f5f06 100644 --- a/src/internal/Particle.ts +++ b/src/internal/Particle.ts @@ -20,6 +20,7 @@ import { CallResultsArray, LogLevel } from '@fluencelabs/avm'; import log from 'loglevel'; import { ParticleContext } from './commonTypes'; import { dataToString } from './utils'; +import { Action } from 'rxjs/internal/scheduler/Action'; export class Particle { id: string; @@ -140,6 +141,18 @@ export class Particle { } } +export enum ParticleExecutionStage { + 'received' = 1, + 'interpreted' = 2, + 'localWorkDone' = 3, + 'sent' = 4, + 'expired' = 5, +} +export interface ParticleQueueItem { + particle: Particle; + onStateChange: (state: ParticleExecutionStage) => void; +} + function genUUID() { return uuidv4(); } From ab1055c33ad11a5ee1db56854d7dcffe73593471 Mon Sep 17 00:00:00 2001 From: Pavel Murygin Date: Fri, 5 Nov 2021 16:56:42 +0300 Subject: [PATCH 04/14] WIP --- .../integration/compiler/compiler.spec.ts | 32 +++++++++++++++++++ src/__test__/integration/peer.spec.ts | 10 +++--- src/index.ts | 2 +- src/internal/FluencePeer.ts | 31 ++++++++++++------ src/internal/Particle.ts | 18 +++++------ 5 files changed, 69 insertions(+), 24 deletions(-) diff --git a/src/__test__/integration/compiler/compiler.spec.ts b/src/__test__/integration/compiler/compiler.spec.ts index cfa6a1395..997fb4dd9 100644 --- a/src/__test__/integration/compiler/compiler.spec.ts +++ b/src/__test__/integration/compiler/compiler.spec.ts @@ -2,6 +2,7 @@ import { Fluence, FluencePeer } from '../../..'; import { Particle } from '../../../internal/Particle'; import { registerHandlersHelper } from '../../util'; import { callMeBack, registerHelloWorld } from './gen1'; +import { callFunction } from '../../../internal/compilerSupport/v2'; describe('Compiler support infrastructure tests', () => { it('Compiled code for function should work', async () => { @@ -177,4 +178,35 @@ describe('Compiler support infrastructure tests', () => { await anotherPeer.stop(); }); + + it('Should throw error if particle with incorrect AIR script is initiated', async () => { + // arrange; + const anotherPeer = new FluencePeer(); + await anotherPeer.start(); + + // act + const action = () => { + callFunction( + [anotherPeer], + { + functionName: 'dontcare', + argDefs: [], + returnType: { tag: 'void' }, + names: { + relay: '-relay-', + getDataSrv: 'getDataSrv', + callbackSrv: 'callbackSrv', + responseSrv: 'callbackSrv', + responseFnName: 'response', + errorHandlingSrv: 'errorHandlingSrv', + errorFnName: 'error', + }, + }, + 'incorrect air script', + ); + }; + + // assert + await expect(action).toThrow(/incorrect air script/); + }); }); diff --git a/src/__test__/integration/peer.spec.ts b/src/__test__/integration/peer.spec.ts index 9923ae5d9..e9c0c2353 100644 --- a/src/__test__/integration/peer.spec.ts +++ b/src/__test__/integration/peer.spec.ts @@ -432,20 +432,20 @@ describe('Typescript usage suite', () => { await expect(action).toThrow('Cannon initiate new particle: peer is no initialized'); }); - it('Should throw error if particle with incorrect AIR script is initiated', async () => { + it('Should throw error if particle is initiated on a stopped peer', async () => { // arrange; - await anotherPeer.start(); + const stoppedPeer = new FluencePeer(); // act const action = () => { - const script = `incorrect air script`; + const script = `(null)`; const particle = Particle.createNew(script); - anotherPeer.internals.initiateParticle(particle); + stoppedPeer.internals.initiateParticle(particle); }; // assert - await expect(action).toThrow(/incorrect air script/); + await expect(action).toThrow('Cannon initiate new particle: peer is no initialized'); }); it.skip('Should throw correct error when the client tries to send a particle not to the relay', async () => { diff --git a/src/index.ts b/src/index.ts index f75af0687..92a0b7ab1 100644 --- a/src/index.ts +++ b/src/index.ts @@ -44,7 +44,7 @@ export const Fluence = { }, /** - * Uninitializes the default peer: stops all the underltying workflows, stops the Aqua VM + * Un-initializes the default peer: stops all the underlying workflows, stops the Aqua VM * and disconnects from the Fluence network */ stop: (): Promise => { diff --git a/src/internal/FluencePeer.ts b/src/internal/FluencePeer.ts index 70750dfe4..1ca63f4a2 100644 --- a/src/internal/FluencePeer.ts +++ b/src/internal/FluencePeer.ts @@ -201,7 +201,7 @@ export class FluencePeer { peerId: this._keyPair.Libp2pPeerId, relayAddress: connectToMultiAddr, dialTimeoutMs: config.dialTimeoutMs, - onIncomingParticle: (p) => this._incomingParticles.next({ particle: p, onStateChange: () => {} }), + onIncomingParticle: (p) => this._incomingParticles.next({ particle: p, onStageChange: () => {} }), }); await this._connect(); @@ -239,7 +239,7 @@ export class FluencePeer { * Initiates a new particle execution starting from local peer * @param particle - particle to start execution of */ - initiateParticle: (particle: Particle): void => { + initiateParticle: (particle: Particle, onStageChange?: (stage: ParticleExecutionStage) => void): void => { if (!this.getStatus().isInitialized) { throw 'Cannon initiate new particle: peer is no initialized'; } @@ -254,9 +254,10 @@ export class FluencePeer { this._incomingParticles.next({ particle: particle, - onStateChange: () => {}, + onStageChange: onStageChange || (() => {}), }); }, + /** * Register Call Service handler functions */ @@ -309,7 +310,11 @@ export class FluencePeer { timeout: request.timeout, }); - this.internals.initiateParticle(particle); + this.internals.initiateParticle(particle, (stage) => { + if (stage.stage === 'interpreterError') { + request?.error(stage.errorMessage); + } + }); }, /** @@ -387,7 +392,7 @@ export class FluencePeer { this._outgoingParticles.subscribe(async (item) => { await this._connection.sendParticle(item.particle); - item.onStateChange(ParticleExecutionStage.sent); + item.onStageChange({ stage: 'sent' }); }); } @@ -402,7 +407,7 @@ export class FluencePeer { } this._particleSpecificHandlers.delete(particleId); this._timeoutHandlers.delete(particleId); - item.onStateChange(ParticleExecutionStage.expired); + item.onStageChange({ stage: 'expired' }); } private _createParticlesProcessingQueue() { @@ -418,7 +423,11 @@ export class FluencePeer { const particle = item.particle; const result = runInterpreter(this.getStatus().peerId, this._interpreter, particle, prevData); setTimeout(() => { - item.onStateChange(ParticleExecutionStage.interpreted); + if (result.retCode === 0) { + item.onStageChange({ stage: 'interpreted' }); + } else { + item.onStageChange({ stage: 'interpreterError', errorMessage: result.errorMessage }); + } }, 0); prevData = Buffer.from(result.data); @@ -441,7 +450,7 @@ export class FluencePeer { particlesQueue.next({ ...item, particle: newParticle }); }); } else { - item.onStateChange(ParticleExecutionStage.localWorkDone); + item.onStageChange({ stage: 'localWorkDone' }); } }); @@ -600,7 +609,11 @@ function runInterpreter( const toLog: any = { ...interpreterResult }; toLog.data = dataToString(toLog.data); - log.debug('Interpreter result: ', toLog); + if (interpreterResult.retCode === 0) { + log.debug('Interpreter result: ', toLog); + } else { + log.error('Interpreter failed: ', toLog); + } return interpreterResult; } diff --git a/src/internal/Particle.ts b/src/internal/Particle.ts index 69e5f5f06..f22c4e37f 100644 --- a/src/internal/Particle.ts +++ b/src/internal/Particle.ts @@ -20,7 +20,6 @@ import { CallResultsArray, LogLevel } from '@fluencelabs/avm'; import log from 'loglevel'; import { ParticleContext } from './commonTypes'; import { dataToString } from './utils'; -import { Action } from 'rxjs/internal/scheduler/Action'; export class Particle { id: string; @@ -141,16 +140,17 @@ export class Particle { } } -export enum ParticleExecutionStage { - 'received' = 1, - 'interpreted' = 2, - 'localWorkDone' = 3, - 'sent' = 4, - 'expired' = 5, -} +export type ParticleExecutionStage = + | { stage: 'received' } + | { stage: 'interpreted' } + | { stage: 'interpreterError'; errorMessage: string } + | { stage: 'localWorkDone' } + | { stage: 'sent' } + | { stage: 'expired' }; + export interface ParticleQueueItem { particle: Particle; - onStateChange: (state: ParticleExecutionStage) => void; + onStageChange: (state: ParticleExecutionStage) => void; } function genUUID() { From ca00a930ae439311c553a0564220a17022229d3a Mon Sep 17 00:00:00 2001 From: Pavel Murygin Date: Fri, 5 Nov 2021 19:20:19 +0300 Subject: [PATCH 05/14] Everything works --- src/__test__/integration/avm.spec.ts | 7 +- .../integration/compiler/compiler.spec.ts | 45 ++- src/__test__/integration/peer.spec.ts | 25 +- src/__test__/util.ts | 10 +- src/internal/FluencePeer.ts | 34 +-- src/internal/commonTypes.ts | 18 +- src/internal/compilerSupport/v2.ts | 258 ++++++++---------- src/internal/utils.ts | 21 +- 8 files changed, 195 insertions(+), 223 deletions(-) diff --git a/src/__test__/integration/avm.spec.ts b/src/__test__/integration/avm.spec.ts index 825327111..a9b90e272 100644 --- a/src/__test__/integration/avm.spec.ts +++ b/src/__test__/integration/avm.spec.ts @@ -1,5 +1,6 @@ import { FluencePeer } from '../../index'; import { Particle } from '../../internal/Particle'; +import { handleTimeout } from '../../internal/utils'; import { registerHandlersHelper } from '../util'; describe('Avm spec', () => { @@ -21,10 +22,9 @@ describe('Avm spec', () => { resolve(res); }, }, - _timeout: reject, }); - peer.internals.initiateParticle(particle); + peer.internals.initiateParticle(particle, handleTimeout(reject)); }); // assert @@ -61,10 +61,9 @@ describe('Avm spec', () => { } }, }, - _timeout: reject, }); - peer.internals.initiateParticle(particle); + peer.internals.initiateParticle(particle, handleTimeout(reject)); }); // assert diff --git a/src/__test__/integration/compiler/compiler.spec.ts b/src/__test__/integration/compiler/compiler.spec.ts index 997fb4dd9..771867cae 100644 --- a/src/__test__/integration/compiler/compiler.spec.ts +++ b/src/__test__/integration/compiler/compiler.spec.ts @@ -3,6 +3,7 @@ import { Particle } from '../../../internal/Particle'; import { registerHandlersHelper } from '../../util'; import { callMeBack, registerHelloWorld } from './gen1'; import { callFunction } from '../../../internal/compilerSupport/v2'; +import { handleTimeout } from '../../../internal/utils'; describe('Compiler support infrastructure tests', () => { it('Compiled code for function should work', async () => { @@ -79,10 +80,9 @@ describe('Compiler support infrastructure tests', () => { resolve(val); }, }, - _timeout: reject, }); - Fluence.getPeer().internals.initiateParticle(particle); + Fluence.getPeer().internals.initiateParticle(particle, handleTimeout(reject)); }); // assert @@ -167,9 +167,8 @@ describe('Compiler support infrastructure tests', () => { resolve(val); }, }, - _timeout: reject, }); - anotherPeer.internals.initiateParticle(particle); + anotherPeer.internals.initiateParticle(particle, handleTimeout(reject)); }); // assert @@ -185,28 +184,26 @@ describe('Compiler support infrastructure tests', () => { await anotherPeer.start(); // act - const action = () => { - callFunction( - [anotherPeer], - { - functionName: 'dontcare', - argDefs: [], - returnType: { tag: 'void' }, - names: { - relay: '-relay-', - getDataSrv: 'getDataSrv', - callbackSrv: 'callbackSrv', - responseSrv: 'callbackSrv', - responseFnName: 'response', - errorHandlingSrv: 'errorHandlingSrv', - errorFnName: 'error', - }, + const action = callFunction( + [anotherPeer], + { + functionName: 'dontcare', + argDefs: [], + returnType: { tag: 'void' }, + names: { + relay: '-relay-', + getDataSrv: 'getDataSrv', + callbackSrv: 'callbackSrv', + responseSrv: 'callbackSrv', + responseFnName: 'response', + errorHandlingSrv: 'errorHandlingSrv', + errorFnName: 'error', }, - 'incorrect air script', - ); - }; + }, + 'incorrect air script', + ); // assert - await expect(action).toThrow(/incorrect air script/); + await expect(action).rejects.toMatch(/incorrect air script/); }); }); diff --git a/src/__test__/integration/peer.spec.ts b/src/__test__/integration/peer.spec.ts index e9c0c2353..945525d1e 100644 --- a/src/__test__/integration/peer.spec.ts +++ b/src/__test__/integration/peer.spec.ts @@ -1,7 +1,7 @@ import { Multiaddr } from 'multiaddr'; import { nodes } from '../connection'; import { Fluence, FluencePeer, setLogLevel } from '../../index'; -import { checkConnection } from '../../internal/utils'; +import { checkConnection, doNothing, handleTimeout } from '../../internal/utils'; import { Particle } from '../../internal/Particle'; import { registerHandlersHelper } from '../util'; @@ -121,10 +121,9 @@ describe('Typescript usage suite', () => { reject(error); }, }, - _timeout: reject, }); - anotherPeer.internals.initiateParticle(particle); + anotherPeer.internals.initiateParticle(particle, handleTimeout(reject)); }); // assert @@ -169,7 +168,7 @@ describe('Typescript usage suite', () => { ) `; const particle = Particle.createNew(script); - await peer1.internals.initiateParticle(particle); + await peer1.internals.initiateParticle(particle, doNothing); // assert const res = await resMakingPromise; @@ -309,10 +308,9 @@ describe('Typescript usage suite', () => { resolve(res); }, }, - _timeout: reject, }); - anotherPeer.internals.initiateParticle(particle); + anotherPeer.internals.initiateParticle(particle, handleTimeout(reject)); }); // assert @@ -368,10 +366,9 @@ describe('Typescript usage suite', () => { reject(error); }, }, - _timeout: reject, }); - anotherPeer.internals.initiateParticle(particle); + anotherPeer.internals.initiateParticle(particle, handleTimeout(reject)); }); // assert @@ -404,10 +401,9 @@ describe('Typescript usage suite', () => { reject(error); }, }, - _timeout: reject, }); - anotherPeer.internals.initiateParticle(particle); + anotherPeer.internals.initiateParticle(particle, handleTimeout(reject)); }); // assert @@ -425,7 +421,7 @@ describe('Typescript usage suite', () => { const script = `(null)`; const particle = Particle.createNew(script); - nonInitiatedPeer.internals.initiateParticle(particle); + nonInitiatedPeer.internals.initiateParticle(particle, doNothing); }; // assert @@ -441,7 +437,7 @@ describe('Typescript usage suite', () => { const script = `(null)`; const particle = Particle.createNew(script); - stoppedPeer.internals.initiateParticle(particle); + stoppedPeer.internals.initiateParticle(particle, doNothing); }; // assert @@ -470,7 +466,7 @@ describe('Typescript usage suite', () => { }, }); - anotherPeer.internals.initiateParticle(particle); + anotherPeer.internals.initiateParticle(particle, doNothing); }); // assert @@ -499,10 +495,9 @@ async function callIncorrectService(peer: FluencePeer): Promise { reject(error); }, }, - _timeout: reject, }); - peer.internals.initiateParticle(particle); + peer.internals.initiateParticle(particle, handleTimeout(reject)); }); return promise; diff --git a/src/__test__/util.ts b/src/__test__/util.ts index 770152dc6..d7e3e895d 100644 --- a/src/__test__/util.ts +++ b/src/__test__/util.ts @@ -3,14 +3,10 @@ import { Particle } from '../internal/Particle'; import { MakeServiceCall } from '../internal/utils'; export const registerHandlersHelper = (peer: FluencePeer, particle: Particle, handlers) => { - const { _timeout, ...rest } = handlers; - if (_timeout) { - peer.internals.regHandler.timeout(particle.id, _timeout); - } - for (let serviceId in rest) { - for (let fnName in rest[serviceId]) { + for (let serviceId in handlers) { + for (let fnName in handlers[serviceId]) { // of type [args] => result - const h = rest[serviceId][fnName]; + const h = handlers[serviceId][fnName]; peer.internals.regHandler.forParticle(particle.id, serviceId, fnName, MakeServiceCall(h)); } } diff --git a/src/internal/FluencePeer.ts b/src/internal/FluencePeer.ts index 1ca63f4a2..c40235601 100644 --- a/src/internal/FluencePeer.ts +++ b/src/internal/FluencePeer.ts @@ -225,7 +225,6 @@ export class FluencePeer { this._particleSpecificHandlers.clear(); this._commonHandlers.clear(); - this._timeoutHandlers.clear(); } // internal api @@ -239,7 +238,7 @@ export class FluencePeer { * Initiates a new particle execution starting from local peer * @param particle - particle to start execution of */ - initiateParticle: (particle: Particle, onStageChange?: (stage: ParticleExecutionStage) => void): void => { + initiateParticle: (particle: Particle, onStageChange: (stage: ParticleExecutionStage) => void): void => { if (!this.getStatus().isInitialized) { throw 'Cannon initiate new particle: peer is no initialized'; } @@ -254,7 +253,7 @@ export class FluencePeer { this._incomingParticles.next({ particle: particle, - onStageChange: onStageChange || (() => {}), + onStageChange: onStageChange, }); }, @@ -290,12 +289,6 @@ export class FluencePeer { psh.set(serviceFnKey(serviceId, fnName), handler); }, - /** - * Register handler which will be called upon particle timeout - */ - timeout: (particleId: string, handler: () => void) => { - this._timeoutHandlers.set(particleId, handler); - }, }, /** @@ -314,6 +307,10 @@ export class FluencePeer { if (stage.stage === 'interpreterError') { request?.error(stage.errorMessage); } + + if (stage.stage === 'expired') { + request?.timeout(); + } }); }, @@ -352,7 +349,6 @@ export class FluencePeer { private _particleSpecificHandlers = new Map>(); private _commonHandlers = new Map(); - private _timeoutHandlers = new Map void>(); // Internal peer state @@ -401,12 +397,8 @@ export class FluencePeer { log.debug(`particle ${particleId} has expired. Deleting particle-related queues and handlers`); this._particleQueues.delete(particleId); - const timeoutHandler = this._timeoutHandlers.get(particleId); - if (timeoutHandler) { - timeoutHandler(); - } this._particleSpecificHandlers.delete(particleId); - this._timeoutHandlers.delete(particleId); + item.onStageChange({ stage: 'expired' }); } @@ -422,12 +414,14 @@ export class FluencePeer { .subscribe((item) => { const particle = item.particle; const result = runInterpreter(this.getStatus().peerId, this._interpreter, particle, prevData); + + if (result.retCode !== 0 || result?.errorMessage?.length > 0) { + item.onStageChange({ stage: 'interpreterError', errorMessage: result.errorMessage }); + return; + } + setTimeout(() => { - if (result.retCode === 0) { - item.onStageChange({ stage: 'interpreted' }); - } else { - item.onStageChange({ stage: 'interpreterError', errorMessage: result.errorMessage }); - } + item.onStageChange({ stage: 'interpreted' }); }, 0); prevData = Buffer.from(result.data); diff --git a/src/internal/commonTypes.ts b/src/internal/commonTypes.ts index 34ba66efc..02d2e92d5 100644 --- a/src/internal/commonTypes.ts +++ b/src/internal/commonTypes.ts @@ -68,12 +68,28 @@ export enum ResultCodes { */ export interface ParticleContext { /** - * The particle ID + * The identifier of particle which triggered the call */ particleId: string; + + /** + * The peer id which created the particle + */ initPeerId: PeerIdB58; + + /** + * Particle's timestamp when it was created + */ timestamp: number; + + /** + * Time to live in milliseconds. The time after the particle should be expired + */ ttl: number; + + /** + * Particle's signature + */ signature: string; } diff --git a/src/internal/compilerSupport/v2.ts b/src/internal/compilerSupport/v2.ts index a0e682a31..30a64235c 100644 --- a/src/internal/compilerSupport/v2.ts +++ b/src/internal/compilerSupport/v2.ts @@ -203,162 +203,128 @@ export function callFunction(rawFnArgs: Array, def: FunctionCallDef, script throw new Error('Incorrect number of arguments. Expecting ${def.argDefs.length}'); } - const isVoid = def.returnType.tag === 'void'; - - // if the function has void type we should resolve immediately for API symmetry with non-void types - // to help with debugging we are returning a promise which can be used to track particle errors - // we cannot return a bare promise because JS will lift it, so returning an array with the promise - // TODO:: - const outerPromise = new Promise((outerResolve, outerReject) => { + const promise = new Promise((resolve, reject) => { const particle = Particle.createNew(script, config?.ttl); - const innerPromise = new Promise((_innerResolve, innerReject) => { - for (let i = 0; i < def.argDefs.length; i++) { - const argDef = def.argDefs[i]; - const arg = args[i]; - - const [serviceId, fnName, cb] = match(argDef.argType) - // for callback arguments we are registering particle-specific callback which executes the passed function - .with({ tag: 'callback' }, (callbackDef) => { - const fn = async (req: CallServiceData): Promise => { - const args = convertArgsFromReqToUserCall(req, callbackDef.callback.argDefs); - // arg is function at this point - const result = await arg.apply(null, args); - let res; - switch (callbackDef.callback.returnType.tag) { - case 'void': - res = {}; - break; - case 'primitive': - res = result; - break; - case 'optional': - res = tsToAquaOpt(result); - break; - } - return { - retCode: ResultCodes.success, - result: res, - }; - }; - return [def.names.callbackSrv, argDef.name, fn] as const; - }) - // for optional types we are converting value to array representation in air - .with({ tag: 'optional' }, () => { - const fn = (req: CallServiceData): CallServiceResult => { - // arg is optional at this point - const res = tsToAquaOpt(arg); - return { - retCode: ResultCodes.success, - result: res, - }; + for (let i = 0; i < def.argDefs.length; i++) { + const argDef = def.argDefs[i]; + const arg = args[i]; + + const [serviceId, fnName, cb] = match(argDef.argType) + // for callback arguments we are registering particle-specific callback which executes the passed function + .with({ tag: 'callback' }, (callbackDef) => { + const fn = async (req: CallServiceData): Promise => { + const args = convertArgsFromReqToUserCall(req, callbackDef.callback.argDefs); + // arg is function at this point + const result = await arg.apply(null, args); + let res; + switch (callbackDef.callback.returnType.tag) { + case 'void': + res = {}; + break; + case 'primitive': + res = result; + break; + case 'optional': + res = tsToAquaOpt(result); + break; + } + return { + retCode: ResultCodes.success, + result: res, }; - return [def.names.getDataSrv, argDef.name, fn] as const; - }) - // for primitive types wre are simply passing the value - .with({ tag: 'primitive' }, () => { - // arg is primitive at this point - const fn = (req: CallServiceData): CallServiceResult => ({ + }; + return [def.names.callbackSrv, argDef.name, fn] as const; + }) + // for optional types we are converting value to array representation in air + .with({ tag: 'optional' }, () => { + const fn = (req: CallServiceData): CallServiceResult => { + // arg is optional at this point + const res = tsToAquaOpt(arg); + return { retCode: ResultCodes.success, - result: arg, - }); - return [def.names.getDataSrv, argDef.name, fn] as const; - }) - .exhaustive(); - - // registering handlers for every argument of the function - peer.internals.regHandler.forParticle(particle.id, serviceId, fnName, cb); - } - - // registering handler for function response - peer.internals.regHandler.forParticle( - particle.id, - def.names.responseSrv, - def.names.responseFnName, - (req) => { - const userFunctionReturn = match(def.returnType) - .with({ tag: 'primitive' }, () => req.args[0]) - .with({ tag: 'optional' }, () => aquaOptToTs(req.args[0])) - .with({ tag: 'void' }, () => undefined) - .with({ tag: 'multiReturn' }, (mr) => { - return mr.returnItems.map((x, index) => { - return match(x) - .with({ tag: 'optional' }, () => aquaOptToTs(req.args[index])) - .with({ tag: 'primitive' }, () => req.args[index]) - .exhaustive(); - }); - }) - .exhaustive(); - - setTimeout(() => { - outerResolve(userFunctionReturn); - }, 0); - - return { - retCode: ResultCodes.success, - result: {}, + result: res, + }; }; - }, - ); - - // registering handler for injecting relay variable - peer.internals.regHandler.forParticle(particle.id, def.names.getDataSrv, def.names.relay, (req) => { - return { - retCode: ResultCodes.success, - result: peer.getStatus().relayPeerId, - }; - }); - - // registering handler for error reporting - peer.internals.regHandler.forParticle( - particle.id, - def.names.errorHandlingSrv, - def.names.errorFnName, - (req) => { - const [err, _] = req.args; - setTimeout(() => { - // - if (isVoid) { - innerReject(err); - } else { - outerReject(err); - } - }, 0); - return { + return [def.names.getDataSrv, argDef.name, fn] as const; + }) + // for primitive types wre are simply passing the value + .with({ tag: 'primitive' }, () => { + // arg is primitive at this point + const fn = (req: CallServiceData): CallServiceResult => ({ retCode: ResultCodes.success, - result: {}, - }; - }, - ); - - // registering handler for particle timeout - peer.internals.regHandler.timeout(particle.id, () => { - // failing after timeout only makes sense for functions with return value - // as they have a clear indication of completion - if (!isVoid) { - outerReject(`Request timed out for ${def.functionName}`); - } - }); - - try { - peer.internals.initiateParticle(particle); - } catch (err) { - if (isVoid) { - outerReject(err); - } else { - innerReject(err); - } - } - }); + result: arg, + }); + return [def.names.getDataSrv, argDef.name, fn] as const; + }) + .exhaustive(); - // if the return type is void, that we resolve immediately - // returning the inner promise which will handle further execution errors - if (isVoid) { - outerResolve([innerPromise]); + // registering handlers for every argument of the function + peer.internals.regHandler.forParticle(particle.id, serviceId, fnName, cb); } + + // registering handler for function response + peer.internals.regHandler.forParticle(particle.id, def.names.responseSrv, def.names.responseFnName, (req) => { + const userFunctionReturn = match(def.returnType) + .with({ tag: 'primitive' }, () => req.args[0]) + .with({ tag: 'optional' }, () => aquaOptToTs(req.args[0])) + .with({ tag: 'void' }, () => undefined) + .with({ tag: 'multiReturn' }, (mr) => { + return mr.returnItems.map((x, index) => { + return match(x) + .with({ tag: 'optional' }, () => aquaOptToTs(req.args[index])) + .with({ tag: 'primitive' }, () => req.args[index]) + .exhaustive(); + }); + }) + .exhaustive(); + + setTimeout(() => { + resolve(userFunctionReturn); + }, 0); + + return { + retCode: ResultCodes.success, + result: {}, + }; + }); + + // registering handler for injecting relay variable + peer.internals.regHandler.forParticle(particle.id, def.names.getDataSrv, def.names.relay, (req) => { + return { + retCode: ResultCodes.success, + result: peer.getStatus().relayPeerId, + }; + }); + + // registering handler for error reporting + peer.internals.regHandler.forParticle(particle.id, def.names.errorHandlingSrv, def.names.errorFnName, (req) => { + const [err, _] = req.args; + setTimeout(() => { + reject(err); + }, 0); + return { + retCode: ResultCodes.success, + result: {}, + }; + }); + + peer.internals.initiateParticle(particle, (stage) => { + if (def.returnType.tag === 'void' && (stage.stage === 'interpreted' || stage.stage === 'localWorkDone')) { + resolve(undefined); + } + + if (stage.stage === 'expired') { + reject(`Request timed out for ${def.functionName}`); + } + + if (stage.stage === 'interpreterError') { + reject(stage.errorMessage); + } + }); }); - return outerPromise; + return promise; } /** diff --git a/src/internal/utils.ts b/src/internal/utils.ts index aa3b198b5..2146dc6bd 100644 --- a/src/internal/utils.ts +++ b/src/internal/utils.ts @@ -18,7 +18,7 @@ import { AirInterpreter, LogLevel as AvmLogLevel } from '@fluencelabs/avm'; import log from 'loglevel'; import { CallServiceData, CallServiceResult, CallServiceResultType, ResultCodes } from './commonTypes'; import { AvmLoglevel, FluencePeer } from './FluencePeer'; -import { Particle } from './Particle'; +import { Particle, ParticleExecutionStage } from './Particle'; export const createInterpreter = (logLevel: AvmLoglevel): Promise => { const logFn = (level: AvmLogLevel, msg: string) => { @@ -53,6 +53,14 @@ export const MakeServiceCall = (fn: (args: any[]) => CallServiceResultType) => { }; }; +export const handleTimeout = (fn: Function) => (stage: ParticleExecutionStage) => { + if (stage.stage === 'expired') { + fn(); + } +}; + +export const doNothing = (stage: ParticleExecutionStage) => {}; + /** * Checks the network connection by sending a ping-like request to relay node * @param { FluenceClient } peer - The Fluence Client instance. @@ -127,11 +135,12 @@ export const checkConnection = async (peer: FluencePeer, ttl?: number): Promise< }), ); - peer.internals.regHandler.timeout(particle.id, () => { - reject('particle timed out'); - }); - - peer.internals.initiateParticle(particle); + peer.internals.initiateParticle( + particle, + handleTimeout(() => { + reject('particle timed out'); + }), + ); }); try { From c018603daa9ec5ccd8fb00476c944ce60ffe275d Mon Sep 17 00:00:00 2001 From: Pavel Murygin Date: Mon, 8 Nov 2021 16:02:17 +0300 Subject: [PATCH 06/14] Fix AVM version --- package-lock.json | 14 +++++++------- package.json | 2 +- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/package-lock.json b/package-lock.json index 480dec9a0..6730dfc11 100644 --- a/package-lock.json +++ b/package-lock.json @@ -10,7 +10,7 @@ "license": "Apache-2.0", "dependencies": { "@chainsafe/libp2p-noise": "4.0.0", - "@fluencelabs/avm": "0.16.0-expose-parse-ast.0", + "@fluencelabs/avm": "0.16.6", "async": "3.2.0", "base64-js": "1.5.1", "bs58": "4.0.1", @@ -646,9 +646,9 @@ } }, "node_modules/@fluencelabs/avm": { - "version": "0.16.0-expose-parse-ast.0", - "resolved": "https://registry.npmjs.org/@fluencelabs/avm/-/avm-0.16.0-expose-parse-ast.0.tgz", - "integrity": "sha512-gJXu9Fp2iaZf178g79wYRpj7VorylMuSYlVEoCdZb/4TkRVI11uiGgdL/T9FYJvpD3/76vnZhl7Y4EcyKREVrw==", + "version": "0.16.6", + "resolved": "https://registry.npmjs.org/@fluencelabs/avm/-/avm-0.16.6.tgz", + "integrity": "sha512-RDNXW/VYAXh+E7B7+S4pTTc/1IcvtlID2xyBs/3TDlxkjbVxM7+vMcFL6cJZOzZZl+3oAWXL3ibDhE5Elcq6ug==", "dependencies": { "base64-js": "1.5.1" } @@ -8689,9 +8689,9 @@ } }, "@fluencelabs/avm": { - "version": "0.16.0-expose-parse-ast.0", - "resolved": "https://registry.npmjs.org/@fluencelabs/avm/-/avm-0.16.0-expose-parse-ast.0.tgz", - "integrity": "sha512-gJXu9Fp2iaZf178g79wYRpj7VorylMuSYlVEoCdZb/4TkRVI11uiGgdL/T9FYJvpD3/76vnZhl7Y4EcyKREVrw==", + "version": "0.16.6", + "resolved": "https://registry.npmjs.org/@fluencelabs/avm/-/avm-0.16.6.tgz", + "integrity": "sha512-RDNXW/VYAXh+E7B7+S4pTTc/1IcvtlID2xyBs/3TDlxkjbVxM7+vMcFL6cJZOzZZl+3oAWXL3ibDhE5Elcq6ug==", "requires": { "base64-js": "1.5.1" } diff --git a/package.json b/package.json index 46454bc28..080a12ad7 100644 --- a/package.json +++ b/package.json @@ -21,7 +21,7 @@ "license": "Apache-2.0", "dependencies": { "@chainsafe/libp2p-noise": "4.0.0", - "@fluencelabs/avm": "0.16.0-expose-parse-ast.0", + "@fluencelabs/avm": "0.16.6", "async": "3.2.0", "base64-js": "1.5.1", "bs58": "4.0.1", From 3f6344153e8ad92795f494960d749b61c4e45918 Mon Sep 17 00:00:00 2001 From: Pavel Murygin Date: Mon, 8 Nov 2021 16:15:14 +0300 Subject: [PATCH 07/14] Fix misc issues --- src/__test__/integration/peer.spec.ts | 16 ---------------- src/internal/FluencePeer.ts | 10 ++++++++-- 2 files changed, 8 insertions(+), 18 deletions(-) diff --git a/src/__test__/integration/peer.spec.ts b/src/__test__/integration/peer.spec.ts index 945525d1e..a9f40c8a0 100644 --- a/src/__test__/integration/peer.spec.ts +++ b/src/__test__/integration/peer.spec.ts @@ -412,22 +412,6 @@ describe('Typescript usage suite', () => { }); }); - it('Should throw error if initiating particle for non-initiated peer', async () => { - // arrange; - const nonInitiatedPeer = new FluencePeer(); - - // act - const action = () => { - const script = `(null)`; - const particle = Particle.createNew(script); - - nonInitiatedPeer.internals.initiateParticle(particle, doNothing); - }; - - // assert - await expect(action).toThrow('Cannon initiate new particle: peer is no initialized'); - }); - it('Should throw error if particle is initiated on a stopped peer', async () => { // arrange; const stoppedPeer = new FluencePeer(); diff --git a/src/internal/FluencePeer.ts b/src/internal/FluencePeer.ts index c40235601..403a47ffc 100644 --- a/src/internal/FluencePeer.ts +++ b/src/internal/FluencePeer.ts @@ -415,7 +415,8 @@ export class FluencePeer { const particle = item.particle; const result = runInterpreter(this.getStatus().peerId, this._interpreter, particle, prevData); - if (result.retCode !== 0 || result?.errorMessage?.length > 0) { + // Do not continue if there was an error in particle interpretation + if (isInterpretationSuccessful(result)) { item.onStageChange({ stage: 'interpreterError', errorMessage: result.errorMessage }); return; } @@ -568,6 +569,10 @@ export class FluencePeer { private _legacyCallServiceHandler: LegacyCallServiceHandler; } +function isInterpretationSuccessful(result: InterpreterResult) { + return result.retCode !== 0 || result?.errorMessage?.length > 0; +} + function serviceFnKey(serviceId: string, fnName: string) { return `${serviceId}/${fnName}`; } @@ -603,7 +608,8 @@ function runInterpreter( const toLog: any = { ...interpreterResult }; toLog.data = dataToString(toLog.data); - if (interpreterResult.retCode === 0) { + + if (isInterpretationSuccessful(interpreterResult)) { log.debug('Interpreter result: ', toLog); } else { log.error('Interpreter failed: ', toLog); From 2b7d437f7760e37b1d0aabddef581cd0fae7e623 Mon Sep 17 00:00:00 2001 From: Pavel Date: Mon, 8 Nov 2021 16:19:55 +0300 Subject: [PATCH 08/14] Update src/internal/FluencePeer.ts Co-authored-by: folex <0xdxdy@gmail.com> --- src/internal/FluencePeer.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/internal/FluencePeer.ts b/src/internal/FluencePeer.ts index 403a47ffc..12882df0b 100644 --- a/src/internal/FluencePeer.ts +++ b/src/internal/FluencePeer.ts @@ -240,7 +240,7 @@ export class FluencePeer { */ initiateParticle: (particle: Particle, onStageChange: (stage: ParticleExecutionStage) => void): void => { if (!this.getStatus().isInitialized) { - throw 'Cannon initiate new particle: peer is no initialized'; + throw 'Can't initiate new particle: peer is not initialized'; } if (particle.initPeerId === undefined) { From 990f5ba49409b4bd16a9d7e9a269e3a6af5a37be Mon Sep 17 00:00:00 2001 From: Pavel Murygin Date: Mon, 8 Nov 2021 16:35:25 +0300 Subject: [PATCH 09/14] Fix PR comments --- src/internal/FluencePeer.ts | 10 +++++----- src/internal/compilerSupport/v2.ts | 2 +- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/internal/FluencePeer.ts b/src/internal/FluencePeer.ts index 12882df0b..33b03d0a4 100644 --- a/src/internal/FluencePeer.ts +++ b/src/internal/FluencePeer.ts @@ -240,7 +240,7 @@ export class FluencePeer { */ initiateParticle: (particle: Particle, onStageChange: (stage: ParticleExecutionStage) => void): void => { if (!this.getStatus().isInitialized) { - throw 'Can't initiate new particle: peer is not initialized'; + throw 'Cannot initiate new particle: peer is not initialized'; } if (particle.initPeerId === undefined) { @@ -404,7 +404,7 @@ export class FluencePeer { private _createParticlesProcessingQueue() { let particlesQueue = new Subject(); - let prevData: Uint8Array = Buffer.from([]); + let newData: Uint8Array = Buffer.from([]); particlesQueue .pipe( @@ -413,7 +413,7 @@ export class FluencePeer { ) .subscribe((item) => { const particle = item.particle; - const result = runInterpreter(this.getStatus().peerId, this._interpreter, particle, prevData); + const result = runInterpreter(this.getStatus().peerId, this._interpreter, particle, newData); // Do not continue if there was an error in particle interpretation if (isInterpretationSuccessful(result)) { @@ -425,12 +425,12 @@ export class FluencePeer { item.onStageChange({ stage: 'interpreted' }); }, 0); - prevData = Buffer.from(result.data); + newData = Buffer.from(result.data); // send particle further if requested if (result.nextPeerPks.length > 0) { const newParticle = particle.clone(); - newParticle.data = prevData; + newParticle.data = newData; this._outgoingParticles.next({ ...item, particle: newParticle }); } diff --git a/src/internal/compilerSupport/v2.ts b/src/internal/compilerSupport/v2.ts index 30a64235c..0e505b3a5 100644 --- a/src/internal/compilerSupport/v2.ts +++ b/src/internal/compilerSupport/v2.ts @@ -310,7 +310,7 @@ export function callFunction(rawFnArgs: Array, def: FunctionCallDef, script }); peer.internals.initiateParticle(particle, (stage) => { - if (def.returnType.tag === 'void' && (stage.stage === 'interpreted' || stage.stage === 'localWorkDone')) { + if (def.returnType.tag === 'void' && (stage.stage === 'sent' || stage.stage === 'localWorkDone')) { resolve(undefined); } From 73906ff3231c580ba63f02b6ce18f6147da1efc4 Mon Sep 17 00:00:00 2001 From: Pavel Murygin Date: Mon, 8 Nov 2021 16:45:32 +0300 Subject: [PATCH 10/14] fix tests --- src/__test__/integration/peer.spec.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/__test__/integration/peer.spec.ts b/src/__test__/integration/peer.spec.ts index a9f40c8a0..fe5e100f6 100644 --- a/src/__test__/integration/peer.spec.ts +++ b/src/__test__/integration/peer.spec.ts @@ -425,7 +425,7 @@ describe('Typescript usage suite', () => { }; // assert - await expect(action).toThrow('Cannon initiate new particle: peer is no initialized'); + await expect(action).toThrow('Cannot initiate new particle: peer is not initialized'); }); it.skip('Should throw correct error when the client tries to send a particle not to the relay', async () => { From d30011a53cb52396b1c13f3a46574dddfbc55838 Mon Sep 17 00:00:00 2001 From: Pavel Murygin Date: Tue, 9 Nov 2021 13:52:32 +0300 Subject: [PATCH 11/14] Fix PR comments --- src/internal/FluencePeer.ts | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/internal/FluencePeer.ts b/src/internal/FluencePeer.ts index c5fba1063..8141d382c 100644 --- a/src/internal/FluencePeer.ts +++ b/src/internal/FluencePeer.ts @@ -406,7 +406,7 @@ export class FluencePeer { private _createParticlesProcessingQueue() { let particlesQueue = new Subject(); - let newData: Uint8Array = Buffer.from([]); + let prevData: Uint8Array = Buffer.from([]); particlesQueue .pipe( @@ -415,7 +415,7 @@ export class FluencePeer { ) .subscribe((item) => { const particle = item.particle; - const result = runInterpreter(this.getStatus().peerId, this._interpreter, particle, newData); + const result = runInterpreter(this.getStatus().peerId, this._interpreter, particle, prevData); // Do not continue if there was an error in particle interpretation if (isInterpretationSuccessful(result)) { @@ -427,7 +427,8 @@ export class FluencePeer { item.onStageChange({ stage: 'interpreted' }); }, 0); - newData = Buffer.from(result.data); + const newData = Buffer.from(result.data); + prevData = newData; // send particle further if requested if (result.nextPeerPks.length > 0) { From 00214fcb682a937ff3a0dad5e6172da10e7fd85c Mon Sep 17 00:00:00 2001 From: Pavel Date: Tue, 9 Nov 2021 14:17:19 +0300 Subject: [PATCH 12/14] Update src/internal/compilerSupport/v2.ts Co-authored-by: folex <0xdxdy@gmail.com> --- src/internal/compilerSupport/v2.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/internal/compilerSupport/v2.ts b/src/internal/compilerSupport/v2.ts index 19d909238..f3dd64ceb 100644 --- a/src/internal/compilerSupport/v2.ts +++ b/src/internal/compilerSupport/v2.ts @@ -330,7 +330,7 @@ export function callFunction(rawFnArgs: Array, def: FunctionCallDef, script } if (stage.stage === 'interpreterError') { - reject(stage.errorMessage); + reject(`Script interpretation failed for ${def.functionName}: ${stage.errorMessage}`); } }); }); From e42ff813f2649f63a4c31d1741dddb59a61c66c2 Mon Sep 17 00:00:00 2001 From: Pavel Date: Tue, 9 Nov 2021 14:17:25 +0300 Subject: [PATCH 13/14] Update src/internal/compilerSupport/v2.ts Co-authored-by: folex <0xdxdy@gmail.com> --- src/internal/compilerSupport/v2.ts | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/internal/compilerSupport/v2.ts b/src/internal/compilerSupport/v2.ts index f3dd64ceb..36d49e24b 100644 --- a/src/internal/compilerSupport/v2.ts +++ b/src/internal/compilerSupport/v2.ts @@ -321,6 +321,9 @@ export function callFunction(rawFnArgs: Array, def: FunctionCallDef, script }); peer.internals.initiateParticle(particle, (stage) => { + // If function is void, then it's completed when one of the two conditions is met: + // 1. The particle is sent to the network (state 'sent') + // 2. All CallRequests are executed, e.g., all variable loading and local function calls are completed (state 'localWorkDone') if (def.returnType.tag === 'void' && (stage.stage === 'sent' || stage.stage === 'localWorkDone')) { resolve(undefined); } From 3017f93f59803435dcaf8c58378cdf3a69b72c01 Mon Sep 17 00:00:00 2001 From: Pavel Murygin Date: Tue, 9 Nov 2021 14:35:27 +0300 Subject: [PATCH 14/14] fix PR comments --- src/internal/FluencePeer.ts | 4 +++- src/internal/compilerSupport/v2.ts | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/src/internal/FluencePeer.ts b/src/internal/FluencePeer.ts index 8141d382c..f2bc58e9e 100644 --- a/src/internal/FluencePeer.ts +++ b/src/internal/FluencePeer.ts @@ -396,7 +396,9 @@ export class FluencePeer { private _expireParticle(item: ParticleQueueItem) { const particleId = item.particle.id; - log.debug(`particle ${particleId} has expired. Deleting particle-related queues and handlers`); + log.debug( + `particle ${particleId} has expired after ${item.particle.ttl}. Deleting particle-related queues and handlers`, + ); this._particleQueues.delete(particleId); this._particleSpecificHandlers.delete(particleId); diff --git a/src/internal/compilerSupport/v2.ts b/src/internal/compilerSupport/v2.ts index 36d49e24b..5c2fccd91 100644 --- a/src/internal/compilerSupport/v2.ts +++ b/src/internal/compilerSupport/v2.ts @@ -329,7 +329,7 @@ export function callFunction(rawFnArgs: Array, def: FunctionCallDef, script } if (stage.stage === 'expired') { - reject(`Request timed out for ${def.functionName}`); + reject(`Request timed out after ${particle.ttl} for ${def.functionName}`); } if (stage.stage === 'interpreterError') {