Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 124 additions & 8 deletions server/src/agent/agent.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::cmp::PartialEq;
use std::convert::TryFrom;
use std::net::SocketAddr;
use std::ops::Drop;
Expand All @@ -24,22 +25,71 @@ use super::types::{AgentGetInfoResponse, CodeChainCallRPCResponse};
use crate::common_rpc_types::HardwareUsage;
use crate::noti::Noti;

#[derive(Clone, PartialEq, Debug)]
#[derive(Clone, Debug)]
pub enum State {
Initializing,
Normal {
name: NodeName,
address: Option<SocketAddr>,
status: NodeStatus,
recent_update_result: Option<UpdateResult>,
maximum_memory_usage: Option<HardwareUsage>,
},
Stop {
name: NodeName,
address: Option<SocketAddr>,
status: NodeStatus,
cause: StopCause,
maximum_memory_usage: Option<HardwareUsage>,
},
}

impl PartialEq for State {
fn eq(&self, other: &State) -> bool {
match (self, other) {
(State::Initializing, State::Initializing) => true,
(
State::Normal {
name: self_name,
address: self_address,
status: self_status,
recent_update_result: _self_recent_update_result,
maximum_memory_usage: _self_maximum_memory_usage,
},
State::Normal {
name: other_name,
address: other_address,
status: other_status,
recent_update_result: _other_recent_update_result,
maximum_memory_usage: _other_maximum_memory_usage,
},
) => self_name == other_name && self_address == other_address && self_status == other_status,
(
State::Stop {
name: self_name,
address: self_address,
status: self_status,
cause: self_cause,
maximum_memory_usage: _self_maximum_memory_usage,
},
State::Stop {
name: other_name,
address: other_address,
status: other_status,
cause: other_cause,
maximum_memory_usage: _other_maximum_memory_usage,
},
) => {
self_name == other_name
&& self_address == other_address
&& self_status == other_status
&& self_cause == other_cause
}
_ => false,
}
}
}

#[derive(Copy, Clone, PartialEq, Debug)]
pub enum StopCause {
AlreadyConnected,
Expand All @@ -62,6 +112,56 @@ impl State {
} => Some(name),
}
}

pub fn update_recent_update_result(&mut self, update_result: UpdateResult) {
match self {
State::Normal {
recent_update_result,
maximum_memory_usage,
..
} => {
match *maximum_memory_usage {
None => *maximum_memory_usage = Some(update_result.memory_usage),
Some(prev_memory_usage) => {
if prev_memory_usage.available > update_result.memory_usage.available {
*maximum_memory_usage = Some(update_result.memory_usage)
}
}
}
*recent_update_result = Some(update_result);
}
State::Initializing => {}
State::Stop {
maximum_memory_usage,
..
} => match *maximum_memory_usage {
None => *maximum_memory_usage = Some(update_result.memory_usage),
Some(prev_memory_usage) => {
if prev_memory_usage.available > update_result.memory_usage.available {
*maximum_memory_usage = Some(update_result.memory_usage)
}
}
},
}
}

pub fn reset_maximum_memory_usage(&mut self) {
match self {
State::Normal {
maximum_memory_usage,
..
} => {
*maximum_memory_usage = None;
}
State::Stop {
maximum_memory_usage,
..
} => {
*maximum_memory_usage = None;
}
State::Initializing => {}
}
}
}

#[derive(Clone)]
Expand All @@ -81,6 +181,10 @@ impl AgentSender {
pub fn read_state(&self) -> RwLockReadGuard<State> {
self.state.read()
}

pub fn reset_maximum_memory_usage(&self) {
self.state.write().reset_maximum_memory_usage();
}
}

