Skip to content

Commit

Permalink
feat(tracing): Add tracing for understand particle processing (#1935)
Browse files Browse the repository at this point in the history
  • Loading branch information
gurinderu committed Dec 15, 2023
1 parent 16215a7 commit c800495
Show file tree
Hide file tree
Showing 26 changed files with 361 additions and 148 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

120 changes: 83 additions & 37 deletions aquamarine/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,19 @@
* limitations under the License.
*/

use futures::future::BoxFuture;
use futures::FutureExt;
use std::sync::Arc;
use std::task::Context;
use std::{
collections::VecDeque,
task::{Poll, Waker},
task::{Context, Poll, Waker},
};
use tracing::{instrument, Instrument, Span};

use fluence_keypair::KeyPair;
use futures::future::BoxFuture;
use futures::FutureExt;
use tracing::{Instrument, Span};

use fluence_libp2p::PeerId;
use particle_execution::{ParticleFunctionStatic, ServiceFunction};
use particle_protocol::Particle;
use particle_protocol::{ExtendedParticle, Particle};

use crate::deadline::Deadline;
use crate::particle_effects::RoutingEffects;
Expand All @@ -42,12 +40,20 @@ struct Reusables<RT> {
}

type AVMCallResult<RT> = FutResult<(usize, Option<RT>), RoutingEffects, InterpretationStats>;
type AVMTask<RT> = BoxFuture<'static, (Reusables<RT>, ParticleEffects, InterpretationStats)>;
type AVMTask<RT> = BoxFuture<
'static,
(
Reusables<RT>,
ParticleEffects,
InterpretationStats,
Arc<Span>,
),
>;
pub struct Actor<RT, F> {
/// Particle of that actor is expired after that deadline
deadline: Deadline,
future: Option<AVMTask<RT>>,
mailbox: VecDeque<Particle>,
mailbox: VecDeque<ExtendedParticle>,
waker: Option<Waker>,
functions: Functions<F>,
/// Particle that's memoized on the actor creation.
Expand All @@ -58,8 +64,8 @@ pub struct Actor<RT, F> {
/// It's either `host_peer_id` or local worker peer id
current_peer_id: PeerId,
key_pair: KeyPair,
span: Span,
data_store: Arc<ParticleDataStore>,
deal_id: Option<String>,
}

impl<RT, F> Actor<RT, F>
Expand All @@ -72,9 +78,10 @@ where
functions: Functions<F>,
current_peer_id: PeerId,
key_pair: KeyPair,
span: Span,
data_store: Arc<ParticleDataStore>,
deal_id: Option<String>,
) -> Self {
let particle = particle;
Self {
deadline: Deadline::from(particle),
functions,
Expand All @@ -83,18 +90,13 @@ where
waker: None,
// Clone particle without data
particle: Particle {
id: particle.id.clone(),
init_peer_id: particle.init_peer_id,
timestamp: particle.timestamp,
ttl: particle.ttl,
script: particle.script.clone(),
signature: particle.signature.clone(),
data: vec![],
..particle.clone()
},
current_peer_id,
key_pair,
span,
data_store,
deal_id,
}
}

Expand All @@ -119,7 +121,8 @@ where
self.functions.set_function(function)
}

