Skip to content

Commit

Permalink
[v10] Swap out select for poll (#22676) and Loop for poll (#22746) (#…
Browse files Browse the repository at this point in the history
…22800)

* Swap out `select` for `poll` (#22676)

* Swap out select for poll

wait_for_fd now uses poll instead of select, allowing us to wait for
file descriptors > 1024.

* Restructuring the function to simplify and adds a log statement on error

* Loop for `poll` (#22746)

* Swap out select for poll

wait_for_fd now uses poll instead of select, allowing us to wait for
file descriptors > 1024.

* Restructuring the function to simplify and adds a log statement on error

* Puts poll in a loop that checks for EINTR and EAGAIN
and calls it again if either of those errors were the source
of the error.
  • Loading branch information
ibeckermayer committed Mar 8, 2023
1 parent 10cae08 commit 89f97e0
Showing 1 changed file with 123 additions and 88 deletions.
211 changes: 123 additions & 88 deletions lib/srv/desktop/rdp/rdpclient/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ extern crate log;
extern crate num_derive;

use errors::try_error;
use libc::{fd_set, select, FD_SET};
use rand::Rng;
use rand::SeedableRng;
use rdp::core::event::*;
Expand All @@ -66,6 +65,7 @@ use rdpdr::path::UnixPath;
use rdpdr::ServerCreateDriveRequest;
use std::convert::TryFrom;
use std::ffi::{CStr, CString, NulError};
use std::fmt::Debug;
use std::io::Error as IoError;
use std::io::ErrorKind;
use std::io::{Cursor, Read, Write};
Expand Down Expand Up @@ -798,20 +798,43 @@ impl Drop for CGOBitmap {
}

#[cfg(unix)]
fn wait_for_fd(fd: usize) -> bool {
unsafe {
let mut raw_fds: fd_set = mem::zeroed();

FD_SET(fd as i32, &mut raw_fds);

let result = select(
fd as i32 + 1,
&mut raw_fds,
ptr::null_mut(),
ptr::null_mut(),
ptr::null_mut(),
);
result == 1
fn wait_for_fd(fd: usize) -> RdpResult<()> {
let fds = &mut libc::pollfd {
fd: fd as i32,
events: libc::POLLIN,
revents: 0,
};
loop {
let res = unsafe { libc::poll(fds, 1, -1) };

// We only use a single fd and can't timeout, so
// res will either be 1 for success or -1 for failure.
if res != 1 {
let os_err = std::io::Error::last_os_error();
match os_err.raw_os_error() {
Some(libc::EINTR) | Some(libc::EAGAIN) => continue,
_ => return Err(RdpError::Io(os_err)),
}
}

// res == 1
// POLLIN means that the fd is ready to be read from,
// POLLHUP means that the other side of the pipe was closed,
// but we still may have data to read.
if fds.revents & (libc::POLLIN | libc::POLLHUP) != 0 {
return Ok(()); // ready for a read
} else if fds.revents & libc::POLLNVAL != 0 {
return Err(RdpError::Io(IoError::new(
std::io::ErrorKind::InvalidInput,
"invalid fd",
)));
} else {
// fds.revents & libc::POLLERR != 0
return Err(RdpError::Io(IoError::new(
std::io::ErrorKind::Other,
"error on fd",
)));
}
}
}

Expand Down Expand Up @@ -1149,97 +1172,109 @@ fn read_rdp_output_inner(client: &Client) -> ReadRdpOutputReturns {
let tcp_fd = client.tcp_fd;
let client_ref = client.go_ref;

// Read incoming events.
//
// Wait for some data to be available on the TCP socket FD before consuming it. This prevents
// us from locking the mutex in Client permanently while no data is available.
while wait_for_fd(tcp_fd) {
let mut err = CGOErrCode::ErrCodeSuccess;
let res = client.rdp_client.lock().unwrap().read(|rdp_event| {
// This callback can be called multiple times per rdp_client.read()
// (if multiple messages were received since the last call). Therefore,
// we check that the previous call to handle_bitmap succeeded, so we don't
// have a situation where handle_bitmap fails repeatedly and creates a
// bunch of repetitive error messages in the logs. If it fails once,
// we assume the connection is broken and stop trying to send bitmaps.
if err == CGOErrCode::ErrCodeSuccess {
match rdp_event {
RdpEvent::Bitmap(bitmap) => {
let mut cbitmap = match CGOBitmap::try_from(bitmap) {
Ok(cb) => cb,
Err(e) => {
error!(
loop {
// Read incoming events.
//
// Wait for some data to be available on the TCP socket FD before consuming it. This prevents
// us from locking the mutex in Client permanently while no data is available.
match wait_for_fd(tcp_fd) {
Ok(_) => {
let mut err = CGOErrCode::ErrCodeSuccess;
let res = client.rdp_client.lock().unwrap().read(|rdp_event| {
// This callback can be called multiple times per rdp_client.read()
// (if multiple messages were received since the last call). Therefore,
// we check that the previous call to handle_png succeeded, so we don't
// have a situation where handle_png fails repeatedly and creates a
// bunch of repetitive error messages in the logs. If it fails once,
// we assume the connection is broken and stop trying to send PNGs.
if err == CGOErrCode::ErrCodeSuccess {
match rdp_event {
RdpEvent::Bitmap(bitmap) => {
let mut cbitmap = match CGOBitmap::try_from(bitmap) {
Ok(cb) => cb,
Err(e) => {
error!(
"failed to convert RDP bitmap to CGO representation: {:?}",
e
);
return;
return;
}
};
unsafe {
err = handle_bitmap(client_ref, &mut cbitmap) as CGOErrCode;
};
}
// No other events should be sent by the server to us.
_ => {
debug!("got unexpected pointer event from RDP server, ignoring");
}
}
}
});
match res {
Err(RdpError::Io(io_err)) if io_err.kind() == ErrorKind::UnexpectedEof => {
debug!("client disconnect detected");
return ReadRdpOutputReturns {
user_message: "Client successfully disconnected".to_string(),
disconnect_code: CGODisconnectCode::DisconnectCodeClient,
err_code: CGOErrCode::ErrCodeSuccess,
};
unsafe {
err = handle_bitmap(client_ref, &mut cbitmap) as CGOErrCode;
}
Err(RdpError::RdpError(rdp_err))
if rdp_err.kind() == RdpErrorKind::Disconnect =>
{
// RdpErrorKind::Disconnect means we encountered a Disconnect Provider Ultimatum.
// If we don't know why that was sent, return an failure, otherwise return a success.
debug!("server disconnect detected");
let disconnect_code = CGODisconnectCode::DisconnectCodeServer;
let server_error = client.rdp_client.lock().unwrap().global.server_error();
let mut message = server_error.to_string();
let mut err_code = CGOErrCode::ErrCodeSuccess;
if server_error == ServerError::None || server_error.is_error() {
err_code = CGOErrCode::ErrCodeFailure;
if server_error == ServerError::None {
message =
"RDP server disconnected for an unknown reason".to_string();
}
}
return ReadRdpOutputReturns {
user_message: message,
disconnect_code,
err_code,
};
}
// No other events should be sent by the server to us.
_ => {
debug!("got unexpected pointer event from RDP server, ignoring");
Err(e) => {
return ReadRdpOutputReturns {
user_message: format!("RDP read failed: {e:?}"),
disconnect_code: CGODisconnectCode::DisconnectCodeUnknown,
err_code: CGOErrCode::ErrCodeFailure,
};
}
}
}
});
match res {
Err(RdpError::Io(io_err)) if io_err.kind() == ErrorKind::UnexpectedEof => {
debug!("client disconnect detected");
return ReadRdpOutputReturns {
user_message: "Client successfully disconnected".to_string(),
disconnect_code: CGODisconnectCode::DisconnectCodeClient,
err_code: CGOErrCode::ErrCodeSuccess,
};
}
Err(RdpError::RdpError(rdp_err)) if rdp_err.kind() == RdpErrorKind::Disconnect => {
// RdpErrorKind::Disconnect means we encountered a Disconnect Provider Ultimatum.
// If we don't know why that was sent, return an failure, otherwise return a success.
debug!("server disconnect detected");
let disconnect_code = CGODisconnectCode::DisconnectCodeServer;
let server_error = client.rdp_client.lock().unwrap().global.server_error();
let mut message = server_error.to_string();
let mut err_code = CGOErrCode::ErrCodeSuccess;
if server_error == ServerError::None || server_error.is_error() {
err_code = CGOErrCode::ErrCodeFailure;
if server_error == ServerError::None {
message = "RDP server disconnected for an unknown reason".to_string();
Ok(()) => {
// continue
}
}
return ReadRdpOutputReturns {
user_message: message,
disconnect_code,
err_code,
};
if err != CGOErrCode::ErrCodeSuccess {
return ReadRdpOutputReturns {
user_message: "failed forwarding RDP bitmap frame".to_string(),
disconnect_code: CGODisconnectCode::DisconnectCodeUnknown,
err_code: err,
};
}
}
Err(e) => {
Err(err) => {
let user_message = match err {
RdpError::Io(err) => err.to_string(),
_ => "RDP read failed for an unknown reason".to_string(),
};
return ReadRdpOutputReturns {
user_message: format!("RDP read failed: {e:?}"),
user_message,
disconnect_code: CGODisconnectCode::DisconnectCodeUnknown,
err_code: CGOErrCode::ErrCodeFailure,
};
}
Ok(()) => {
// continue
}
}
if err != CGOErrCode::ErrCodeSuccess {
return ReadRdpOutputReturns {
user_message: "failed forwarding RDP bitmap frame".to_string(),
disconnect_code: CGODisconnectCode::DisconnectCodeUnknown,
err_code: err,
};
}
}

ReadRdpOutputReturns {
user_message: "RDP read failed for an unknown reason".to_string(),
disconnect_code: CGODisconnectCode::DisconnectCodeUnknown,
err_code: CGOErrCode::ErrCodeFailure,
}
}

/// CGOMousePointerEvent is a CGO-compatible version of PointerEvent that we pass back to Go.
Expand Down

0 comments on commit 89f97e0

Please sign in to comment.