Skip to content
This repository was archived by the owner on Jul 10, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
bf1e4b4
allow multiple clients on the same browser instance
coder11 Jan 4, 2021
2a68ed9
test for multiple clients at the same time
coder11 Jan 5, 2021
7d0c9c5
Merge branch 'multiple-clients' into big-refactoring
coder11 Jan 5, 2021
ae43a03
Split particle queue processing from particle handling logic. Hide im…
coder11 Jan 6, 2021
7e3ceb2
Merge remote-tracking branch 'origin/master' into big-refactoring
coder11 Jan 6, 2021
239280c
FluenceClient implementation
coder11 Jan 7, 2021
01d63d7
Fix tests (WIP)
coder11 Jan 7, 2021
52044ff
Fix tests (contd.)
coder11 Jan 7, 2021
8f8388d
Fix gitignore, remove unused scripts
coder11 Jan 8, 2021
d551a13
fix particle removal for fetch requests
coder11 Jan 8, 2021
0eeba71
trying to fix exports
coder11 Jan 8, 2021
cf8474b
trying to fix exports contd.
coder11 Jan 8, 2021
be3cca6
trying to fix exports contd. 2
coder11 Jan 8, 2021
92ee65f
trying to fix exports contd. 3
coder11 Jan 8, 2021
48352dd
trying to fix exports contd. 4
coder11 Jan 8, 2021
c84a3ba
fix entrypoint
coder11 Jan 8, 2021
f180de1
brought back magical data injection into particles
coder11 Jan 8, 2021
369be6c
fix tests
coder11 Jan 9, 2021
76b4e9d
Add more convenience to writing scripts for fetch and fire and forget…
coder11 Jan 9, 2021
0826413
Tests for new clients methods
coder11 Jan 9, 2021
5ff257d
Fix copy-paste issue
coder11 Jan 11, 2021
a5e7d56
more test fixes
coder11 Jan 11, 2021
ac2377b
Additional test for event registration
coder11 Jan 11, 2021
8ea7afe
hoping to fix package
coder11 Jan 11, 2021
649ba85
hoping to fix package contd.
coder11 Jan 11, 2021
974d73d
Minor typings adjustments
coder11 Jan 13, 2021
f12fee1
updated public api
coder11 Jan 14, 2021
9915323
Add method for fetch calls to api.ts
coder11 Jan 14, 2021
3602f5e
allow get sendParticleAsFetch to use arbitrary air
coder11 Jan 14, 2021
a1510e5
exposing peer ids as PeerIdB58 type
coder11 Jan 17, 2021
731a6a2
fix tests
coder11 Jan 19, 2021
cf0cee1
comment out integrtion test
coder11 Jan 19, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,5 @@ bundle/

