Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions server/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,13 @@ colored = "1.6"
env_logger = "0.5.7"
iron = "*"
jsonrpc-core = { git = "https://github.com/paritytech/jsonrpc.git", branch = "parity-1.11" }
lazy_static = "1.3.0"
log = "0.4.1"
parking_lot = "0.7.1"
postgres = { version = "0.15", features = ["with-chrono"] }
primitives = { git = "https://github.com/CodeChain-io/rust-codechain-primitives.git", version = "0.4.0" }
rand = "0.5.5"
regex = "1"
sendgrid = "0.8.1"
serde = "1.0"
serde_derive = "1.0"
Expand Down
12 changes: 12 additions & 0 deletions server/src/agent/agent.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::convert::TryFrom;
use std::net::SocketAddr;
use std::ops::Drop;
use std::sync::Arc;
Expand Down Expand Up @@ -316,6 +317,7 @@ impl Agent {
let network_id = self.codechain_rpc.get_network_id(info.status)?;
let whitelist = self.codechain_rpc.get_whitelist(info.status)?;
let blacklist = self.codechain_rpc.get_blacklist(info.status)?;
let network_usage = self.codechain_rpc.get_network_usage(info.status)?;
let hardware = self.sender.hardware_get().map_err(|err| format!("Agent Update {}", err))?;

ctrace!("Update state from {:?} to {:?}", *state, new_state);
Expand All @@ -335,6 +337,16 @@ impl Agent {
});
*state = new_state;

let now = chrono::Utc::now();
if let Some(network_usage) = network_usage {
self.db_service.write_network_usage(info.name.clone(), network_usage, now);
self.db_service.write_peer_count(
info.name.clone(),
i32::try_from(number_of_peers).map_err(|err| err.to_string())?,
now,
);
}

let logs = self.codechain_rpc.get_logs(info.status)?;
self.db_service.write_logs(info.name, logs);

Expand Down
8 changes: 7 additions & 1 deletion server/src/agent/codechain_rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ use serde::de::DeserializeOwned;
use serde_json;
use serde_json::Value;

use super::super::common_rpc_types::{BlackList, BlockId, NodeStatus, PendingTransaction, StructuredLog, WhiteList};
use super::super::common_rpc_types::{
BlackList, BlockId, NetworkUsage, NodeStatus, PendingTransaction, StructuredLog, WhiteList,
};
use super::agent::{AgentSender, SendAgentRPC};
use super::types::ChainGetBestBlockIdResponse;

Expand Down Expand Up @@ -59,6 +61,10 @@ impl CodeChainRPC {
self.call_rpc(status, "net_getBlacklist", Vec::new())
}

pub fn get_network_usage(&self, status: NodeStatus) -> Result<Option<NetworkUsage>, String> {
self.call_rpc(status, "net_recentNetworkUsage", Vec::new())
}

