diff --git a/libindy/src/api/pool.rs b/libindy/src/api/pool.rs index ad87251cff..02d27d4138 100755 --- a/libindy/src/api/pool.rs +++ b/libindy/src/api/pool.rs @@ -79,6 +79,7 @@ pub extern fn indy_create_pool_ledger_config(command_handle: CommandHandle, /// "number_read_nodes": int (optional) - the number of nodes to send read requests (2 by default) /// By default Libindy sends a read requests to 2 nodes in the pool. /// If response isn't received or `state proof` is invalid Libindy sends the request again but to 2 (`number_read_nodes`) * 2 = 4 nodes and so far until completion. +/// "socks_proxy": string (optional) - ZMQ socks proxy host name and port (example: proxy1.intranet.company.com:1080) /// } /// /// #Returns diff --git a/libindy/src/domain/pool.rs b/libindy/src/domain/pool.rs index c2dfd24841..b66a225ba5 100644 --- a/libindy/src/domain/pool.rs +++ b/libindy/src/domain/pool.rs @@ -33,6 +33,8 @@ pub struct PoolOpenConfig { pub preordered_nodes: Vec, #[serde(default = "PoolOpenConfig::default_number_read_nodes")] pub number_read_nodes: u8, + #[serde(default = "PoolOpenConfig::default_socks_proxy")] + pub socks_proxy: String, } impl Validatable for PoolOpenConfig { @@ -65,6 +67,7 @@ impl Default for PoolOpenConfig { conn_active_timeout: PoolOpenConfig::default_conn_active_timeout(), preordered_nodes: PoolOpenConfig::default_preordered_nodes(), number_read_nodes: PoolOpenConfig::default_number_read_nodes(), + socks_proxy: PoolOpenConfig::default_socks_proxy(), } } } @@ -91,4 +94,6 @@ impl PoolOpenConfig { } fn default_number_read_nodes() -> u8 { NUMBER_READ_NODES } + + fn default_socks_proxy() -> String { String::new() } } diff --git a/libindy/src/services/pool/networker.rs b/libindy/src/services/pool/networker.rs index 74f6278bf5..5a9e8b604c 100644 --- a/libindy/src/services/pool/networker.rs +++ b/libindy/src/services/pool/networker.rs @@ -17,7 +17,7 @@ use super::zmq::PollItem; use super::zmq::Socket as ZSocket; pub trait Networker { - fn new(active_timeout: i64, conn_limit: usize, preordered_nodes: Vec) -> Self; + fn new(active_timeout: i64, conn_limit: usize, preordered_nodes: Vec, socks_proxy: String) -> Self; fn fetch_events(&self, poll_items: &[PollItem]) -> Vec; fn process_event(&mut self, pe: Option) -> Option; fn get_timeout(&self) -> ((String, String), i64); @@ -31,10 +31,11 @@ pub struct ZMQNetworker { active_timeout: i64, conn_limit: usize, preordered_nodes: Vec, + socks_proxy: String, } impl Networker for ZMQNetworker { - fn new(active_timeout: i64, conn_limit: usize, preordered_nodes: Vec) -> Self { + fn new(active_timeout: i64, conn_limit: usize, preordered_nodes: Vec, socks_proxy: String) -> Self { ZMQNetworker { req_id_mappings: HashMap::new(), pool_connections: BTreeMap::new(), @@ -42,6 +43,7 @@ impl Networker for ZMQNetworker { active_timeout, conn_limit, preordered_nodes, + socks_proxy, } } @@ -84,7 +86,7 @@ impl Networker for ZMQNetworker { None => { trace!("send request in new conn"); let pc_id = sequence::get_next_id(); - let mut pc = PoolConnection::new(self.nodes.clone(), self.active_timeout, self.preordered_nodes.clone()); + let mut pc = PoolConnection::new(self.nodes.clone(), self.active_timeout, self.preordered_nodes.clone(), self.socks_proxy.clone()); pc.send_request(pe).expect("FIXME"); self.pool_connections.insert(pc_id, pc); self.req_id_mappings.insert(req_id.clone(), pc_id); @@ -175,10 +177,11 @@ pub struct PoolConnection { time_created: time::Tm, req_cnt: usize, active_timeout: i64, + socks_proxy: String, } impl PoolConnection { - fn new(mut nodes: Vec, active_timeout: i64, preordered_nodes: Vec) -> Self { + fn new(mut nodes: Vec, active_timeout: i64, preordered_nodes: Vec, socks_proxy: String) -> Self { trace!("PoolConnection::new: from nodes {:?}", nodes); nodes.shuffle(&mut thread_rng()); @@ -205,6 +208,7 @@ impl PoolConnection { timeouts: RefCell::new(HashMap::new()), req_cnt: 0, active_timeout, + socks_proxy } } @@ -331,7 +335,7 @@ impl PoolConnection { fn _get_socket(&mut self, idx: usize) -> IndyResult<&ZSocket> { if self.sockets[idx].is_none() { debug!("_get_socket: open new socket for node {}", idx); - let s: ZSocket = self.nodes[idx].connect(&self.ctx, &self.key_pair)?; + let s: ZSocket = self.nodes[idx].connect(&self.ctx, &self.key_pair, self.socks_proxy.clone())?; self.sockets[idx] = Some(s) } Ok(self.sockets[idx].as_ref().unwrap()) @@ -339,7 +343,7 @@ impl PoolConnection { } impl RemoteNode { - fn connect(&self, ctx: &zmq::Context, key_pair: &zmq::CurveKeyPair) -> IndyResult { + fn connect(&self, ctx: &zmq::Context, key_pair: &zmq::CurveKeyPair, socks_proxy: String) -> IndyResult { let s = ctx.socket(zmq::SocketType::DEALER)?; s.set_identity(base64::encode(&key_pair.public_key).as_bytes())?; s.set_curve_secretkey(&key_pair.secret_key)?; @@ -348,6 +352,17 @@ impl RemoteNode { .to_indy(IndyErrorKind::InvalidStructure, "Can't encode server key as z85")? // FIXME: review kind .as_bytes())?; s.set_linger(0)?; //TODO set correct timeout + + if !socks_proxy.is_empty() { + // let proxy = socks_proxy.unwrap(); + debug!("Use socks proxy: {}", socks_proxy); + let result = s.set_socks_proxy(Some(&socks_proxy)); + if result.is_err() { + error!("socks error: {}", result.unwrap_err()) + } + } else { + debug!("Socks proxy is not configured"); + } s.connect(&self.zaddr)?; Ok(s) } @@ -360,7 +375,7 @@ pub struct MockNetworker { #[cfg(test)] impl Networker for MockNetworker { - fn new(_active_timeout: i64, _conn_limit: usize, _preordered_nodes: Vec) -> Self { + fn new(_active_timeout: i64, _conn_limit: usize, _preordered_nodes: Vec, _socks_proxy: String) -> Self { MockNetworker { events: Vec::new(), } @@ -418,12 +433,12 @@ pub mod networker_tests { #[test] pub fn networker_new_works() { - ZMQNetworker::new(POOL_CON_ACTIVE_TO, MAX_REQ_PER_POOL_CON, vec![]); + ZMQNetworker::new(POOL_CON_ACTIVE_TO, MAX_REQ_PER_POOL_CON, vec![], String::new()); } #[test] pub fn networker_process_event_works() { - let mut networker = ZMQNetworker::new(POOL_CON_ACTIVE_TO, MAX_REQ_PER_POOL_CON, vec![]); + let mut networker = ZMQNetworker::new(POOL_CON_ACTIVE_TO, MAX_REQ_PER_POOL_CON, vec![], String::new()); networker.process_event(None); } @@ -432,7 +447,7 @@ pub mod networker_tests { let txn = nodes_emulator::node(); let rn = _remote_node(&txn); - let mut networker = ZMQNetworker::new(POOL_CON_ACTIVE_TO, MAX_REQ_PER_POOL_CON, vec![]); + let mut networker = ZMQNetworker::new(POOL_CON_ACTIVE_TO, MAX_REQ_PER_POOL_CON, vec![], String::new()); assert_eq!(0, networker.nodes.len()); @@ -447,7 +462,7 @@ pub mod networker_tests { let handle = nodes_emulator::start(&mut txn); let rn = _remote_node(&txn); - let mut networker = ZMQNetworker::new(POOL_CON_ACTIVE_TO, MAX_REQ_PER_POOL_CON, vec![]); + let mut networker = ZMQNetworker::new(POOL_CON_ACTIVE_TO, MAX_REQ_PER_POOL_CON, vec![], String::new()); networker.process_event(Some(NetworkerEvent::NodesStateUpdated(vec![rn]))); assert!(networker.pool_connections.is_empty()); @@ -473,7 +488,7 @@ pub mod networker_tests { let handle_2 = nodes_emulator::start(&mut txn_2); let rn_2 = _remote_node(&txn_2); - let mut networker = ZMQNetworker::new(POOL_CON_ACTIVE_TO, MAX_REQ_PER_POOL_CON, vec![]); + let mut networker = ZMQNetworker::new(POOL_CON_ACTIVE_TO, MAX_REQ_PER_POOL_CON, vec![], String::new()); networker.process_event(Some(NetworkerEvent::NodesStateUpdated(vec![rn_1, rn_2]))); networker.process_event(Some(NetworkerEvent::SendAllRequest(MESSAGE.to_string(), REQ_ID.to_string(), POOL_ACK_TIMEOUT, None))); @@ -496,7 +511,7 @@ pub mod networker_tests { let send_cnt = 2; - let mut networker = ZMQNetworker::new(POOL_CON_ACTIVE_TO, MAX_REQ_PER_POOL_CON, vec!["n2".to_string(), "n1".to_string()]); + let mut networker = ZMQNetworker::new(POOL_CON_ACTIVE_TO, MAX_REQ_PER_POOL_CON, vec!["n2".to_string(), "n1".to_string()], String::new()); networker.process_event(Some(NetworkerEvent::NodesStateUpdated(vec![rn_1, rn_2]))); @@ -526,7 +541,7 @@ pub mod networker_tests { let handle_2 = nodes_emulator::start(&mut txn_2); let rn_2 = _remote_node(&txn_2); - let mut networker = ZMQNetworker::new(POOL_CON_ACTIVE_TO, MAX_REQ_PER_POOL_CON, vec![]); + let mut networker = ZMQNetworker::new(POOL_CON_ACTIVE_TO, MAX_REQ_PER_POOL_CON, vec![], String::new()); networker.process_event(Some(NetworkerEvent::NodesStateUpdated(vec![rn_1, rn_2]))); networker.process_event(Some(NetworkerEvent::SendAllRequest(MESSAGE.to_string(), REQ_ID.to_string(), POOL_ACK_TIMEOUT, Some(vec![NODE_NAME.to_string()])))); @@ -542,7 +557,7 @@ pub mod networker_tests { let txn = nodes_emulator::node(); let rn = _remote_node(&txn); - let mut networker = ZMQNetworker::new(POOL_CON_ACTIVE_TO, MAX_REQ_PER_POOL_CON, vec![]); + let mut networker = ZMQNetworker::new(POOL_CON_ACTIVE_TO, MAX_REQ_PER_POOL_CON, vec![], String::new()); networker.process_event(Some(NetworkerEvent::NodesStateUpdated(vec![rn]))); @@ -565,7 +580,7 @@ pub mod networker_tests { let txn = nodes_emulator::node(); let rn = _remote_node(&txn); - let mut networker = ZMQNetworker::new(POOL_CON_ACTIVE_TO, MAX_REQ_PER_POOL_CON, vec![]); + let mut networker = ZMQNetworker::new(POOL_CON_ACTIVE_TO, MAX_REQ_PER_POOL_CON, vec![], String::new()); networker.process_event(Some(NetworkerEvent::NodesStateUpdated(vec![rn]))); @@ -588,7 +603,7 @@ pub mod networker_tests { let txn = nodes_emulator::node(); let rn = _remote_node(&txn); - let mut networker = ZMQNetworker::new(POOL_CON_ACTIVE_TO, MAX_REQ_PER_POOL_CON, vec![]); + let mut networker = ZMQNetworker::new(POOL_CON_ACTIVE_TO, MAX_REQ_PER_POOL_CON, vec![], String::new()); networker.process_event(Some(NetworkerEvent::NodesStateUpdated(vec![rn]))); networker.process_event(Some(NetworkerEvent::SendOneRequest(MESSAGE.to_string(), REQ_ID.to_string(), POOL_ACK_TIMEOUT))); @@ -615,9 +630,9 @@ pub mod networker_tests { fn networker_process_timeout_event_works() { let txn = nodes_emulator::node(); let rn = _remote_node(&txn); - let conn = PoolConnection::new(vec![rn.clone()], POOL_CON_ACTIVE_TO, vec![]); + let conn = PoolConnection::new(vec![rn.clone()], POOL_CON_ACTIVE_TO, vec![], String::new()); - let mut networker = ZMQNetworker::new(POOL_CON_ACTIVE_TO, MAX_REQ_PER_POOL_CON, vec![]); + let mut networker = ZMQNetworker::new(POOL_CON_ACTIVE_TO, MAX_REQ_PER_POOL_CON, vec![], String::new()); networker.process_event(Some(NetworkerEvent::NodesStateUpdated(vec![rn]))); networker.pool_connections.insert(1, conn); @@ -634,7 +649,7 @@ pub mod networker_tests { let txn = nodes_emulator::node(); let rn = _remote_node(&txn); - let mut networker = ZMQNetworker::new(POOL_CON_ACTIVE_TO, MAX_REQ_PER_POOL_CON, vec![]); + let mut networker = ZMQNetworker::new(POOL_CON_ACTIVE_TO, MAX_REQ_PER_POOL_CON, vec![], String::new()); networker.process_event(Some(NetworkerEvent::NodesStateUpdated(vec![rn]))); networker.process_event(Some(NetworkerEvent::SendOneRequest(MESSAGE.to_string(), REQ_ID.to_string(), POOL_ACK_TIMEOUT))); @@ -650,7 +665,7 @@ pub mod networker_tests { let txn = nodes_emulator::node(); let rn = _remote_node(&txn); - let mut networker = ZMQNetworker::new(POOL_CON_ACTIVE_TO, MAX_REQ_PER_POOL_CON, vec![]); + let mut networker = ZMQNetworker::new(POOL_CON_ACTIVE_TO, MAX_REQ_PER_POOL_CON, vec![], String::new()); networker.process_event(Some(NetworkerEvent::NodesStateUpdated(vec![rn]))); networker.process_event(Some(NetworkerEvent::SendOneRequest(MESSAGE.to_string(), REQ_ID.to_string(), POOL_ACK_TIMEOUT))); @@ -668,7 +683,7 @@ pub mod networker_tests { let txn = nodes_emulator::node(); let rn = _remote_node(&txn); - let mut networker = ZMQNetworker::new(POOL_CON_ACTIVE_TO, MAX_REQ_PER_POOL_CON, vec![]); + let mut networker = ZMQNetworker::new(POOL_CON_ACTIVE_TO, MAX_REQ_PER_POOL_CON, vec![], String::new()); networker.process_event(Some(NetworkerEvent::NodesStateUpdated(vec![rn]))); networker.process_event(Some(NetworkerEvent::SendOneRequest(MESSAGE.to_string(), REQ_ID.to_string(), POOL_ACK_TIMEOUT))); @@ -687,7 +702,7 @@ pub mod networker_tests { let txn = nodes_emulator::node(); let rn = _remote_node(&txn); - let mut networker = ZMQNetworker::new(POOL_CON_ACTIVE_TO, MAX_REQ_PER_POOL_CON, vec![]); + let mut networker = ZMQNetworker::new(POOL_CON_ACTIVE_TO, MAX_REQ_PER_POOL_CON, vec![], String::new()); networker.process_event(Some(NetworkerEvent::NodesStateUpdated(vec![rn]))); @@ -712,7 +727,9 @@ pub mod networker_tests { let txn = nodes_emulator::node(); let rn = _remote_node(&txn); - let _socket = rn.connect(&zmq::Context::new(), &zmq::CurveKeyPair::new().unwrap()).unwrap(); + let _socket: ZSocket = rn.connect(&zmq::Context::new(), &zmq::CurveKeyPair::new().unwrap(), String::new()).unwrap(); + let socks_proxy = _socket.get_socks_proxy().unwrap().unwrap(); + assert_eq!("", socks_proxy); } #[test] @@ -721,9 +738,19 @@ pub mod networker_tests { let mut rn = _remote_node(&txn); rn.zaddr = "invalid_address".to_string(); - let res = rn.connect(&zmq::Context::new(), &zmq::CurveKeyPair::new().unwrap()); + let res = rn.connect(&zmq::Context::new(), &zmq::CurveKeyPair::new().unwrap(),String::new()); assert_kind!(IndyErrorKind::IOError, res); } + + #[test] + fn remote_node_connect_via_socks_proxy() { + let txn = nodes_emulator::node(); + let rn = _remote_node(&txn); + + let _socket: ZSocket = rn.connect(&zmq::Context::new(), &zmq::CurveKeyPair::new().unwrap(), "proxy.internal.company:1080".to_string()).unwrap(); + let socks_proxy = _socket.get_socks_proxy().unwrap().unwrap(); + assert_eq!("proxy.internal.company:1080", socks_proxy); + } } #[cfg(test)] @@ -737,7 +764,7 @@ pub mod networker_tests { let txn = nodes_emulator::node(); let rn = _remote_node(&txn); - PoolConnection::new(vec![rn], POOL_CON_ACTIVE_TO, vec![]); + PoolConnection::new(vec![rn], POOL_CON_ACTIVE_TO, vec![], String::new()); } #[test] @@ -753,7 +780,7 @@ pub mod networker_tests { nodes.push(_remote_node(&txn)); } - let pc = PoolConnection::new(nodes, POOL_CON_ACTIVE_TO, vec![]); + let pc = PoolConnection::new(nodes, POOL_CON_ACTIVE_TO, vec![], String::new()); let act_names: Vec = pc.nodes.iter().map(|n| n.name.to_string()).collect(); @@ -781,7 +808,8 @@ pub mod networker_tests { let pc = PoolConnection::new(vec![rn_1.clone(), rn_2.clone(), rn_3.clone(), rn_4.clone(), rn_5.clone()], POOL_CON_ACTIVE_TO, - vec![rn_2.name.clone(), rn_1.name.clone(), rn_5.name.clone()]); + vec![rn_2.name.clone(), rn_1.name.clone(), rn_5.name.clone()], + String::new()); assert_eq!(rn_2.name, pc.nodes[0].name); assert_eq!(rn_1.name, pc.nodes[1].name); @@ -793,7 +821,7 @@ pub mod networker_tests { let txn = nodes_emulator::node(); let rn = _remote_node(&txn); - let mut conn = PoolConnection::new(vec![rn], POOL_CON_ACTIVE_TO, vec![]); + let mut conn = PoolConnection::new(vec![rn], POOL_CON_ACTIVE_TO, vec![], String::new()); assert!(conn.is_active()); @@ -807,7 +835,7 @@ pub mod networker_tests { let txn = nodes_emulator::node(); let rn = _remote_node(&txn); - let mut conn = PoolConnection::new(vec![rn], POOL_CON_ACTIVE_TO, vec![]); + let mut conn = PoolConnection::new(vec![rn], POOL_CON_ACTIVE_TO, vec![], String::new()); assert!(!conn.has_active_requests()); @@ -821,7 +849,7 @@ pub mod networker_tests { let txn = nodes_emulator::node(); let rn = _remote_node(&txn); - let mut conn = PoolConnection::new(vec![rn], POOL_CON_ACTIVE_TO, vec![]); + let mut conn = PoolConnection::new(vec![rn], POOL_CON_ACTIVE_TO, vec![], String::new()); let ((req_id, node_alias), timeout) = conn.get_timeout(); assert_eq!(req_id, "".to_string()); @@ -842,7 +870,7 @@ pub mod networker_tests { let txn = nodes_emulator::node(); let rn = _remote_node(&txn); - let mut conn = PoolConnection::new(vec![rn], POOL_CON_ACTIVE_TO, vec![]); + let mut conn = PoolConnection::new(vec![rn], POOL_CON_ACTIVE_TO, vec![], String::new()); conn.send_request(Some(NetworkerEvent::SendOneRequest(MESSAGE.to_string(), REQ_ID.to_string(), POOL_ACK_TIMEOUT))).unwrap(); @@ -862,7 +890,7 @@ pub mod networker_tests { let txn = nodes_emulator::node(); let rn = _remote_node(&txn); - let mut conn = PoolConnection::new(vec![rn], POOL_CON_ACTIVE_TO, vec![]); + let mut conn = PoolConnection::new(vec![rn], POOL_CON_ACTIVE_TO, vec![], String::new()); conn.send_request(Some(NetworkerEvent::SendOneRequest(MESSAGE.to_string(), REQ_ID.to_string(), POOL_ACK_TIMEOUT))).unwrap(); @@ -878,7 +906,7 @@ pub mod networker_tests { let txn = nodes_emulator::node(); let rn = _remote_node(&txn); - let mut conn = PoolConnection::new(vec![rn], POOL_CON_ACTIVE_TO, vec![]); + let mut conn = PoolConnection::new(vec![rn], POOL_CON_ACTIVE_TO, vec![], String::new()); let _socket = conn._get_socket(0).unwrap(); } @@ -889,7 +917,7 @@ pub mod networker_tests { let mut rn = _remote_node(&txn); rn.zaddr = "invalid_address".to_string(); - let mut conn = PoolConnection::new(vec![rn], POOL_CON_ACTIVE_TO, vec![]); + let mut conn = PoolConnection::new(vec![rn], POOL_CON_ACTIVE_TO, vec![], String::new()); let res = conn._get_socket(0); assert_kind!(IndyErrorKind::IOError, res); @@ -901,7 +929,7 @@ pub mod networker_tests { let handle = nodes_emulator::start(&mut txn); let rn = _remote_node(&txn); - let mut conn = PoolConnection::new(vec![rn], POOL_CON_ACTIVE_TO, vec![]); + let mut conn = PoolConnection::new(vec![rn], POOL_CON_ACTIVE_TO, vec![], String::new()); conn.send_request(Some(NetworkerEvent::SendOneRequest(MESSAGE.to_string(), REQ_ID.to_string(), POOL_ACK_TIMEOUT))).unwrap(); conn.send_request(Some(NetworkerEvent::SendOneRequest("msg2".to_string(), "12".to_string(), POOL_ACK_TIMEOUT))).unwrap(); @@ -921,7 +949,7 @@ pub mod networker_tests { let handle_2 = nodes_emulator::start(&mut txn_2); let rn_2 = _remote_node(&txn_2); - let mut conn = PoolConnection::new(vec![rn_1, rn_2], POOL_CON_ACTIVE_TO, vec!["n1".to_string(), "n2".to_string()]); + let mut conn = PoolConnection::new(vec![rn_1, rn_2], POOL_CON_ACTIVE_TO, vec!["n1".to_string(), "n2".to_string()], String::new()); conn.send_request(Some(NetworkerEvent::SendOneRequest(MESSAGE.to_string(), REQ_ID.to_string(), POOL_ACK_TIMEOUT))).unwrap(); @@ -941,7 +969,7 @@ pub mod networker_tests { let handle_2 = nodes_emulator::start(&mut txn_2); let rn_2 = _remote_node(&txn_2); - let mut conn = PoolConnection::new(vec![rn_1, rn_2], POOL_CON_ACTIVE_TO, vec![]); + let mut conn = PoolConnection::new(vec![rn_1, rn_2], POOL_CON_ACTIVE_TO, vec![], String::new()); conn.send_request(Some(NetworkerEvent::SendAllRequest(MESSAGE.to_string(), REQ_ID.to_string(), POOL_ACK_TIMEOUT, None))).unwrap(); @@ -957,7 +985,7 @@ pub mod networker_tests { let handle = nodes_emulator::start(&mut txn); let rn = _remote_node(&txn); - let mut conn = PoolConnection::new(vec![rn], POOL_CON_ACTIVE_TO, vec![]); + let mut conn = PoolConnection::new(vec![rn], POOL_CON_ACTIVE_TO, vec![], String::new()); conn.send_request(Some(NetworkerEvent::SendOneRequest(MESSAGE.to_string(), REQ_ID.to_string(), POOL_ACK_TIMEOUT))).unwrap(); @@ -978,7 +1006,7 @@ pub mod networker_tests { let handle_2 = nodes_emulator::start(&mut txn_2); let rn_2 = _remote_node(&txn_2); - let mut conn = PoolConnection::new(vec![rn_1, rn_2], POOL_CON_ACTIVE_TO, vec![]); + let mut conn = PoolConnection::new(vec![rn_1, rn_2], POOL_CON_ACTIVE_TO, vec![], String::new()); conn.send_request(Some(NetworkerEvent::SendOneRequest(MESSAGE.to_string(), REQ_ID.to_string(), POOL_ACK_TIMEOUT))).unwrap(); @@ -996,7 +1024,7 @@ pub mod networker_tests { let mut rn = _remote_node(&txn); rn.zaddr = "invalid_address".to_string(); - let mut conn = PoolConnection::new(vec![rn], POOL_CON_ACTIVE_TO, vec![]); + let mut conn = PoolConnection::new(vec![rn], POOL_CON_ACTIVE_TO, vec![], String::new()); let res = conn.send_request(Some(NetworkerEvent::SendOneRequest(MESSAGE.to_string(), REQ_ID.to_string(), POOL_ACK_TIMEOUT))); assert_kind!(IndyErrorKind::IOError, res); diff --git a/libindy/src/services/pool/pool.rs b/libindy/src/services/pool/pool.rs index 9ebdb857bf..137f9cc912 100644 --- a/libindy/src/services/pool/pool.rs +++ b/libindy/src/services/pool/pool.rs @@ -475,6 +475,7 @@ pub struct Pool> { conn_limit: usize, preordered_nodes: Vec, number_read_nodes: u8, + socks_proxy: String, } impl> Pool { @@ -491,6 +492,7 @@ impl> Pool { conn_limit: config.conn_limit, preordered_nodes: config.preordered_nodes, number_read_nodes: config.number_read_nodes, + socks_proxy: config.socks_proxy, } } @@ -503,12 +505,14 @@ impl> Pool { let conn_limit = self.conn_limit; let preordered_nodes = self.preordered_nodes.clone(); let number_read_nodes = self.number_read_nodes; + let socks_proxy = self.socks_proxy.clone(); self.worker = Some(thread::spawn(move || { let mut pool_thread: PoolThread = PoolThread::new(cmd_socket, name, id, timeout, extended_timeout, active_timeout, conn_limit, preordered_nodes, - number_read_nodes); + number_read_nodes, + socks_proxy); pool_thread.work(); })); } @@ -530,8 +534,9 @@ struct PoolThread> { } impl> PoolThread { - pub fn new(cmd_socket: zmq::Socket, name: String, id: PoolHandle, timeout: i64, extended_timeout: i64, active_timeout: i64, conn_limit: usize, preordered_nodes: Vec, number_read_nodes: u8) -> Self { - let networker = Rc::new(RefCell::new(S::new(active_timeout, conn_limit, preordered_nodes))); + pub fn new(cmd_socket: zmq::Socket, name: String, id: PoolHandle, timeout: i64, extended_timeout: i64, active_timeout: i64, conn_limit: usize, + preordered_nodes: Vec, number_read_nodes: u8, socks_proxy: String) -> Self { + let networker = Rc::new(RefCell::new(S::new(active_timeout, conn_limit, preordered_nodes, socks_proxy))); PoolThread { pool_sm: Some(PoolSM::new(networker.clone(), &name, id, timeout, extended_timeout, number_read_nodes)), events: VecDeque::new(), @@ -809,7 +814,7 @@ mod tests { #[test] pub fn pool_wrapper_new_initialization_works() { - let _p: PoolSM = PoolSM::new(Rc::new(RefCell::new(MockNetworker::new(0, 0, vec![]))), "name", next_pool_handle(), 0, 0, NUMBER_READ_NODES); + let _p: PoolSM = PoolSM::new(Rc::new(RefCell::new(MockNetworker::new(0, 0, vec![], String::new()))), "name", next_pool_handle(), 0, 0, NUMBER_READ_NODES); } #[test] @@ -819,7 +824,7 @@ mod tests { ProtocolVersion::set(2); _write_genesis_txns("pool_wrapper_check_cache_works"); - let p: PoolSM = PoolSM::new(Rc::new(RefCell::new(MockNetworker::new(0, 0, vec![]))), "pool_wrapper_check_cache_works", next_pool_handle(), 0, 0, NUMBER_READ_NODES); + let p: PoolSM = PoolSM::new(Rc::new(RefCell::new(MockNetworker::new(0, 0, vec![], String::new()))), "pool_wrapper_check_cache_works", next_pool_handle(), 0, 0, NUMBER_READ_NODES); let cmd_id: CommandHandle = next_command_handle(); let p = p.handle_event(PoolEvent::CheckCache(cmd_id)); assert_match!(PoolState::GettingCatchupTarget(_), p.state); @@ -830,7 +835,7 @@ mod tests { #[test] pub fn pool_wrapper_check_cache_works_for_no_pool_created() { let p: PoolSM = - PoolSM::new(Rc::new(RefCell::new(MockNetworker::new(0, 0, vec![]))), + PoolSM::new(Rc::new(RefCell::new(MockNetworker::new(0, 0, vec![], String::new()))), "pool_wrapper_check_cache_works_for_no_pool_created", next_pool_handle(), 0, 0, NUMBER_READ_NODES); let cmd_id: CommandHandle = next_command_handle(); let p = p.handle_event(PoolEvent::CheckCache(cmd_id)); @@ -839,7 +844,7 @@ mod tests { #[test] pub fn pool_wrapper_terminated_close_works() { - let p: PoolSM = PoolSM::new(Rc::new(RefCell::new(MockNetworker::new(0, 0, vec![]))), "pool_wrapper_terminated_close_works", next_pool_handle(), 0, 0, NUMBER_READ_NODES); + let p: PoolSM = PoolSM::new(Rc::new(RefCell::new(MockNetworker::new(0, 0, vec![], String::new()))), "pool_wrapper_terminated_close_works", next_pool_handle(), 0, 0, NUMBER_READ_NODES); let cmd_id: CommandHandle = next_command_handle(); let p = p.handle_event(PoolEvent::CheckCache(cmd_id)); let cmd_id: CommandHandle = next_command_handle(); @@ -850,7 +855,7 @@ mod tests { #[test] pub fn pool_wrapper_terminated_refresh_works() { test::cleanup_pool("pool_wrapper_terminated_refresh_works"); - let p: PoolSM = PoolSM::new(Rc::new(RefCell::new(MockNetworker::new(0, 0, vec![]))), "pool_wrapper_terminated_refresh_works", next_pool_handle(), 0, 0, NUMBER_READ_NODES); + let p: PoolSM = PoolSM::new(Rc::new(RefCell::new(MockNetworker::new(0, 0, vec![], String::new()))), "pool_wrapper_terminated_refresh_works", next_pool_handle(), 0, 0, NUMBER_READ_NODES); let cmd_id: CommandHandle = next_command_handle(); let p = p.handle_event(PoolEvent::CheckCache(cmd_id)); @@ -869,7 +874,7 @@ mod tests { pool_name: "pool_wrapper_terminated_timeout_works".to_string(), id: next_pool_handle(), state: PoolState::Terminated(TerminatedState { - networker: Rc::new(RefCell::new(MockNetworker::new(0, 0, vec![]))), + networker: Rc::new(RefCell::new(MockNetworker::new(0, 0, vec![], String::new()))), }), timeout: 0, extended_timeout: 0, @@ -890,7 +895,7 @@ mod tests { #[test] pub fn pool_wrapper_cloe_works_from_initialization() { - let p: PoolSM = PoolSM::new(Rc::new(RefCell::new(MockNetworker::new(0, 0, vec![]))), "pool_wrapper_cloe_works_from_initialization", next_pool_handle(), 0, 0, NUMBER_READ_NODES); + let p: PoolSM = PoolSM::new(Rc::new(RefCell::new(MockNetworker::new(0, 0, vec![], String::new()))), "pool_wrapper_cloe_works_from_initialization", next_pool_handle(), 0, 0, NUMBER_READ_NODES); let cmd_id: CommandHandle = next_command_handle(); let p = p.handle_event(PoolEvent::Close(cmd_id)); assert_match!(PoolState::Closed(_), p.state); @@ -904,7 +909,7 @@ mod tests { _write_genesis_txns("pool_wrapper_close_works_from_getting_catchup_target"); let p: PoolSM = - PoolSM::new(Rc::new(RefCell::new(MockNetworker::new(0, 0, vec![]))), "pool_wrapper_close_works_from_getting_catchup_target", next_pool_handle(), 0, 0, NUMBER_READ_NODES); + PoolSM::new(Rc::new(RefCell::new(MockNetworker::new(0, 0, vec![], String::new()))), "pool_wrapper_close_works_from_getting_catchup_target", next_pool_handle(), 0, 0, NUMBER_READ_NODES); let cmd_id: CommandHandle = next_command_handle(); let p = p.handle_event(PoolEvent::CheckCache(cmd_id)); let cmd_id: CommandHandle = next_command_handle(); @@ -922,7 +927,7 @@ mod tests { _write_genesis_txns("pool_wrapper_catchup_target_not_found_works"); let p: PoolSM = - PoolSM::new(Rc::new(RefCell::new(MockNetworker::new(0, 0, vec![]))), "pool_wrapper_catchup_target_not_found_works", next_pool_handle(), 0, 0, NUMBER_READ_NODES); + PoolSM::new(Rc::new(RefCell::new(MockNetworker::new(0, 0, vec![], String::new()))), "pool_wrapper_catchup_target_not_found_works", next_pool_handle(), 0, 0, NUMBER_READ_NODES); let cmd_id: CommandHandle = next_command_handle(); let p = p.handle_event(PoolEvent::CheckCache(cmd_id)); let p = p.handle_event(PoolEvent::CatchupTargetNotFound(err_msg(IndyErrorKind::PoolTimeout, "Pool timeout"))); @@ -939,7 +944,7 @@ mod tests { _write_genesis_txns("pool_wrapper_getting_catchup_target_synced_works"); let p: PoolSM = - PoolSM::new(Rc::new(RefCell::new(MockNetworker::new(0, 0, vec![]))), "pool_wrapper_getting_catchup_target_synced_works", next_pool_handle(), 0, 0, NUMBER_READ_NODES); + PoolSM::new(Rc::new(RefCell::new(MockNetworker::new(0, 0, vec![], String::new()))), "pool_wrapper_getting_catchup_target_synced_works", next_pool_handle(), 0, 0, NUMBER_READ_NODES); let cmd_id: CommandHandle = next_command_handle(); let p = p.handle_event(PoolEvent::CheckCache(cmd_id)); let p = p.handle_event(PoolEvent::Synced(MerkleTree::from_vec(vec![]).unwrap())); @@ -958,7 +963,7 @@ mod tests { let p: PoolSM = PoolSM::new( Rc::new(RefCell::new( MockNetworker::new(0, - 0, vec![]))), + 0, vec![], String::new()))), "pool_wrapper_getting_catchup_target_synced_works_for_node_state_error", next_pool_handle(), 0, @@ -985,7 +990,8 @@ mod tests { Rc::new(RefCell::new( MockNetworker::new(0, 0, - vec![]))), + vec![], + String::new()))), "pool_wrapper_getting_catchup_target_catchup_target_found_works", next_pool_handle(), 0, @@ -1009,7 +1015,7 @@ mod tests { let p: PoolSM = PoolSM::new(Rc::new(RefCell::new( - MockNetworker::new(0, 0, vec![]))), + MockNetworker::new(0, 0, vec![], String::new()))), "pool_wrapper_getting_catchup_target_catchup_target_found_works_for_node_state_error", next_pool_handle(), 0, @@ -1037,7 +1043,8 @@ mod tests { RefCell::new( MockNetworker::new(0, 0, - vec![]))), + vec![], + String::new()))), "pool_wrapper_sync_catchup_close_works", next_pool_handle(), 0, @@ -1065,7 +1072,8 @@ mod tests { Rc::new(RefCell::new( MockNetworker::new(0, 0, - vec![]))), + vec![], + String::new()))), "pool_wrapper_sync_catchup_synced_works", next_pool_handle(), 0, @@ -1092,7 +1100,8 @@ mod tests { Rc::new(RefCell::new( MockNetworker::new(0, 0, - vec![]))), + vec![], + String::new()))), "pool_wrapper_sync_catchup_synced_works_for_node_state_error", next_pool_handle(), 0, @@ -1124,7 +1133,8 @@ mod tests { let p: PoolSM = PoolSM::new(Rc::new( RefCell::new(MockNetworker::new(0, 0, - vec![]))), + vec![], + String::new()))), "pool_wrapper_active_send_request_works", next_pool_handle(), 0, @@ -1164,7 +1174,8 @@ mod tests { MockNetworker::new( 0, 0, - vec![]))), + vec![], + String::new()))), "pool_wrapper_active_send_request_works_for_no_req_id", next_pool_handle(), 0, @@ -1217,7 +1228,8 @@ mod tests { Rc::new(RefCell::new( MockNetworker::new(0, 0, - vec![]))), + vec![], + String::new()))), "pool_wrapper_active_node_reply_works", next_pool_handle(), 0, @@ -1257,7 +1269,8 @@ mod tests { PoolSM::new(Rc::new(RefCell::new( MockNetworker::new(0, 0, - vec![]))), + vec![], + String::new()))), "pool_wrapper_sends_requests_to_two_nodes", next_pool_handle(), 0, 0, NUMBER_READ_NODES); let cmd_id: CommandHandle = next_command_handle(); @@ -1307,7 +1320,8 @@ mod tests { let p: PoolSM = PoolSM::new(Rc::new( RefCell::new(MockNetworker::new(0, 0, - vec![]))), + vec![], + String::new()))), "pool_wrapper_active_node_reply_works_for_no_request", next_pool_handle(), 0, @@ -1350,7 +1364,8 @@ mod tests { PoolSM::new(Rc::new(RefCell::new(MockNetworker::new( 0, 0, - vec![]))), + vec![], + String::new()))), "pool_wrapper_active_node_reply_works_for_invalid_reply", next_pool_handle(), 0, diff --git a/libindy/src/services/pool/request_handler.rs b/libindy/src/services/pool/request_handler.rs index dea84c4f94..143c93ea42 100644 --- a/libindy/src/services/pool/request_handler.rs +++ b/libindy/src/services/pool/request_handler.rs @@ -924,7 +924,7 @@ pub mod tests { } fn _request_handler(pool_name: &str, f: usize, nodes_cnt: usize) -> RequestHandlerImpl { - let networker = Rc::new(RefCell::new(MockNetworker::new(0, 0, vec![]))); + let networker = Rc::new(RefCell::new(MockNetworker::new(0, 0, vec![], String::new()))); let mut default_nodes: Nodes = HashMap::new(); default_nodes.insert(NODE.to_string(), None);