Skip to content

Commit

Permalink
Extra logs (#253)
Browse files Browse the repository at this point in the history
* Added X-Request-ID to some error responses + const cleanups

* New log entries, improved existing logs, loglevel is now configurable
  • Loading branch information
bmuddha committed Mar 23, 2022
1 parent 8488b5a commit 41f10b0
Show file tree
Hide file tree
Showing 11 changed files with 266 additions and 111 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "cache-rpc"
version = "0.2.14"
version = "0.2.15"
authors = ["Alexander Polakov <a.polakov@iconic.vc>"]
edition = "2018"
license = "MIT"
Expand Down
6 changes: 6 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,9 @@ pub mod types;

const DEFAULT_GAI_QUEUE_SIZE: usize = 2 << 19;
const DEFAULT_GPA_QUEUE_SIZE: usize = 2 << 18;
const DEFAULT_GAI_TIMEOUT: u64 = 30;
const DEFAULT_GPA_TIMEOUT: u64 = 60;
const DEFAULT_GAI_BACKOFF: u64 = 30;
const DEFAULT_GPA_BACKOFF: u64 = 60;
const PASSTHROUGH_BACKOFF: u64 = 30;
pub const CACHER_LOG_LEVEL: &str = "CACHER_LOG_LEVEL";
21 changes: 19 additions & 2 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use actix_web::http::header;
use cache_rpc::control::{handle_command, run_control_interface, ControlState, RpcConfigSender};
use cache_rpc::CACHER_LOG_LEVEL;
use either::Either;
use std::cell::RefCell;
use std::path::PathBuf;
Expand Down Expand Up @@ -50,26 +51,42 @@ async fn main() -> Result<()> {
};
let (writer, _guard) = tracing_appender::non_blocking(writer);

let level = std::env::var(CACHER_LOG_LEVEL)
.ok()
.map(|level| {
use tracing::Level;
match level.to_lowercase().as_str() {
"debug" => Level::DEBUG,
"warn" => Level::WARN,
"error" => Level::ERROR,
_ => Level::INFO,
}
})
.unwrap_or(tracing::Level::INFO);
match options.log_format {
cli::LogFormat::Json => {
let subscriber = fmt::Subscriber::builder()
.with_thread_names(true)
.with_writer(writer)
.with_max_level(level)
.with_timer(fmt::time::ChronoLocal::rfc3339())
.json()
.finish();
tracing::subscriber::set_global_default(subscriber).unwrap();
}
cli::LogFormat::Plain => {
let subscriber = fmt::Subscriber::builder().with_writer(writer).finish();
let subscriber = fmt::Subscriber::builder()
.with_max_level(level)
.with_writer(writer)
.finish();
tracing::subscriber::set_global_default(subscriber).unwrap();
}
};

let span = tracing::span!(tracing::Level::INFO, "global", version = %metrics::version());
let _enter = span.enter();

info!(?options, "configuration options");
info!(?options, "started cache-rpc with configuration");

run(options).await?;
Ok(())
Expand Down
36 changes: 26 additions & 10 deletions src/pubsub/worker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,15 +187,18 @@ impl AccountUpdateManager {
.send(awc::ws::Message::Ping(b"hello?".as_ref().into()))
.is_err()
{
warn!(actor_id = actor.actor_id, "failed to send ping");
warn!(
actor = actor.actor_id,
"failed to send websocket healthcheck ping, will terminate actor"
);
ctx.stop();
}

let elapsed = actor.last_received_at.elapsed();
if elapsed > WEBSOCKET_PING_TIMEOUT {
warn!(
actor_id = actor.actor_id,
"no messages received in {:?}, assume connection lost ({:?})",
actor=actor.actor_id,
"no messages received in {:?}, assume connection lost ({:?}), terminating actor",
elapsed,
actor.last_received_at
);
Expand Down Expand Up @@ -226,7 +229,7 @@ impl AccountUpdateManager {
.set(actor.inflight.len() as i64);
if dead_requests > DEAD_REQUEST_LIMIT {
warn!(
message = "too many dead requests, disconnecting",
message = "too many dead requests, disconnecting from websocket server",
count = dead_requests
);
ctx.stop();
Expand All @@ -248,7 +251,10 @@ impl AccountUpdateManager {
.map(|diff| diff > self.config.slot_distance as u64)
.unwrap_or(false);
if behind {
error!("websocket slot behind rpc, stopping");
error!(
actor=%self.actor_id,
"websocket slot behind rpc, terminating websocket connection"
);
ctx.stop()
}
}
Expand Down Expand Up @@ -334,11 +340,14 @@ impl AccountUpdateManager {
.write(actixws::Message::Ping(b"check connection".as_ref().into()))
.is_err()
{
error!(actor_id, "failed to send check msg");
error!(
actor = actor_id,
"failed to send healthcheck message to websocket server, terminating actor"
);
ctx.stop();
return;
};
info!(actor_id, message = "websocket ping sent");
debug!(actor = actor_id, "websocket ping sent");

ctx.add_stream(stream);

Expand All @@ -351,15 +360,21 @@ impl AccountUpdateManager {
}
}

info!(actor_id, message = "websocket stream added");
info!(
actor = actor_id,
"websocket data stream has been added to actor"
);
metrics()
.websocket_connected
.with_label_values(&[&actor.actor_name])
.set(1);
actor.last_received_at = Instant::now();
});
ctx.wait(fut);
info!(self.actor_id, message = "connection future complete");
debug!(
actor = self.actor_id,
message = "websocket connection future complete"
);
self.update_status();
}

Expand Down Expand Up @@ -520,7 +535,7 @@ impl AccountUpdateManager {
ctx.cancel_future(handle);
}
None => {
debug!(key = %sub.key(), "filter added");
debug!(pubkey=%sub.key(), "new filter added for program");
metrics()
.filters
.with_label_values(&[&self.actor_name])
Expand Down Expand Up @@ -687,6 +702,7 @@ impl Handler<ForceReconnect> for AccountUpdateManager {

fn handle(&mut self, _: ForceReconnect, ctx: &mut Self::Context) -> Self::Result {
// will restart actor, and reestablish connection along with resetting cache
tracing::warn!(actor=%self.actor_id, "forcing websocket reconnection on actor");
ctx.stop();
}
}
Expand Down

0 comments on commit 41f10b0

Please sign in to comment.