From 2cdfd13affbe1c011850585084dbe0c474b2ab63 Mon Sep 17 00:00:00 2001 From: Park Juhyung Date: Wed, 29 May 2019 14:32:29 +0900 Subject: [PATCH 1/5] Add `Noti::info` method --- server/src/noti/mod.rs | 42 +++++++++++++++++++++++++++++++++++------- 1 file changed, 35 insertions(+), 7 deletions(-) diff --git a/server/src/noti/mod.rs b/server/src/noti/mod.rs index c540f56..204625c 100644 --- a/server/src/noti/mod.rs +++ b/server/src/noti/mod.rs @@ -41,13 +41,7 @@ pub struct Noti { impl Noti { pub fn warn(&self, network_id: &str, message: &str) { - let mut targets = Vec::with_capacity(2); - if self.slack.is_some() { - targets.push("slack"); - } - if self.sendgrid.is_some() { - targets.push("sendgrid"); - } + let targets = self.targets(); if targets.is_empty() { cinfo!("No targets to send warning: {}", message); return @@ -68,4 +62,38 @@ impl Noti { } } } + + pub fn info(&self, network_id: &str, title: &str, message: &str) { + let targets = self.targets(); + if targets.is_empty() { + cinfo!("No targets to send info: {}", message); + return + } + cinfo!("Send a info to {}: {}", targets.join(", "), message); + + if let Some(slack) = self.slack.as_ref() { + if let Err(err) = slack.send(format!("{}: {}", network_id, message)) { + cwarn!("Cannot send a slack message({}): {}", message, err); + } + } + if let Some(sendgrid) = self.sendgrid.as_ref() { + if let Err(err) = sendgrid.send( + format!("[info][{}][dashboard-server] {} at {}", network_id, title, Utc::now().to_rfc3339()), + message, + ) { + cwarn!("Cannot send an email({}): {}", message, err); + } + } + } + + fn targets(&self) -> Vec<&str> { + let mut targets = Vec::with_capacity(2); + if self.slack.is_some() { + targets.push("slack"); + } + if self.sendgrid.is_some() { + targets.push("sendgrid"); + } + targets + } } From 4f6c2961bef5821e47a544b083ab8caf8defc675 Mon Sep 17 00:00:00 2001 From: Park Juhyung Date: Wed, 29 May 2019 16:26:58 +0900 Subject: [PATCH 2/5] Send daily report in the Agent Server --- server/src/daily_reporter.rs | 30 ++++++++++++++++++++++++++++++ server/src/main.rs | 6 +++++- 2 files changed, 35 insertions(+), 1 deletion(-) create mode 100644 server/src/daily_reporter.rs diff --git a/server/src/daily_reporter.rs b/server/src/daily_reporter.rs new file mode 100644 index 0000000..125d487 --- /dev/null +++ b/server/src/daily_reporter.rs @@ -0,0 +1,30 @@ +use std::sync::Arc; +use std::thread; + +use chrono; + +use super::noti::Noti; + +pub fn start(noti: Arc) -> thread::JoinHandle<()> { + let network_id = std::env::var("NETWORK_ID").unwrap(); + + thread::Builder::new() + .name("daily reporter".to_string()) + .spawn(move || { + let mut current_date = chrono::Utc::now().date(); + + loop { + let new_date = chrono::Utc::now().date(); + if new_date != current_date { + send_daily_report(&network_id, Arc::clone(¬i)); + } + current_date = new_date; + thread::sleep(std::time::Duration::from_secs(1000)); + } + }) + .unwrap() +} + +pub fn send_daily_report(network_id: &str, noti: Arc) { + noti.info(network_id, "Daily report", "CodeChain Server is running") +} diff --git a/server/src/main.rs b/server/src/main.rs index 61aec42..ba0936c 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -23,6 +23,7 @@ extern crate ws; mod logger; mod agent; mod common_rpc_types; +mod daily_reporter; mod db; mod event_propagator; mod frontend; @@ -76,7 +77,7 @@ fn main() { db_user: db_user.to_string(), db_password: db_password.to_string(), }); - let agent_service_sender = agent::Service::run_thread(db_service_sender.clone(), noti); + let agent_service_sender = agent::Service::run_thread(db_service_sender.clone(), Arc::clone(¬i)); let agent_service_for_frontend = agent_service_sender.clone(); let frontend_join = thread::Builder::new() @@ -106,6 +107,9 @@ fn main() { }) .expect("Should success listening agent"); + let daily_reporter_join = daily_reporter::start(noti); + frontend_join.join().expect("Join frontend listener"); agent_join.join().expect("Join agent listener"); + daily_reporter_join.join().expect("Join daily reporter"); } From 83ba6aa1a1325e775004ad243f63f785b0e03a29 Mon Sep 17 00:00:00 2001 From: Park Juhyung Date: Thu, 30 May 2019 16:17:39 +0900 Subject: [PATCH 3/5] Show database connection status in the daily report --- server/src/daily_reporter.rs | 15 +++++++++++---- server/src/db/service.rs | 17 +++++++++++++++++ server/src/main.rs | 5 +++-- 3 files changed, 31 insertions(+), 6 deletions(-) diff --git a/server/src/daily_reporter.rs b/server/src/daily_reporter.rs index 125d487..88bebd9 100644 --- a/server/src/daily_reporter.rs +++ b/server/src/daily_reporter.rs @@ -3,9 +3,10 @@ use std::thread; use chrono; +use super::db::ServiceSender as DBServiceSender; use super::noti::Noti; -pub fn start(noti: Arc) -> thread::JoinHandle<()> { +pub fn start(noti: Arc, db_service: DBServiceSender) -> thread::JoinHandle<()> { let network_id = std::env::var("NETWORK_ID").unwrap(); thread::Builder::new() @@ -16,7 +17,7 @@ pub fn start(noti: Arc) -> thread::JoinHandle<()> { loop { let new_date = chrono::Utc::now().date(); if new_date != current_date { - send_daily_report(&network_id, Arc::clone(¬i)); + send_daily_report(&network_id, Arc::clone(¬i), db_service.clone()); } current_date = new_date; thread::sleep(std::time::Duration::from_secs(1000)); @@ -25,6 +26,12 @@ pub fn start(noti: Arc) -> thread::JoinHandle<()> { .unwrap() } -pub fn send_daily_report(network_id: &str, noti: Arc) { - noti.info(network_id, "Daily report", "CodeChain Server is running") +pub fn send_daily_report(network_id: &str, noti: Arc, db_service: DBServiceSender) { + let result = db_service.check_connection(); + let db_status = match result { + Ok(_) => "DB is connected".to_string(), + Err(err) => format!("DB connection has an error : {:?}", err), + }; + let messages = ["CodeChain Server is running".to_string(), db_status]; + noti.info(network_id, "Daily report", &messages.join("\n")) } diff --git a/server/src/db/service.rs b/server/src/db/service.rs index f5a8cc9..ffd1377 100644 --- a/server/src/db/service.rs +++ b/server/src/db/service.rs @@ -21,6 +21,7 @@ use util; #[derive(Debug, Clone)] pub enum Message { + CheckConnection(Sender>), InitializeAgent(Box, Sender), UpdateAgent(Box), GetAgent(NodeName, Sender>), @@ -98,6 +99,9 @@ impl Service { .spawn(move || { for message in rx { match message { + Message::CheckConnection(callback) => { + service.check_connection(callback); + } Message::InitializeAgent(agent_query_result, callback) => { service.initialize_agent(&agent_query_result, callback); } @@ -183,6 +187,13 @@ impl Service { service_sender } + fn check_connection(&self, callback: Sender>) { + let result = self.db_conn.execute(&"SELECT 1", &[]).map_err(|err| DBError::Internal(err.to_string())); + if let Err(err) = callback.send(result.map(|_| ())) { + cerror!("Cannot send callback : {}", err); + } + } + fn initialize_agent(&mut self, state: &AgentQueryResult, callback: Sender) { let name = state.name.clone(); let before = match self.state.agent_query_result.entry(name) { @@ -392,6 +403,12 @@ impl ServiceSender { } } + pub fn check_connection(&self) -> Result<(), DBError> { + let (tx, rx) = channel(); + self.sender.send(Message::CheckConnection(tx)).expect("Should success check connection"); + rx.recv()? + } + pub fn initialize_agent_query_result(&self, agent_query_result: AgentQueryResult) -> Result { let (tx, rx) = channel(); self.sender.send(Message::InitializeAgent(agent_query_result.into(), tx)).expect("Should success update agent"); diff --git a/server/src/main.rs b/server/src/main.rs index ba0936c..9ab804a 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -80,6 +80,7 @@ fn main() { let agent_service_sender = agent::Service::run_thread(db_service_sender.clone(), Arc::clone(¬i)); let agent_service_for_frontend = agent_service_sender.clone(); + let db_service_sender_for_frontend = db_service_sender.clone(); let frontend_join = thread::Builder::new() .name("frontend listen".to_string()) .spawn(move || { @@ -87,7 +88,7 @@ fn main() { frontend::add_routing(Arc::get_mut(&mut frontend_router).unwrap()); let frontend_context = frontend::Context { agent_service: agent_service_for_frontend, - db_service: db_service_sender.clone(), + db_service: db_service_sender_for_frontend.clone(), passphrase: std::env::var("PASSPHRASE").unwrap_or_else(|_| "passphrase".to_string()), }; listen("0.0.0.0:3012", move |out| frontend::WebSocketHandler { @@ -107,7 +108,7 @@ fn main() { }) .expect("Should success listening agent"); - let daily_reporter_join = daily_reporter::start(noti); + let daily_reporter_join = daily_reporter::start(noti, db_service_sender.clone()); frontend_join.join().expect("Join frontend listener"); agent_join.join().expect("Join agent listener"); From d8c706f5152462d7d2780157d58caf16e0c43a18 Mon Sep 17 00:00:00 2001 From: Park Juhyung Date: Thu, 30 May 2019 19:44:45 +0900 Subject: [PATCH 4/5] Show agent's information in the daily report --- server/src/agent/agent.rs | 81 ++++++++++++++++++++++++++++++++---- server/src/agent/service.rs | 13 +++++- server/src/daily_reporter.rs | 52 +++++++++++++++++++++-- server/src/main.rs | 5 ++- 4 files changed, 136 insertions(+), 15 deletions(-) diff --git a/server/src/agent/agent.rs b/server/src/agent/agent.rs index ef27961..95591ff 100644 --- a/server/src/agent/agent.rs +++ b/server/src/agent/agent.rs @@ -1,3 +1,4 @@ +use std::cmp::PartialEq; use std::convert::TryFrom; use std::net::SocketAddr; use std::ops::Drop; @@ -24,13 +25,14 @@ use super::types::{AgentGetInfoResponse, CodeChainCallRPCResponse}; use crate::common_rpc_types::HardwareUsage; use crate::noti::Noti; -#[derive(Clone, PartialEq, Debug)] +#[derive(Clone, Debug)] pub enum State { Initializing, Normal { name: NodeName, address: Option, status: NodeStatus, + recent_update_result: Option, }, Stop { name: NodeName, @@ -40,6 +42,48 @@ pub enum State { }, } +impl PartialEq for State { + fn eq(&self, other: &State) -> bool { + match (self, other) { + (State::Initializing, State::Initializing) => true, + ( + State::Normal { + name: self_name, + address: self_address, + status: self_status, + recent_update_result: _self_recent_update_result, + }, + State::Normal { + name: other_name, + address: other_address, + status: other_status, + recent_update_result: _other_recent_update_result, + }, + ) => self_name == other_name && self_address == other_address && self_status == other_status, + ( + State::Stop { + name: self_name, + address: self_address, + status: self_status, + cause: self_cause, + }, + State::Stop { + name: other_name, + address: other_address, + status: other_status, + cause: other_cause, + }, + ) => { + self_name == other_name + && self_address == other_address + && self_status == other_status + && self_cause == other_cause + } + _ => false, + } + } +} + #[derive(Copy, Clone, PartialEq, Debug)] pub enum StopCause { AlreadyConnected, @@ -62,6 +106,21 @@ impl State { } => Some(name), } } + + pub fn update_recent_update_result(&mut self, update_result: UpdateResult) { + match self { + State::Normal { + recent_update_result, + .. + } => { + *recent_update_result = Some(update_result); + } + State::Initializing => {} + State::Stop { + .. + } => {} + } + } } #[derive(Clone)] @@ -262,6 +321,7 @@ impl Agent { name: info.name.clone(), address: info.address, status: info.status, + recent_update_result: None, }; if let State::Initializing = *state { @@ -350,12 +410,16 @@ impl Agent { let logs = self.codechain_rpc.get_logs(info.status)?; self.db_service.write_logs(info.name, logs); - Ok(Some(UpdateResult { + let update_result = UpdateResult { network_id: network_id.unwrap_or_default(), number_of_peers, best_block_number: best_block_id.map(|id| id.block_number as u64), disk_usage, - })) + }; + + state.update_recent_update_result(update_result.clone()); + + Ok(Some(update_result)) } fn clean_up(&mut self, reason: AgentCleanupReason) { @@ -415,11 +479,12 @@ impl Agent { } } -struct UpdateResult { - network_id: String, - number_of_peers: usize, - best_block_number: Option, - disk_usage: HardwareUsage, +#[derive(Clone, Debug)] +pub struct UpdateResult { + pub network_id: String, + pub number_of_peers: usize, + pub best_block_number: Option, + pub disk_usage: HardwareUsage, } impl Drop for Agent { diff --git a/server/src/agent/service.rs b/server/src/agent/service.rs index a4365c1..c0d0738 100644 --- a/server/src/agent/service.rs +++ b/server/src/agent/service.rs @@ -7,7 +7,7 @@ use parking_lot::RwLock; use super::super::db; use super::super::jsonrpc; -use super::agent::{Agent, AgentSender}; +use super::agent::{Agent, AgentSender, State as AgentState}; use crate::noti::Noti; #[derive(Default)] @@ -38,6 +38,17 @@ impl ServiceSender { find_result.map(|(_, agent)| agent.clone()) } + + pub fn get_agents_states(&self) -> Vec { + let state = self.state.read(); + let mut result = Vec::new(); + for (_, agent) in state.agents.iter() { + let state = agent.read_state().clone(); + result.push(state); + } + + result + } } pub struct Service { diff --git a/server/src/daily_reporter.rs b/server/src/daily_reporter.rs index 88bebd9..b19550f 100644 --- a/server/src/daily_reporter.rs +++ b/server/src/daily_reporter.rs @@ -3,10 +3,16 @@ use std::thread; use chrono; +use super::agent::ServiceSender as AgentServiceSender; +use super::agent::State as AgentState; use super::db::ServiceSender as DBServiceSender; use super::noti::Noti; -pub fn start(noti: Arc, db_service: DBServiceSender) -> thread::JoinHandle<()> { +pub fn start( + noti: Arc, + db_service: DBServiceSender, + agent_service: AgentServiceSender, +) -> thread::JoinHandle<()> { let network_id = std::env::var("NETWORK_ID").unwrap(); thread::Builder::new() @@ -17,7 +23,7 @@ pub fn start(noti: Arc, db_service: DBServiceSender) -> thread::JoinHandle loop { let new_date = chrono::Utc::now().date(); if new_date != current_date { - send_daily_report(&network_id, Arc::clone(¬i), db_service.clone()); + send_daily_report(&network_id, Arc::clone(¬i), db_service.clone(), agent_service.clone()); } current_date = new_date; thread::sleep(std::time::Duration::from_secs(1000)); @@ -26,12 +32,50 @@ pub fn start(noti: Arc, db_service: DBServiceSender) -> thread::JoinHandle .unwrap() } -pub fn send_daily_report(network_id: &str, noti: Arc, db_service: DBServiceSender) { +pub fn send_daily_report( + network_id: &str, + noti: Arc, + db_service: DBServiceSender, + agent_service: AgentServiceSender, +) { let result = db_service.check_connection(); let db_status = match result { Ok(_) => "DB is connected".to_string(), Err(err) => format!("DB connection has an error : {:?}", err), }; - let messages = ["CodeChain Server is running".to_string(), db_status]; + let mut messages = vec!["CodeChain Server is running".to_string(), db_status]; + + let agent_states = agent_service.get_agents_states(); + for agent_state in agent_states { + match agent_state { + AgentState::Initializing => {} + AgentState::Normal { + name, + address, + status, + recent_update_result, + } => { + messages.push(format!("Agent: {}", name)); + messages.push(format!(" address: {:?}", address)); + messages.push(format!(" status: {:?}", status)); + if let Some(update_result) = recent_update_result { + messages.push(format!(" peer count: {}", update_result.number_of_peers)); + messages.push(format!(" best block number: {:?}", update_result.best_block_number)); + messages.push(format!(" available disk: {} MB", update_result.disk_usage.available / 1_000_000)); + } + } + AgentState::Stop { + name, + address, + status, + .. + } => { + messages.push(format!("Agent: {}", name)); + messages.push(format!(" address: {:?}", address)); + messages.push(format!(" status: {:?}", status)); + } + }; + } + noti.info(network_id, "Daily report", &messages.join("\n")) } diff --git a/server/src/main.rs b/server/src/main.rs index 9ab804a..515020d 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -101,14 +101,15 @@ fn main() { }) .expect("Should success listening frontend"); + let agent_service_for_agent = agent_service_sender.clone(); let agent_join = thread::Builder::new() .name("agent listen".to_string()) .spawn(move || { - listen("0.0.0.0:4012", |out| agent::WebSocketHandler::new(out, agent_service_sender.clone())).unwrap(); + listen("0.0.0.0:4012", |out| agent::WebSocketHandler::new(out, agent_service_for_agent.clone())).unwrap(); }) .expect("Should success listening agent"); - let daily_reporter_join = daily_reporter::start(noti, db_service_sender.clone()); + let daily_reporter_join = daily_reporter::start(noti, db_service_sender, agent_service_sender); frontend_join.join().expect("Join frontend listener"); agent_join.join().expect("Join agent listener"); From 18709769d6eb18f7ce9b1ae95f310b9d5906fdcb Mon Sep 17 00:00:00 2001 From: Park Juhyung Date: Thu, 30 May 2019 20:50:06 +0900 Subject: [PATCH 5/5] Add maximum memory usage of the day in the daily report --- server/src/agent/agent.rs | 53 +++++++++++++++++++++++++++++++++++- server/src/agent/service.rs | 7 +++++ server/src/daily_reporter.rs | 14 ++++++++++ 3 files changed, 73 insertions(+), 1 deletion(-) diff --git a/server/src/agent/agent.rs b/server/src/agent/agent.rs index 95591ff..a0247f3 100644 --- a/server/src/agent/agent.rs +++ b/server/src/agent/agent.rs @@ -33,12 +33,14 @@ pub enum State { address: Option, status: NodeStatus, recent_update_result: Option, + maximum_memory_usage: Option, }, Stop { name: NodeName, address: Option, status: NodeStatus, cause: StopCause, + maximum_memory_usage: Option, }, } @@ -52,12 +54,14 @@ impl PartialEq for State { address: self_address, status: self_status, recent_update_result: _self_recent_update_result, + maximum_memory_usage: _self_maximum_memory_usage, }, State::Normal { name: other_name, address: other_address, status: other_status, recent_update_result: _other_recent_update_result, + maximum_memory_usage: _other_maximum_memory_usage, }, ) => self_name == other_name && self_address == other_address && self_status == other_status, ( @@ -66,12 +70,14 @@ impl PartialEq for State { address: self_address, status: self_status, cause: self_cause, + maximum_memory_usage: _self_maximum_memory_usage, }, State::Stop { name: other_name, address: other_address, status: other_status, cause: other_cause, + maximum_memory_usage: _other_maximum_memory_usage, }, ) => { self_name == other_name @@ -111,14 +117,49 @@ impl State { match self { State::Normal { recent_update_result, + maximum_memory_usage, .. } => { + match *maximum_memory_usage { + None => *maximum_memory_usage = Some(update_result.memory_usage), + Some(prev_memory_usage) => { + if prev_memory_usage.available > update_result.memory_usage.available { + *maximum_memory_usage = Some(update_result.memory_usage) + } + } + } *recent_update_result = Some(update_result); } State::Initializing => {} State::Stop { + maximum_memory_usage, .. - } => {} + } => match *maximum_memory_usage { + None => *maximum_memory_usage = Some(update_result.memory_usage), + Some(prev_memory_usage) => { + if prev_memory_usage.available > update_result.memory_usage.available { + *maximum_memory_usage = Some(update_result.memory_usage) + } + } + }, + } + } + + pub fn reset_maximum_memory_usage(&mut self) { + match self { + State::Normal { + maximum_memory_usage, + .. + } => { + *maximum_memory_usage = None; + } + State::Stop { + maximum_memory_usage, + .. + } => { + *maximum_memory_usage = None; + } + State::Initializing => {} } } } @@ -140,6 +181,10 @@ impl AgentSender { pub fn read_state(&self) -> RwLockReadGuard { self.state.read() } + + pub fn reset_maximum_memory_usage(&self) { + self.state.write().reset_maximum_memory_usage(); + } } pub struct Agent { @@ -268,6 +313,7 @@ impl Agent { number_of_peers, best_block_number, disk_usage, + .. }) = update_result { let node_name = node_name.expect("Updated"); @@ -322,6 +368,7 @@ impl Agent { address: info.address, status: info.status, recent_update_result: None, + maximum_memory_usage: None, }; if let State::Initializing = *state { @@ -346,6 +393,7 @@ impl Agent { address: info.address, status: info.status, cause: StopCause::AlreadyConnected, + maximum_memory_usage: None, }; return Ok(None) } @@ -383,6 +431,7 @@ impl Agent { ctrace!("Update state from {:?} to {:?}", *state, new_state); let number_of_peers = peers.len(); let disk_usage = hardware.disk_usage; + let memory_usage = hardware.memory_usage; self.db_service.update_agent_query_result(db::AgentQueryResult { name: info.name.clone(), status: info.status, @@ -415,6 +464,7 @@ impl Agent { number_of_peers, best_block_number: best_block_id.map(|id| id.block_number as u64), disk_usage, + memory_usage, }; state.update_recent_update_result(update_result.clone()); @@ -485,6 +535,7 @@ pub struct UpdateResult { pub number_of_peers: usize, pub best_block_number: Option, pub disk_usage: HardwareUsage, + pub memory_usage: HardwareUsage, } impl Drop for Agent { diff --git a/server/src/agent/service.rs b/server/src/agent/service.rs index c0d0738..feb689a 100644 --- a/server/src/agent/service.rs +++ b/server/src/agent/service.rs @@ -49,6 +49,13 @@ impl ServiceSender { result } + + pub fn reset_maximum_memory_usages(&self) { + let state = self.state.write(); + for (_, agent) in state.agents.iter() { + agent.reset_maximum_memory_usage(); + } + } } pub struct Service { diff --git a/server/src/daily_reporter.rs b/server/src/daily_reporter.rs index b19550f..0d7bd06 100644 --- a/server/src/daily_reporter.rs +++ b/server/src/daily_reporter.rs @@ -46,6 +46,7 @@ pub fn send_daily_report( let mut messages = vec!["CodeChain Server is running".to_string(), db_status]; let agent_states = agent_service.get_agents_states(); + agent_service.reset_maximum_memory_usages(); for agent_state in agent_states { match agent_state { AgentState::Initializing => {} @@ -54,6 +55,7 @@ pub fn send_daily_report( address, status, recent_update_result, + maximum_memory_usage, } => { messages.push(format!("Agent: {}", name)); messages.push(format!(" address: {:?}", address)); @@ -63,16 +65,28 @@ pub fn send_daily_report( messages.push(format!(" best block number: {:?}", update_result.best_block_number)); messages.push(format!(" available disk: {} MB", update_result.disk_usage.available / 1_000_000)); } + if let Some(maximum_memory_usage) = maximum_memory_usage { + let total_mb = maximum_memory_usage.total / 1_000_000; + let used_mb = (maximum_memory_usage.total - maximum_memory_usage.available) / 1_000_000; + messages.push(format!(" memory usage: {} MB / {} MB", used_mb, total_mb)); + } } AgentState::Stop { name, address, status, + maximum_memory_usage, .. } => { messages.push(format!("Agent: {}", name)); messages.push(format!(" address: {:?}", address)); messages.push(format!(" status: {:?}", status)); + + if let Some(maximum_memory_usage) = maximum_memory_usage { + let total_mb = maximum_memory_usage.total / 1_000_000; + let used_mb = (maximum_memory_usage.total - maximum_memory_usage.available) / 1_000_000; + messages.push(format!(" memory usage: {} MB / {} MB", used_mb, total_mb)); + } } }; }