From 979282099051ff8890a02cfb4c668fdc3f7c57e9 Mon Sep 17 00:00:00 2001 From: Pavel Murygin Date: Thu, 21 Oct 2021 15:26:22 +0300 Subject: [PATCH 1/2] FluencePeer: add option to specify default TTL for all new particles --- src/__test__/integration/peer.spec.ts | 12 ++++++++++++ src/internal/FluencePeer.ts | 12 +++++++++++- 2 files changed, 23 insertions(+), 1 deletion(-) diff --git a/src/__test__/integration/peer.spec.ts b/src/__test__/integration/peer.spec.ts index cde1859f0..152baaf6d 100644 --- a/src/__test__/integration/peer.spec.ts +++ b/src/__test__/integration/peer.spec.ts @@ -275,6 +275,18 @@ describe('Typescript usage suite', () => { // assert expect(isConnected).toBeTruthy; }); + + it('With connection options: defaultTTL', async () => { + // arrange + const addr = nodes[0]; + + // act + await anotherPeer.start({ connectTo: addr, defaultTtlMs: 1000 }); + const isConnected = await checkConnection(anotherPeer); + + // assert + expect(isConnected).toBeTruthy; + }); }); it('Should successfully call identity on local peer', async function () { diff --git a/src/internal/FluencePeer.ts b/src/internal/FluencePeer.ts index 71035b2ff..e5108385f 100644 --- a/src/internal/FluencePeer.ts +++ b/src/internal/FluencePeer.ts @@ -94,6 +94,13 @@ export interface PeerConfig { * The dialing timeout in milliseconds */ dialTimeoutMs?: number; + + /** + * Sets the default TTL for all particles originating from the peer with no TTL specified. + * If the originating particle's TTL is defined then that value will be used + * If the options is not set default TTL will be 7000 + */ + defaultTtlMs?: number; } /** @@ -169,6 +176,8 @@ export class FluencePeer { this._keyPair = await KeyPair.randomEd25519(); } + this._defaultTTL = config?.defaultTtlMs | DEFAULT_TTL; + this._interpreter = await createInterpreter(config?.avmLogLevel || 'off'); if (config?.connectTo) { @@ -234,7 +243,7 @@ export class FluencePeer { } if (particle.ttl === undefined) { - particle.ttl = DEFAULT_TTL; + particle.ttl = this._defaultTTL; } this._incomingParticles.next(particle); @@ -333,6 +342,7 @@ export class FluencePeer { // Internal peer state + private _defaultTTL: number; private _relayPeerId: PeerIdB58 | null = null; private _keyPair: KeyPair; private _connection: FluenceConnection; From 88371c2382b1c49437725ccd26194e8f1f311f0d Mon Sep 17 00:00:00 2001 From: Pavel Murygin Date: Thu, 21 Oct 2021 17:17:58 +0300 Subject: [PATCH 2/2] fix PR comments --- src/__test__/integration/peer.spec.ts | 23 ++++++-------- src/internal/FluencePeer.ts | 45 +++++++++++++++------------ src/internal/Particle.ts | 4 +-- src/internal/utils.ts | 8 +++-- 4 files changed, 42 insertions(+), 38 deletions(-) diff --git a/src/__test__/integration/peer.spec.ts b/src/__test__/integration/peer.spec.ts index 152baaf6d..69a62f29b 100644 --- a/src/__test__/integration/peer.spec.ts +++ b/src/__test__/integration/peer.spec.ts @@ -189,7 +189,7 @@ describe('Typescript usage suite', () => { const isConnected = await checkConnection(anotherPeer); // assert - expect(isConnected).toBeTruthy; + expect(isConnected).toBeTruthy(); }); it('address as multiaddr', async () => { @@ -201,7 +201,7 @@ describe('Typescript usage suite', () => { const isConnected = await checkConnection(anotherPeer); // assert - expect(isConnected).toBeTruthy; + expect(isConnected).toBeTruthy(); }); it('address as node', async () => { @@ -213,7 +213,7 @@ describe('Typescript usage suite', () => { const isConnected = await checkConnection(anotherPeer); // assert - expect(isConnected).toBeTruthy; + expect(isConnected).toBeTruthy(); }); it('peerid as peer id', async () => { @@ -225,7 +225,7 @@ describe('Typescript usage suite', () => { const isConnected = await checkConnection(anotherPeer); // assert - expect(isConnected).toBeTruthy; + expect(isConnected).toBeTruthy(); }); it('peerid as seed', async () => { @@ -237,7 +237,7 @@ describe('Typescript usage suite', () => { const isConnected = await checkConnection(anotherPeer); // assert - expect(isConnected).toBeTruthy; + expect(isConnected).toBeTruthy(); }); it('With connection options: dialTimeout', async () => { @@ -249,7 +249,7 @@ describe('Typescript usage suite', () => { const isConnected = await checkConnection(anotherPeer); // assert - expect(isConnected).toBeTruthy; + expect(isConnected).toBeTruthy(); }); it('With connection options: skipCheckConnection', async () => { @@ -261,7 +261,7 @@ describe('Typescript usage suite', () => { const isConnected = await checkConnection(anotherPeer); // assert - expect(isConnected).toBeTruthy; + expect(isConnected).toBeTruthy(); }); it('With connection options: checkConnectionTTL', async () => { @@ -273,7 +273,7 @@ describe('Typescript usage suite', () => { const isConnected = await checkConnection(anotherPeer); // assert - expect(isConnected).toBeTruthy; + expect(isConnected).toBeTruthy(); }); it('With connection options: defaultTTL', async () => { @@ -281,11 +281,11 @@ describe('Typescript usage suite', () => { const addr = nodes[0]; // act - await anotherPeer.start({ connectTo: addr, defaultTtlMs: 1000 }); + await anotherPeer.start({ connectTo: addr, defaultTtlMs: 1 }); const isConnected = await checkConnection(anotherPeer); // assert - expect(isConnected).toBeTruthy; + expect(isConnected).toBeFalsy(); }); }); @@ -309,9 +309,6 @@ describe('Typescript usage suite', () => { resolve(res); }, }, - // op: { - // identity: (req) => {}, - // }, _timeout: reject, }); diff --git a/src/internal/FluencePeer.ts b/src/internal/FluencePeer.ts index e5108385f..2f8717dac 100644 --- a/src/internal/FluencePeer.ts +++ b/src/internal/FluencePeer.ts @@ -98,7 +98,8 @@ export interface PeerConfig { /** * Sets the default TTL for all particles originating from the peer with no TTL specified. * If the originating particle's TTL is defined then that value will be used - * If the options is not set default TTL will be 7000 + * If the option is not set default TTL will be 7000 + * Value 0 (zero) is treated as if the option was not set */ defaultTtlMs?: number; } @@ -176,7 +177,7 @@ export class FluencePeer { this._keyPair = await KeyPair.randomEd25519(); } - this._defaultTTL = config?.defaultTtlMs | DEFAULT_TTL; + this._defaultTTL = config?.defaultTtlMs || DEFAULT_TTL; this._interpreter = await createInterpreter(config?.avmLogLevel || 'off'); @@ -348,32 +349,23 @@ export class FluencePeer { private _connection: FluenceConnection; private _interpreter: AirInterpreter; private _timeouts: Array = []; + private _particleQueues = new Map>(); private _startParticleProcessing() { - const particleQueues = new Map>(); - this._incomingParticles .pipe( tap((x) => x.logTo('debug', 'particle received:')), - filterExpiredParticles(), + filterExpiredParticles(this._expireParticle.bind(this)), ) .subscribe((p) => { - let particlesQueue = particleQueues.get(p.id); + let particlesQueue = this._particleQueues.get(p.id); if (!particlesQueue) { particlesQueue = this._createParticlesProcessingQueue(); - particleQueues.set(p.id, particlesQueue); + this._particleQueues.set(p.id, particlesQueue); const timeout = setTimeout(() => { - log.debug(`particle ${p.id} has expired. Deleting particle-related queues and handlers`); - - particleQueues.delete(p.id); - const timeoutHandler = this._timeoutHandlers.get(p.id); - if (timeoutHandler) { - timeoutHandler(); - } - this._particleSpecificHandlers.delete(p.id); - this._timeoutHandlers.delete(p.id); + this._expireParticle(p.id); }, p.actualTtl()); this._timeouts.push(timeout); @@ -387,6 +379,18 @@ export class FluencePeer { }); } + private _expireParticle(particleId: string) { + log.debug(`particle ${particleId} has expired. Deleting particle-related queues and handlers`); + + this._particleQueues.delete(particleId); + const timeoutHandler = this._timeoutHandlers.get(particleId); + if (timeoutHandler) { + timeoutHandler(); + } + this._particleSpecificHandlers.delete(particleId); + this._timeoutHandlers.delete(particleId); + } + private _createParticlesProcessingQueue() { let particlesQueue = new Subject(); let prevData: Uint8Array = Buffer.from([]); @@ -394,7 +398,7 @@ export class FluencePeer { particlesQueue .pipe( // force new line - filterExpiredParticles(), + filterExpiredParticles(this._expireParticle.bind(this)), ) .subscribe((x) => { const result = runInterpreter(this.getStatus().peerId, this._interpreter, x, prevData); @@ -520,6 +524,7 @@ export class FluencePeer { for (let item of this._timeouts) { clearTimeout(item); } + this._particleQueues.clear(); } /** @@ -579,11 +584,11 @@ function runInterpreter( return interpreterResult; } -function filterExpiredParticles() { +function filterExpiredParticles(onParticleExpiration: (particleId: string) => void) { return pipe( tap((p: Particle) => { - if (p.hasExpired) { - log.debug(`particle ${p.id} has expired`); + if (p.hasExpired()) { + onParticleExpiration(p.id); } }), filter((x: Particle) => !x.hasExpired()), diff --git a/src/internal/Particle.ts b/src/internal/Particle.ts index c472b6a7d..519ec66b1 100644 --- a/src/internal/Particle.ts +++ b/src/internal/Particle.ts @@ -21,8 +21,6 @@ import log from 'loglevel'; import { ParticleContext } from './commonTypes'; import { dataToString } from './utils'; -const DefaultTTL = 7000; - export class Particle { id: string; initPeerId: string; @@ -37,7 +35,7 @@ export class Particle { const res = new Particle(); res.id = genUUID(); res.script = script; - res.ttl = ttlMs || DefaultTTL; + res.ttl = ttlMs; res.data = Buffer.from([]); res.timestamp = Date.now(); diff --git a/src/internal/utils.ts b/src/internal/utils.ts index 3f8173d2e..aa3b198b5 100644 --- a/src/internal/utils.ts +++ b/src/internal/utils.ts @@ -54,7 +54,7 @@ export const MakeServiceCall = (fn: (args: any[]) => CallServiceResultType) => { }; /** - * Checks the network connection by sending a ping-like request to relat node + * Checks the network connection by sending a ping-like request to relay node * @param { FluenceClient } peer - The Fluence Client instance. */ export const checkConnection = async (peer: FluencePeer, ttl?: number): Promise => { @@ -127,11 +127,15 @@ export const checkConnection = async (peer: FluencePeer, ttl?: number): Promise< }), ); + peer.internals.regHandler.timeout(particle.id, () => { + reject('particle timed out'); + }); + peer.internals.initiateParticle(particle); }); try { - const [result] = await promise; + const result = await promise; if (result != msg) { log.warn("unexpected behavior. 'identity' must return the passed arguments."); }