# Dependency directories
node_modules/
jspm_packages/
jspm_packages/
/dist/
13 changes: 5 additions & 8 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,12 @@
"name": "@fluencelabs/fluence",
"version": "0.8.0",
"description": "JS SDK for the Fluence network",
"main": "./dist/fluence.js",
"typings": "./dist/fluence.d.ts",
"main": "./dist/index.js",
"typings": "./dist/index.d.ts",
"scripts": {
"test": "mocha --timeout 10000 -r esm -r ts-node/register src/**/*.spec.ts",
"test-ts": "ts-mocha --timeout 10000 -r esm -p tsconfig.json src/**/*.spec.ts",
"package:build": "NODE_ENV=production webpack && npm run package",
"package": "tsc && rsync -r src/aqua/*.js dist/aqua",
"start": "webpack-dev-server -p",
"build": "webpack --mode production"
"build": "tsc && rsync -r src/internal/aqua/*.js dist/internal/aqua",
"build:webpack": "webpack --mode production"
},
"repository": "https://github.com/fluencelabs/fluence-js",
"author": "Fluence Labs",
Expand Down Expand Up @@ -48,7 +45,7 @@
"text-encoding": "^0.7.0",
"ts-loader": "7.0.5",
"ts-mocha": "8.0.0",
"typescript": "3.9.5",
"typescript": "^3.9.5",
"webpack": "4.43.0",
"webpack-cli": "3.3.11",
"webpack-dev-server": "3.11.0"
Expand Down
252 changes: 252 additions & 0 deletions src/FluenceClient.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,252 @@
/*
* Copyright 2020 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import log from 'loglevel';
import PeerId from 'peer-id';
import { SecurityTetraplet, StepperOutcome } from './internal/commonTypes';
import { FluenceClientBase } from './internal/FluenceClientBase';
import { build, genUUID, ParticleDto } from './internal/particle';
import { ParticleProcessor } from './internal/ParticleProcessor';
import { ParticleProcessorStrategy } from './internal/ParticleProcessorStrategy';

const fetchCallbackServiceName = '__callback';
const selfRelayVarName = '__relay';

const wrapRelayBasedCall = (script: string) => {
return `
(seq
(call ${selfRelayVarName} ("op" "identity") [])
${script}
)
`;
};

const wrapFetchCall = (script: string, particleId: string, resultArgNames: string[]) => {
// TODO: sanitize
const resultTogether = resultArgNames.join(' ');
let res = `
(seq
${script}
(seq
(call ${selfRelayVarName} ("op" "identity") [])
(call %init_peer_id% ("${fetchCallbackServiceName}" "${particleId}") [${resultTogether}])
)
)`;
return wrapRelayBasedCall(res);
};

export interface FluenceEvent {
type: string;
args: any[];
}

export type FluenceEventHandler = (event: FluenceEvent) => void;

export class FluenceClient extends FluenceClientBase {
private eventSubscribers: Map<string, FluenceEventHandler[]> = new Map();
private eventValidators: Map<string, Function> = new Map();
private callbacks: Map<string, Function> = new Map();
private fetchParticles: Map<string, { resolve: Function; reject: Function }> = new Map();

constructor(selfPeerId: PeerId) {
super(selfPeerId);
this.processor = new ParticleProcessor(this.strategy, selfPeerId);
}

async fetch<T>(script: string, resultArgNames: string[], data?: Map<string, any>, ttl?: number): Promise<T> {
data = this.addRelayToArgs(data);
const callBackId = genUUID();
script = wrapFetchCall(script, callBackId, resultArgNames);
const particle = await build(this.selfPeerIdFull, script, data, ttl, callBackId);

return new Promise<T>((resolve, reject) => {
this.fetchParticles.set(callBackId, { resolve, reject });
this.processor.executeLocalParticle(particle);
});
}

// TODO:: better naming probably?
async fireAndForget(script: string, data?: Map<string, any>, ttl?: number) {
data = this.addRelayToArgs(data);
script = wrapRelayBasedCall(script);

await this.sendScript(script, data, ttl);
}

registerEvent(
channel: string,
eventName: string,
validate?: (channel: string, eventName: string, args: any[], tetraplets: any[][]) => boolean,
) {
if (!validate) {
validate = (c, e, a, t) => true;
}

this.eventValidators.set(`${channel}/${eventName}`, validate);
}

unregisterEvent(channel: string, eventName: string) {
this.eventValidators.delete(`${channel}/${eventName}`);
}

registerCallback(
serviceId: string,
fnName: string,
callback: (args: any[], tetraplets: SecurityTetraplet[][]) => object,
) {
this.callbacks.set(`${serviceId}/${fnName}`, callback);
}

unregisterCallback(channel: string, eventName: string) {
this.eventValidators.delete(`${channel}/${eventName}`);
}

subscribe(channel: string, handler: FluenceEventHandler) {
if (!this.eventSubscribers.get(channel)) {
this.eventSubscribers.set(channel, []);
}

this.eventSubscribers.get(channel).push(handler);
}

protected strategy: ParticleProcessorStrategy = {
particleHandler: (serviceId: string, fnName: string, args: any[], tetraplets: SecurityTetraplet[][]) => {
// missing built-in op
if (serviceId === 'op' && fnName === 'identity') {
return {
ret_code: 0,
result: JSON.stringify(args),
};
}

// async fetch model handling
if (serviceId === fetchCallbackServiceName) {
const executingParticlePromiseFns = this.fetchParticles.get(fnName);
if (executingParticlePromiseFns) {
// don't block
setImmediate(() => {
this.fetchParticles.delete(fnName);
executingParticlePromiseFns.resolve(args);
});
}

return {
ret_code: 0,
result: JSON.stringify({}),
};
}

// event model handling
const eventPair = `${serviceId}/${fnName}`;
const eventValidator = this.eventValidators.get(eventPair);
if (eventValidator) {
try {
if (!eventValidator(serviceId, fnName, args, tetraplets)) {
return {
ret_code: 1, // TODO:: error codes
result: 'validation failed',
};
}
} catch (e) {
log.error('error running validation function: ' + e);
return {
ret_code: 1, // TODO:: error codes
result: 'validation failed',
};
}

// don't block
setImmediate(() => {
this.pushEvent(serviceId, {
type: fnName,
args: args,
});
});

return {
ret_code: 0,
result: JSON.stringify({}),
};
}

// callback model handling
const callback = this.callbacks.get(eventPair);
if (callback) {
try {
const res = callback(args, tetraplets);
return {
ret_code: 0,
result: JSON.stringify(res),
};
} catch (e) {
return {
ret_code: 1, // TODO:: error codes
result: JSON.stringify(e),
};
}
}

return {
ret_code: 1,
result: `Error. There is no service: ${serviceId}`,
};
},

sendParticleFurther: async (particle: ParticleDto) => {
try {
await this.connection.sendParticle(particle);
} catch (reason) {
log.error(`Error on sending particle with id ${particle.id}: ${reason}`);
}
},

onParticleTimeout: (particle: ParticleDto, now: number) => {
log.info(`Particle expired. Now: ${now}, ttl: ${particle.ttl}, ts: ${particle.timestamp}`);
const executingParticle = this.fetchParticles.get(particle.id);
if (executingParticle) {
executingParticle.reject(new Error(`particle ${particle.id} timed out`));
}
},
onLocalParticleRecieved: (particle: ParticleDto) => {},
onExternalParticleRecieved: (particle: ParticleDto) => {},
onStepperExecuting: (particle: ParticleDto) => {},
onStepperExecuted: (stepperOutcome: StepperOutcome) => {
log.info('inner interpreter outcome:');
log.info(stepperOutcome);
},
};

private pushEvent(channel: string, event: FluenceEvent) {
const subs = this.eventSubscribers.get(channel);
if (subs) {
for (let sub of subs) {
sub(event);
}
}
}

private addRelayToArgs(data: Map<string, any>) {
if (data === undefined) {
data = new Map();
}

if (!data.has(selfRelayVarName)) {
data.set(selfRelayVarName, this.relayPeerId);
}

return data;
}
}
Loading