diff --git a/relay/sources/relayd/Cargo.lock b/relay/sources/relayd/Cargo.lock index d7cf9169d6b..0a7ab2df6f3 100644 --- a/relay/sources/relayd/Cargo.lock +++ b/relay/sources/relayd/Cargo.lock @@ -1317,7 +1317,7 @@ dependencies = [ "tokio-signal 0.2.7 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-threadpool 0.1.16 (registry+https://github.com/rust-lang/crates.io-index)", "toml 0.5.3 (registry+https://github.com/rust-lang/crates.io-index)", - "tracing 0.1.9 (registry+https://github.com/rust-lang/crates.io-index)", + "tracing 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)", "tracing-log 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "tracing-subscriber 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", "warp 0.1.20 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1915,18 +1915,18 @@ dependencies = [ [[package]] name = "tracing" -version = "0.1.9" +version = "0.1.10" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "cfg-if 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)", "spin 0.5.2 (registry+https://github.com/rust-lang/crates.io-index)", - "tracing-attributes 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", + "tracing-attributes 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", "tracing-core 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] name = "tracing-attributes" -version = "0.1.4" +version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "quote 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)", @@ -2374,8 +2374,8 @@ dependencies = [ "checksum tokio-udp 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "f02298505547f73e60f568359ef0d016d5acd6e830ab9bc7c4a5b3403440121b" "checksum tokio-uds 0.2.5 (registry+https://github.com/rust-lang/crates.io-index)" = "037ffc3ba0e12a0ab4aca92e5234e0dedeb48fddf6ccd260f1f150a36a9f2445" "checksum toml 0.5.3 (registry+https://github.com/rust-lang/crates.io-index)" = "c7aabe75941d914b72bf3e5d3932ed92ce0664d49d8432305a8b547c37227724" -"checksum tracing 0.1.9 (registry+https://github.com/rust-lang/crates.io-index)" = "c21ff9457accc293386c20e8f754d0b059e67e325edf2284f04230d125d7e5ff" -"checksum tracing-attributes 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "3ff978fd9c9afe2cc9c671e247713421c6406b3422305cbdce5de695d3ab4c3c" +"checksum tracing 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)" = "ff4e4f59e752cb3beb5b61c6d5e11191c7946231ba84faec2902c9efdd8691c5" +"checksum tracing-attributes 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "a4263b12c3d3c403274493eb805966093b53214124796552d674ca1dd5d27c2b" "checksum tracing-core 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)" = "bc913647c520c959b6d21e35ed8fa6984971deca9f0a2fcb8c51207e0c56af1d" "checksum tracing-log 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "652bc99e1286541d6ccc42d5fb37213d1cdde544f88b19fac3d94e3117b55163" "checksum tracing-subscriber 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "63e6bb522a99f10b3b0b3b894f936bccc2516cc8761e5506de132b06bf8d2e4d" diff --git a/relay/sources/relayd/fuzz/Cargo.lock b/relay/sources/relayd/fuzz/Cargo.lock index 3b59411a896..62e5f576520 100644 --- a/relay/sources/relayd/fuzz/Cargo.lock +++ b/relay/sources/relayd/fuzz/Cargo.lock @@ -1189,7 +1189,7 @@ dependencies = [ "tokio-signal 0.2.7 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-threadpool 0.1.16 (registry+https://github.com/rust-lang/crates.io-index)", "toml 0.5.3 (registry+https://github.com/rust-lang/crates.io-index)", - "tracing 0.1.9 (registry+https://github.com/rust-lang/crates.io-index)", + "tracing 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)", "tracing-log 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "tracing-subscriber 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", "warp 0.1.20 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1778,18 +1778,18 @@ dependencies = [ [[package]] name = "tracing" -version = "0.1.9" +version = "0.1.10" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "cfg-if 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)", "spin 0.5.2 (registry+https://github.com/rust-lang/crates.io-index)", - "tracing-attributes 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", + "tracing-attributes 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", "tracing-core 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] name = "tracing-attributes" -version = "0.1.4" +version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "quote 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)", @@ -2206,8 +2206,8 @@ dependencies = [ "checksum tokio-udp 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "f02298505547f73e60f568359ef0d016d5acd6e830ab9bc7c4a5b3403440121b" "checksum tokio-uds 0.2.5 (registry+https://github.com/rust-lang/crates.io-index)" = "037ffc3ba0e12a0ab4aca92e5234e0dedeb48fddf6ccd260f1f150a36a9f2445" "checksum toml 0.5.3 (registry+https://github.com/rust-lang/crates.io-index)" = "c7aabe75941d914b72bf3e5d3932ed92ce0664d49d8432305a8b547c37227724" -"checksum tracing 0.1.9 (registry+https://github.com/rust-lang/crates.io-index)" = "c21ff9457accc293386c20e8f754d0b059e67e325edf2284f04230d125d7e5ff" -"checksum tracing-attributes 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "3ff978fd9c9afe2cc9c671e247713421c6406b3422305cbdce5de695d3ab4c3c" +"checksum tracing 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)" = "ff4e4f59e752cb3beb5b61c6d5e11191c7946231ba84faec2902c9efdd8691c5" +"checksum tracing-attributes 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "a4263b12c3d3c403274493eb805966093b53214124796552d674ca1dd5d27c2b" "checksum tracing-core 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)" = "bc913647c520c959b6d21e35ed8fa6984971deca9f0a2fcb8c51207e0c56af1d" "checksum tracing-log 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "652bc99e1286541d6ccc42d5fb37213d1cdde544f88b19fac3d94e3117b55163" "checksum tracing-subscriber 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "63e6bb522a99f10b3b0b3b894f936bccc2516cc8761e5506de132b06bf8d2e4d" diff --git a/relay/sources/relayd/src/api/remote_run.rs b/relay/sources/relayd/src/api/remote_run.rs index d5fdf78c6be..8c7055de79a 100644 --- a/relay/sources/relayd/src/api/remote_run.rs +++ b/relay/sources/relayd/src/api/remote_run.rs @@ -147,15 +147,19 @@ impl RemoteRun { self.target .next_hops(job_config.clone()) .iter() - .map(|relay| self.forward_call(job_config.clone(), relay.clone())), + .map(|(relay, target)| { + self.forward_call(job_config.clone(), relay.clone(), target.clone()) + }), )), ))), // Async and no output -> spawn in background and return early (true, false) => { - for relay in self.target.next_hops(job_config.clone()) { - tokio::spawn(RemoteRun::consume( - self.forward_call(job_config.clone(), relay), - )); + for (relay, target) in self.target.next_hops(job_config.clone()) { + tokio::spawn(RemoteRun::consume(self.forward_call( + job_config.clone(), + relay, + target, + ))); } tokio::spawn(RemoteRun::consume(self.run_parameters.remote_run( &job_config.cfg.remote_run, @@ -178,7 +182,9 @@ impl RemoteRun { self.target .next_hops(job_config.clone()) .iter() - .map(|relay| self.forward_call(job_config.clone(), relay.clone())), + .map(|(relay, target)| { + self.forward_call(job_config.clone(), relay.clone(), target.clone()) + }), )), ))), // Sync and output -> wait until the end and return output @@ -193,7 +199,9 @@ impl RemoteRun { self.target .next_hops(job_config.clone()) .iter() - .map(|relay| self.forward_call(job_config.clone(), relay.clone())), + .map(|(relay, target)| { + self.forward_call(job_config.clone(), relay.clone(), target.clone()) + }), )), ))), } @@ -203,10 +211,14 @@ impl RemoteRun { &self, job_config: Arc, node: Host, + // Target for the sub relay + target: RemoteRunTarget, ) -> impl Stream + Send + 'static { let report_span = span!(Level::TRACE, "upstream"); let _report_enter = report_span.enter(); + debug!("Forwarding remote-run to {} for {:#?}", node, self.target); + // We cannot simply deserialize if using `.form()` as we // need specific formatting let mut form = Form::new() @@ -221,7 +233,7 @@ impl RemoteRun { .collect::>() .join(","), ); - if let RemoteRunTarget::Nodes(nodes) = &self.target { + if let RemoteRunTarget::Nodes(nodes) = &target { form = form.text("nodes", nodes.join(",")) } @@ -231,7 +243,7 @@ impl RemoteRun { .post(&format!( "{}/rudder/relay-api/{}", node, - match &self.target { + match target { RemoteRunTarget::All => "all", RemoteRunTarget::Nodes(_) => "nodes", }, @@ -240,7 +252,13 @@ impl RemoteRun { .send() .map(|response| response.into_body()) .flatten_stream() - .map_err(|e| e.into()) + .map_err(|e| { + error!("{}", e); + e.into() + }) + // Don't fail if a relay is not available, + // just log it + .or_else(|_: Error| futures::future::empty()) .map(|c| c.into()) } } @@ -254,18 +272,30 @@ pub enum RemoteRunTarget { impl RemoteRunTarget { pub fn neighbors(&self, job_config: Arc) -> Vec { let nodes = job_config.nodes.read().expect("Cannot read nodes list"); - match self { - RemoteRunTarget::All => nodes.neighbors(), - RemoteRunTarget::Nodes(nodeslist) => nodes.neighbors_from(nodeslist), - } + let neighbors = match self { + RemoteRunTarget::All => nodes.my_neighbors(), + RemoteRunTarget::Nodes(nodeslist) => nodes.my_neighbors_from(nodeslist), + }; + debug!("Neighbors: {:#?}", neighbors); + neighbors } - pub fn next_hops(&self, job_config: Arc) -> Vec { + pub fn next_hops(&self, job_config: Arc) -> Vec<(Host, RemoteRunTarget)> { let nodes = job_config.nodes.read().expect("Cannot read nodes list"); - match self { - RemoteRunTarget::All => nodes.sub_relays(), - RemoteRunTarget::Nodes(nodeslist) => nodes.sub_relays_from(nodeslist), - } + let next_hops = match self { + RemoteRunTarget::All => nodes + .my_sub_relays() + .into_iter() + .map(|r| (r, RemoteRunTarget::All)) + .collect(), + RemoteRunTarget::Nodes(nodeslist) => nodes + .my_sub_relays_from(nodeslist) + .into_iter() + .map(|(relay, nodes)| (relay, RemoteRunTarget::Nodes(nodes))) + .collect(), + }; + debug!("Next-hops: {:#?}", next_hops); + next_hops } } @@ -415,6 +445,10 @@ impl RunParameters { tokio_io::io::lines(BufReader::new(stdout)) .map_err(Error::from) .inspect(|line| debug!("output: {}", line)) + .map(|mut l| { + l.push('\n'); + l + }) .map(Chunk::from) } } diff --git a/relay/sources/relayd/src/data/node.rs b/relay/sources/relayd/src/data/node.rs index 08563f9f9c1..dcd91153c7f 100644 --- a/relay/sources/relayd/src/data/node.rs +++ b/relay/sources/relayd/src/data/node.rs @@ -38,7 +38,7 @@ use std::{ path::Path, str::FromStr, }; -use tracing::{info, trace, warn}; +use tracing::{error, info, trace, warn}; pub type NodeId = String; pub type Host = String; @@ -136,7 +136,7 @@ impl NodesList { pub fn counts(&self) -> NodeCounts { NodeCounts { sub_nodes: self.list.data.len(), - managed_nodes: self.neighbors().len(), + managed_nodes: self.my_neighbors().len(), } } @@ -146,6 +146,14 @@ impl NodesList { self.list.data.get(id).is_some() } + pub fn is_my_neighbor(&self, id: &str) -> Result { + self.list + .data + .get(id) + .ok_or(()) + .map(|n| n.policy_server == self.my_id) + } + pub fn key_hash(&self, id: &str) -> Option { self.list.data.get(id).map(|s| s.key_hash.clone()) } @@ -173,22 +181,27 @@ impl NodesList { .to_string()) } - fn next_hop(&self, node_id: &str) -> Option { + /// Some(Next hop) if any, None if directly connected, error if not found + fn next_hop(&self, node_id: &str) -> Result, ()> { // nodeslist should not contain loops but just in case // 20 levels of relays should be more than enough const MAX_RELAY_LEVELS: u8 = 20; + if self.is_my_neighbor(node_id)? { + return Ok(None); + } + let mut current_id = node_id; - let mut current = self.list.data.get(current_id)?; - let mut next_hop: Option = None; + let mut current = self.list.data.get(current_id).ok_or(())?; + let mut next_hop = Err(()); for level in 0..MAX_RELAY_LEVELS { if current.policy_server == self.my_id { - next_hop = Some(current_id.to_string()); + next_hop = Ok(Some(current_id.to_string())); break; } current_id = ¤t.policy_server; - current = self.list.data.get(current_id)?; + current = self.list.data.get(current_id).ok_or(())?; if level == MAX_RELAY_LEVELS { warn!( @@ -203,7 +216,7 @@ impl NodesList { // NOTE: Following methods could be made faster by pre-computing a graph in cache - pub fn neighbors(&self) -> Vec { + pub fn my_neighbors(&self) -> Vec { self.list .data .values() @@ -212,16 +225,20 @@ impl NodesList { .collect() } - pub fn neighbors_from(&self, nodes: &[String]) -> Vec { + pub fn neighbors_from(&self, server: &NodeId, nodes: &[NodeId]) -> Vec { nodes .iter() .filter_map(|n| self.list.data.get::(n)) - .filter(|n| n.policy_server == self.my_id) + .filter(|n| &n.policy_server == server) .map(|n| n.hostname.clone()) .collect() } - pub fn sub_relays(&self) -> Vec { + pub fn my_neighbors_from(&self, nodes: &[NodeId]) -> Vec { + self.neighbors_from(&self.my_id, nodes) + } + + pub fn my_sub_relays(&self) -> Vec { let mut relays = HashSet::new(); for policy_server in self .list @@ -236,21 +253,33 @@ impl NodesList { relays.into_iter().collect() } - pub fn sub_relays_from(&self, nodes: &[String]) -> Vec { - let mut relays = HashSet::new(); - for relay in nodes - .iter() - .filter_map(|n| self.next_hop(n)) - .filter(|n| n != &self.my_id) - { - let _ = relays.insert( - self.list + /// Relays to contact to trigger given nodes, with the matching nodes + /// Logs and ignores unknown nodes + pub fn my_sub_relays_from(&self, nodes: &[NodeId]) -> Vec<(Host, Vec)> { + let mut relays: HashMap> = HashMap::new(); + for node in nodes.iter() { + let hostname = match self.next_hop(node) { + Ok(Some(ref next_hop)) => self + .list .data - .get::(&relay) + .get::(next_hop) .map(|n| n.hostname.clone()) + // We are sure it is there at this point .unwrap(), - ); + Ok(None) => continue, + Err(()) => { + error!("Unknown node {}", node); + continue; + } + }; + + if let Some(nodes) = relays.get_mut(&hostname) { + nodes.push(node.clone()); + } else { + relays.insert(hostname, vec![node.clone()]); + } } + relays.into_iter().collect() } } @@ -330,6 +359,42 @@ mod tests { ); } + #[test] + fn if_gets_subrelays() { + assert!( + NodesList::new("root".to_string(), "tests/files/nodeslist.json", None) + .unwrap() + .is_subnode("37817c4d-fbf7-4850-a985-50021f4e8f41") + ); + assert!( + !NodesList::new("root".to_string(), "tests/files/nodeslist.json", None) + .unwrap() + .is_subnode("unknown") + ); + } + + #[test] + fn if_gets_my_neighbors() { + assert!( + NodesList::new("root".to_string(), "tests/files/nodeslist.json", None) + .unwrap() + .is_my_neighbor("37817c4d-fbf7-4850-a985-50021f4e8f41") + .unwrap() + ); + assert!( + !NodesList::new("root".to_string(), "tests/files/nodeslist.json", None) + .unwrap() + .is_my_neighbor("b745a140-40bc-4b86-b6dc-084488fc906b") + .unwrap() + ); + assert!( + NodesList::new("root".to_string(), "tests/files/nodeslist.json", None) + .unwrap() + .is_my_neighbor("unknown") + .is_err() + ); + } + #[test] fn it_filters_neighbors() { let mut reference = vec![ @@ -341,7 +406,7 @@ mod tests { let mut actual = NodesList::new("root".to_string(), "tests/files/nodeslist.json", None) .unwrap() - .neighbors(); + .my_neighbors(); actual.sort(); assert_eq!(reference, actual); @@ -358,7 +423,7 @@ mod tests { let mut actual = NodesList::new("root".to_string(), "tests/files/nodeslist.json", None) .unwrap() - .neighbors(); + .my_neighbors(); actual.sort(); assert_eq!(reference, actual); @@ -375,7 +440,7 @@ mod tests { let mut actual = NodesList::new("root".to_string(), "tests/files/nodeslist.json", None) .unwrap() - .sub_relays(); + .my_sub_relays(); actual.sort(); assert_eq!(reference, actual); @@ -383,12 +448,18 @@ mod tests { #[test] fn it_filters_sub_relays() { - let mut reference = vec!["node1.rudder.local", "node2.rudder.local"]; + let mut reference = vec![( + "node1.rudder.local".to_string(), + vec![ + "b745a140-40bc-4b86-b6dc-084488fc906b".to_string(), + "a745a140-40bc-4b86-b6dc-084488fc906b".to_string(), + ], + )]; reference.sort(); let mut actual = NodesList::new("root".to_string(), "tests/files/nodeslist.json", None) .unwrap() - .sub_relays_from(&[ + .my_sub_relays_from(&[ "b745a140-40bc-4b86-b6dc-084488fc906b".to_string(), "a745a140-40bc-4b86-b6dc-084488fc906b".to_string(), "root".to_string(), diff --git a/relay/sources/relayd/tests/fake_agent.sh b/relay/sources/relayd/tests/fake_agent.sh index d5e6f144bb9..f27b7237a6b 100755 --- a/relay/sources/relayd/tests/fake_agent.sh +++ b/relay/sources/relayd/tests/fake_agent.sh @@ -2,3 +2,5 @@ echo -n "$@" > ./target/tmp/api_test.txt sleep 0.5 echo "OK" +sleep 0.3 +echo "END" diff --git a/relay/sources/relayd/tests/remote_run.rs b/relay/sources/relayd/tests/remote_run.rs index 18cd38e7960..26d4158db1c 100644 --- a/relay/sources/relayd/tests/remote_run.rs +++ b/relay/sources/relayd/tests/remote_run.rs @@ -36,12 +36,49 @@ mod tests { .send() .unwrap(); assert_eq!(response.status(), hyper::StatusCode::OK); - assert_eq!(response.text().unwrap(), "OK".to_string()); + assert_eq!(response.text().unwrap(), "OK\nEND\n".to_string()); assert_eq!( "remote run -D class2,class3 server.rudder.local".to_string(), read_to_string("target/tmp/api_test.txt").unwrap() ); + let _ = remove_file("target/tmp/api_test.txt"); + let params_async = [ + ("asynchronous", "true"), + ("keep_output", "true"), + ("classes", "class2,class45"), + ("nodes", "e745a140-40bc-4b86-b6dc-084488fc906b"), + ]; + let mut response = client + .post("http://localhost:3030/rudder/relay-api/1/remote-run/nodes") + .form(¶ms_async) + .send() + .unwrap(); + assert_eq!(response.status(), hyper::StatusCode::OK); + assert_eq!(response.text().unwrap(), "OK\nEND\n".to_string()); + assert_eq!( + "remote run -D class2,class45 node1.rudder.local".to_string(), + read_to_string("target/tmp/api_test.txt").unwrap() + ); + + let _ = remove_file("target/tmp/api_test.txt"); + let params_async = [ + ("asynchronous", "true"), + ("keep_output", "true"), + ("classes", "class2,class46"), + ]; + let mut response = client + .post("http://localhost:3030/rudder/relay-api/1/remote-run/nodes/e745a140-40bc-4b86-b6dc-084488fc906b") + .form(¶ms_async) + .send() + .unwrap(); + assert_eq!(response.status(), hyper::StatusCode::OK); + assert_eq!(response.text().unwrap(), "OK\nEND\n".to_string()); + assert_eq!( + "remote run -D class2,class46 node1.rudder.local".to_string(), + read_to_string("target/tmp/api_test.txt").unwrap() + ); + // Async & no keep let _ = remove_file("target/tmp/api_test.txt"); @@ -80,7 +117,7 @@ mod tests { .send() .unwrap(); assert_eq!(response.status(), hyper::StatusCode::OK); - assert_eq!(response.text().unwrap(), "OK\n".to_string()); + assert_eq!(response.text().unwrap(), "OK\nEND\n".to_string()); assert_eq!( "remote run -D class2,class5 server.rudder.local".to_string(), read_to_string("target/tmp/api_test.txt").unwrap()