Skip to content
This repository was archived by the owner on Jul 10, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,6 @@ jobs:
"@fluencelabs/marine-js": "${{ inputs.marine-js-version }}"
}

- uses: browser-actions/setup-chrome@v1

- run: pnpm -r --no-frozen-lockfile i
- run: pnpm -r build
- run: pnpm -r test
Expand Down
3 changes: 2 additions & 1 deletion .npmrc
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
auto-install-peers=true
save-exact=true
save-exact=true
side-effects-cache=false
16 changes: 8 additions & 8 deletions packages/core/js-client/src/clientPeer/__test__/client.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@ describe('FluenceClient usage test suite', () => {
await withClient(RELAY, {}, async (peer) => {
// arrange

const result = await new Promise<string[]>((resolve, reject) => {
const script = `
const script = `
(xor
(seq
(call %init_peer_id% ("load" "relay") [] init_relay)
Expand All @@ -26,8 +25,10 @@ describe('FluenceClient usage test suite', () => {
(call %init_peer_id% ("callback" "error") [%last_error%])
)
)`;
const particle = peer.internals.createNewParticle(script);

const particle = await peer.internals.createNewParticle(script);

const result = await new Promise<string>((resolve, reject) => {
if (particle instanceof Error) {
return reject(particle.message);
}
Expand Down Expand Up @@ -92,7 +93,7 @@ describe('FluenceClient usage test suite', () => {
(call "${peer2.getPeerId()}" ("test" "test") ["test"])
)
`;
const particle = peer1.internals.createNewParticle(script);
const particle = await peer1.internals.createNewParticle(script);

if (particle instanceof Error) {
throw particle;
Expand Down Expand Up @@ -149,14 +150,13 @@ describe('FluenceClient usage test suite', () => {

it.skip('Should throw correct error when the client tries to send a particle not to the relay', async () => {
await withClient(RELAY, {}, async (peer) => {
const promise = new Promise((resolve, reject) => {
const script = `
const script = `
(xor
(call "incorrect_peer_id" ("any" "service") [])
(call %init_peer_id% ("callback" "error") [%last_error%])
)`;
const particle = peer.internals.createNewParticle(script);

const particle = await peer.internals.createNewParticle(script);
const promise = new Promise((resolve, reject) => {
if (particle instanceof Error) {
return reject(particle.message);
}
Expand Down
6 changes: 3 additions & 3 deletions packages/core/js-client/src/clientPeer/checkConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@ const log = logger('connection');
export const checkConnection = async (peer: ClientPeer, ttl?: number): Promise<boolean> => {
const msg = Math.random().toString(36).substring(7);

const promise = new Promise<string>((resolve, reject) => {
const script = `
const script = `
(xor
(seq
(call %init_peer_id% ("load" "relay") [] init_relay)
Expand All @@ -46,8 +45,9 @@ export const checkConnection = async (peer: ClientPeer, ttl?: number): Promise<b
(call %init_peer_id% ("callback" "error") [%last_error%])
)
)`;
const particle = peer.internals.createNewParticle(script, ttl);
const particle = await peer.internals.createNewParticle(script, ttl);

const promise = new Promise<string>((resolve, reject) => {
if (particle instanceof Error) {
return reject(particle.message);
}
Expand Down
17 changes: 8 additions & 9 deletions packages/core/js-client/src/compilerSupport/callFunction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,20 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { getArgumentTypes, isReturnTypeVoid, CallAquaFunctionType } from '@fluencelabs/interfaces';
import { CallAquaFunctionType, getArgumentTypes, isReturnTypeVoid } from '@fluencelabs/interfaces';

import {
errorHandlingService,
injectRelayService,
injectValueService,
registerParticleScopeService,
responseService,
errorHandlingService,
ServiceDescription,
userHandlerService,
injectValueService,
} from './services.js';

import { logger } from '../util/logger.js';
import { IParticle } from '../particle/interfaces.js';

const log = logger('aqua');

Expand All @@ -40,13 +41,13 @@ const log = logger('aqua');
* @param args - args in the form of JSON where each key corresponds to the name of the argument
* @returns
*/
export const callAquaFunction: CallAquaFunctionType = ({ def, script, config, peer, args }) => {
export const callAquaFunction: CallAquaFunctionType = async ({ def, script, config, peer, args }) => {
log.trace('calling aqua function %j', { def, script, config, args });
const argumentTypes = getArgumentTypes(def);

const promise = new Promise((resolve, reject) => {
const particle = peer.internals.createNewParticle(script, config?.ttl);
const particle = await peer.internals.createNewParticle(script, config?.ttl);

return new Promise((resolve, reject) => {
if (particle instanceof Error) {
return reject(particle.message);
}
Expand Down Expand Up @@ -92,7 +93,5 @@ export const callAquaFunction: CallAquaFunctionType = ({ def, script, config, pe
);
}
});
});

return promise;
})
};
2 changes: 2 additions & 0 deletions packages/core/js-client/src/compilerSupport/services.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import {
IFluenceInternalApi,
} from '@fluencelabs/interfaces';
import { CallServiceData, GenericCallServiceHandler, ResultCodes } from '../jsServiceHost/interfaces.js';
import { fromUint8Array } from 'js-base64';

export interface ServiceDescription {
serviceId: string;
Expand Down Expand Up @@ -177,6 +178,7 @@ const extractCallParams = (req: CallServiceData, arrow: ArrowWithoutCallbacks):

const callParams = {
...req.particleContext,
signature: fromUint8Array(req.particleContext.signature),
tetraplets,
};

Expand Down
72 changes: 45 additions & 27 deletions packages/core/js-client/src/connection/RelayConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,15 @@
*/
import { PeerIdB58 } from '@fluencelabs/interfaces';
import { pipe } from 'it-pipe';
import { encode, decode } from 'it-length-prefixed';
import { decode, encode } from 'it-length-prefixed';
import type { PeerId } from '@libp2p/interface/peer-id';
import { createLibp2p, Libp2p } from 'libp2p';

import { noise } from '@chainsafe/libp2p-noise';
import { yamux } from '@chainsafe/libp2p-yamux';
import { webSockets } from '@libp2p/websockets';
import { all } from '@libp2p/websockets/filters';
import { multiaddr } from '@multiformats/multiaddr';
import type { Multiaddr } from '@multiformats/multiaddr';
import { multiaddr, type Multiaddr } from '@multiformats/multiaddr';

import map from 'it-map';
import { fromString } from 'uint8arrays/from-string';
Expand All @@ -35,9 +34,13 @@ import { Subject } from 'rxjs';
import { throwIfHasNoPeerId } from '../util/libp2pUtils.js';
import { IConnection } from './interfaces.js';
import { IParticle } from '../particle/interfaces.js';
import { Particle, serializeToString } from '../particle/Particle.js';
import { Particle, serializeToString, verifySignature } from '../particle/Particle.js';
import { identifyService } from 'libp2p/identify';
import { pingService } from 'libp2p/ping';
import { unmarshalPublicKey } from '@libp2p/crypto/keys';
import { peerIdFromString } from '@libp2p/peer-id';
import { Stream } from '@libp2p/interface/connection';
import { KeyPair } from '../keypair/index.js';

const log = logger('connection');

Expand Down Expand Up @@ -170,6 +173,31 @@ export class RelayConnection implements IConnection {
);
log.trace('data written to sink');
}

private async processIncomingMessage(msg: string, stream: Stream) {
let particle: Particle | undefined;
try {
particle = Particle.fromString(msg);
log.trace('got particle from stream with id %s and particle id %s', stream.id, particle.id);
const initPeerId = peerIdFromString(particle.initPeerId);

if (initPeerId.publicKey === undefined) {
log.error('cannot retrieve public key from init_peer_id. particle id: %s. init_peer_id: %s', particle.id, particle.initPeerId);
return;
}

const isVerified = await verifySignature(particle, initPeerId.publicKey);
if (isVerified) {
this.particleSource.next(particle);
} else {
log.trace('particle signature is incorrect. rejecting particle with id: %s', particle.id);
}
} catch (e) {
const particleId = particle?.id;
const particleIdMessage = typeof particleId === 'string' ? `. particle id: ${particleId}` : '';
log.error(`error on handling an incoming message: %O%s`, e, particleIdMessage);
}
}

private async connect() {
if (this.lib2p2Peer === null) {
Expand All @@ -178,30 +206,20 @@ export class RelayConnection implements IConnection {

await this.lib2p2Peer.handle(
[PROTOCOL_NAME],
async ({ connection, stream }) => {
pipe(
stream.source,
// @ts-ignore
decode(),
// @ts-ignore
(source) => map(source, (buf) => toString(buf.subarray())),
async (source) => {
try {
for await (const msg of source) {
try {
const particle = Particle.fromString(msg);
log.trace('got particle from stream with id %s and particle id %s', stream.id, particle.id);
this.particleSource.next(particle);
} catch (e) {
log.error('error on handling a new incoming message: %j', e);
}
}
} catch (e) {
log.error('connection closed: %j', e);
async ({ connection, stream }) => pipe(
stream.source,
decode(),
(source) => map(source, (buf) => toString(buf.subarray())),
async (source) => {
try {
for await (const msg of source) {
await this.processIncomingMessage(msg, stream);
}
},
);
},
} catch (e) {
log.error('connection closed: %j', e);
}
},
),
{
maxInboundStreams: this.config.maxInboundStreams,
maxOutboundStreams: this.config.maxOutboundStreams,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ describe.skip('Ephemeral networks tests', () => {
)
`;

const particle = client.internals.createNewParticle(script);
const particle = await client.internals.createNewParticle(script);

const promise = new Promise<string>((resolve) => {
client.internals.regHandler.forParticle(particle.id, 'test', 'test', (req: CallServiceData) => {
Expand Down
7 changes: 4 additions & 3 deletions packages/core/js-client/src/jsPeer/FluencePeer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ import {
ResultCodes,
} from '../jsServiceHost/interfaces.js';
import { JSONValue } from '../util/commonTypes.js';
import { fromUint8Array } from 'js-base64';

const log_particle = logger('particle');
const log_peer = logger('peer');
Expand Down Expand Up @@ -217,8 +218,8 @@ export abstract class FluencePeer {
}
},

createNewParticle: (script: string, ttl: number = this.config.defaultTtlMs): IParticle => {
return Particle.createNew(script, this.keyPair.getPeerId(), ttl);
createNewParticle: (script: string, ttl: number = this.config.defaultTtlMs): Promise<IParticle> => {
return Particle.createNew(script, this.keyPair.getPeerId(), ttl, this.keyPair);
},

/**
Expand Down Expand Up @@ -317,7 +318,7 @@ export abstract class FluencePeer {
log_particle.trace('id %s. call results: %j', item.particle.id, item.callResults);
}),
filterExpiredParticles(this._expireParticle.bind(this)),
groupBy(item => item.particle.id),
groupBy(item => fromUint8Array(item.particle.signature)),
mergeMap(group$ => {
let prevData: Uint8Array = Buffer.from([]);
let firstRun = true;
Expand Down
35 changes: 18 additions & 17 deletions packages/core/js-client/src/jsPeer/__test__/avm.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,19 @@ import { handleTimeout } from '../../particle/Particle.js';
describe('Basic AVM functionality in Fluence Peer tests', () => {
it('Simple call', async () => {
await withPeer(async (peer) => {
const res = await new Promise<string[]>((resolve, reject) => {
const script = `
const script = `
(call %init_peer_id% ("print" "print") ["1"])
`;
const particle = peer.internals.createNewParticle(script);

const particle = await peer.internals.createNewParticle(script);

const res = await new Promise<string>((resolve, reject) => {
if (particle instanceof Error) {
return reject(particle.message);
}

registerHandlersHelper(peer, particle, {
print: {
print: (args: Array<Array<string>>) => {
print: (args: Array<string>) => {
const [res] = args;
resolve(res);
},
Expand All @@ -33,9 +33,7 @@ describe('Basic AVM functionality in Fluence Peer tests', () => {

it('Par call', async () => {
await withPeer(async (peer) => {
const res = await new Promise<string[]>((resolve, reject) => {
const res: any[] = [];
const script = `
const script = `
(seq
(par
(call %init_peer_id% ("print" "print") ["1"])
Expand All @@ -44,7 +42,10 @@ describe('Basic AVM functionality in Fluence Peer tests', () => {
(call %init_peer_id% ("print" "print") ["2"])
)
`;
const particle = peer.internals.createNewParticle(script);
const particle = await peer.internals.createNewParticle(script);

const res = await new Promise<string[]>((resolve, reject) => {
const res: any[] = [];

if (particle instanceof Error) {
return reject(particle.message);
Expand All @@ -70,8 +71,7 @@ describe('Basic AVM functionality in Fluence Peer tests', () => {

it('Timeout in par call: race', async () => {
await withPeer(async (peer) => {
const res = await new Promise((resolve, reject) => {
const script = `
const script = `
(seq
(call %init_peer_id% ("op" "identity") ["slow_result"] arg)
(seq
Expand All @@ -86,8 +86,9 @@ describe('Basic AVM functionality in Fluence Peer tests', () => {
)
)
`;
const particle = peer.internals.createNewParticle(script);

const particle = await peer.internals.createNewParticle(script);

const res = await new Promise((resolve, reject) => {
if (particle instanceof Error) {
return reject(particle.message);
}
Expand All @@ -109,8 +110,7 @@ describe('Basic AVM functionality in Fluence Peer tests', () => {

it('Timeout in par call: wait', async () => {
await withPeer(async (peer) => {
const res = await new Promise((resolve, reject) => {
const script = `
const script = `
(seq
(call %init_peer_id% ("op" "identity") ["timeout_msg"] arg)
(seq
Expand All @@ -136,8 +136,9 @@ describe('Basic AVM functionality in Fluence Peer tests', () => {
)
)
`;
const particle = peer.internals.createNewParticle(script);

const particle = await peer.internals.createNewParticle(script);

const res = await new Promise((resolve, reject) => {
if (particle instanceof Error) {
return reject(particle.message);
}
Expand Down
Loading