pub fn get_logs(&self, status: NodeStatus) -> Result<Vec<StructuredLog>, String> {
if status != NodeStatus::Run {
return Ok(Default::default())
Expand Down
38 changes: 38 additions & 0 deletions server/src/bin/generate-schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ fn main() {

create_agent_extra_schema(&conn);
create_logs_schema(&conn);
create_network_usage_schema(&conn);
create_peer_count_schema(&conn)
}

fn create_agent_extra_schema(conn: &Connection) {
Expand Down Expand Up @@ -56,3 +58,39 @@ fn create_logs_schema(conn: &Connection) {
cinfo!("Create logs_target index");
conn.execute("CREATE INDEX IF NOT EXISTS logs_targets ON logs (target)", &[]).unwrap();
}

fn create_network_usage_schema(conn: &Connection) {
cinfo!("Create network_usage table");
conn.execute(
"CREATE TABLE IF NOT EXISTS network_usage (
id BIGSERIAL PRIMARY KEY,
time TIMESTAMP WITH TIME ZONE NOT NULL,
name VARCHAR NOT NULL,
extension VARCHAR NOT NULL,
target_ip VARCHAR NOT NULL,
bytes INTEGER NOT NULL
)",
&[],
)
.unwrap();

cinfo!("Create network_usage_time_index");
conn.execute("CREATE INDEX IF NOT EXISTS network_usage_time_index ON network_usage (time)", &[]).unwrap();
}

fn create_peer_count_schema(conn: &Connection) {
cinfo!("Create peer_count table");
conn.execute(
"CREATE TABLE IF NOT EXISTS peer_count (
id BIGSERIAL PRIMARY KEY,
time TIMESTAMP WITH TIME ZONE NOT NULL,
name VARCHAR NOT NULL,
peer_count INTEGER NOT NULL
)",
&[],
)
.unwrap();

cinfo!("Create peer_count_time_index");
conn.execute("CREATE INDEX IF NOT EXISTS peer_count_time_index ON peer_count (time)", &[]).unwrap();
}
52 changes: 52 additions & 0 deletions server/src/common_rpc_types.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use std::collections::HashMap;
use std::net::IpAddr;

use chrono::{DateTime, Utc};
use cprimitives::H256;
use serde_json;

Expand Down Expand Up @@ -60,6 +62,8 @@ pub struct WhiteList {

pub type BlackList = WhiteList;

pub type NetworkUsage = HashMap<String, i32>;

#[derive(Debug, Serialize, Deserialize, PartialEq, Clone, Copy)]
#[serde(rename_all = "camelCase")]
pub struct HardwareUsage {
Expand Down Expand Up @@ -101,3 +105,51 @@ pub enum UpdateCodeChainRequest {
binary_checksum: String,
},
}


#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub enum GraphPeriod {
Minutes5,
Hour,
Day,
}

#[derive(Debug, Serialize, Deserialize, Clone)]
#[serde(rename_all = "camelCase")]
pub struct GraphCommonArgs {
pub from: DateTime<Utc>,
pub to: DateTime<Utc>,
pub period: GraphPeriod,
}

#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct GraphNetworkOutAllRow {
pub node_name: String,
pub time: DateTime<Utc>,
pub value: f32,
}

pub type GraphNetworkOutAllAVGRow = GraphNetworkOutAllRow;

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn serialize_day() {
let period = GraphPeriod::Day;
assert_eq!("\"day\"", &serde_json::to_string(&period).unwrap());
}
#[test]
fn serialize_minutes5() {
let period = GraphPeriod::Minutes5;
assert_eq!("\"minutes5\"", &serde_json::to_string(&period).unwrap());
}
#[test]
fn serialize_hour() {
let period = GraphPeriod::Hour;
assert_eq!("\"hour\"", &serde_json::to_string(&period).unwrap());
}
}
3 changes: 3 additions & 0 deletions server/src/db/queries/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
pub mod agent_extra;
pub mod config;
pub mod logs;
pub mod network_usage;
pub mod network_usage_graph;
pub mod peer_count;
47 changes: 47 additions & 0 deletions server/src/db/queries/network_usage.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
use chrono;
use postgres;
use regex::{Captures, Regex};

use common_rpc_types::NetworkUsage;

pub fn insert(
conn: &postgres::Connection,
node_name: &str,
network_usage: NetworkUsage,
time: chrono::DateTime<chrono::Utc>,
) -> postgres::Result<()> {
ctrace!("Add network usage of {}", node_name);

if network_usage.is_empty() {
return Ok(())
}

let stmt = conn
.prepare("INSERT INTO network_usage (time, name, extension, target_ip, bytes) VALUES ($1, $2, $3, $4, $5)")?;
for key in network_usage.keys() {
let parse_result = parse_network_usage_key(key);
let (extension, ip) = match parse_result {
Ok((extension, ip)) => (extension, ip),
Err(err) => {
cerror!("Network Usage Parse Failed {:?}", err);
// FIXME: propagate the error
return Ok(())
}
};
let bytes = network_usage[key];
stmt.execute(&[&time, &node_name, &extension, &ip, &bytes])?;
}

Ok(())
}

