Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

allow streaming input from long-running cmds #2

Closed
eignnx opened this issue Nov 21, 2019 · 8 comments
Closed

allow streaming input from long-running cmds #2

eignnx opened this issue Nov 21, 2019 · 8 comments
Assignees
Labels
enhancement New feature or request

Comments

@eignnx
Copy link
Owner

eignnx commented Nov 21, 2019

Right now, the server must wait until the command (ex: cargo run) sent from the web client is complete. It captures the output of this command, and sends it all back to the client at the same time. This doesn't allow a long-running command (like a web server which runs forever and prints occasionally) to be monitored with cargo-gui.

Could this streaming ability be added via a websocket connection?

@huxi
Copy link

huxi commented Nov 23, 2019

This could lead to other problems like a command not finishing at all because it's blocking on an stdout/stderr write call if the buffer runs out. Essentially the problem described in this SO question.

So regardless of how you deliver the output to the user in the end, you should drain the streams somewhere if you aren't doing that already.

@eignnx
Copy link
Owner Author

eignnx commented Nov 23, 2019

Thanks for the input @huxi ! This is not a problem I'm familiar with. By "drain the streams" do you mean call (the rust equivalent of) fflush on the command subprocess's stdout/stderr streams?

I was thinking of doing something like this:

// (pseudocode)
let client_ws = Websocket::new(...);
let cmd = Command::new("cargo").arg("run");
thread::spawn(move || {
    let child_process = cmd.spawn();
    while let Some(line) = child_process.stdout.read_line() {
        client_ws.send(line);
        // child_process.stdout.flush();  ???
    }
});

Is that approximately where you'd put the drain?

@huxi
Copy link

huxi commented Nov 24, 2019

Sorry for being unclear. I since learned that this is less of an issue in Rust than in Java or .net because Rust captures the output in a reasonable way in the cmd.output() case.

What I meant is that if you work with Stdio::piped() then you need to make sure that the buffers of stdout and stderr don't fill up to the brink. This means that you need to read from both constantly (what I meant with "drain the streams") instead of one after the other. Otherwise, the child process will block on write to the full buffer.

I just mentioned it because it's easy to miss and won't make a problem at first if the buffers are big enough or used in a way that favors the best-case scenario stdout.

use std::io::BufRead;
use std::io::BufReader;
use std::process::{Command, Stdio};
use std::thread;

#[derive(Debug)]
enum Output {
    Out,
    Err,
}

fn main() {
    let mut child = Command::new("ping")
        .arg("1.1.1.1")
        .stdout(Stdio::piped())
        .stderr(Stdio::piped())
        .spawn()
        .expect("failed to spawn");

    let stdout = child.stdout.take();
    let stderr = child.stderr.take();

    let reader = BufReader::new(stdout.expect("failed to capture stdout"));
    let stdout_thread = thread::Builder::new()
        .name("stdout-thread".to_string())
        .spawn(|| {
            reader
                .lines()
                .filter_map(|s| s.ok())
                .for_each(|line| do_something(Output::Out, &line));
            println!("OUT DONE!");
        });

    let reader = BufReader::new(stderr.expect("failed to capture stderr"));
    let stderr_thread = thread::Builder::new()
        .name("stderr-thread".to_string())
        .spawn(|| {
            reader
                .lines()
                .filter_map(|s| s.ok())
                .for_each(|line| do_something(Output::Err, &line));
            println!("ERR DONE!");
        });

    stdout_thread
        .expect("stdout_thread join handle")
        .join()
        .expect("stdout_thread join failed");
    stderr_thread
        .expect("stderr_thread join handle")
        .join()
        .expect("stderr_thread join failed");

    // See https://doc.rust-lang.org/std/process/struct.Child.html#warning
    let exit_status = child.wait().expect("retrieving ExitStatus failed");
    println!("ExitStatus of process: {}", exit_status);
}

fn do_something(output: Output, line: &str) {
    println!("{:?}: {}", output, line);
}

@eignnx
Copy link
Owner Author

eignnx commented Nov 24, 2019

Ohh so we just want to keep reading data from both stdout and stderr simultaneously otherwise there could be build up in one of the buffers. I think I understand now, thanks so much!

@eignnx eignnx added the enhancement New feature or request label Nov 26, 2019
@huxi
Copy link

huxi commented Nov 26, 2019

Quick follow-up:
It bugged me that my example didn't clean up the child process as required. So I tried to fix the problem myself, stumbled and asked on the forums.

I updated the code above accordingly.

@eignnx
Copy link
Owner Author

eignnx commented Nov 26, 2019

@huxi great! Would you be interested in taking ownership of this issue and submitting a PR?

Currently we're using actix and actix_web for the server architecture, so this code would likely need to be adjusted to fit into the actor design pattern.

Alternatively, since I've been having trouble understanding actix, I've been considering "jumping ship" and switching to something simpler like tide since the server code is still pretty small.

@eignnx
Copy link
Owner Author

eignnx commented Nov 26, 2019

To summarize an offline conversation with @huxi :

  • I'm assigning us both to the issue, though @huxi is unsure if he'll have the time to work on this.
  • I'll start working (on a different branch) on integrating @huxi's code into the actix server in the meantime

@eignnx
Copy link
Owner Author

eignnx commented Dec 17, 2019

Closing as of v0.3.0. More discussion needs to happen about generalizing the (kinda hacky) approach I ended up going with which was to set up server endpoints (/api/stdout_line and /api/stderr_line) for reading one line at a time and using mutexes to protect async_std::stream::Streams of lines coming from the commands.

@eignnx eignnx closed this as completed Dec 17, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

2 participants