Skip to content

Commit

Permalink
feat(logging): add structured fields in logging + new log format (#1590)
Browse files Browse the repository at this point in the history
* Improve logging

* Review fixes

* Update particle-node/src/connectivity.rs

Co-authored-by: folex <0xdxdy@gmail.com>

* Update particle-node/src/connectivity.rs

Co-authored-by: folex <0xdxdy@gmail.com>

* Update particle-node/src/connectivity.rs

Co-authored-by: folex <0xdxdy@gmail.com>

* Apply suggestions from code review

---------

Co-authored-by: folex <0xdxdy@gmail.com>
  • Loading branch information
gurinderu and folex committed Apr 27, 2023
1 parent 430df1e commit 0c06f1b
Show file tree
Hide file tree
Showing 30 changed files with 210 additions and 98 deletions.
6 changes: 5 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 21 additions & 7 deletions aquamarine/src/aqua_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ impl AquaRuntime for AVM<DataStoreError> {
match parse_outcome(outcome) {
Ok((data, peers, calls)) if !peers.is_empty() || !calls.is_empty() => {
#[rustfmt::skip]
log::debug!("Particle {} executed: {} call requests, {} next peers", p.id, calls.len(), peers.len());
tracing::debug!(particle_id = p.id, "Particle executed: {} call requests, {} next peers", calls.len(), peers.len());

ParticleEffects {
next_peers: peers,
Expand All @@ -122,22 +122,36 @@ impl AquaRuntime for AVM<DataStoreError> {
}
}
Ok((data, ..)) => {
log::warn!(
"Executed particle {}, next_peer_pks is empty, no call requests. Nothing to do.",
p.id
tracing::warn!(
particle_id = p.id,
"Executed particle, next_peer_pks is empty, no call requests. Nothing to do.",
);
if log::max_level() >= LevelFilter::Debug {
let data = String::from_utf8_lossy(data.as_slice());
log::debug!("particle {} next_peer_pks = [], data: {}", p.id, data);
tracing::debug!(
particle_id = p.id,
"particle next_peer_pks = [], data: {}",
data
);
}
ParticleEffects::empty(Particle { data, ..p })
}
Err(ExecutionError::AquamarineError(err)) => {
log::warn!("Error executing particle {:#?}: {}", p, err);
tracing::warn!(
particle_id = p.id,
"Error executing particle {:#?}: {}",
p,
err
);
ParticleEffects::empty(p)
}
Err(err @ ExecutionError::InvalidResultField { .. }) => {
log::warn!("Error parsing outcome for particle {:#?}: {}", p, err);
tracing::warn!(
particle_id = p.id,
"Error parsing outcome for particle {:#?}: {}",
p,
err
);
ParticleEffects::empty(p)
}
}
Expand Down
4 changes: 2 additions & 2 deletions aquamarine/src/particle_data_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ impl DataStore for ParticleDataStore {
}

fn store_data(&mut self, data: &[u8], particle_id: &str, current_peer_id: &str) -> Result<()> {
log::debug!(target: "particle_reap", "Storing data for particle {}", particle_id);
tracing::debug!(target: "particle_reap", particle_id = particle_id, "Storing data for particle");
let data_path = self.data_file(particle_id, current_peer_id);
std::fs::write(&data_path, data).map_err(|err| StoreData(err, data_path))?;

Expand All @@ -97,7 +97,7 @@ impl DataStore for ParticleDataStore {
}

fn cleanup_data(&mut self, particle_id: &str, current_peer_id: &str) -> Result<()> {
log::debug!(target: "particle_reap", "Cleaning up particle data for particle {}", particle_id);
tracing::debug!(target: "particle_reap", particle_id = particle_id, "Cleaning up particle data for particle");
remove_file(&self.data_file(particle_id, current_peer_id)).map_err(CleanupData)?;
self.vault.cleanup(particle_id)?;

Expand Down
13 changes: 8 additions & 5 deletions aquamarine/src/particle_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ impl<RT: AquaRuntime> ParticleExecutor for RT {
let cloned_particle = particle.clone();
let task = tokio::task::Builder::new().name(&format!("Particle {}", particle.id)).spawn_blocking(move || {
let now = Instant::now();
log::info!("Executing particle {}", particle.id);
tracing::info!(particle_id = particle.id, "Executing particle");

let particle_params = ParticleParameters {
current_peer_id: Cow::Owned(current_peer_id.to_string()),
Expand All @@ -71,10 +71,10 @@ impl<RT: AquaRuntime> ParticleExecutor for RT {
let stats = InterpretationStats { interpretation_time, new_data_len, success: result.is_ok() };

if let Err(err) = &result {
log::warn!("Error executing particle {:#?}: {}", particle, err)
tracing::warn!(particle_id = particle.id, "Error executing particle {:#?}: {}", particle, err)
} else {
let len = new_data_len.map(|l| l as i32).unwrap_or(-1);
log::trace!(target: "execution", "Particle {} interpreted in {} [{} bytes => {} bytes]", particle.id, pretty(interpretation_time), particle.data.len(), len);
tracing::trace!(target: "execution", particle_id = particle.id, "Particle interpreted in {} [{} bytes => {} bytes]", pretty(interpretation_time), particle.data.len(), len);
}
let effects = Self::into_effects(result, particle);

Expand All @@ -93,9 +93,12 @@ impl<RT: AquaRuntime> ParticleExecutor for RT {
Ok(res) => res,
Err(err) => {
if err.is_cancelled() {
log::warn!("Particle task {} was cancelled", cloned_particle.id);
tracing::warn!(
particle_id = cloned_particle.id,
"Particle task was cancelled"
);
} else {
log::error!("Particle task {} panic", cloned_particle.id);
tracing::error!(particle_id = cloned_particle.id, "Particle task panic");
}
let stats = InterpretationStats {
interpretation_time: Duration::ZERO,
Expand Down
6 changes: 3 additions & 3 deletions aquamarine/src/particle_functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,11 +198,11 @@ impl<F: ParticleFunctionStatic> Functions<F> {
};

if let Err(err) = &result {
log::warn!(
"Failed host call {} ({}) [{}]: {}",
tracing::warn!(
particle_id = particle_id,
"Failed host call {} ({}): {}",
log_args,
pretty(elapsed),
particle_id,
err
)
} else {
Expand Down
13 changes: 7 additions & 6 deletions aquamarine/src/plumber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ impl<RT: AquaRuntime, F: ParticleFunctionStatic> Plumber<RT, F> {

let deadline = Deadline::from(&particle);
if deadline.is_expired(now_ms()) {
log::info!("Particle {} is expired, ignoring", particle.id);
tracing::info!(particle_id = particle.id, "Particle is expired, ignoring");
self.events
.push_back(Err(AquamarineApiError::ParticleExpired {
particle_id: particle.id,
Expand Down Expand Up @@ -196,16 +196,17 @@ impl<RT: AquaRuntime, F: ParticleFunctionStatic> Plumber<RT, F> {
if !actor.is_expired(now) || actor.is_executing() {
return true; // keep actor
}
log::debug!(
tracing::debug!(
target: "particle_reap",
"Reaping particle's actor particle_id={particle_id}, peer_id={peer_id})"
particle_id = particle_id,
"Reaping particle's actor peer_id={peer_id})"
);
// cleanup files and dirs after particle processing (vault & prev_data)
// TODO: do not pass vm https://github.com/fluencelabs/fluence/issues/1216
if let Err(err) = actor.cleanup(particle_id, &peer_id.to_string(), &mut vm) {
log::warn!(
"Error cleaning up after particle {}: {:?}",
particle_id,
tracing::warn!(
particle_id = particle_id,
"Error cleaning up after particle {:?}",
err
)
}
Expand Down
1 change: 1 addition & 0 deletions connection-pool/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ libp2p = { workspace = true }

futures = { workspace = true }
log = { workspace = true }
tracing = { workspace = true }
serde = { workspace = true }
tokio = { workspace = true }
tokio-stream = { workspace = true }
Expand Down
18 changes: 11 additions & 7 deletions connection-pool/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,17 +214,17 @@ impl ConnectionPoolBehaviour {
outlet.send(SendStatus::Ok).ok();
self.wake();
} else if self.contacts.contains_key(&to.peer_id) {
log::debug!(target: "network", "{}: Sending particle {} to {}", self.peer_id, particle.id, to.peer_id);
tracing::debug!(target: "network",particle_id = particle.id , "{}: Sending particle to {}", self.peer_id, to.peer_id);
// Send particle to remote peer
self.push_event(ToSwarm::NotifyHandler {
peer_id: to.peer_id,
handler: NotifyHandler::Any,
event: HandlerMessage::OutParticle(particle, CompletionChannel::Oneshot(outlet)),
});
} else {
log::warn!(
"Won't send particle {} to contact {}: not connected",
particle.id,
tracing::warn!(
particle_id = particle.id,
"Won't send particle to contact {}: not connected",
to.peer_id
);
outlet.send(SendStatus::NotConnected).ok();
Expand Down Expand Up @@ -562,7 +562,7 @@ impl NetworkBehaviour for ConnectionPoolBehaviour {
) {
match event {
HandlerMessage::InParticle(particle) => {
log::trace!(target: "network", "{}: received particle {} from {}; queue {}", self.peer_id, particle.id, from, self.queue.len());
tracing::trace!(target: "network", particle_id = particle.id,"{}: received particle from {}; queue {}", self.peer_id, from, self.queue.len());
self.meter(|m| {
m.incoming_particle(
&particle.id,
Expand Down Expand Up @@ -595,9 +595,13 @@ impl NetworkBehaviour for ConnectionPoolBehaviour {
if let Some(particle) = self.queue.pop_front() {
let particle_id = particle.id.clone();
if let Err(err) = outlet.start_send(particle) {
log::error!("Failed to send particle to outlet: {}", err)
tracing::error!(
particle_id = particle_id,
"Failed to send particle to outlet: {}",
err
)
} else {
log::trace!(target: "execution", "Sent particle {} to execution", particle_id);
tracing::trace!(target: "execution",particle_id = particle_id, "Sent particle to execution");
}
} else {
break;
Expand Down
2 changes: 1 addition & 1 deletion crates/builtins-deployer/src/builtins_deployer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ impl BuiltinsDeployer {
};

let finished = sent.elapsed();
log::debug!(target: "execution", "sending particle took {}", pretty(finished));
tracing::debug!(target: "execution", particle_id = particle.id, "sending particle took {}", pretty(finished));
result
}

Expand Down
1 change: 1 addition & 0 deletions crates/local-vm/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,4 @@ serde_json = { workspace = true }
parking_lot = { workspace = true }
maplit = { workspace = true }
log = { workspace = true }
tracing = { workspace = true }
2 changes: 1 addition & 1 deletion crates/local-vm/src/local_vm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ pub fn make_particle(
}
}

log::info!("Made a particle {}", id);
tracing::info!(particle_id = id, "Made a particle");

Particle {
id,
Expand Down
1 change: 1 addition & 0 deletions crates/log-utils/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ log = { workspace = true }
tracing = { workspace = true, features = ["async-await", "log"] }
tracing-subscriber.workspace = true
console-subscriber = { version = "0.1.8", features = ["parking_lot"] }
tracing-logfmt = "0.3.1"
4 changes: 2 additions & 2 deletions crates/log-utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,11 @@ pub fn enable_logs() {
.add_directive("libp2p_kad::iterlog=info".parse().unwrap())
.add_directive("libp2p_plaintext=info".parse().unwrap())
.add_directive("libp2p_identify::protocol=info".parse().unwrap())
.add_directive("cranelift_codegen=info".parse().unwrap())
.add_directive("cranelift_codegen=off".parse().unwrap())
.add_directive("cranelift_codegen::context=off".parse().unwrap())
.add_directive("wasmer_wasi=info".parse().unwrap())
.add_directive("wasmer_interface_types_fl=info".parse().unwrap())
.add_directive("polling=info".parse().unwrap())
.add_directive("cranelift_codegen=info".parse().unwrap())
.add_directive("walrus=info".parse().unwrap()),
)
.try_init()
Expand Down
1 change: 0 additions & 1 deletion crates/server-config/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ libp2p-metrics = { workspace = true }
serde = { version = "1.0.160", features = ["derive"] }
humantime-serde = { workspace = true }
serde_json = "1.0.96"
log = { workspace = true }
rand = "0.8.5"
clap = { version = "4.2.4", features = ["derive", "string"] }
bs58 = { workspace = true }
Expand Down
25 changes: 25 additions & 0 deletions crates/server-config/src/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
* limitations under the License.
*/

use crate::LogFormat;
use clap::{Args, Parser};
use figment::error::Kind::InvalidType;
use figment::value::{Dict, Map, Value};
Expand Down Expand Up @@ -107,6 +108,17 @@ impl Serialize for RootKeyPairArgs {
}
}

#[derive(Args, Debug, Clone, Serialize)]
pub struct LogArgs {
#[arg(
long("log-format"),
id = "LOG_FORMAT",
help_heading = "Node configuration",
display_order = 24
)]
pub(crate) format: Option<LogFormat>,
}

#[derive(Parser, Debug, Serialize, Clone)]
pub(crate) struct DerivedArgs {
#[arg(
Expand Down Expand Up @@ -276,6 +288,19 @@ pub(crate) struct DerivedArgs {
action = clap::ArgAction::SetTrue
)]
pub(crate) print_config: Option<bool>,
#[arg(
long,
value_parser = clap::value_parser!(bool),
id = "NO_BANNER",
help = "Disable banner",
help_heading = "Node configuration",
display_order = 23,
action = clap::ArgAction::SetTrue
)]
pub(crate) no_banner: Option<bool>,

#[command(flatten)]
log: Option<LogArgs>,
}

impl figment::Provider for DerivedArgs {
Expand Down
2 changes: 1 addition & 1 deletion crates/server-config/src/defaults.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ pub fn default_management_peer_id() -> PeerId {
let public_key = PublicKey::Ed25519(kp.public());
let peer_id = PeerId::from(public_key);

log::warn!(
println!(
"New management key generated. ed25519 private key in base64 = {}",
base64.encode(kp.secret()),
);
Expand Down
2 changes: 1 addition & 1 deletion crates/server-config/src/kademlia_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ impl KademliaConfig {
if let Some(replication_factor) = std::num::NonZeroUsize::new(replication_factor) {
cfg.set_replication_factor(replication_factor);
} else {
log::warn!(
println!(
"Invalid config value: replication_factor must be > 0, was {:?}",
self.replication_factor
)
Expand Down
3 changes: 1 addition & 2 deletions crates/server-config/src/keys.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ use std::{
use base64::{engine::general_purpose::STANDARD as base64, Engine};
use eyre::eyre;
use fluence_keypair::{key_pair::KeyFormat, KeyPair};
use log::info;

use fs_utils::create_dirs;

Expand Down Expand Up @@ -145,7 +144,7 @@ pub fn load_key(
) -> eyre::Result<KeyPair> {
if !key_path.exists() {
return if generate_on_absence {
info!("Generating a new key to {:?}", key_path);
println!("Generating a new key to {:?}", key_path);
Ok(create_new_key_pair(
&key_path,
KeyFormat::from_str(&key_format)?,
Expand Down
2 changes: 2 additions & 0 deletions crates/server-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,5 +48,7 @@ pub use bootstrap_config::BootstrapConfig;
pub use kademlia_config::KademliaConfig;
pub use network_config::NetworkConfig;
pub use node_config::{NodeConfig, TransportConfig};
pub use resolved_config::LogConfig;
pub use resolved_config::LogFormat;
pub use resolved_config::{ResolvedConfig, UnresolvedConfig};
pub use services_config::ServicesConfig;
Loading

0 comments on commit 0c06f1b

Please sign in to comment.