Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to forward stdin, stdout *and* stderr #189

Closed
Timmmm opened this issue Jun 6, 2020 · 10 comments
Closed

How to forward stdin, stdout *and* stderr #189

Timmmm opened this issue Jun 6, 2020 · 10 comments

Comments

@Timmmm
Copy link
Contributor

Timmmm commented Jun 6, 2020

I want to execute a command remotely and then forward all three streams - stdin, stdout and stderr. This is closely related to #128, but none of the solutions there seem very good. I don't want a busy loop, and this solution looks like it will delay stdin until stdout data is available.

My theory is that the basic idea is to start a thread for each channel:

fn execute(session: &Session, command: &str) -> Result<()> {
  let mut channel = session.channel_session()?;
  channel.exec(command)?;
 
  let ssh_stderr = channel.stderr();
  let ssh_stdout = channel.stdout();
  let ssh_stdin = channel.stdin();

  crossbeam::scope(|s| {
    s.spawn(|_| {
      let stdin = io::stdin();
      let mut stdin = stdin.lock();
      io::copy(&mut stdin, &mut channel).unwrap(); // TODO: Don't unwrap
    });

     // Repeat for stdout/stderr. One of them could go on the main thread.
  }).unwrap();

  channel.wait_close()?;
  println!("{}", channel.exit_status()?);
  Ok(())
}

This almost works. Because channel.stderr() exists and returns a Stream that is Send and Sync. However there doesn't seem to be a way to do the same thing for stdin and stdout - there is no Channel::stdout(). The channel itself implements Read/Write, but I can't mutably borrow it twice in different threads.

There is stream() which looks like it might be useful, but I'm not sure what to pass it to get stdin/out.

Any hints?

@Timmmm
Copy link
Contributor Author

Timmmm commented Jun 6, 2020

Ah actually I just looked at the source and it looks like I should pass 0 for both. I will try that.

Might be worth adding a named constant, or even a Channel::stdinout() function?

@Timmmm Timmmm closed this as completed Jun 6, 2020
@Timmmm
Copy link
Contributor Author

Timmmm commented Jun 7, 2020

This worked well for me. I haven't thought much about exactly what happens when the various channels are closed, e.g. should it end when stdin closes etc.

fn execute_and_forward(session: &Session, command: &str) -> Result<i32> {
  let mut channel = session.channel_session()?;
  channel.exec(command)?;

  let mut ssh_stdin = channel.stream(0);
  let mut ssh_stdout = channel.stream(0);
  let mut ssh_stderr = channel.stderr();

  let all_result: Result<u64, io::Error> = crossbeam::scope(|s| {
    let stdin_handle = s.spawn(|_| {
      let stdin = io::stdin();
      let mut stdin = stdin.lock();
      io::copy(&mut stdin, &mut ssh_stdin)
    });

    let stdout_handle = s.spawn(|_| {
      let stdout = io::stdout();
      let mut stdout = stdout.lock();
      io::copy(&mut ssh_stdout, &mut stdout)
    });

    let stderr_handle = s.spawn(|_| {
      let stderr = io::stderr();
      let mut stderr = stderr.lock();
      io::copy(&mut ssh_stderr, &mut stderr)
    });

    // The unwrap() means the main thread will panic if the inner threads panicked.
    let stdin_result: Result<u64, io::Error> = stdin_handle.join().unwrap();
    let stdout_result: Result<u64, io::Error> = stdout_handle.join().unwrap();
    let stderr_result: Result<u64, io::Error> = stderr_handle.join().unwrap();

    stdin_result.and(stdout_result).and(stderr_result)
  })
  .unwrap(); // Should never panic because all scoped threads have been joined.

  // Return Err if any of the threads errored.
  all_result?;

  // Wait for SSH channel to close.
  channel.wait_close()?;
  let exit_status = channel.exit_status()?;
  Ok(exit_status)
}

@wez
Copy link
Collaborator

wez commented Jun 7, 2020

Note that even though the streams, channels etc. are Send and are safe to use concurrently from multiple threads, if you perform a blocking read or write in one of the threads on one of the related objects, it will block the other instances that share the same underlying ssh channel. This is because the underlying library is only safe to access from one thread at a time and those operations are protected by a mutex.

