Skip to content
This repository has been archived by the owner on Aug 16, 2023. It is now read-only.

Improve janus related errors logging #224

Merged
merged 1 commit into from
Feb 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 16 additions & 4 deletions src/backend/janus/client.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::collections::HashMap;
use std::fmt;
use std::sync::Arc;
use std::thread;
use std::time::Duration as StdDuration;
Expand Down Expand Up @@ -27,6 +28,18 @@ struct RequestInfo {
payload: JsonValue,
}

impl fmt::Display for RequestInfo {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let method = self
.payload
.get("method")
.and_then(|m| m.as_str())
.unwrap_or("");

write!(f, "to = '{}', method = '{}'", self.to, method)
}
}

enum TransactionWatchdogMessage {
Halt,
Insert(String, RequestInfo),
Expand Down Expand Up @@ -67,12 +80,11 @@ impl Client {

state = state
.into_iter()
.filter(|(corr_data, info)| {
.filter(|(_corr_data, info)| {
if info.start_timestamp + info.timeout < Utc::now() {
let err =
anyhow!("Janus request timed out ({}): {:?}", corr_data, info);

let err = anyhow!("Janus request timed out; {}", info);
error!(crate::LOG, "{}", err);

if let Some(ref writer) = timeouts_writer {
writer.record_janus_timeout(info.to.clone());
}
Expand Down
18 changes: 13 additions & 5 deletions src/backend/janus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,8 @@ async fn handle_response_impl<C: Context>(
inresp
.plugin()
.data()
.ok_or_else(|| anyhow!("Missing 'data' in the response"))
.error(AppErrorKind::MessageParsingFailed)?
.get("status")
.ok_or_else(|| anyhow!("Missing 'status' in the response"))
.error(AppErrorKind::MessageParsingFailed)
Expand Down Expand Up @@ -260,6 +262,8 @@ async fn handle_response_impl<C: Context>(
inresp
.plugin()
.data()
.ok_or_else(|| anyhow!("Missing 'data' in the response"))
.error(AppErrorKind::MessageParsingFailed)?
.get("status")
.ok_or_else(|| anyhow!("Missing 'status' in the response"))
.error(AppErrorKind::MessageParsingFailed)
Expand Down Expand Up @@ -305,7 +309,11 @@ async fn handle_response_impl<C: Context>(
));

// TODO: improve error handling
let plugin_data = inresp.plugin().data();
let plugin_data = inresp
.plugin()
.data()
.ok_or_else(|| anyhow!("Missing 'data' in the response"))
.error(AppErrorKind::MessageParsingFailed)?;

plugin_data
.get("status")
Expand Down Expand Up @@ -453,15 +461,15 @@ async fn handle_response_impl<C: Context>(
}
IncomingResponse::Error(ErrorResponse::Session(ref inresp)) => {
let err = anyhow!(
"received an unexpected Error message (session): {:?}",
inresp
"received an unexpected Error message (session): {}",
inresp.error()
);
Err(err).error(AppErrorKind::MessageParsingFailed)
}
IncomingResponse::Error(ErrorResponse::Handle(ref inresp)) => {
let err = anyhow!(
"received an unexpected Error message (handle): {:?}",
inresp
"received an unexpected Error message (handle): {}",
inresp.error()
);
Err(err).error(AppErrorKind::MessageParsingFailed)
}
Expand Down
26 changes: 23 additions & 3 deletions src/backend/janus/responses.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::fmt;

use serde_derive::Deserialize;
use serde_json::Value as JsonValue;

Expand Down Expand Up @@ -26,18 +28,36 @@ pub(crate) struct HandleErrorResponse {
error: ErrorResponseData,
}

impl HandleErrorResponse {
pub(crate) fn error(&self) -> &ErrorResponseData {
&self.error
}
}

#[derive(Debug, Deserialize)]
pub(crate) struct SessionErrorResponse {
transaction: String,
error: ErrorResponseData,
}

impl SessionErrorResponse {
pub(crate) fn error(&self) -> &ErrorResponseData {
&self.error
}
}

#[derive(Debug, Deserialize)]
pub(crate) struct ErrorResponseData {
code: i32,
reason: String,
}

impl fmt::Display for ErrorResponseData {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{} ({})", self.reason, self.code)
}
}

// A request to a plugin handle was received.
#[derive(Debug, Deserialize)]
pub(crate) struct AckResponse {
Expand Down Expand Up @@ -77,13 +97,13 @@ impl EventResponse {

#[derive(Debug, Deserialize)]
pub(crate) struct EventResponsePluginData {
data: JsonValue,
data: Option<JsonValue>,
plugin: String,
}

impl EventResponsePluginData {
pub(crate) fn data(&self) -> &JsonValue {
&self.data
pub(crate) fn data(&self) -> Option<&JsonValue> {
self.data.as_ref()
}
}

Expand Down