Skip to content

Commit

Permalink
Fixes #16047: Remote-run sometimes returns empty outpout or output wi…
Browse files Browse the repository at this point in the history
…th missing newlines
  • Loading branch information
amousset authored and Jenkins CI committed Oct 24, 2019
1 parent 5a7f957 commit 50bf22b
Show file tree
Hide file tree
Showing 6 changed files with 204 additions and 60 deletions.
12 changes: 6 additions & 6 deletions relay/sources/relayd/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 6 additions & 6 deletions relay/sources/relayd/fuzz/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

72 changes: 53 additions & 19 deletions relay/sources/relayd/src/api/remote_run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,15 +147,19 @@ impl RemoteRun {
self.target
.next_hops(job_config.clone())
.iter()
.map(|relay| self.forward_call(job_config.clone(), relay.clone())),
.map(|(relay, target)| {
self.forward_call(job_config.clone(), relay.clone(), target.clone())
}),
)),
))),
// Async and no output -> spawn in background and return early
(true, false) => {
for relay in self.target.next_hops(job_config.clone()) {
tokio::spawn(RemoteRun::consume(
self.forward_call(job_config.clone(), relay),
));
for (relay, target) in self.target.next_hops(job_config.clone()) {
tokio::spawn(RemoteRun::consume(self.forward_call(
job_config.clone(),
relay,
target,
)));
}
tokio::spawn(RemoteRun::consume(self.run_parameters.remote_run(
&job_config.cfg.remote_run,
Expand All @@ -178,7 +182,9 @@ impl RemoteRun {
self.target
.next_hops(job_config.clone())
.iter()
.map(|relay| self.forward_call(job_config.clone(), relay.clone())),
.map(|(relay, target)| {
self.forward_call(job_config.clone(), relay.clone(), target.clone())
}),
)),
))),
// Sync and output -> wait until the end and return output
Expand All @@ -193,7 +199,9 @@ impl RemoteRun {
self.target
.next_hops(job_config.clone())
.iter()
.map(|relay| self.forward_call(job_config.clone(), relay.clone())),
.map(|(relay, target)| {
self.forward_call(job_config.clone(), relay.clone(), target.clone())
}),
)),
))),
}
Expand All @@ -203,10 +211,14 @@ impl RemoteRun {
&self,
job_config: Arc<JobConfig>,
node: Host,
// Target for the sub relay
target: RemoteRunTarget,
) -> impl Stream<Item = Chunk, Error = Error> + Send + 'static {
let report_span = span!(Level::TRACE, "upstream");
let _report_enter = report_span.enter();

debug!("Forwarding remote-run to {} for {:#?}", node, self.target);

// We cannot simply deserialize if using `.form()` as we
// need specific formatting
let mut form = Form::new()
Expand All @@ -221,7 +233,7 @@ impl RemoteRun {
.collect::<Vec<&str>>()
.join(","),
);
if let RemoteRunTarget::Nodes(nodes) = &self.target {
if let RemoteRunTarget::Nodes(nodes) = &target {
form = form.text("nodes", nodes.join(","))
}

Expand All @@ -231,7 +243,7 @@ impl RemoteRun {
.post(&format!(
"{}/rudder/relay-api/{}",
node,
match &self.target {
match target {
RemoteRunTarget::All => "all",
RemoteRunTarget::Nodes(_) => "nodes",
},
Expand All @@ -240,7 +252,13 @@ impl RemoteRun {
.send()
.map(|response| response.into_body())
.flatten_stream()
.map_err(|e| e.into())
.map_err(|e| {
error!("{}", e);
e.into()
})
// Don't fail if a relay is not available,
// just log it
.or_else(|_: Error| futures::future::empty())
.map(|c| c.into())
}
}
Expand All @@ -254,18 +272,30 @@ pub enum RemoteRunTarget {
impl RemoteRunTarget {
pub fn neighbors(&self, job_config: Arc<JobConfig>) -> Vec<Host> {
let nodes = job_config.nodes.read().expect("Cannot read nodes list");
match self {
RemoteRunTarget::All => nodes.neighbors(),
RemoteRunTarget::Nodes(nodeslist) => nodes.neighbors_from(nodeslist),
}
let neighbors = match self {
RemoteRunTarget::All => nodes.my_neighbors(),
RemoteRunTarget::Nodes(nodeslist) => nodes.my_neighbors_from(nodeslist),
};
debug!("Neighbors: {:#?}", neighbors);
neighbors
}

pub fn next_hops(&self, job_config: Arc<JobConfig>) -> Vec<Host> {
pub fn next_hops(&self, job_config: Arc<JobConfig>) -> Vec<(Host, RemoteRunTarget)> {
let nodes = job_config.nodes.read().expect("Cannot read nodes list");
match self {
RemoteRunTarget::All => nodes.sub_relays(),
RemoteRunTarget::Nodes(nodeslist) => nodes.sub_relays_from(nodeslist),
}
let next_hops = match self {
RemoteRunTarget::All => nodes
.my_sub_relays()
.into_iter()
.map(|r| (r, RemoteRunTarget::All))
.collect(),
RemoteRunTarget::Nodes(nodeslist) => nodes
.my_sub_relays_from(nodeslist)
.into_iter()
.map(|(relay, nodes)| (relay, RemoteRunTarget::Nodes(nodes)))
.collect(),
};
debug!("Next-hops: {:#?}", next_hops);
next_hops
}
}

Expand Down Expand Up @@ -415,6 +445,10 @@ impl RunParameters {
tokio_io::io::lines(BufReader::new(stdout))
.map_err(Error::from)
.inspect(|line| debug!("output: {}", line))
.map(|mut l| {
l.push('\n');
l
})
.map(Chunk::from)
}
}
Expand Down
Loading

0 comments on commit 50bf22b

Please sign in to comment.