Skip to content
This repository was archived by the owner on Jul 10, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 20 additions & 11 deletions src/__test__/integration/peer.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ describe('Typescript usage suite', () => {
const isConnected = await checkConnection(anotherPeer);

// assert
expect(isConnected).toBeTruthy;
expect(isConnected).toBeTruthy();
});

it('address as multiaddr', async () => {
Expand All @@ -201,7 +201,7 @@ describe('Typescript usage suite', () => {
const isConnected = await checkConnection(anotherPeer);

// assert
expect(isConnected).toBeTruthy;
expect(isConnected).toBeTruthy();
});

it('address as node', async () => {
Expand All @@ -213,7 +213,7 @@ describe('Typescript usage suite', () => {
const isConnected = await checkConnection(anotherPeer);

// assert
expect(isConnected).toBeTruthy;
expect(isConnected).toBeTruthy();
});

it('peerid as peer id', async () => {
Expand All @@ -225,7 +225,7 @@ describe('Typescript usage suite', () => {
const isConnected = await checkConnection(anotherPeer);

// assert
expect(isConnected).toBeTruthy;
expect(isConnected).toBeTruthy();
});

it('peerid as seed', async () => {
Expand All @@ -237,7 +237,7 @@ describe('Typescript usage suite', () => {
const isConnected = await checkConnection(anotherPeer);

// assert
expect(isConnected).toBeTruthy;
expect(isConnected).toBeTruthy();
});

it('With connection options: dialTimeout', async () => {
Expand All @@ -249,7 +249,7 @@ describe('Typescript usage suite', () => {
const isConnected = await checkConnection(anotherPeer);

// assert
expect(isConnected).toBeTruthy;
expect(isConnected).toBeTruthy();
});

it('With connection options: skipCheckConnection', async () => {
Expand All @@ -261,7 +261,7 @@ describe('Typescript usage suite', () => {
const isConnected = await checkConnection(anotherPeer);

// assert
expect(isConnected).toBeTruthy;
expect(isConnected).toBeTruthy();
});

it('With connection options: checkConnectionTTL', async () => {
Expand All @@ -273,7 +273,19 @@ describe('Typescript usage suite', () => {
const isConnected = await checkConnection(anotherPeer);

// assert
expect(isConnected).toBeTruthy;
expect(isConnected).toBeTruthy();
});

it('With connection options: defaultTTL', async () => {
// arrange
const addr = nodes[0];

// act
await anotherPeer.start({ connectTo: addr, defaultTtlMs: 1 });
const isConnected = await checkConnection(anotherPeer);

// assert
expect(isConnected).toBeFalsy();
});
});

Expand All @@ -297,9 +309,6 @@ describe('Typescript usage suite', () => {
resolve(res);
},
},
// op: {
// identity: (req) => {},
// },
_timeout: reject,
});

Expand Down
53 changes: 34 additions & 19 deletions src/internal/FluencePeer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,14 @@ export interface PeerConfig {
* The dialing timeout in milliseconds
*/
dialTimeoutMs?: number;

/**
* Sets the default TTL for all particles originating from the peer with no TTL specified.
* If the originating particle's TTL is defined then that value will be used
* If the option is not set default TTL will be 7000
* Value 0 (zero) is treated as if the option was not set
*/
defaultTtlMs?: number;
}

/**
Expand Down Expand Up @@ -169,6 +177,8 @@ export class FluencePeer {
this._keyPair = await KeyPair.randomEd25519();
}

this._defaultTTL = config?.defaultTtlMs || DEFAULT_TTL;

this._interpreter = await createInterpreter(config?.avmLogLevel || 'off');

if (config?.connectTo) {
Expand Down Expand Up @@ -234,7 +244,7 @@ export class FluencePeer {
}

if (particle.ttl === undefined) {
particle.ttl = DEFAULT_TTL;
particle.ttl = this._defaultTTL;
}

this._incomingParticles.next(particle);
Expand Down Expand Up @@ -333,37 +343,29 @@ export class FluencePeer {

// Internal peer state

private _defaultTTL: number;
private _relayPeerId: PeerIdB58 | null = null;
private _keyPair: KeyPair;
private _connection: FluenceConnection;
private _interpreter: AirInterpreter;
private _timeouts: Array<NodeJS.Timeout> = [];
private _particleQueues = new Map<string, Subject<Particle>>();

private _startParticleProcessing() {
const particleQueues = new Map<string, Subject<Particle>>();

this._incomingParticles
.pipe(
tap((x) => x.logTo('debug', 'particle received:')),
filterExpiredParticles(),
filterExpiredParticles(this._expireParticle.bind(this)),
)
.subscribe((p) => {
let particlesQueue = particleQueues.get(p.id);
let particlesQueue = this._particleQueues.get(p.id);

if (!particlesQueue) {
particlesQueue = this._createParticlesProcessingQueue();
particleQueues.set(p.id, particlesQueue);
this._particleQueues.set(p.id, particlesQueue);

const timeout = setTimeout(() => {
log.debug(`particle ${p.id} has expired. Deleting particle-related queues and handlers`);

particleQueues.delete(p.id);
const timeoutHandler = this._timeoutHandlers.get(p.id);
if (timeoutHandler) {
timeoutHandler();
}
this._particleSpecificHandlers.delete(p.id);
this._timeoutHandlers.delete(p.id);
this._expireParticle(p.id);
}, p.actualTtl());

this._timeouts.push(timeout);
Expand All @@ -377,14 +379,26 @@ export class FluencePeer {
});
}

private _expireParticle(particleId: string) {
log.debug(`particle ${particleId} has expired. Deleting particle-related queues and handlers`);

this._particleQueues.delete(particleId);
const timeoutHandler = this._timeoutHandlers.get(particleId);
if (timeoutHandler) {
timeoutHandler();
}
this._particleSpecificHandlers.delete(particleId);
this._timeoutHandlers.delete(particleId);
}

private _createParticlesProcessingQueue() {
let particlesQueue = new Subject<Particle>();
let prevData: Uint8Array = Buffer.from([]);

particlesQueue
.pipe(
// force new line
filterExpiredParticles(),
filterExpiredParticles(this._expireParticle.bind(this)),
)
.subscribe((x) => {
const result = runInterpreter(this.getStatus().peerId, this._interpreter, x, prevData);
Expand Down Expand Up @@ -510,6 +524,7 @@ export class FluencePeer {
for (let item of this._timeouts) {
clearTimeout(item);
}
this._particleQueues.clear();
}

/**
Expand Down Expand Up @@ -569,11 +584,11 @@ function runInterpreter(
return interpreterResult;
}

function filterExpiredParticles() {
function filterExpiredParticles(onParticleExpiration: (particleId: string) => void) {
return pipe(
tap((p: Particle) => {
if (p.hasExpired) {
log.debug(`particle ${p.id} has expired`);
if (p.hasExpired()) {
onParticleExpiration(p.id);
}
}),
filter((x: Particle) => !x.hasExpired()),
Expand Down
4 changes: 1 addition & 3 deletions src/internal/Particle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ import log from 'loglevel';
import { ParticleContext } from './commonTypes';
import { dataToString } from './utils';

const DefaultTTL = 7000;

export class Particle {
id: string;
initPeerId: string;
Expand All @@ -37,7 +35,7 @@ export class Particle {
const res = new Particle();
res.id = genUUID();
res.script = script;
res.ttl = ttlMs || DefaultTTL;
res.ttl = ttlMs;
res.data = Buffer.from([]);
res.timestamp = Date.now();

Expand Down
8 changes: 6 additions & 2 deletions src/internal/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ export const MakeServiceCall = (fn: (args: any[]) => CallServiceResultType) => {
};

/**
* Checks the network connection by sending a ping-like request to relat node
* Checks the network connection by sending a ping-like request to relay node
* @param { FluenceClient } peer - The Fluence Client instance.
*/
export const checkConnection = async (peer: FluencePeer, ttl?: number): Promise<boolean> => {
Expand Down Expand Up @@ -127,11 +127,15 @@ export const checkConnection = async (peer: FluencePeer, ttl?: number): Promise<
}),
);

peer.internals.regHandler.timeout(particle.id, () => {
reject('particle timed out');
});

peer.internals.initiateParticle(particle);
});

try {
const [result] = await promise;
const result = await promise;
if (result != msg) {
log.warn("unexpected behavior. 'identity' must return the passed arguments.");
}
Expand Down