fn parse_network_usage_key(key: &str) -> Result<(String, String), String> {
// Ex) ::block-propagation@54.180.74.243:3485
lazy_static! {
static ref KEY_REGEX: Regex = Regex::new(r"::(?P<extension>[a-zA-Z\-]*)@(?P<ip>[0-9\.]*)").unwrap();
}

let reg_result: Captures = KEY_REGEX.captures(key).ok_or_else(|| "Parse Error".to_string())?;

Ok((reg_result["extension"].to_string(), reg_result["ip"].to_string()))
}
71 changes: 71 additions & 0 deletions server/src/db/queries/network_usage_graph.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
use common_rpc_types::{GraphCommonArgs, GraphNetworkOutAllRow, GraphPeriod};
use postgres;

pub fn query_network_out_all(
conn: &postgres::Connection,
graph_args: GraphCommonArgs,
) -> postgres::Result<Vec<GraphNetworkOutAllRow>> {
let query_stmt = format!(
"\
SELECT \
name, \
{}, \
CAST (AVG(bytes) AS REAL) as value \
FROM \"network_usage\" \
WHERE \"time\"<$1 and \"time\">$2 \
GROUP BY \"name\", \"rounded_time\" \
ORDER BY \"name\", \"rounded_time\" ASC",
get_sql_round_period_expression(graph_args.period)
);

let rows = conn.query(&query_stmt, &[&graph_args.to, &graph_args.from])?;

Ok(rows
.into_iter()
.map(|row| GraphNetworkOutAllRow {
node_name: row.get("name"),
time: row.get("rounded_time"),
value: row.get("value"),
})
.collect())
}

fn get_sql_round_period_expression(period: GraphPeriod) -> &'static str {
match period {
GraphPeriod::Minutes5 => {
"date_trunc('hour', \"network_usage\".time) + INTERVAL '5 min' * ROUND(date_part('minute', \"network_usage\".time) / 5.0) as \"rounded_time\""
}
GraphPeriod::Hour => "date_trunc('hour', \"network_usage\".time) as \"rounded_time\"",
GraphPeriod::Day => "date_trunc('day', \"network_usage\".time) as \"rounded_time\"",
}
}

pub fn query_network_out_all_avg(
conn: &postgres::Connection,
graph_args: GraphCommonArgs,
) -> postgres::Result<Vec<GraphNetworkOutAllRow>> {
let query_stmt = format!(
"\
SELECT \
\"network_usage\".name, \
{}, \
CAST (AVG(bytes/\"peer_count\".\"peer_count\") AS REAL) as value \
FROM \"network_usage\" \
LEFT JOIN peer_count ON (\"network_usage\".\"time\"=\"peer_count\".\"time\") \
WHERE \"network_usage\".\"time\"<$1 and \"network_usage\".\"time\">$2 \
GROUP BY \"network_usage\".\"name\", \"rounded_time\" \
ORDER BY \"network_usage\".\"name\", \"rounded_time\" ASC",
get_sql_round_period_expression(graph_args.period)
);

let rows = conn.query(&query_stmt, &[&graph_args.to, &graph_args.from])?;

Ok(rows
.into_iter()
.map(|row| GraphNetworkOutAllRow {
node_name: row.get("name"),
time: row.get("rounded_time"),
value: row.get("value"),
})
.collect())
}
18 changes: 18 additions & 0 deletions server/src/db/queries/peer_count.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
use chrono;
use postgres;

pub fn insert(
conn: &postgres::Connection,
node_name: &str,
peer_count: i32,
time: chrono::DateTime<chrono::Utc>,
) -> postgres::Result<()> {
ctrace!("Add peer count of {}", node_name);

conn.execute("INSERT INTO peer_count (time, name, peer_count) VALUES ($1, $2, $3)", &[
&time,
&node_name,
&peer_count,
])?;
Ok(())
}
Loading