@Timmmm
Copy link
Contributor Author

Timmmm commented Jun 10, 2020

Oh, damn. That's unfortunate. Although I did try this and it actually seemed to work for me - I wrote a trivial program that reads from stdin and writes that to stdout lowercased and stderr uppercased. Then I executed it on a remote server using the above code and fed some data in via stdout. It seemed to work.

I can't see how it would work at all if the first io::copy blocks the other two. Or did I just get lucky with the unlocking of mutexes? Writing to stdin makes its io::copy temporarily unlock the mutexes so the other two can run.

Is there no way to do this with this library then?

@wez
Copy link
Collaborator

wez commented Jun 10, 2020

What I do in wezterm is poll the underlying socket for read/write events and then perform the corresponding read/write in non-blocking mode; that way none of the threads is blocking the others while they are waiting.
It's not perfect.

@Timmmm
Copy link
Contributor Author

Timmmm commented Jun 10, 2020

Hmm yeah that doesn't sound ideal. Quite a limitation! Incidentally this seems to be quite an old request (#22).

@Timmmm
Copy link
Contributor Author

Timmmm commented Jun 13, 2020

I ended up reimplementing this part of my program (which is a completely standalone executable) in Go - it has a really great SSH library, and makes forwarding stdin/out/err completely trivial:

session.Stdin = os.Stdin
session.Stdout = os.Stdout
session.Stderr = os.Stderr

err := session.Run(target)

Hopefully we'll get a great native SSH & SFTP library for Rust one day! Maybe Thrussh, but at the moment it still has C dependencies, is completely undocumented and doesn't include SFTP support.

@cococolanosugar
Copy link

I change some of your code, and work well, it just like your go code (i had try it too) !

Except that it can not hanle control code like Tab key.

No delay, No block, but three busy loop( as you know three io::copy is three busy loop too).

@Timmmm

fn main() {
    let tcp = StdTcpStream::connect("192.168.33.30:22").unwrap();
    let mut sess = Session::new().unwrap();
    sess.set_tcp_stream(tcp);
    sess.handshake().unwrap();
    sess.userauth_password("root", "vagrant").unwrap();

    let mut channel = sess.channel_session().unwrap();

    channel.request_pty("xterm", None, None).unwrap();

    channel.shell().unwrap();

    sess.set_blocking(false);


    let mut ssh_stdin = channel.stream(0);
    let mut ssh_stdout = channel.stream(0);
    let mut ssh_stderr = channel.stderr();

    let all_result: Result<u64, io::Error> = crossbeam::scope(|s| {
        let stdin_handle = s.spawn(|_| {
            let stdin = io::stdin();
            let mut stdin = stdin.lock();

            loop {
                let mut line = String::new();
                stdin.read_line(&mut line).unwrap();
                channel.write(line.as_bytes()).unwrap();
                channel.flush().unwrap();
            }

        });

        let stdout_handle = s.spawn(|_| {
            let stdout = io::stdout();
            let mut stdout = stdout.lock();

            loop {
                let mut buf = vec![0; 4096];
                match ssh_stdout.read(&mut buf) {
                    Ok(_) => {
                        let s = String::from_utf8(buf).unwrap();
                        stdout.write(s.as_bytes()).unwrap();
                    }
                    Err(e) => {
                        if e.kind() != std::io::ErrorKind::WouldBlock {
                            println!("{}", e);
                        }
                    }
                }
            }

        });

        let stderr_handle = s.spawn(|_| {
            let stderr = io::stderr();
            let mut stderr = stderr.lock();

            loop {
                let mut buf = vec![0; 4096];
                match ssh_stderr.read(&mut buf) {
                    Ok(_) => {
                        let s = String::from_utf8(buf).unwrap();
                        stderr.write(s.as_bytes()).unwrap();
                    }
                    Err(e) => {
                        if e.kind() != std::io::ErrorKind::WouldBlock {
                            println!("{}", e);
                        }
                    }
                }
            }
        });

        // The unwrap() means the main thread will panic if the inner threads panicked.
        let stdin_result: Result<u64, io::Error> = stdin_handle.join().unwrap();
        let stdout_result: Result<u64, io::Error> = stdout_handle.join().unwrap();
        let stderr_result: Result<u64, io::Error> = stderr_handle.join().unwrap();

        stdin_result.and(stdout_result).and(stderr_result)
    }).unwrap(); // Should never panic because all scoped threads have been joined.

    // Return Err if any of the threads errored.
    all_result.unwrap();

    // Wait for SSH channel to close.
    channel.wait_close().unwrap();
    let exit_status = channel.exit_status().unwrap();
    // Ok(exit_status)
    exit(exit_status)
}

