Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(avm): update to avm 0.39.1 #1627

Merged
merged 9 commits into from
Jun 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 36 additions & 21 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,8 @@ marine-utils = "0.5.0"
marine-it-parser = "0.12.1"

# avm
avm-server = "0.31.0"
air-interpreter-wasm = "=0.38.0"
avm-server = { version = "=0.39.1-feat-VM-276-aquavm-keypair-775a729-1587-1.0", registry = "fluence" }
air-interpreter-wasm = { version = "=0.39.1-feat-VM-276-aquavm-keypair-775a729-1587-1.0", registry = "fluence" }

# libp2p
libp2p = { version = "0.51.3", features = ["noise", "tcp", "dns", "websocket", "yamux", "mplex", "tokio", "kad", "ping", "identify", "macros"] }
Expand Down
75 changes: 52 additions & 23 deletions aquamarine/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ use std::{
task::{Context, Poll, Waker},
};

use avm_server::CallResults;
use fluence_keypair::KeyPair;
use futures::future::BoxFuture;
use futures::FutureExt;

use fluence_libp2p::PeerId;
Expand All @@ -28,14 +29,19 @@ use particle_protocol::Particle;

use crate::deadline::Deadline;
use crate::particle_effects::RoutingEffects;
use crate::particle_executor::{Fut, FutResult, ParticleExecutor};
use crate::particle_executor::{FutResult, ParticleExecutor};
use crate::particle_functions::{Functions, SingleCallStat};
use crate::{AquaRuntime, InterpretationStats};
use crate::{AquaRuntime, InterpretationStats, ParticleEffects};

struct Reusables<RT> {
vm_id: usize,
vm: Option<RT>,
}

pub struct Actor<RT, F> {
/// Particle of that actor is expired after that deadline
deadline: Deadline,
future: Option<(usize, Fut<RT>)>,
future: Option<BoxFuture<'static, (Reusables<RT>, ParticleEffects, InterpretationStats)>>,
mailbox: VecDeque<Particle>,
waker: Option<Waker>,
functions: Functions<F>,
Expand All @@ -46,14 +52,20 @@ pub struct Actor<RT, F> {
/// Particles and call results will be processed in the security scope of this peer id
/// It's either `host_peer_id` or local worker peer id
current_peer_id: PeerId,
key_pair: KeyPair,
}

impl<RT, F> Actor<RT, F>
where
RT: AquaRuntime + ParticleExecutor<Particle = (Particle, CallResults), Future = Fut<RT>>,
RT: AquaRuntime,
F: ParticleFunctionStatic,
{
pub fn new(particle: &Particle, functions: Functions<F>, current_peer_id: PeerId) -> Self {
pub fn new(
particle: &Particle,
functions: Functions<F>,
current_peer_id: PeerId,
key_pair: KeyPair,
) -> Self {
Self {
deadline: Deadline::from(particle),
functions,
Expand All @@ -71,6 +83,7 @@ where
data: vec![],
},
current_peer_id,
key_pair,
}
}

Expand Down Expand Up @@ -110,33 +123,35 @@ where
pub fn poll_completed(
&mut self,
cx: &mut Context<'_>,
) -> Poll<FutResult<(usize, RT), RoutingEffects, InterpretationStats>> {
) -> Poll<FutResult<(usize, Option<RT>), RoutingEffects, InterpretationStats>> {
use Poll::Ready;

self.waker = Some(cx.waker().clone());

self.functions.poll(cx);

// Poll AquaVM future
if let Some((vm_id, Ready(r))) = self.future.as_mut().map(|(i, f)| (*i, f.poll_unpin(cx))) {
if let Some(Ready((reusables, effects, stats))) =
self.future.as_mut().map(|f| f.poll_unpin(cx))
{
self.future.take();

let waker = cx.waker().clone();
// Schedule execution of functions
self.functions.execute(
r.effects.particle.id.clone(),
r.effects.call_requests,
waker,
);
self.functions
.execute(self.particle.id.clone(), effects.call_requests, waker);

let effects = RoutingEffects {
particle: r.effects.particle,
next_peers: r.effects.next_peers,
particle: Particle {
data: effects.new_data,
..self.particle.clone()
},
next_peers: effects.next_peers,
};
return Poll::Ready(FutResult {
vm: r.vm.map(|vm| (vm_id, vm)),
return Ready(FutResult {
runtime: (reusables.vm_id, reusables.vm),
effects,
stats: r.stats,
stats,
});
}

Expand Down Expand Up @@ -175,12 +190,26 @@ where
self.particle.clone()
});
let waker = cx.waker().clone();
// TODO: add timeout for execution https://github.com/fluencelabs/fluence/issues/1212
// Take ownership of vm to process particle
self.future = Some((
vm_id,
vm.execute((particle, calls), waker, self.current_peer_id),
));
let peer_id = self.current_peer_id;
// TODO: get rid of this clone by recovering key_pair after `vm.execute` (not trivial to implement)
let key_pair = self.key_pair.clone();
// TODO: add timeout for execution https://github.com/fluencelabs/fluence/issues/1212
self.future = Some(
async move {
let res = vm
.execute((particle, calls), waker, peer_id, key_pair)
.await;

let reusables = Reusables {
vm_id,
vm: res.runtime,
};
(reusables, res.effects, res.stats)
}
.boxed(),
);
self.wake();

ActorPoll::Executing(stats)
}
Expand Down
Loading
Loading