Skip to content
This repository was archived by the owner on Jul 10, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ on:
nox-image:
description: "nox image tag"
type: string
default: "fluencelabs/nox:unstable_minimal"
default: "fluencelabs/nox:minimal_0.2.5"
avm-version:
description: "@fluencelabs/avm version"
type: string
Expand Down
6 changes: 6 additions & 0 deletions packages/@tests/aqua/_aqua/finalize_particle.aqua
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
import "@fluencelabs/aqua-lib/builtin.aqua"
export test

func test():
on HOST_PEER_ID:
Op.noop()
78 changes: 78 additions & 0 deletions packages/@tests/aqua/src/_aqua/finalize_particle.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/* eslint-disable */
// @ts-nocheck
/**
*
* This file is auto-generated. Do not edit manually: changes may be erased.
* Generated by Aqua compiler: https://github.com/fluencelabs/aqua/.
* If you find any bugs, please write an issue on GitHub: https://github.com/fluencelabs/aqua/issues
* Aqua version: 0.12.0
*
*/
import type { IFluenceClient as IFluenceClient$$, CallParams as CallParams$$ } from '@fluencelabs/js-client';
import {
v5_callFunction as callFunction$$,
v5_registerService as registerService$$,
} from '@fluencelabs/js-client';



// Services

// Functions
export const test_script = `
(seq
(call %init_peer_id% ("getDataSrv" "-relay-") [] -relay-)
(xor
(xor
(call -relay- ("op" "noop") [])
(fail %last_error%)
)
(call %init_peer_id% ("errorHandlingSrv" "error") [%last_error% 0])
)
)
`


export function test(
config?: {ttl?: number}
): Promise<void>;

export function test(
peer: IFluenceClient$$,
config?: {ttl?: number}
): Promise<void>;

export function test(...args: any) {


return callFunction$$(
args,
{
"functionName" : "test",
"arrow" : {
"tag" : "arrow",
"domain" : {
"tag" : "labeledProduct",
"fields" : {

}
},
"codomain" : {
"tag" : "nil"
}
},
"names" : {
"relay" : "-relay-",
"getDataSrv" : "getDataSrv",
"callbackSrv" : "callbackSrv",
"responseSrv" : "callbackSrv",
"responseFnName" : "response",
"errorHandlingSrv" : "errorHandlingSrv",
"errorFnName" : "error"
}
},
test_script
)
}

/* eslint-enable */
408 changes: 204 additions & 204 deletions packages/@tests/aqua/src/_aqua/smoke_test.ts

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions packages/@tests/aqua/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { fromByteArray } from 'base64-js';
import { Fluence } from '@fluencelabs/js-client';
import type { ClientConfig } from '@fluencelabs/js-client';
import { registerHelloWorld, helloTest, marineTest, resourceTest } from './_aqua/smoke_test.js';
import { test as particleTest } from './_aqua/finalize_particle.js';
import { wasm } from './wasmb64.js';