@Timmmm
Copy link
Contributor Author

Timmmm commented Jul 17, 2020

It looks like you are trying to do a non-blocking spin-loop style read of stdout and stderr but as far as I can see you never set them to non-blocking mode, so you'll never get WouldBlock, and unless I am mistaken your code will suffer from the same problem as my original io::copy code, which it is essentially equivalent to.

I guess if you set the streams to non-blocking mode it will work (if you can even do that for these streams) but nobody wants 200% CPU usage just reading idle streams.

@cococolanosugar
Copy link

@Timmmm

Thanks for your tips.
I resolve the 200% CPU usage problem by mio's event loop.
And simple code like this:

`
fn main() {

let addr = "192.168.33.30:22".parse().unwrap();

let mut tcp = MioTcpStream::connect(&addr).unwrap();


let poll = Poll::new().unwrap();
let mut events = Events::with_capacity(1024);
poll.register(&tcp, SSH_TOKEN, Ready::readable(), PollOpt::edge()).unwrap();


let mut sess = Session::new().unwrap();
sess.set_tcp_stream(tcp);
sess.handshake().unwrap();
sess.userauth_password("root", "vagrant").unwrap();

let mut channel = sess.channel_session().unwrap();

channel.request_pty("xterm", None, None).unwrap();

channel.shell().unwrap();

sess.set_blocking(false);

let mut ssh_stdin = channel.stream(0);
let mut ssh_stdout = channel.stream(0);
let mut ssh_stderr = channel.stderr();

let all_result: Result<u64, io::Error> = crossbeam::scope(|s| {
    let stdin_handle = s.spawn(|_| {
        let stdin = io::stdin();
        let mut stdin = stdin.lock();

        loop {
            let mut line = String::new();
            stdin.read_line(&mut line).unwrap();
            ssh_stdin.write(line.as_bytes()).unwrap();
            ssh_stdin.flush().unwrap();
        }
    });

    let poll_handle = s.spawn(|_| {
        loop {
            poll.poll(&mut events, None).unwrap();
            for event in events.iter() {
                match event.token() {
                    SSH_TOKEN => {

                        if event.readiness().is_readable() {
                            let stdout = io::stdout();
                            let mut stdout = stdout.lock();
                            let mut buf = vec![0; 4096];
                            match ssh_stdout.read(&mut buf) {
                                Ok(_) => {
                                    let s = String::from_utf8(buf).unwrap();
                                    stdout.write(s.as_bytes()).unwrap();
                                }
                                Err(e) => {
                                    if e.kind() != std::io::ErrorKind::WouldBlock {
                                        println!("{}", e);
                                    }
                                }
                            }

                            let stderr = io::stderr();
                            let mut stderr = stderr.lock();
                            let mut buf = vec![0; 4096];
                            match ssh_stderr.read(&mut buf) {
                                Ok(_) => {
                                    let s = String::from_utf8(buf).unwrap();
                                    stderr.write(s.as_bytes()).unwrap();
                                }
                                Err(e) => {
                                    if e.kind() != std::io::ErrorKind::WouldBlock {
                                        println!("{}", e);
                                    }
                                }
                            }
                        }

                    }
                    _ => unreachable!(),
                }
            }
        }
    });

    let stdin_result: Result<u64, io::Error> = stdin_handle.join().unwrap();
    let poll_result: Result<u64, io::Error> = poll_handle.join().unwrap();

    stdin_result.and(poll_result)

}).unwrap(); // Should never panic because all scoped threads have been joined.

all_result.unwrap();

// Wait for SSH channel to close.
channel.wait_close().unwrap();
let exit_status = channel.exit_status().unwrap();
exit(exit_status)

}
`

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants