Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 105 additions & 31 deletions server/Cargo.lock

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,5 @@ serde_json = "1.0"
slack-hook = "0.8.0"
time = "0.1"
ws = "*"
r2d2_postgres = "0.14.0"
r2d2 = "0.8.6"
7 changes: 3 additions & 4 deletions server/src/db/queries/client_extra.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
use postgres;

use super::super::types::DBConnection;
use super::super::ClientExtra;

pub fn get(conn: &postgres::Connection, node_name: &str) -> postgres::Result<Option<ClientExtra>> {
pub fn get(conn: &DBConnection, node_name: &str) -> postgres::Result<Option<ClientExtra>> {
ctrace!("Query client extra by name {}", node_name);

let rows = conn.query("SELECT * FROM client_extra WHERE name=$1;", &[&node_name])?;
Expand All @@ -16,7 +15,7 @@ pub fn get(conn: &postgres::Connection, node_name: &str) -> postgres::Result<Opt
}))
}

pub fn upsert(conn: &postgres::Connection, node_name: &str, client_extra: &ClientExtra) -> postgres::Result<()> {
pub fn upsert(conn: &DBConnection, node_name: &str, client_extra: &ClientExtra) -> postgres::Result<()> {
ctrace!("Upsert client extra {:?}", client_extra);
let result = conn.execute(
"INSERT INTO client_extra (name, prev_env, prev_args) VALUES ($1, $2, $3) \
Expand Down
4 changes: 2 additions & 2 deletions server/src/db/queries/config.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use postgres;
use super::super::types::DBConnection;

pub fn set_query_timeout(conn: &postgres::Connection) -> postgres::Result<()> {
pub fn set_query_timeout(conn: &DBConnection) -> postgres::Result<()> {
conn.execute("SET SESSION statement_timeout TO 2000", &[])?;
Ok(())
}
7 changes: 4 additions & 3 deletions server/src/db/queries/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@ use postgres;
use postgres::types::ToSql;

use super::super::super::common_rpc_types::StructuredLog;
use super::super::types::DBConnection;
use super::super::types::OrderBy;
use super::super::types::{Log, LogQueryParams};

pub fn insert(conn: &postgres::Connection, node_name: &str, logs: Vec<StructuredLog>) -> postgres::Result<()> {
pub fn insert(conn: &DBConnection, node_name: &str, logs: Vec<StructuredLog>) -> postgres::Result<()> {
ctrace!("Add log {} : {:?}", node_name, logs);

if logs.is_empty() {
Expand Down Expand Up @@ -53,7 +54,7 @@ pub fn insert(conn: &postgres::Connection, node_name: &str, logs: Vec<Structured
Ok(())
}

pub fn search(conn: &postgres::Connection, params: LogQueryParams) -> postgres::Result<Vec<Log>> {
pub fn search(conn: &DBConnection, params: LogQueryParams) -> postgres::Result<Vec<Log>> {
ctrace!("Search log with {:?}", params);
let mut parameters = Parameters::default();
let mut where_conditions = Vec::new();
Expand Down Expand Up @@ -145,7 +146,7 @@ impl Parameters {
}
}

pub fn get_targets(conn: &postgres::Connection) -> postgres::Result<Vec<String>> {
pub fn get_targets(conn: &DBConnection) -> postgres::Result<Vec<String>> {
ctrace!("Query targets");

// let rows = conn.query("SELECT DISTINCT target FROM logs", &[])?;
Expand Down
3 changes: 2 additions & 1 deletion server/src/db/queries/network_usage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@ use chrono;
use postgres;
use regex::{Captures, Regex};

use super::super::types::DBConnection;
use common_rpc_types::NetworkUsage;
use util::{floor_to_5min, start_of_day, start_of_hour};

pub fn insert(
conn: &postgres::Connection,
conn: &DBConnection,
node_name: &str,
network_usage: NetworkUsage,
time: chrono::DateTime<chrono::Utc>,
Expand Down
10 changes: 6 additions & 4 deletions server/src/db/queries/network_usage_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ use common_rpc_types::{
};
use postgres;

use super::super::types::DBConnection;

pub fn query_network_out_all(
conn: &postgres::Connection,
conn: &DBConnection,
graph_args: GraphCommonArgs,
) -> postgres::Result<Vec<GraphNetworkOutAllRow>> {
let time_column_name = get_sql_column_name_by_period(graph_args.period);
Expand Down Expand Up @@ -38,7 +40,7 @@ fn get_sql_column_name_by_period(period: GraphPeriod) -> &'static str {
}

pub fn query_network_out_all_avg(
conn: &postgres::Connection,
conn: &DBConnection,
graph_args: GraphCommonArgs,
) -> postgres::Result<Vec<GraphNetworkOutAllRow>> {
let time_column_name = get_sql_column_name_by_period(graph_args.period);
Expand All @@ -63,7 +65,7 @@ pub fn query_network_out_all_avg(
}

pub fn query_network_out_node_extension(
conn: &postgres::Connection,
conn: &DBConnection,
node_name: NodeName,
graph_args: GraphCommonArgs,
) -> postgres::Result<Vec<GraphNetworkOutNodeExtensionRow>> {
Expand All @@ -90,7 +92,7 @@ pub fn query_network_out_node_extension(
}

pub fn query_network_out_node_peer(
conn: &postgres::Connection,
conn: &DBConnection,
node_name: NodeName,
graph_args: GraphCommonArgs,
) -> postgres::Result<Vec<GraphNetworkOutNodePeerRow>> {
Expand Down
4 changes: 3 additions & 1 deletion server/src/db/queries/peer_count.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use chrono;
use postgres;

use super::super::types::DBConnection;

pub fn insert(
conn: &postgres::Connection,
conn: &DBConnection,
node_name: &str,
peer_count: i32,
time: chrono::DateTime<chrono::Utc>,
Expand Down
56 changes: 36 additions & 20 deletions server/src/db/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@ use std::net::SocketAddr;
use std::sync::mpsc::{channel, Sender};
use std::thread;

use postgres;
use postgres::TlsMode;
use r2d2_postgres::PostgresConnectionManager;

use super::super::common_rpc_types as rpc_type;
use super::super::common_rpc_types::{NodeName, NodeStatus, StructuredLog};
Expand All @@ -17,6 +16,7 @@ use common_rpc_types::{
GraphCommonArgs, GraphNetworkOutAllAVGRow, GraphNetworkOutAllRow, GraphNetworkOutNodeExtensionRow,
GraphNetworkOutNodePeerRow, NetworkUsage,
};
use db::types::DBConnection;
use util;

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -58,7 +58,7 @@ struct State {
pub struct Service {
state: State,
event_subscriber: Box<dyn EventSubscriber>,
db_conn: postgres::Connection,
pool: r2d2::Pool<PostgresConnectionManager>,
}

pub struct ServiceNewArg {
Expand All @@ -75,15 +75,20 @@ impl Service {
db_password,
}: ServiceNewArg,
) -> Self {
let conn_uri = format!("postgres://{}:{}@localhost", db_user, db_password);
let manager = PostgresConnectionManager::new(
format!("postgres://{}:{}@localhost", db_user, db_password),
r2d2_postgres::TlsMode::None,
)
.expect("Create connection manager");
let pool = r2d2::Pool::new(manager).expect("Create connection pool");

let conn = postgres::Connection::connect(conn_uri, TlsMode::None).unwrap();
queries::config::set_query_timeout(&conn).unwrap();
let connection = pool.get().expect("Get connection");
queries::config::set_query_timeout(&connection).unwrap();

Self {
state: State::default(),
event_subscriber,
db_conn: conn,
pool,
}
}

Expand Down Expand Up @@ -187,8 +192,19 @@ impl Service {
service_sender
}

fn db_conn(&self) -> Result<DBConnection, DBError> {
self.pool.get().map_err(|err| DBError::Internal(err.to_string()))
}

fn check_connection(&self, callback: Sender<Result<(), DBError>>) {
let result = self.db_conn.execute(&"SELECT 1", &[]).map_err(|err| DBError::Internal(err.to_string()));
let conn = match self.db_conn() {
Ok(conn) => conn,
Err(err) => {
cerror!("check_connection: {:?}", err);
return
}
};
let result = conn.execute(&"SELECT 1", &[]).map_err(|err| DBError::Internal(err.to_string()));
if let Err(err) = callback.send(result.map(|_| ())) {
cerror!("Cannot send callback : {}", err);
}
Expand Down Expand Up @@ -299,13 +315,13 @@ impl Service {
}

fn save_start_option(&mut self, node_name: NodeName, env: &str, args: &str) -> Result<(), Box<dyn error::Error>> {
let before_extra = queries::client_extra::get(&self.db_conn, &node_name)?;
let before_extra = queries::client_extra::get(&self.db_conn()?, &node_name)?;
let mut extra = before_extra.clone().unwrap_or_default();

extra.prev_env = env.to_string();
extra.prev_args = args.to_string();

queries::client_extra::upsert(&self.db_conn, &node_name, &extra)?;
queries::client_extra::upsert(&self.db_conn()?, &node_name, &extra)?;

self.event_subscriber.on_event(Event::ClientExtraUpdated {
name: node_name,
Expand All @@ -321,26 +337,26 @@ impl Service {
node_name: &str,
callback: Sender<Option<ClientExtra>>,
) -> Result<(), Box<dyn error::Error>> {
let extra = queries::client_extra::get(&self.db_conn, node_name)?;
let extra = queries::client_extra::get(&self.db_conn()?, node_name)?;
if let Err(err) = callback.send(extra) {
cerror!("Callback error {}", err);
}
Ok(())
}

fn get_logs(&self, params: LogQueryParams, callback: Sender<Vec<Log>>) -> Result<(), Box<dyn error::Error>> {
let logs = queries::logs::search(&self.db_conn, params)?;
let logs = queries::logs::search(&self.db_conn()?, params)?;
callback.send(logs)?;
Ok(())
}

fn write_logs(&self, node_name: &str, logs: Vec<StructuredLog>) -> Result<(), Box<dyn error::Error>> {
queries::logs::insert(&self.db_conn, node_name, logs)?;
queries::logs::insert(&self.db_conn()?, node_name, logs)?;
Ok(())
}

fn get_log_targets(&self, callback: Sender<Vec<String>>) -> Result<(), Box<dyn error::Error>> {
let targets = queries::logs::get_targets(&self.db_conn)?;
let targets = queries::logs::get_targets(&self.db_conn()?)?;
callback.send(targets)?;
Ok(())
}
Expand All @@ -351,7 +367,7 @@ impl Service {
network_usage: NetworkUsage,
time: chrono::DateTime<chrono::Utc>,
) -> Result<(), Box<dyn error::Error>> {
queries::network_usage::insert(&self.db_conn, node_name, network_usage, time)?;
queries::network_usage::insert(&self.db_conn()?, node_name, network_usage, time)?;
Ok(())
}

Expand All @@ -361,23 +377,23 @@ impl Service {
peer_count: i32,
time: chrono::DateTime<chrono::Utc>,
) -> Result<(), Box<dyn error::Error>> {
queries::peer_count::insert(&self.db_conn, node_name, peer_count, time)?;
queries::peer_count::insert(&self.db_conn()?, node_name, peer_count, time)?;
Ok(())
}

fn get_network_out_all_graph(
&self,
args: GraphCommonArgs,
) -> Result<Vec<GraphNetworkOutAllRow>, Box<dyn error::Error>> {
let rows = queries::network_usage_graph::query_network_out_all(&self.db_conn, args)?;
let rows = queries::network_usage_graph::query_network_out_all(&self.db_conn()?, args)?;
Ok(rows)
}

fn get_network_out_all_avg_graph(
&self,
args: GraphCommonArgs,
) -> Result<Vec<GraphNetworkOutAllRow>, Box<dyn error::Error>> {
let rows = queries::network_usage_graph::query_network_out_all_avg(&self.db_conn, args)?;
let rows = queries::network_usage_graph::query_network_out_all_avg(&self.db_conn()?, args)?;
Ok(rows)
}

Expand All @@ -386,7 +402,7 @@ impl Service {
node_name: NodeName,
args: GraphCommonArgs,
) -> Result<Vec<GraphNetworkOutNodeExtensionRow>, Box<dyn error::Error>> {
let rows = queries::network_usage_graph::query_network_out_node_extension(&self.db_conn, node_name, args)?;
let rows = queries::network_usage_graph::query_network_out_node_extension(&self.db_conn()?, node_name, args)?;
Ok(rows)
}

Expand All @@ -395,7 +411,7 @@ impl Service {
node_name: NodeName,
args: GraphCommonArgs,
) -> Result<Vec<GraphNetworkOutNodePeerRow>, Box<dyn error::Error>> {
let rows = queries::network_usage_graph::query_network_out_node_peer(&self.db_conn, node_name, args)?;
let rows = queries::network_usage_graph::query_network_out_node_peer(&self.db_conn()?, node_name, args)?;
Ok(rows)
}
}
Expand Down
13 changes: 13 additions & 0 deletions server/src/db/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ use std::sync::mpsc::RecvError;
use super::super::common_rpc_types::{
BlackList, BlockId, HardwareInfo, NodeName, NodeStatus, NodeVersion, PendingTransaction, WhiteList,
};
use serde::export::Formatter;

pub type DBConnection = r2d2::PooledConnection<r2d2_postgres::PostgresConnectionManager>;

#[derive(PartialEq, Clone, Debug, Default)]
pub struct ClientQueryResult {
Expand Down Expand Up @@ -184,3 +187,13 @@ impl From<RecvError> for Error {
Error::Internal(error.to_string())
}
}

impl std::error::Error for Error {}

impl std::fmt::Display for Error {
fn fmt(&self, f: &mut Formatter) -> Result<(), std::fmt::Error> {
match self {
Error::Internal(log) => write!(f, "{}", log),
}
}
}
1 change: 1 addition & 0 deletions server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ extern crate serde;
extern crate serde_derive;
#[macro_use]
extern crate serde_json;
extern crate r2d2_postgres;
extern crate slack_hook;
extern crate ws;

Expand Down