const relay = {
Expand Down Expand Up @@ -67,6 +68,10 @@ export const runTest = async (): Promise<TestResult> => {

console.log('running marine test...');
const marine = await marineTest(wasm);

console.log('running particle test...');
await particleTest();

console.log('marine test finished, result: ', marine);

const returnVal = {
Expand Down
3 changes: 2 additions & 1 deletion packages/@tests/aqua/tsconfig.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
{
"extends": "../../../tsconfig.json",
"compilerOptions": {
"outDir": "./dist"
"outDir": "./dist",
"module": "NodeNext"
},
"exclude": ["node_modules", "dist"]
}
2 changes: 1 addition & 1 deletion packages/core/js-client/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@
"license": "Apache-2.0",
"dependencies": {
"@chainsafe/libp2p-noise": "13.0.0",
"@chainsafe/libp2p-yamux": "5.0.0",
"@fluencelabs/interfaces": "workspace:*",
"@libp2p/crypto": "2.0.3",
"@libp2p/interface": "0.1.2",
"@libp2p/mplex": "9.0.4",
"@libp2p/peer-id": "3.0.2",
"@libp2p/peer-id-factory": "3.0.3",
"@libp2p/websockets": "7.0.4",
Expand Down
13 changes: 2 additions & 11 deletions packages/core/js-client/src/clientPeer/ClientPeer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,20 +55,13 @@ export const makeClientPeerConfig = async (
};

export class ClientPeer extends FluencePeer implements IFluenceClient {
private relayPeerId: PeerIdB58;
private relayConnection: RelayConnection;

constructor(
peerConfig: PeerConfig,
relayConfig: RelayConnectionConfig,
keyPair: KeyPair,
marine: IMarineHost,
) {
const relayConnection = new RelayConnection(relayConfig);

super(peerConfig, keyPair, marine, new JsServiceHost(), relayConnection);
this.relayPeerId = relayConnection.getRelayPeerId();
this.relayConnection = relayConnection;
super(peerConfig, keyPair, marine, new JsServiceHost(), new RelayConnection(relayConfig));
}

getPeerId(): string {
Expand All @@ -83,7 +76,7 @@ export class ClientPeer extends FluencePeer implements IFluenceClient {
connectionStateChangeHandler: (state: ConnectionState) => void = () => {};

getRelayPeerId(): string {
return this.relayPeerId;
return this.internals.getRelayPeerId();
}

onConnectionStateChange(handler: (state: ConnectionState) => void): ConnectionState {
Expand Down Expand Up @@ -115,7 +108,6 @@ export class ClientPeer extends FluencePeer implements IFluenceClient {
log.trace('connecting to Fluence network');
this.changeConnectionState('connecting');
await super.start();
await this.relayConnection.start();
// TODO: check connection (`checkConnection` function) here
this.changeConnectionState('connected');
log.trace('connected');
Expand All @@ -124,7 +116,6 @@ export class ClientPeer extends FluencePeer implements IFluenceClient {
async stop(): Promise<void> {
log.trace('disconnecting from Fluence network');
this.changeConnectionState('disconnecting');
await this.relayConnection.stop();
await super.stop();
this.changeConnectionState('disconnected');
log.trace('disconnected');
Expand Down
24 changes: 17 additions & 7 deletions packages/core/js-client/src/connection/RelayConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import type { PeerId } from '@libp2p/interface/peer-id';
import { createLibp2p, Libp2p } from 'libp2p';

import { noise } from '@chainsafe/libp2p-noise';
import { mplex } from '@libp2p/mplex';
import { yamux } from '@chainsafe/libp2p-yamux';
import { webSockets } from '@libp2p/websockets';
import { all } from '@libp2p/websockets/filters';
import { multiaddr } from '@multiformats/multiaddr';
Expand All @@ -36,7 +36,8 @@ import { throwIfHasNoPeerId } from '../util/libp2pUtils.js';
import { IConnection } from './interfaces.js';
import { IParticle } from '../particle/interfaces.js';
import { Particle, serializeToString } from '../particle/Particle.js';
import { IStartable } from '../util/commonTypes.js';
import { identifyService } from 'libp2p/identify';
import { pingService } from 'libp2p/ping';

const log = logger('connection');

Expand Down Expand Up @@ -77,7 +78,7 @@ export interface RelayConnectionConfig {
/**
* Implementation for JS peers which connects to Fluence through relay node
*/
export class RelayConnection implements IStartable, IConnection {
export class RelayConnection implements IConnection {
private relayAddress: Multiaddr;
private lib2p2Peer: Libp2p | null = null;

Expand Down Expand Up @@ -110,14 +111,20 @@ export class RelayConnection implements IStartable, IConnection {
filter: all,
}),
],
streamMuxers: [mplex()],
streamMuxers: [yamux()],
connectionEncryption: [noise()],
connectionManager: {
dialTimeout: this.config.dialTimeoutMs,
},
connectionGater: {
// By default, this function forbids connections to private peers. For example multiaddr with ip 127.0.0.1 isn't allowed
denyDialMultiaddr: () => Promise.resolve(false)
},
services: {
identify: identifyService({
runOnConnectionOpen: false,
}),
ping: pingService()
}
});

Expand Down Expand Up @@ -158,23 +165,25 @@ export class RelayConnection implements IStartable, IConnection {
const sink = this._connection.streams[0].sink;
*/

log.trace('sending particle...');
const stream = await this.lib2p2Peer.dialProtocol(this.relayAddress, PROTOCOL_NAME);
log.trace('created stream with id ', stream.id);
const sink = stream.sink;

pipe(
await pipe(
[fromString(serializeToString(particle))],
// @ts-ignore
encode(),
sink,
);
log.trace('data written to sink');
}

private async connect() {
if (this.lib2p2Peer === null) {
throw new Error('Relay connection is not started');
}

this.lib2p2Peer.handle(
await this.lib2p2Peer.handle(
[PROTOCOL_NAME],
async ({ connection, stream }) => {
pipe(
Expand All @@ -188,6 +197,7 @@ export class RelayConnection implements IStartable, IConnection {
for await (const msg of source) {
try {
const particle = Particle.fromString(msg);
log.trace('got particle from stream with id %s and particle id %s', stream.id, particle.id);
this.particleSource.next(particle);
} catch (e) {
log.error('error on handling a new incoming message: %j', e);
Expand Down
3 changes: 2 additions & 1 deletion packages/core/js-client/src/connection/interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@
import type { PeerIdB58 } from '@fluencelabs/interfaces';
import type { Subscribable } from 'rxjs';
import { IParticle } from '../particle/interfaces.js';
import { IStartable } from '../util/commonTypes.js';

/**
* Interface for connection used in Fluence Peer.
*/
export interface IConnection {
export interface IConnection extends IStartable {
/**
* Observable that emits particles received from the connection.
*/
Expand Down
10 changes: 9 additions & 1 deletion packages/core/js-client/src/ephemeral/network.ts
Original file line number Diff line number Diff line change
Expand Up @@ -129,13 +129,21 @@ export interface IEphemeralConnection extends IConnection {
receiveParticle(particle: Particle): void;
}

export class EphemeralConnection implements IConnection, IEphemeralConnection {
export class EphemeralConnection implements IEphemeralConnection {
readonly selfPeerId: PeerIdB58;
readonly connections: Map<PeerIdB58, IEphemeralConnection> = new Map();

constructor(selfPeerId: PeerIdB58) {
this.selfPeerId = selfPeerId;
}

start(): Promise<void> {
return Promise.resolve();
}

stop(): Promise<void> {
return Promise.resolve();
}

connectToOther(other: IEphemeralConnection) {
if (other.selfPeerId === this.selfPeerId) {
Expand Down
Loading