pub fn ingest(&mut self, particle: Particle) {
#[instrument(level = tracing::Level::INFO, skip_all)]
pub fn ingest(&mut self, particle: ExtendedParticle) {
self.mailbox.push_back(particle);
self.wake();
}
Expand All @@ -143,21 +146,34 @@ where

fn poll_avm_future(&mut self, cx: &mut Context<'_>) -> Option<Poll<AVMCallResult<RT>>> {
if let Some(Poll::Ready(res)) = self.future.as_mut().map(|f| f.poll_unpin(cx)) {
let (reusables, effects, stats) = res;
let _entered = self.span.enter();
let (reusables, effects, stats, parent_span) = res;
let span = tracing::info_span!(
parent: parent_span.as_ref(),
"Actor::poll_avm_future::future_ready",
particle_id= self.particle.id,
deal_id = self.deal_id
);
let _span_guard = span.enter();

self.future.take();

let waker = cx.waker().clone();
// Schedule execution of functions
self.functions
.execute(self.particle.id.clone(), effects.call_requests, waker);
self.functions.execute(
self.particle.id.clone(),
effects.call_requests,
waker,
parent_span.clone(),
);

let effects = RoutingEffects {
particle: Particle {
data: effects.new_data,
..self.particle.clone()
},
particle: ExtendedParticle::linked(
Particle {
data: effects.new_data,
..self.particle.clone()
},
parent_span,
),
next_peers: effects.next_peers,
};
return Some(Poll::Ready(FutResult {
Expand All @@ -184,30 +200,39 @@ where
}

// Gather CallResults
let (calls, stats) = self.functions.drain();
let (calls, stats, call_spans) = self.functions.drain();

// Take the next particle
let particle = self.mailbox.pop_front();
let ext_particle = self.mailbox.pop_front();

if particle.is_none() && calls.is_empty() {
if ext_particle.is_none() && calls.is_empty() {
debug_assert!(stats.is_empty(), "stats must be empty if calls are empty");
// Nothing to execute, return vm
return ActorPoll::Vm(vm_id, vm);
}

let particle = particle.unwrap_or_else(|| {
// If mailbox is empty, then take self.particle.
// Its data is empty, so `vm` will process `calls` on the old (saved on disk) data
self.particle.clone()
});
let particle = ext_particle
.as_ref()
.map(|p| p.particle.clone())
.unwrap_or_else(|| {
// If mailbox is empty, then take self.particle.
// Its data is empty, so `vm` will process `calls` on the old (saved on disk) data
self.particle.clone()
});

let waker = cx.waker().clone();
let data_store = self.data_store.clone();
let key_pair = self.key_pair.clone();
let peer_id = self.current_peer_id;

let (async_span, linking_span) =
self.create_spans(call_spans, ext_particle, particle.id.as_str());

self.future = Some(
async move {
let res = vm
.execute(data_store, (particle.clone(), calls), peer_id, key_pair)
.in_current_span()
.await;

waker.wake();
Expand All @@ -217,15 +242,36 @@ where
vm: res.runtime,
};

(reusables, res.effects, res.stats)
(reusables, res.effects, res.stats, linking_span)
}
.instrument(self.span.clone())
.instrument(async_span)
.boxed(),
);
self.wake();

ActorPoll::Executing(stats)
}

fn create_spans(
&self,
call_spans: Vec<Arc<Span>>,
ext_particle: Option<ExtendedParticle>,
particle_id: &str,
) -> (Span, Arc<Span>) {
let async_span = tracing::info_span!(
"Actor: async AVM process particle & call results",
particle_id = particle_id,
deal_id = self.deal_id
);
if let Some(ext_particle) = ext_particle.as_ref() {
async_span.follows_from(ext_particle.span.as_ref());
}
for span in call_spans {
async_span.follows_from(span.as_ref());
}
let linking_span = Arc::new(async_span.clone());
(async_span, linking_span)
}
fn wake(&self) {
if let Some(waker) = &self.waker {
waker.wake_by_ref();
Expand Down
12 changes: 8 additions & 4 deletions aquamarine/src/aquamarine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ use std::time::Duration;
use futures::StreamExt;
use tokio::sync::mpsc;
use tokio::task::JoinHandle;
use tracing::Instrument;
use tracing::{instrument, Instrument};

use fluence_libp2p::PeerId;
use health::HealthCheckRegistry;
use key_manager::KeyManager;
use particle_execution::{ParticleFunctionStatic, ServiceFunction};
use particle_protocol::Particle;
use particle_protocol::ExtendedParticle;
use peer_metrics::{ParticleExecutorMetrics, VmPoolMetrics};

use crate::aqua_runtime::AquaRuntime;
Expand Down Expand Up @@ -105,6 +105,8 @@ impl<RT: AquaRuntime, F: ParticleFunctionStatic> AquamarineBackend<RT, F> {
match self.inlet.poll_recv(cx) {
Poll::Ready(Some(Ingest { particle, function })) => {
wake = true;
let span = tracing::info_span!(parent: particle.span.as_ref(), "Aquamarine::poll::ingest");
let _guard = span.entered();
// set new particle to be executed
// every particle that comes from the connection pool first executed on the host peer id
self.plumber.ingest(particle, function, self.host_peer_id);
Expand Down Expand Up @@ -179,12 +181,13 @@ impl AquamarineApi {
}

/// Send particle to the interpreters pool
#[instrument(level = tracing::Level::INFO, skip_all)]
pub fn execute(
self,
particle: Particle,
particle: ExtendedParticle,
function: Option<ServiceFunction>,
) -> impl Future<Output = Result<(), AquamarineApiError>> {
let particle_id = particle.id.clone();
let particle_id = particle.particle.id.clone();
self.send_command(Ingest { particle, function }, Some(particle_id))
}

Expand Down Expand Up @@ -227,5 +230,6 @@ impl AquamarineApi {
AquamarineDied { particle_id }
})
}
.in_current_span()
}
}
7 changes: 4 additions & 3 deletions aquamarine/src/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,14 @@
* limitations under the License.
*/

use particle_execution::ServiceFunction;
use particle_protocol::Particle;
use std::collections::HashMap;

use particle_execution::ServiceFunction;
use particle_protocol::ExtendedParticle;

pub enum Command {
Ingest {
particle: Particle,
particle: ExtendedParticle,
function: Option<ServiceFunction>,
},
AddService {
Expand Down
4 changes: 4 additions & 0 deletions aquamarine/src/particle_data_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use avm_server::avm_runner::RawAVMOutcome;
use avm_server::{AnomalyData, CallResults, ParticleParameters};
use fluence_libp2p::PeerId;
use thiserror::Error;
use tracing::instrument;

use crate::DataStoreError::SerializeAnomaly;
use now_millis::now_ms;
Expand Down Expand Up @@ -82,6 +83,7 @@ impl ParticleDataStore {
Ok(())
}

#[instrument(level = tracing::Level::INFO, skip_all)]
pub async fn store_data(
&self,
data: &[u8],
Expand All @@ -97,6 +99,7 @@ impl ParticleDataStore {
Ok(())
}

#[instrument(level = tracing::Level::INFO)]
pub async fn read_data(&self, particle_id: &str, current_peer_id: &str) -> Result<Vec<u8>> {
let data_path = self.data_file(particle_id, current_peer_id);
let data = tokio::fs::read(&data_path).await.unwrap_or_default();
Expand Down Expand Up @@ -151,6 +154,7 @@ impl ParticleDataStore {
}

#[allow(clippy::too_many_arguments)]
#[instrument(level = tracing::Level::INFO, skip_all)]
pub async fn save_anomaly_data(
&self,
air_script: &str,
Expand Down
4 changes: 2 additions & 2 deletions aquamarine/src/particle_effects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/

use avm_server::CallRequests;
use particle_protocol::Particle;
use particle_protocol::ExtendedParticle;
use std::time::Duration;

use libp2p::PeerId;
Expand Down Expand Up @@ -65,6 +65,6 @@ impl InterpretationStats {
/// Instruct to send particle to either virtual or remote peers.
#[derive(Clone, Debug)]
pub struct RoutingEffects {
pub particle: Particle,
pub particle: ExtendedParticle,
pub next_peers: Vec<PeerId>,
}
Loading

0 comments on commit c800495

Please sign in to comment.