Skip to content
This repository was archived by the owner on Jul 10, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/__test__/integration/builtins.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ describe('Builtins usage suite', () => {

let base64 = 'MjNy';

await uploadModule(client, 'test_broken_module', base64, config);
await uploadModule(client, 'test_broken_module', base64, config, 10000);
});

it('add_blueprint', async function () {
Expand Down Expand Up @@ -101,9 +101,9 @@ describe('Builtins usage suite', () => {
let buf = Buffer.from(key);

let r = Math.random().toString(36).substring(7);
await addProvider(client, buf, dev2peerId, r);
await addProvider(client, buf, dev2peerId, r, undefined, 10000);

let pr = await getProviders(client, buf);
let pr = await getProviders(client, buf, undefined, 10000);
console.log(pr);
console.log(r);
expect(r).toEqual(pr[0][0].service_id);
Expand Down
24 changes: 24 additions & 0 deletions src/__test__/unit/air.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,30 @@ describe('== AIR suite', () => {
expect(res).toEqual(arg);
});

it('call broken script', async function () {
const client = await createLocalClient();

const script = `(htyth)`;

await expect(client.sendScript(script)).rejects.toContain("aqua script can't be parsed");
});

it('call script without ttl', async function () {
const client = await createLocalClient();

const script = `(call %init_peer_id% ("" "") [""])`;

await expect(client.sendScript(script, undefined, 1)).rejects.toContain("Particle expired");
});

it.skip('call broken script by fetch', async function () {
const client = await createLocalClient();

const script = `(htyth)`;

await expect(client.fetch(script, ['result'])).rejects.toContain("aqua script can't be parsed");
});

it('check particle arguments', async function () {
// arrange
const serviceId = 'test_service';
Expand Down
2 changes: 1 addition & 1 deletion src/internal/FluenceClientBase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ export abstract class FluenceClientBase {

async sendScript(script: string, data?: Map<string, any>, ttl?: number): Promise<string> {
const particle = await build(this.selfPeerIdFull, script, data, ttl);
this.processor.executeLocalParticle(particle);
await this.processor.executeLocalParticle(particle);
return particle.id;
}
}
5 changes: 3 additions & 2 deletions src/internal/FluenceClientImpl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,11 @@ export class FluenceClientImpl extends FluenceClientBase implements FluenceClien
script = wrapFetchCall(script, callBackId, resultArgNames);
const particle = await build(this.selfPeerIdFull, script, data, ttl, callBackId);

return new Promise<T>((resolve, reject) => {
const prFetch = new Promise<T>(async (resolve, reject) => {
this.fetchParticles.set(callBackId, { resolve, reject });
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we can drop this fetch line completely. Public fetch api works differently (see api.ts file)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's just a little bit of legacy left after FOSDEM preparations

this.processor.executeLocalParticle(particle);
});
const prExec = this.processor.executeLocalParticle(particle);
return prExec.then(() => prFetch);
}

// TODO:: better naming probably?
Expand Down
36 changes: 33 additions & 3 deletions src/internal/ParticleProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,15 @@ export class ParticleProcessor {

async executeLocalParticle(particle: ParticleDto) {
this.strategy?.onLocalParticleRecieved(particle);
await this.handleParticle(particle).catch((err) => {
log.error('particle processing failed: ' + err);
return new Promise((resolve, reject) => {
const resolveCallback = function () {
resolve()
}
const rejectCallback = function (err: any) {
reject(err)
}
// we check by callbacks that the script passed through the interpreter without errors
this.handleParticle(particle, resolveCallback, rejectCallback)
});
}

Expand Down Expand Up @@ -143,8 +150,10 @@ export class ParticleProcessor {

/**
* Pass a particle to a interpreter and send a result to other services.
* `resolve` will be completed if ret_code equals 0
* `reject` will be completed if ret_code not equals 0
*/
private async handleParticle(particle: ParticleDto): Promise<void> {
private async handleParticle(particle: ParticleDto, resolve?: () => void, reject?: (r: any) => any): Promise<void> {
// if a current particle is processing, add new particle to the queue
if (this.getCurrentParticleId() !== undefined && this.getCurrentParticleId() !== particle.id) {
this.enqueueParticle(particle);
Expand All @@ -160,6 +169,7 @@ export class ParticleProcessor {
let actualTtl = particle.timestamp + particle.ttl - now;
if (actualTtl <= 0) {
this.strategy?.onParticleTimeout(particle, now);
if (reject) reject(`Particle expired. Now: ${now}, ttl: ${particle.ttl}, ts: ${particle.timestamp}`)
} else {
// if there is no subscription yet, previous data is empty
let prevData: Uint8Array = Buffer.from([]);
Expand Down Expand Up @@ -191,6 +201,26 @@ export class ParticleProcessor {
if (stepperOutcome.next_peer_pks.length > 0) {
this.strategy.sendParticleFurther(newParticle);
}

if (stepperOutcome.ret_code == 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can probably write something like this stepperOutcome.ret_code == 0 && resolve

if (resolve) {
resolve()
}
} else {
const error = stepperOutcome.error_message;
if (reject) {
reject(error);
} else {
log.error("Unhandled error: ", error);
}
}
}
} catch (e) {
if (reject) {
reject(e);
} else {
log.error("Unhandled error: ", e)
throw e;
}
} finally {
// get last particle from the queue
Expand Down
1 change: 1 addition & 0 deletions src/internal/commonTypes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ export interface StepperOutcome {
ret_code: number;
data: Uint8Array;
next_peer_pks: string[];
error_message: string;
}

export interface ResolvedTriplet {
Expand Down