pub struct Agent {
Expand Down Expand Up @@ -209,6 +313,7 @@ impl Agent {
number_of_peers,
best_block_number,
disk_usage,
..
}) = update_result
{
let node_name = node_name.expect("Updated");
Expand Down Expand Up @@ -262,6 +367,8 @@ impl Agent {
name: info.name.clone(),
address: info.address,
status: info.status,
recent_update_result: None,
maximum_memory_usage: None,
};

if let State::Initializing = *state {
Expand All @@ -286,6 +393,7 @@ impl Agent {
address: info.address,
status: info.status,
cause: StopCause::AlreadyConnected,
maximum_memory_usage: None,
};
return Ok(None)
}
Expand Down Expand Up @@ -323,6 +431,7 @@ impl Agent {
ctrace!("Update state from {:?} to {:?}", *state, new_state);
let number_of_peers = peers.len();
let disk_usage = hardware.disk_usage;
let memory_usage = hardware.memory_usage;
self.db_service.update_agent_query_result(db::AgentQueryResult {
name: info.name.clone(),
status: info.status,
Expand Down Expand Up @@ -350,12 +459,17 @@ impl Agent {
let logs = self.codechain_rpc.get_logs(info.status)?;
self.db_service.write_logs(info.name, logs);

Ok(Some(UpdateResult {
let update_result = UpdateResult {
network_id: network_id.unwrap_or_default(),
number_of_peers,
best_block_number: best_block_id.map(|id| id.block_number as u64),
disk_usage,
}))
memory_usage,
};

state.update_recent_update_result(update_result.clone());

Ok(Some(update_result))
}

fn clean_up(&mut self, reason: AgentCleanupReason) {
Expand Down Expand Up @@ -415,11 +529,13 @@ impl Agent {
}
}

struct UpdateResult {
network_id: String,
number_of_peers: usize,
best_block_number: Option<u64>,
disk_usage: HardwareUsage,
#[derive(Clone, Debug)]
pub struct UpdateResult {
pub network_id: String,
pub number_of_peers: usize,
pub best_block_number: Option<u64>,
pub disk_usage: HardwareUsage,
pub memory_usage: HardwareUsage,
}

impl Drop for Agent {
Expand Down
20 changes: 19 additions & 1 deletion server/src/agent/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use parking_lot::RwLock;

use super::super::db;
use super::super::jsonrpc;
use super::agent::{Agent, AgentSender};
use super::agent::{Agent, AgentSender, State as AgentState};
use crate::noti::Noti;

#[derive(Default)]
Expand Down Expand Up @@ -38,6 +38,24 @@ impl ServiceSender {

find_result.map(|(_, agent)| agent.clone())
}

pub fn get_agents_states(&self) -> Vec<AgentState> {
let state = self.state.read();
let mut result = Vec::new();
for (_, agent) in state.agents.iter() {
let state = agent.read_state().clone();
result.push(state);
}

result
}

pub fn reset_maximum_memory_usages(&self) {
let state = self.state.write();
for (_, agent) in state.agents.iter() {
agent.reset_maximum_memory_usage();
}
}
}

pub struct Service {
Expand Down
95 changes: 95 additions & 0 deletions server/src/daily_reporter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
use std::sync::Arc;
use std::thread;

use chrono;

use super::agent::ServiceSender as AgentServiceSender;
use super::agent::State as AgentState;
use super::db::ServiceSender as DBServiceSender;
use super::noti::Noti;

pub fn start(
noti: Arc<Noti>,
db_service: DBServiceSender,
agent_service: AgentServiceSender,
) -> thread::JoinHandle<()> {
let network_id = std::env::var("NETWORK_ID").unwrap();

thread::Builder::new()
.name("daily reporter".to_string())
.spawn(move || {
let mut current_date = chrono::Utc::now().date();

loop {
let new_date = chrono::Utc::now().date();
if new_date != current_date {
send_daily_report(&network_id, Arc::clone(&noti), db_service.clone(), agent_service.clone());
}
current_date = new_date;
thread::sleep(std::time::Duration::from_secs(1000));
}
})
.unwrap()
}

pub fn send_daily_report(
network_id: &str,
noti: Arc<Noti>,
db_service: DBServiceSender,
agent_service: AgentServiceSender,
) {
let result = db_service.check_connection();
let db_status = match result {
Ok(_) => "DB is connected".to_string(),
Err(err) => format!("DB connection has an error : {:?}", err),
};
let mut messages = vec!["CodeChain Server is running".to_string(), db_status];

let agent_states = agent_service.get_agents_states();
agent_service.reset_maximum_memory_usages();
for agent_state in agent_states {
match agent_state {
AgentState::Initializing => {}
AgentState::Normal {
name,
address,
status,
recent_update_result,
maximum_memory_usage,
} => {
messages.push(format!("Agent: {}", name));
messages.push(format!(" address: {:?}", address));
messages.push(format!(" status: {:?}", status));
if let Some(update_result) = recent_update_result {
messages.push(format!(" peer count: {}", update_result.number_of_peers));
messages.push(format!(" best block number: {:?}", update_result.best_block_number));
messages.push(format!(" available disk: {} MB", update_result.disk_usage.available / 1_000_000));
}
if let Some(maximum_memory_usage) = maximum_memory_usage {
let total_mb = maximum_memory_usage.total / 1_000_000;
let used_mb = (maximum_memory_usage.total - maximum_memory_usage.available) / 1_000_000;
messages.push(format!(" memory usage: {} MB / {} MB", used_mb, total_mb));
}
}
AgentState::Stop {
name,
address,
status,
maximum_memory_usage,
..
} => {
messages.push(format!("Agent: {}", name));
messages.push(format!(" address: {:?}", address));
messages.push(format!(" status: {:?}", status));

if let Some(maximum_memory_usage) = maximum_memory_usage {
let total_mb = maximum_memory_usage.total / 1_000_000;
let used_mb = (maximum_memory_usage.total - maximum_memory_usage.available) / 1_000_000;
messages.push(format!(" memory usage: {} MB / {} MB", used_mb, total_mb));
}
}
};
}

noti.info(network_id, "Daily report", &messages.join("\n"))
}
17 changes: 17 additions & 0 deletions server/src/db/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use util;

#[derive(Debug, Clone)]
pub enum Message {
CheckConnection(Sender<Result<(), DBError>>),
InitializeAgent(Box<AgentQueryResult>, Sender<bool>),
UpdateAgent(Box<AgentQueryResult>),
GetAgent(NodeName, Sender<Option<AgentQueryResult>>),
Expand Down Expand Up @@ -98,6 +99,9 @@ impl Service {
.spawn(move || {
for message in rx {
match message {
Message::CheckConnection(callback) => {
service.check_connection(callback);
}
Message::InitializeAgent(agent_query_result, callback) => {
service.initialize_agent(&agent_query_result, callback);
}
Expand Down Expand Up @@ -183,6 +187,13 @@ impl Service {
service_sender
}

fn check_connection(&self, callback: Sender<Result<(), DBError>>) {
let result = self.db_conn.execute(&"SELECT 1", &[]).map_err(|err| DBError::Internal(err.to_string()));
if let Err(err) = callback.send(result.map(|_| ())) {
cerror!("Cannot send callback : {}", err);
}
}

fn initialize_agent(&mut self, state: &AgentQueryResult, callback: Sender<bool>) {
let name = state.name.clone();
let before = match self.state.agent_query_result.entry(name) {
Expand Down Expand Up @@ -392,6 +403,12 @@ impl ServiceSender {
}
}

pub fn check_connection(&self) -> Result<(), DBError> {
let (tx, rx) = channel();
self.sender.send(Message::CheckConnection(tx)).expect("Should success check connection");
rx.recv()?
}

pub fn initialize_agent_query_result(&self, agent_query_result: AgentQueryResult) -> Result<bool, DBError> {
let (tx, rx) = channel();
self.sender.send(Message::InitializeAgent(agent_query_result.into(), tx)).expect("Should success update agent");
Expand Down
Loading