Skip to content

Commit fe040fc

Browse files
authored
fix(ext/node): retry named-pipe connect on ERROR_PIPE_BUSY (Windows) (#33974)
On Windows, when all of a named-pipe server's instances are in use (e.g. Docker Desktop under load from testcontainers spinning up containers), `CreateFileW` returns `ERROR_PIPE_BUSY` (231). libuv handles this transparently by queueing the connect to a worker thread that calls `WaitNamedPipeW` and retries — node code like `docker-modem` relies on this. In Deno today the same condition surfaces to JavaScript as `Error: connect EINVAL //./pipe/docker_engine`, which is the intermittent failure reported in the linked issue. The fix mirrors libuv's `pipe_connect_thread_proc`: when the synchronous `ClientOptions::open()` in `uv_pipe_connect` returns 231, spawn a blocking task that loops on `WaitNamedPipeW(30s)` + retry until an instance becomes available, and fire the deferred connect callback with the resolved status once the worker completes. Also cleans up a dead branch in `io_error_to_uv` that claimed `ERROR_PIPE_BUSY` mapped to `PermissionDenied` in Rust std (it actually maps to `Uncategorized`, so the check was unreachable), and adds explicit raw-os-error mappings for the common Win32 pipe codes so they no longer fall through to `UV_EINVAL`. Fixes #33923
1 parent 271a8a7 commit fe040fc

3 files changed

Lines changed: 323 additions & 10 deletions

File tree

libs/core/uv_compat.rs

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,24 @@ pub const UV_EOF: i32 = -4095;
8181
/// Map a `std::io::Error` to the closest libuv error code.
8282
pub(crate) fn io_error_to_uv(err: &std::io::Error) -> c_int {
8383
use std::io::ErrorKind;
84+
// On Windows, several Win32 error codes don't get a stable ErrorKind
85+
// mapping from std (they all end up as `Uncategorized`). Handle the
86+
// pipe-related ones explicitly first so they don't fall through to
87+
// the catch-all UV_EINVAL.
88+
#[cfg(windows)]
89+
if let Some(code) = err.raw_os_error() {
90+
match code {
91+
231 => return UV_EBUSY, // ERROR_PIPE_BUSY
92+
536 => return UV_EAGAIN, // ERROR_PIPE_LISTENING
93+
230 => return UV_EPIPE, // ERROR_BAD_PIPE
94+
// `ERROR_PIPE_NOT_CONNECTED` maps to `UV_EPIPE` to match libuv's
95+
// `uv_translate_sys_error` — node code pattern-matches against
96+
// libuv's actual values, so semantic accuracy (`UV_ENOTCONN`)
97+
// would break those callers.
98+
233 => return UV_EPIPE, // ERROR_PIPE_NOT_CONNECTED
99+
_ => {}
100+
}
101+
}
84102
match err.kind() {
85103
ErrorKind::AddrInUse => UV_EADDRINUSE,
86104
ErrorKind::AddrNotAvailable => UV_EINVAL,
@@ -93,15 +111,7 @@ pub(crate) fn io_error_to_uv(err: &std::io::Error) -> c_int {
93111
ErrorKind::InvalidInput => UV_EINVAL,
94112
ErrorKind::WouldBlock => UV_EAGAIN,
95113
ErrorKind::TimedOut => UV_ETIMEDOUT,
96-
ErrorKind::PermissionDenied => {
97-
// On Windows, ERROR_PIPE_BUSY (231) is mapped to PermissionDenied
98-
// by Rust std, but it means the named pipe is already in use.
99-
#[cfg(windows)]
100-
if let Some(231) = err.raw_os_error() {
101-
return UV_EADDRINUSE;
102-
}
103-
UV_EACCES
104-
}
114+
ErrorKind::PermissionDenied => UV_EACCES,
105115
_ => {
106116
// On Unix, try to use the raw OS error for a more accurate mapping.
107117
#[cfg(unix)]

libs/core/uv_compat/pipe.rs

Lines changed: 132 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,20 @@ pub struct uv_pipe_t {
8080
Box<dyn std::future::Future<Output = std::io::Result<()>> + Send>,
8181
>,
8282
>,
83+
/// In-flight `WaitNamedPipeW` retry task spawned when a synchronous
84+
/// `ClientOptions::open()` returned `ERROR_PIPE_BUSY`. The deferred
85+
/// connect callback fires once this task completes (success or final
86+
/// failure). Mirrors libuv's `pipe_connect_thread_proc`.
87+
#[cfg(windows)]
88+
#[allow(
89+
clippy::type_complexity,
90+
reason = "JoinHandle<io::Result<NamedPipeClient>> is inherently complex"
91+
)]
92+
pub(crate) internal_win_connect_retry: Option<
93+
tokio::task::JoinHandle<
94+
std::io::Result<tokio::net::windows::named_pipe::NamedPipeClient>,
95+
>,
96+
>,
8397

8498
// Connected stream (from connect or accept)
8599
#[cfg(unix)]
@@ -219,6 +233,8 @@ pub fn new_pipe(ipc: bool) -> uv_pipe_t {
219233
internal_win_client: None,
220234
#[cfg(windows)]
221235
internal_win_connect_fut: None,
236+
#[cfg(windows)]
237+
internal_win_connect_retry: None,
222238
#[cfg(unix)]
223239
internal_stream: None,
224240
#[cfg(unix)]
@@ -813,6 +829,32 @@ pub unsafe fn uv_pipe_connect(
813829
}
814830
0
815831
}
832+
Err(e) if e.raw_os_error() == Some(231) => {
833+
// ERROR_PIPE_BUSY: all of the server's pipe instances are in use.
834+
// Match libuv (`pipe_connect_thread_proc`) by retrying on a worker
835+
// thread using `WaitNamedPipeW`. The connect callback fires
836+
// asynchronously once the retry resolves.
837+
if !req.is_null() {
838+
(*req).handle = pipe as *mut super::stream::uv_stream_t;
839+
}
840+
let path_owned = path.to_owned();
841+
let join = tokio::task::spawn_blocking(move || {
842+
wait_named_pipe_and_open(&path_owned)
843+
});
844+
(*pipe).internal_win_connect_retry = Some(join);
845+
(*pipe).internal_connect = Some(PipeConnectPending { req, cb });
846+
(*pipe).flags |= UV_HANDLE_ACTIVE;
847+
848+
let inner = super::get_inner((*pipe).loop_);
849+
let mut handles = inner.pipe_handles.borrow_mut();
850+
if !handles.iter().any(|&h| std::ptr::eq(h, pipe)) {
851+
handles.push(pipe);
852+
}
853+
if let Some(w) = (*pipe).internal_waker.as_ref() {
854+
w.mark_ready();
855+
}
856+
0
857+
}
816858
Err(e) => {
817859
// If the path exists but isn't a named pipe, return ENOTSOCK
818860
// (matching libuv behavior). ClientOptions::open returns NotFound
@@ -828,6 +870,49 @@ pub unsafe fn uv_pipe_connect(
828870
}
829871
}
830872

873+
/// Wait for a Windows named pipe instance to become free and open a client.
874+
///
875+
/// Mirrors libuv's `pipe_connect_thread_proc`: loop on `WaitNamedPipeW`
876+
/// (30s per attempt) until an instance is available, then try to open it
877+
/// via `CreateFileW`. If another client races us, the wait/open loop
878+
/// continues until success or `WaitNamedPipeW` itself fails (e.g. the
879+
/// server is gone).
880+
#[cfg(windows)]
881+
pub(crate) fn wait_named_pipe_and_open(
882+
path: &str,
883+
) -> std::io::Result<tokio::net::windows::named_pipe::NamedPipeClient> {
884+
use std::os::windows::ffi::OsStrExt;
885+
let wide: Vec<u16> = std::ffi::OsStr::new(path)
886+
.encode_wide()
887+
.chain(std::iter::once(0))
888+
.collect();
889+
loop {
890+
// SAFETY: `wide` is a null-terminated wide string for the lifetime
891+
// of this call. WaitNamedPipeW has no other safety requirements.
892+
let ok = unsafe {
893+
windows_sys::Win32::System::Pipes::WaitNamedPipeW(wide.as_ptr(), 30000)
894+
};
895+
if ok == 0 {
896+
// Any `WaitNamedPipeW` failure — including `ERROR_SEM_TIMEOUT` —
897+
// exits the loop. Matches libuv's `pipe_connect_thread_proc`:
898+
// the 30s timeout signals the server is unresponsive and the
899+
// connect should be reported as failed rather than retried
900+
// indefinitely.
901+
return Err(std::io::Error::last_os_error());
902+
}
903+
match tokio::net::windows::named_pipe::ClientOptions::new().open(path) {
904+
Ok(client) => return Ok(client),
905+
Err(e) if e.raw_os_error() == Some(231) => {
906+
// Another client raced us to the freed instance. Loop and wait
907+
// for another one. Yield first to avoid a hot spin.
908+
std::thread::yield_now();
909+
continue;
910+
}
911+
Err(e) => return Err(e),
912+
}
913+
}
914+
}
915+
831916
/// Attempt a non-blocking write to a pipe. Returns the number of bytes
832917
/// written (>= 0) or a negative `UV_*` error code. `UV_EAGAIN` when the
833918
/// pipe would block. Mirrors `uv__try_write()` in libuv.
@@ -963,6 +1048,9 @@ pub(crate) unsafe fn close_pipe(pipe: *mut uv_pipe_t) {
9631048
if let Some(handle) = (*pipe).internal_pending_read.take() {
9641049
handle.abort();
9651050
}
1051+
if let Some(retry) = (*pipe).internal_win_connect_retry.take() {
1052+
retry.abort();
1053+
}
9661054
if let Some(handle) = (*pipe).internal_handle.take() {
9671055
// SAFETY: handle is a valid OS handle from get_osfhandle.
9681056
windows_sys::Win32::Foundation::CloseHandle(handle as _);
@@ -1115,7 +1203,50 @@ pub(crate) unsafe fn poll_pipe_handle(
11151203

11161204
unsafe {
11171205
// 1. Poll deferred connect callback (from uv_pipe_connect on Windows).
1118-
if let Some(pending) = (*pipe_ptr).internal_connect.take() {
1206+
//
1207+
// Two sub-cases:
1208+
// a) Sync-success: ClientOptions::open() returned a connected client
1209+
// immediately; the client is already stored in `internal_win_client`
1210+
// and we just need to fire cb(req, 0).
1211+
// b) Retry-in-flight: ClientOptions::open() returned ERROR_PIPE_BUSY
1212+
// and a worker task is calling WaitNamedPipeW + retry. Wait for the
1213+
// task to finish before firing cb with the resolved status.
1214+
if let Some(ref mut join) = (*pipe_ptr).internal_win_connect_retry {
1215+
match std::pin::Pin::new(join).poll(cx) {
1216+
Poll::Pending => { /* keep waiting */ }
1217+
Poll::Ready(join_res) => {
1218+
(*pipe_ptr).internal_win_connect_retry = None;
1219+
let status = match join_res {
1220+
Ok(Ok(client)) => {
1221+
// Same FILE_TYPE_PIPE sanity check as the sync path.
1222+
use std::os::windows::io::AsRawHandle;
1223+
let handle = client.as_raw_handle();
1224+
let file_type =
1225+
windows_sys::Win32::Storage::FileSystem::GetFileType(
1226+
handle as _,
1227+
);
1228+
if file_type
1229+
!= windows_sys::Win32::Storage::FileSystem::FILE_TYPE_PIPE
1230+
{
1231+
drop(client);
1232+
super::UV_ENOTSOCK
1233+
} else {
1234+
(*pipe_ptr).internal_win_client = Some(client);
1235+
0
1236+
}
1237+
}
1238+
Ok(Err(ref e)) => io_error_to_uv(e),
1239+
Err(_) => super::UV_ECANCELED,
1240+
};
1241+
if let Some(pending) = (*pipe_ptr).internal_connect.take()
1242+
&& let Some(cb) = pending.cb
1243+
{
1244+
cb(pending.req, status);
1245+
}
1246+
any_work = true;
1247+
}
1248+
}
1249+
} else if let Some(pending) = (*pipe_ptr).internal_connect.take() {
11191250
if let Some(cb) = pending.cb {
11201251
cb(pending.req, 0);
11211252
}

libs/core/uv_compat/tests.rs

Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2989,3 +2989,175 @@ async fn pipe_open_not_active_until_read_start() {
29892989
})
29902990
.await;
29912991
}
2992+
2993+
/// Regression test for https://github.com/denoland/deno/issues/33923.
2994+
///
2995+
/// On Windows, when all named-pipe instances are in use (e.g., Docker
2996+
/// Desktop under load), `CreateFileW` returns `ERROR_PIPE_BUSY` (231).
2997+
/// libuv handles this by transparently retrying via `WaitNamedPipeW` on
2998+
/// a worker thread; node code (like `docker-modem`) relies on this. We
2999+
/// must do the same — without it, the connection surfaces as
3000+
/// `connect EINVAL` to JavaScript.
3001+
#[cfg(windows)]
3002+
#[tokio::test(flavor = "current_thread")]
3003+
async fn pipe_connect_retries_on_error_pipe_busy() {
3004+
use tokio::net::windows::named_pipe::ClientOptions;
3005+
use tokio::net::windows::named_pipe::ServerOptions;
3006+
// Unique pipe name per test run.
3007+
let pipe_name =
3008+
format!(r"\\.\pipe\deno-uvcompat-busy-{}", std::process::id());
3009+
3010+
// Server with max_instances=1 -- only one client can be connected at
3011+
// a time. The first server instance must use first_pipe_instance(true).
3012+
let server = ServerOptions::new()
3013+
.first_pipe_instance(true)
3014+
.max_instances(1)
3015+
.create(&pipe_name)
3016+
.unwrap();
3017+
3018+
// Client 1 connects, consuming the only instance.
3019+
let _client1 = ClientOptions::new().open(&pipe_name).unwrap();
3020+
// Drive the server's ConnectNamedPipe so the pair is fully established.
3021+
server.connect().await.unwrap();
3022+
3023+
// Sanity: a second synchronous open MUST now return ERROR_PIPE_BUSY.
3024+
// (If this assertion fails, the test setup no longer triggers the
3025+
// condition we're trying to exercise.)
3026+
let busy_err = ClientOptions::new().open(&pipe_name).unwrap_err();
3027+
assert_eq!(
3028+
busy_err.raw_os_error(),
3029+
Some(231),
3030+
"expected ERROR_PIPE_BUSY, got {:?}",
3031+
busy_err
3032+
);
3033+
3034+
// Now exercise the retry helper. It should block in WaitNamedPipeW
3035+
// until we drop the existing server and create a fresh instance.
3036+
let pipe_name_clone = pipe_name.clone();
3037+
let join = tokio::task::spawn_blocking(move || {
3038+
pipe::wait_named_pipe_and_open(&pipe_name_clone)
3039+
});
3040+
3041+
// Give the worker a moment to actually enter WaitNamedPipeW. Without
3042+
// this the new server instance below could appear before the wait
3043+
// starts, masking whether the retry actually waits.
3044+
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
3045+
assert!(!join.is_finished(), "wait must block while pipe is busy");
3046+
3047+
// Free the instance: drop the original server, then create a new one.
3048+
drop(server);
3049+
let new_server = ServerOptions::new()
3050+
.first_pipe_instance(false)
3051+
.create(&pipe_name)
3052+
.unwrap();
3053+
// Drive the server side so the client's CreateFileW can complete.
3054+
let connect_fut = tokio::spawn(async move {
3055+
new_server.connect().await.unwrap();
3056+
new_server
3057+
});
3058+
3059+
// The retry helper must observe the new instance and succeed.
3060+
let result = tokio::time::timeout(std::time::Duration::from_secs(5), join)
3061+
.await
3062+
.expect("wait_named_pipe_and_open did not complete in time")
3063+
.expect("spawn_blocking join failed")
3064+
.expect("wait_named_pipe_and_open returned an error");
3065+
3066+
// Hand-off succeeded: result is a connected client.
3067+
drop(result);
3068+
let _ = connect_fut.await;
3069+
}
3070+
3071+
/// End-to-end check that `uv_pipe_connect` returns successfully (0 from
3072+
/// the function, status=0 to the connect callback) even when the initial
3073+
/// `ClientOptions::open()` fails with `ERROR_PIPE_BUSY`. Prior to the
3074+
/// retry fix, this scenario surfaced as `UV_EINVAL` (the
3075+
/// `connect EINVAL //./pipe/...` error from #33923).
3076+
#[cfg(windows)]
3077+
#[tokio::test(flavor = "current_thread")]
3078+
async fn uv_pipe_connect_busy_then_succeeds() {
3079+
use std::sync::atomic::AtomicI32;
3080+
use std::sync::atomic::Ordering;
3081+
3082+
use tokio::net::windows::named_pipe::ClientOptions;
3083+
use tokio::net::windows::named_pipe::ServerOptions;
3084+
3085+
let pipe_name =
3086+
format!(r"\\.\pipe\deno-uvcompat-busy-cb-{}", std::process::id());
3087+
3088+
// Saturate the pipe so the synchronous open in uv_pipe_connect hits
3089+
// ERROR_PIPE_BUSY.
3090+
let server = ServerOptions::new()
3091+
.first_pipe_instance(true)
3092+
.max_instances(1)
3093+
.create(&pipe_name)
3094+
.unwrap();
3095+
let _client1 = ClientOptions::new().open(&pipe_name).unwrap();
3096+
server.connect().await.unwrap();
3097+
3098+
run_test(async |runtime, uv_loop| {
3099+
// Static result slot for the connect callback. The pointer is
3100+
// stable for the test's duration.
3101+
static RESULT: AtomicI32 = AtomicI32::new(i32::MIN);
3102+
RESULT.store(i32::MIN, Ordering::SeqCst);
3103+
3104+
unsafe extern "C" fn connect_cb(_req: *mut uv_connect_t, status: i32) {
3105+
RESULT.store(status, Ordering::SeqCst);
3106+
}
3107+
3108+
let mut pipe = pipe::new_pipe(false);
3109+
unsafe {
3110+
assert_ok(pipe::uv_pipe_init(uv_loop, &mut pipe, 0));
3111+
}
3112+
3113+
let mut req = new_connect();
3114+
let rc = unsafe {
3115+
pipe::uv_pipe_connect(&mut req, &mut pipe, &pipe_name, Some(connect_cb))
3116+
};
3117+
// The retry path returns 0 synchronously; the callback fires later.
3118+
assert_eq!(rc, 0, "uv_pipe_connect should defer, not fail with {rc}");
3119+
assert_eq!(
3120+
RESULT.load(Ordering::SeqCst),
3121+
i32::MIN,
3122+
"callback must not fire before the pipe is free"
3123+
);
3124+
3125+
// Free up the pipe and create a new server instance so the worker's
3126+
// WaitNamedPipeW returns.
3127+
drop(server);
3128+
let new_server = ServerOptions::new()
3129+
.first_pipe_instance(false)
3130+
.create(&pipe_name)
3131+
.unwrap();
3132+
// Drive the server side concurrently so the client's CreateFileW
3133+
// (issued from the retry worker) can complete.
3134+
let server_task = tokio::spawn(async move {
3135+
new_server.connect().await.unwrap();
3136+
new_server
3137+
});
3138+
3139+
// Tick the loop until the callback fires (with timeout).
3140+
let deadline =
3141+
std::time::Instant::now() + std::time::Duration::from_secs(5);
3142+
while RESULT.load(Ordering::SeqCst) == i32::MIN {
3143+
if std::time::Instant::now() > deadline {
3144+
panic!("connect callback did not fire within timeout");
3145+
}
3146+
tick(runtime).await;
3147+
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
3148+
}
3149+
assert_eq!(
3150+
RESULT.load(Ordering::SeqCst),
3151+
0,
3152+
"expected status=0, got {}",
3153+
RESULT.load(Ordering::SeqCst)
3154+
);
3155+
3156+
let _ = server_task.await;
3157+
unsafe {
3158+
uv_close(&mut pipe as *mut uv_pipe_t as *mut uv_handle_t, None);
3159+
}
3160+
tick(runtime).await;
3161+
})
3162+
.await;
3163+
}

0 commit comments

Comments
 (0)