diff --git a/server/src/agent/agent.rs b/server/src/agent/agent.rs index ef27961..a0247f3 100644 --- a/server/src/agent/agent.rs +++ b/server/src/agent/agent.rs @@ -1,3 +1,4 @@ +use std::cmp::PartialEq; use std::convert::TryFrom; use std::net::SocketAddr; use std::ops::Drop; @@ -24,22 +25,71 @@ use super::types::{AgentGetInfoResponse, CodeChainCallRPCResponse}; use crate::common_rpc_types::HardwareUsage; use crate::noti::Noti; -#[derive(Clone, PartialEq, Debug)] +#[derive(Clone, Debug)] pub enum State { Initializing, Normal { name: NodeName, address: Option, status: NodeStatus, + recent_update_result: Option, + maximum_memory_usage: Option, }, Stop { name: NodeName, address: Option, status: NodeStatus, cause: StopCause, + maximum_memory_usage: Option, }, } +impl PartialEq for State { + fn eq(&self, other: &State) -> bool { + match (self, other) { + (State::Initializing, State::Initializing) => true, + ( + State::Normal { + name: self_name, + address: self_address, + status: self_status, + recent_update_result: _self_recent_update_result, + maximum_memory_usage: _self_maximum_memory_usage, + }, + State::Normal { + name: other_name, + address: other_address, + status: other_status, + recent_update_result: _other_recent_update_result, + maximum_memory_usage: _other_maximum_memory_usage, + }, + ) => self_name == other_name && self_address == other_address && self_status == other_status, + ( + State::Stop { + name: self_name, + address: self_address, + status: self_status, + cause: self_cause, + maximum_memory_usage: _self_maximum_memory_usage, + }, + State::Stop { + name: other_name, + address: other_address, + status: other_status, + cause: other_cause, + maximum_memory_usage: _other_maximum_memory_usage, + }, + ) => { + self_name == other_name + && self_address == other_address + && self_status == other_status + && self_cause == other_cause + } + _ => false, + } + } +} + #[derive(Copy, Clone, PartialEq, Debug)] pub enum StopCause { AlreadyConnected, @@ -62,6 +112,56 @@ impl State { } => Some(name), } } + + pub fn update_recent_update_result(&mut self, update_result: UpdateResult) { + match self { + State::Normal { + recent_update_result, + maximum_memory_usage, + .. + } => { + match *maximum_memory_usage { + None => *maximum_memory_usage = Some(update_result.memory_usage), + Some(prev_memory_usage) => { + if prev_memory_usage.available > update_result.memory_usage.available { + *maximum_memory_usage = Some(update_result.memory_usage) + } + } + } + *recent_update_result = Some(update_result); + } + State::Initializing => {} + State::Stop { + maximum_memory_usage, + .. + } => match *maximum_memory_usage { + None => *maximum_memory_usage = Some(update_result.memory_usage), + Some(prev_memory_usage) => { + if prev_memory_usage.available > update_result.memory_usage.available { + *maximum_memory_usage = Some(update_result.memory_usage) + } + } + }, + } + } + + pub fn reset_maximum_memory_usage(&mut self) { + match self { + State::Normal { + maximum_memory_usage, + .. + } => { + *maximum_memory_usage = None; + } + State::Stop { + maximum_memory_usage, + .. + } => { + *maximum_memory_usage = None; + } + State::Initializing => {} + } + } } #[derive(Clone)] @@ -81,6 +181,10 @@ impl AgentSender { pub fn read_state(&self) -> RwLockReadGuard { self.state.read() } + + pub fn reset_maximum_memory_usage(&self) { + self.state.write().reset_maximum_memory_usage(); + } } pub struct Agent { @@ -209,6 +313,7 @@ impl Agent { number_of_peers, best_block_number, disk_usage, + .. }) = update_result { let node_name = node_name.expect("Updated"); @@ -262,6 +367,8 @@ impl Agent { name: info.name.clone(), address: info.address, status: info.status, + recent_update_result: None, + maximum_memory_usage: None, }; if let State::Initializing = *state { @@ -286,6 +393,7 @@ impl Agent { address: info.address, status: info.status, cause: StopCause::AlreadyConnected, + maximum_memory_usage: None, }; return Ok(None) } @@ -323,6 +431,7 @@ impl Agent { ctrace!("Update state from {:?} to {:?}", *state, new_state); let number_of_peers = peers.len(); let disk_usage = hardware.disk_usage; + let memory_usage = hardware.memory_usage; self.db_service.update_agent_query_result(db::AgentQueryResult { name: info.name.clone(), status: info.status, @@ -350,12 +459,17 @@ impl Agent { let logs = self.codechain_rpc.get_logs(info.status)?; self.db_service.write_logs(info.name, logs); - Ok(Some(UpdateResult { + let update_result = UpdateResult { network_id: network_id.unwrap_or_default(), number_of_peers, best_block_number: best_block_id.map(|id| id.block_number as u64), disk_usage, - })) + memory_usage, + }; + + state.update_recent_update_result(update_result.clone()); + + Ok(Some(update_result)) } fn clean_up(&mut self, reason: AgentCleanupReason) { @@ -415,11 +529,13 @@ impl Agent { } } -struct UpdateResult { - network_id: String, - number_of_peers: usize, - best_block_number: Option, - disk_usage: HardwareUsage, +#[derive(Clone, Debug)] +pub struct UpdateResult { + pub network_id: String, + pub number_of_peers: usize, + pub best_block_number: Option, + pub disk_usage: HardwareUsage, + pub memory_usage: HardwareUsage, } impl Drop for Agent { diff --git a/server/src/agent/service.rs b/server/src/agent/service.rs index a4365c1..feb689a 100644 --- a/server/src/agent/service.rs +++ b/server/src/agent/service.rs @@ -7,7 +7,7 @@ use parking_lot::RwLock; use super::super::db; use super::super::jsonrpc; -use super::agent::{Agent, AgentSender}; +use super::agent::{Agent, AgentSender, State as AgentState}; use crate::noti::Noti; #[derive(Default)] @@ -38,6 +38,24 @@ impl ServiceSender { find_result.map(|(_, agent)| agent.clone()) } + + pub fn get_agents_states(&self) -> Vec { + let state = self.state.read(); + let mut result = Vec::new(); + for (_, agent) in state.agents.iter() { + let state = agent.read_state().clone(); + result.push(state); + } + + result + } + + pub fn reset_maximum_memory_usages(&self) { + let state = self.state.write(); + for (_, agent) in state.agents.iter() { + agent.reset_maximum_memory_usage(); + } + } } pub struct Service { diff --git a/server/src/daily_reporter.rs b/server/src/daily_reporter.rs new file mode 100644 index 0000000..0d7bd06 --- /dev/null +++ b/server/src/daily_reporter.rs @@ -0,0 +1,95 @@ +use std::sync::Arc; +use std::thread; + +use chrono; + +use super::agent::ServiceSender as AgentServiceSender; +use super::agent::State as AgentState; +use super::db::ServiceSender as DBServiceSender; +use super::noti::Noti; + +pub fn start( + noti: Arc, + db_service: DBServiceSender, + agent_service: AgentServiceSender, +) -> thread::JoinHandle<()> { + let network_id = std::env::var("NETWORK_ID").unwrap(); + + thread::Builder::new() + .name("daily reporter".to_string()) + .spawn(move || { + let mut current_date = chrono::Utc::now().date(); + + loop { + let new_date = chrono::Utc::now().date(); + if new_date != current_date { + send_daily_report(&network_id, Arc::clone(¬i), db_service.clone(), agent_service.clone()); + } + current_date = new_date; + thread::sleep(std::time::Duration::from_secs(1000)); + } + }) + .unwrap() +} + +pub fn send_daily_report( + network_id: &str, + noti: Arc, + db_service: DBServiceSender, + agent_service: AgentServiceSender, +) { + let result = db_service.check_connection(); + let db_status = match result { + Ok(_) => "DB is connected".to_string(), + Err(err) => format!("DB connection has an error : {:?}", err), + }; + let mut messages = vec!["CodeChain Server is running".to_string(), db_status]; + + let agent_states = agent_service.get_agents_states(); + agent_service.reset_maximum_memory_usages(); + for agent_state in agent_states { + match agent_state { + AgentState::Initializing => {} + AgentState::Normal { + name, + address, + status, + recent_update_result, + maximum_memory_usage, + } => { + messages.push(format!("Agent: {}", name)); + messages.push(format!(" address: {:?}", address)); + messages.push(format!(" status: {:?}", status)); + if let Some(update_result) = recent_update_result { + messages.push(format!(" peer count: {}", update_result.number_of_peers)); + messages.push(format!(" best block number: {:?}", update_result.best_block_number)); + messages.push(format!(" available disk: {} MB", update_result.disk_usage.available / 1_000_000)); + } + if let Some(maximum_memory_usage) = maximum_memory_usage { + let total_mb = maximum_memory_usage.total / 1_000_000; + let used_mb = (maximum_memory_usage.total - maximum_memory_usage.available) / 1_000_000; + messages.push(format!(" memory usage: {} MB / {} MB", used_mb, total_mb)); + } + } + AgentState::Stop { + name, + address, + status, + maximum_memory_usage, + .. + } => { + messages.push(format!("Agent: {}", name)); + messages.push(format!(" address: {:?}", address)); + messages.push(format!(" status: {:?}", status)); + + if let Some(maximum_memory_usage) = maximum_memory_usage { + let total_mb = maximum_memory_usage.total / 1_000_000; + let used_mb = (maximum_memory_usage.total - maximum_memory_usage.available) / 1_000_000; + messages.push(format!(" memory usage: {} MB / {} MB", used_mb, total_mb)); + } + } + }; + } + + noti.info(network_id, "Daily report", &messages.join("\n")) +} diff --git a/server/src/db/service.rs b/server/src/db/service.rs index f5a8cc9..ffd1377 100644 --- a/server/src/db/service.rs +++ b/server/src/db/service.rs @@ -21,6 +21,7 @@ use util; #[derive(Debug, Clone)] pub enum Message { + CheckConnection(Sender>), InitializeAgent(Box, Sender), UpdateAgent(Box), GetAgent(NodeName, Sender>), @@ -98,6 +99,9 @@ impl Service { .spawn(move || { for message in rx { match message { + Message::CheckConnection(callback) => { + service.check_connection(callback); + } Message::InitializeAgent(agent_query_result, callback) => { service.initialize_agent(&agent_query_result, callback); } @@ -183,6 +187,13 @@ impl Service { service_sender } + fn check_connection(&self, callback: Sender>) { + let result = self.db_conn.execute(&"SELECT 1", &[]).map_err(|err| DBError::Internal(err.to_string())); + if let Err(err) = callback.send(result.map(|_| ())) { + cerror!("Cannot send callback : {}", err); + } + } + fn initialize_agent(&mut self, state: &AgentQueryResult, callback: Sender) { let name = state.name.clone(); let before = match self.state.agent_query_result.entry(name) { @@ -392,6 +403,12 @@ impl ServiceSender { } } + pub fn check_connection(&self) -> Result<(), DBError> { + let (tx, rx) = channel(); + self.sender.send(Message::CheckConnection(tx)).expect("Should success check connection"); + rx.recv()? + } + pub fn initialize_agent_query_result(&self, agent_query_result: AgentQueryResult) -> Result { let (tx, rx) = channel(); self.sender.send(Message::InitializeAgent(agent_query_result.into(), tx)).expect("Should success update agent"); diff --git a/server/src/main.rs b/server/src/main.rs index 61aec42..515020d 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -23,6 +23,7 @@ extern crate ws; mod logger; mod agent; mod common_rpc_types; +mod daily_reporter; mod db; mod event_propagator; mod frontend; @@ -76,9 +77,10 @@ fn main() { db_user: db_user.to_string(), db_password: db_password.to_string(), }); - let agent_service_sender = agent::Service::run_thread(db_service_sender.clone(), noti); + let agent_service_sender = agent::Service::run_thread(db_service_sender.clone(), Arc::clone(¬i)); let agent_service_for_frontend = agent_service_sender.clone(); + let db_service_sender_for_frontend = db_service_sender.clone(); let frontend_join = thread::Builder::new() .name("frontend listen".to_string()) .spawn(move || { @@ -86,7 +88,7 @@ fn main() { frontend::add_routing(Arc::get_mut(&mut frontend_router).unwrap()); let frontend_context = frontend::Context { agent_service: agent_service_for_frontend, - db_service: db_service_sender.clone(), + db_service: db_service_sender_for_frontend.clone(), passphrase: std::env::var("PASSPHRASE").unwrap_or_else(|_| "passphrase".to_string()), }; listen("0.0.0.0:3012", move |out| frontend::WebSocketHandler { @@ -99,13 +101,17 @@ fn main() { }) .expect("Should success listening frontend"); + let agent_service_for_agent = agent_service_sender.clone(); let agent_join = thread::Builder::new() .name("agent listen".to_string()) .spawn(move || { - listen("0.0.0.0:4012", |out| agent::WebSocketHandler::new(out, agent_service_sender.clone())).unwrap(); + listen("0.0.0.0:4012", |out| agent::WebSocketHandler::new(out, agent_service_for_agent.clone())).unwrap(); }) .expect("Should success listening agent"); + let daily_reporter_join = daily_reporter::start(noti, db_service_sender, agent_service_sender); + frontend_join.join().expect("Join frontend listener"); agent_join.join().expect("Join agent listener"); + daily_reporter_join.join().expect("Join daily reporter"); } diff --git a/server/src/noti/mod.rs b/server/src/noti/mod.rs index c540f56..204625c 100644 --- a/server/src/noti/mod.rs +++ b/server/src/noti/mod.rs @@ -41,13 +41,7 @@ pub struct Noti { impl Noti { pub fn warn(&self, network_id: &str, message: &str) { - let mut targets = Vec::with_capacity(2); - if self.slack.is_some() { - targets.push("slack"); - } - if self.sendgrid.is_some() { - targets.push("sendgrid"); - } + let targets = self.targets(); if targets.is_empty() { cinfo!("No targets to send warning: {}", message); return @@ -68,4 +62,38 @@ impl Noti { } } } + + pub fn info(&self, network_id: &str, title: &str, message: &str) { + let targets = self.targets(); + if targets.is_empty() { + cinfo!("No targets to send info: {}", message); + return + } + cinfo!("Send a info to {}: {}", targets.join(", "), message); + + if let Some(slack) = self.slack.as_ref() { + if let Err(err) = slack.send(format!("{}: {}", network_id, message)) { + cwarn!("Cannot send a slack message({}): {}", message, err); + } + } + if let Some(sendgrid) = self.sendgrid.as_ref() { + if let Err(err) = sendgrid.send( + format!("[info][{}][dashboard-server] {} at {}", network_id, title, Utc::now().to_rfc3339()), + message, + ) { + cwarn!("Cannot send an email({}): {}", message, err); + } + } + } + + fn targets(&self) -> Vec<&str> { + let mut targets = Vec::with_capacity(2); + if self.slack.is_some() { + targets.push("slack"); + } + if self.sendgrid.is_some() { + targets.push("sendgrid"); + } + targets + } }