diff --git a/Cargo.toml b/Cargo.toml index 4dcfac9..35ef6a2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,7 +14,7 @@ doc = false name = "wreq_ruby" [dependencies] -magnus = { version = "0.8.2", features = ["bytes"] } +magnus = { version = "0.8.2", features = ["bytes", "rb-sys"] } rb-sys = { version = "0.9.128", default-features = false } tokio = { version = "1.52.3", features = ["full"] } wreq = { git = "https://github.com/0x676e67/wreq", features = [ diff --git a/src/client/body.rs b/src/client/body.rs index e8bbb4a..eb19762 100644 --- a/src/client/body.rs +++ b/src/client/body.rs @@ -12,7 +12,7 @@ use magnus::{ pub use self::{ form::Form, json::Json, - stream::{BodyReceiver, BodySender, ReceiverStream}, + stream::{BodySender, ReceiverStream}, }; /// Represents the body of an HTTP request. diff --git a/src/client/body/stream.rs b/src/client/body/stream.rs index dbf13b4..286651e 100644 --- a/src/client/body/stream.rs +++ b/src/client/body/stream.rs @@ -5,20 +5,11 @@ use std::{ }; use bytes::Bytes; -use futures_util::{Stream, StreamExt, TryFutureExt}; +use futures_util::{Stream, TryFutureExt}; use magnus::{Error, RString, TryConvert, Value}; -use tokio::sync::{ - Mutex, - mpsc::{self}, -}; - -use crate::{ - error::{memory_error, mpsc_send_error_to_magnus}, - rt, -}; +use tokio::sync::mpsc; -/// A receiver for streaming HTTP response bodies. -pub struct BodyReceiver(Mutex> + Send>>>); +use crate::error::{memory_error, mpsc_send_error_to_magnus}; /// A sender for streaming HTTP request bodies. #[magnus::wrap(class = "Wreq::BodySender", free_immediately, size)] @@ -29,32 +20,6 @@ struct InnerBodySender { rx: Option>, } -// ===== impl BodyReceiver ===== - -impl BodyReceiver { - /// Create a new [`BodyReceiver`] instance. - #[inline] - pub fn new(stream: impl Stream> + Send + 'static) -> BodyReceiver { - BodyReceiver(Mutex::new(Box::pin(stream))) - } -} - -impl Iterator for BodyReceiver { - type Item = Bytes; - - fn next(&mut self) -> Option { - rt::maybe_block_on(async { - self.0 - .lock() - .await - .as_mut() - .next() - .await - .and_then(|r| r.ok()) - }) - } -} - // ===== impl BodySender ===== impl BodySender { @@ -78,7 +43,7 @@ impl BodySender { let bytes = data.to_bytes(); let inner = rb_self.0.read().unwrap(); if let Some(ref tx) = inner.tx { - rt::try_block_on(tx.send(bytes).map_err(mpsc_send_error_to_magnus))?; + crate::rt::try_block_on(tx.send(bytes).map_err(mpsc_send_error_to_magnus))?; } Ok(()) } diff --git a/src/client/resp.rs b/src/client/resp.rs index 1aeab97..5091ca8 100644 --- a/src/client/resp.rs +++ b/src/client/resp.rs @@ -2,14 +2,17 @@ use std::{net::SocketAddr, sync::Arc}; use arc_swap::ArcSwapOption; use bytes::Bytes; -use futures_util::TryFutureExt; +use futures_util::{StreamExt, TryFutureExt}; use http::{Extensions, HeaderMap, response::Response as HttpResponse}; use http_body_util::BodyExt; -use magnus::{Error, Module, RArray, RModule, Ruby, Value, block::Yield, scan_args::scan_args}; +use magnus::{ + Error, IntoValue, Module, RArray, RModule, Ruby, Value, block::Proc, scan_args::scan_args, + value::ReprValue, +}; use wreq::Uri; use crate::{ - client::body::{BodyReceiver, Json}, + client::body::Json, cookie::Cookie, error::{memory_error, wreq_error_to_magnus}, gvl, @@ -18,6 +21,22 @@ use crate::{ rt, }; +// RAII wrapper that calls rb_gc_unregister_address on drop, ensuring the GC +// registration is always cleaned up regardless of how the scope exits (normal +// return, early error return via `?`, or panic). +struct GcGuard(*mut rb_sys::VALUE); + +impl Drop for GcGuard { + fn drop(&mut self) { + unsafe { rb_sys::rb_gc_unregister_address(self.0) } + } +} + +// SAFETY: GcGuard is only ever created while holding the GVL, and its Drop +// runs either on the same thread or inside with_gvl (also GVL-held). +// The pointer it holds is into a Box that outlives the guard. +unsafe impl Send for GcGuard {} + /// A response from a request. #[magnus::wrap(class = "Wreq::Response", free_immediately, size)] pub struct Response { @@ -199,12 +218,112 @@ impl Response { }) } - /// Get a chunk iterator for the response body. - pub fn chunks(&self) -> Result, Error> { - self.response(true) - .map(wreq::Response::bytes_stream) - .map(BodyReceiver::new) - .map(Yield::Iter) + /// Stream the response body, yielding each chunk to the given block with + /// proper GVL management. + /// + /// The iteration loop is driven from Rust: + /// 1. GVL is released while waiting for the next chunk (network I/O) + /// 2. GVL is re-acquired to yield the chunk to the Ruby block + /// 3. GVL is released again for the next I/O operation + /// + /// This allows other Ruby threads to run during network I/O, and ensures + /// streaming errors are properly propagated instead of silently swallowed. + pub fn chunks(ruby: &Ruby, rb_self: &Self) -> Result { + if unsafe { rb_sys::rb_block_given_p() == 0 } { + return Err(Error::new( + ruby.exception_local_jump_error(), + "no block given (yield)", + )); + } + + // FIX (issue 3): response() is called FIRST, before any GC registration. + // If it fails and returns Err, we exit here without ever registering + // anything — so there is nothing to unregister. + let response = rb_self.response(true)?; + let stream = response.bytes_stream(); + + // Heap-allocate the block VALUE so rb_gc_register_address has a stable + // address to track. GcGuard guarantees rb_gc_unregister_address is called + // on every exit path (FIX issues 3 and 5). + let mut block_raw = Box::new(unsafe { rb_sys::rb_block_proc() }); + let block_ptr: *mut rb_sys::VALUE = block_raw.as_mut(); + unsafe { rb_sys::rb_gc_register_address(block_ptr) }; + let _gc_guard = GcGuard(block_ptr); // dropped at end of scope unconditionally + + // FIX (issue 2): capture the heap address as `usize` (Copy + Send) rather + // than as `*mut VALUE` (!Send). The pointer is reconstructed inside the + // loop, only when needed, from within a with_gvl callback. + let block_addr: usize = block_ptr as usize; + + // Drive the streaming loop without the GVL. + // FIX (issue 1): `ruby: &Ruby` is NOT moved into the async block. + // The Ruby handle is obtained fresh via Ruby::get() inside with_gvl, + // where we are guaranteed to hold the GVL. Capturing &Ruby across + // GVL releases is semantically wrong even though Ruby is a ZST. + let result = gvl::nogvl_cancellable(|flag| { + rt::runtime().block_on(async move { + let mut stream = Box::pin(stream); + loop { + let chunk = tokio::select! { + biased; + _ = flag.cancelled() => return Err(crate::error::interrupt_error()), + result = stream.next() => result, + }; + + match chunk { + Some(Ok(bytes)) => { + // FIX (issue 2): reconstruct the pointer from usize here, + // inside the closure, rather than at capture time. + // Read the current VALUE — GC compaction may have updated + // the referent via the registered address. + let current_block_raw = + unsafe { *(block_addr as *const rb_sys::VALUE) }; + + let yield_result: Result<(), Error> = + tokio::task::block_in_place(|| { + gvl::with_gvl(|| { + // FIX (issue 1): obtain Ruby handle fresh now + // that we hold the GVL. Never use a captured + // &Ruby across a GVL release. + let ruby = magnus::Ruby::get() + .expect("Ruby::get() failed inside with_gvl — GVL not held as expected"); + + let block_value = unsafe { + magnus::rb_sys::FromRawValue::from_raw( + current_block_raw, + ) + }; + + // FIX (issue 6): accurate error message. + // This path means VALUE reconstruction failed, + // not that GC collected the block (the GcGuard + // prevents that). + let block = + Proc::from_value(block_value).ok_or_else(|| { + Error::new( + ruby.exception_runtime_error(), + "invalid block VALUE: reconstruction failed \ + (this is a wreq-ruby bug, not GC collection)", + ) + })?; + + let chunk_value = bytes.into_value_with(&ruby); + block.call::<_, Value>((chunk_value,))?; + Ok(()) + }) + }); + yield_result?; + } + Some(Err(e)) => return Err(wreq_error_to_magnus(e)), + None => return Ok(()), + } + } + }) + }); + // _gc_guard drops here — rb_gc_unregister_address called unconditionally. + + result?; + Ok(ruby.qnil().as_value()) } /// Close the response body, dropping any resources. diff --git a/src/gvl.rs b/src/gvl.rs index 1573618..799aa07 100644 --- a/src/gvl.rs +++ b/src/gvl.rs @@ -3,7 +3,7 @@ use std::{ffi::c_void, mem::MaybeUninit, ptr::null_mut}; -use rb_sys::rb_thread_call_without_gvl; +use rb_sys::{rb_thread_call_with_gvl, rb_thread_call_without_gvl}; use tokio::sync::watch; /// Container for safely passing closure and result through C callback. @@ -77,6 +77,61 @@ unsafe extern "C" fn unblock_func(arg: *mut c_void) { } } +// ── Separate arg container for with_gvl ────────────────────────────────────── +// +// Uses Option instead of MaybeUninit. If the closure panics and never +// writes a result, args.result stays None and the subsequent .expect() gives a +// clear panic message rather than reading uninitialized memory (which would be +// UB). The FFI unwind is still UB, but this is the best we can do short of +// catching the panic before the FFI boundary. + +struct GvlArgs { + func: Option, + result: Option, +} + +unsafe extern "C" fn call_with_gvl(arg: *mut c_void) -> *mut c_void +where + F: FnOnce() -> R, +{ + let args = unsafe { &mut *(arg as *mut GvlArgs) }; + let func = args.func.take().expect("call_with_gvl called twice"); + args.result = Some(func()); + null_mut() +} + +/// Executes the given closure while holding the Ruby GVL. +/// +/// Must be called from a context where the GVL has been released +/// (e.g., inside a [`nogvl`] or [`nogvl_cancellable`] callback). +/// Re-acquires the GVL, runs the closure, then releases it again. +/// +/// # Safety +/// +/// The closure MUST NOT panic. A panic unwinds through the FFI boundary, +/// which is undefined behavior. Unlike `nogvl` (which uses `MaybeUninit`), +/// this uses `Option` so a failed result produces a clear `.expect()` +/// message rather than silent UB — but the FFI unwind remains UB regardless. +pub fn with_gvl(func: F) -> R +where + F: FnOnce() -> R, + R: Sized, +{ + let mut args = GvlArgs { + func: Some(func), + result: None, + }; + + let arg_ptr = &mut args as *mut _ as *mut c_void; + + unsafe { + rb_thread_call_with_gvl(Some(call_with_gvl::), arg_ptr); + } + + args.result + .expect("with_gvl: closure did not produce a result (panic crossed FFI boundary?)") +} + /// Executes the given closure without holding the Ruby GVL (Global VM Lock). /// /// WARNING: Do NOT nest calls to [`nogvl`] or [`nogvl_cancellable`] inside each other. diff --git a/src/rt.rs b/src/rt.rs index 1bd71f5..acf55ed 100644 --- a/src/rt.rs +++ b/src/rt.rs @@ -1,3 +1,4 @@ +use std::future::Future; use std::sync::LazyLock; use tokio::runtime::{Builder, Runtime}; @@ -11,6 +12,11 @@ static RUNTIME: LazyLock = LazyLock::new(|| { .expect("Failed to initialize Tokio runtime") }); +/// Returns a reference to the global Tokio runtime. +pub(crate) fn runtime() -> &'static Runtime { + &RUNTIME +} + /// Block on a future to completion on the global Tokio runtime, /// with support for cancellation via the provided `CancelFlag`. pub fn try_block_on(future: F) -> F::Output @@ -27,21 +33,3 @@ where }) }) } - -/// Block on a future to completion on the global Tokio runtime, -/// returning `None` if cancelled via the provided `CancelFlag`. -#[inline] -pub fn maybe_block_on(future: F) -> F::Output -where - F: Future>, -{ - gvl::nogvl_cancellable(|flag| { - RUNTIME.block_on(async move { - tokio::select! { - biased; - _ = flag.cancelled() => None, - result = future => result, - } - }) - }) -} diff --git a/test/gvl_streaming_test.rb b/test/gvl_streaming_test.rb new file mode 100644 index 0000000..b24f7fa --- /dev/null +++ b/test/gvl_streaming_test.rb @@ -0,0 +1,508 @@ +# frozen_string_literal: true + +require 'test_helper' + +# Test suite for GVL-aware streaming (issue #57). +# +# Requires httpbin running on localhost:8080: +# docker run -d -p 8080:80 --name httpbin kennethreitz/httpbin +# +# Covers: +# - Basic functionality (yield count, encoding, return value, empty body) +# - Block control flow (break, next, exception) +# - Double-consumption / body ownership +# - GVL correctness (other threads run during I/O) +# - Thread interruption (kill, raise) +# - Streaming error propagation +# - Content integrity (streamed == buffered) +# - GC safety (forced collection during streaming) +# - Concurrency (multiple threads, multiple clients) +# - Regression: original issue #57 (Mutex starvation) + +class GvlStreamingTest < Minitest::Test + BASE = 'http://localhost:8080' + + # =========================================================================== + # Helpers + # =========================================================================== + + def client + Wreq::Client.new + end + + def get(path, **opts) + client.get("#{BASE}#{path}", **opts) + end + + # Collect all chunks from a response into an array. + def collect_chunks(resp) + chunks = [] + resp.chunks { |c| chunks << c } + chunks + end + + # =========================================================================== + # 1. Basic functionality + # =========================================================================== + + def test_chunks_yields_correct_count + chunks = collect_chunks(get('/stream/5')) + assert_equal 5, chunks.size + end + + def test_chunks_yields_strings + get('/stream/3').chunks do |chunk| + assert_kind_of String, chunk + end + end + + def test_chunks_yields_binary_encoding + get('/stream/3').chunks do |chunk| + assert_includes [Encoding::BINARY, Encoding::ASCII_8BIT], chunk.encoding, + "Expected binary encoding, got #{chunk.encoding}" + end + end + + def test_chunks_yields_non_empty_chunks + get('/stream/3').chunks do |chunk| + assert chunk.bytesize > 0, 'Chunks must not be empty' + end + end + + def test_chunks_returns_nil + result = get('/stream/3').chunks { |_c| :ignored_return_value } + assert_nil result + end + + def test_chunks_block_return_value_is_always_nil + # Whatever the block returns, chunks itself must return nil. + [42, 'string', :symbol, [], {}, true, false].each do |val| + result = get('/stream/1').chunks { |_c| val } + assert_nil result, "chunks should return nil when block returns #{val.inspect}" + end + end + + def test_chunks_empty_body + chunk_count = 0 + get('/status/204').chunks { |_c| chunk_count += 1 } + assert_equal 0, chunk_count + end + + def test_chunks_single_chunk_total_bytes + total = 0 + get('/bytes/1024').chunks { |c| total += c.bytesize } + assert_equal 1024, total + end + + def test_chunks_without_block_raises_local_jump_error + resp = get('/stream/3') + assert_raises(LocalJumpError) { resp.chunks } + end + + # =========================================================================== + # 2. Block control flow + # =========================================================================== + + def test_break_inside_block_stops_iteration + chunks_seen = 0 + # /stream/10 yields 10 chunks; we break after 2. + get('/stream/10').chunks do |_c| + chunks_seen += 1 + break if chunks_seen == 2 + end + assert_equal 2, chunks_seen, 'break should stop iteration after 2 chunks' + end + + def test_break_inside_block_does_not_raise + # break must not propagate as an exception to the caller. + assert_silent do + get('/stream/5').chunks do |_c| + break + end + end + end + + def test_next_inside_block_skips_to_next_chunk + processed = [] + get('/stream/5').chunks do |c| + next if processed.size == 2 # skip third chunk processing + + processed << c + end + # next skips the block body but iteration continues; we still get 5 yields + # but only push 4 times (skip once when size==2 means index 2 skipped). + assert_equal 4, processed.size + end + + def test_exception_in_block_propagates_to_caller + raised = nil + begin + get('/stream/5').chunks do |_c| + raise 'block error' + end + rescue RuntimeError => e + raised = e + end + refute_nil raised + assert_equal 'block error', raised.message + end + + def test_exception_in_block_stops_after_correct_chunk_count + count = 0 + begin + get('/stream/5').chunks do |_c| + count += 1 + raise 'stop' if count == 3 + end + rescue RuntimeError + end + assert_equal 3, count + end + + def test_exception_class_preserved_through_block + begin + get('/stream/3').chunks { raise ArgumentError, 'bad arg' } + rescue ArgumentError => e + assert_equal 'bad arg', e.message + return + end + flunk 'ArgumentError should have propagated' + end + + # =========================================================================== + # 3. Body ownership / double-consumption + # =========================================================================== + + def test_chunks_called_twice_raises_memory_error + resp = get('/stream/3') + resp.chunks { |_c| } + assert_raises(Wreq::MemoryError) { resp.chunks { |_c| } } + end + + def test_text_after_chunks_raises_memory_error + resp = get('/stream/3') + resp.chunks { |_c| } + assert_raises(Wreq::MemoryError) { resp.text } + end + + def test_bytes_after_chunks_raises_memory_error + resp = get('/stream/3') + resp.chunks { |_c| } + assert_raises(Wreq::MemoryError) { resp.bytes } + end + + def test_chunks_after_text_raises_memory_error + resp = get('/stream/3') + resp.text + assert_raises(Wreq::MemoryError) { resp.chunks { |_c| } } + end + + # Regression for issue #3 in the fix analysis: + # If response() raises (body already consumed), we must NOT leak a GC + # registration. This is validated indirectly: if a stale pointer is + # registered, subsequent GC cycles corrupt the heap and later tests crash. + def test_gc_registration_not_leaked_when_response_already_consumed + resp = get('/stream/3') + resp.chunks { |_c| } + + # Force several GC cycles. If a stale pointer was registered, this crashes. + 10.times { GC.start(full_mark: true, immediate_sweep: true) } + + # Confirm the error is still raised cleanly (body correctly consumed). + assert_raises(Wreq::MemoryError) { resp.chunks { |_c| } } + end + + # =========================================================================== + # 4. Content integrity + # =========================================================================== + + def test_streamed_content_matches_buffered_content + full = client.get("#{BASE}/bytes/4096").bytes + stream = ''.b + client.get("#{BASE}/bytes/4096").chunks { |c| stream << c } + assert_equal full.bytesize, stream.bytesize + end + + def test_streamed_content_is_valid_json_per_chunk + get('/stream/5').chunks do |chunk| + assert_match(/\{.*\}/, chunk, 'Each chunk from /stream/N should be a JSON object') + end + end + + def test_large_stream_total_size + # /bytes/N returns exactly N random bytes. Stream it and verify. + size = 256 * 1024 # 256 KB + total = 0 + client.get("#{BASE}/bytes/#{size}").chunks { |c| total += c.bytesize } + assert_equal size, total + end + + def test_many_chunks_all_received + # /stream/N yields N JSON objects, one per chunk. + n = 20 + chunks = collect_chunks(get("/stream/#{n}")) + assert_equal n, chunks.size + end + + # =========================================================================== + # 5. GVL correctness + # =========================================================================== + + # Core GVL test: a background thread must make measurable progress while + # the main thread is blocked waiting for network chunks. + # + # Uses an Atomic-style counter via Array#push (GIL-safe on MRI) rather than + # a plain integer to avoid a data race in the assertion read. + def test_other_threads_run_during_network_wait + resp = client.get("#{BASE}/drip?duration=3&numbytes=3&delay=1") + + ticks = [] + ticker = Thread.new do + 30.times do + ticks << 1 + sleep 0.1 + end + end + + chunks_received = 0 + resp.chunks { |_c| chunks_received += 1 } + ticker.join(10) + + # With 3 chunks spaced 1s apart, the ticker should have accumulated + # at least ~20 ticks (3 seconds * 10 ticks/sec) if the GVL is released. + # We use a conservative threshold of 10 to allow for slow CI. + assert ticks.size > 10, + "Ticker only reached #{ticks.size} ticks — GVL may not be released during I/O. " \ + 'Expected > 10 ticks during a 3-second drip stream.' + assert chunks_received >= 1 + end + + # Regression for issue #57: streaming must not starve a thread that holds + # a Mutex. Before the fix, BodyReceiver held the GVL continuously, so a + # thread waiting on mutex.synchronize would never be scheduled. + def test_streaming_does_not_starve_mutex_waiters + mutex = Mutex.new + mutex_acquired_at = nil + + # This thread acquires the mutex after a short delay. + # If the GVL is held by the streaming thread, it will never run. + waiter = Thread.new do + sleep 0.5 + mutex.synchronize { mutex_acquired_at = Time.now } + end + + Time.now + client.get("#{BASE}/drip?duration=3&numbytes=3&delay=1").chunks { |_c| } + stream_end = Time.now + + waiter.join(10) + + refute_nil mutex_acquired_at, 'Mutex waiter thread never ran' + + # The waiter should have acquired the mutex well before streaming finished. + assert mutex_acquired_at < stream_end, + "Mutex was only acquired at #{mutex_acquired_at}, after streaming ended at #{stream_end}. " \ + 'The streaming thread may have held the GVL for the entire duration.' + end + + def test_multiple_concurrent_streams_same_client + results = Array.new(2) + threads = 2.times.map do |i| + Thread.new do + chunks = collect_chunks(client.get("#{BASE}/stream/3")) + results[i] = chunks.size + end + end + threads.each { |t| t.join(15) } + assert_equal [3, 3], results + end + + def test_multiple_concurrent_streams_different_clients + results = Array.new(3) + threads = 3.times.map do |i| + Thread.new do + c = Wreq::Client.new + chunks = collect_chunks(c.get("#{BASE}/stream/3")) + results[i] = chunks.size + end + end + threads.each { |t| t.join(15) } + assert_equal [3, 3, 3], results + end + + # =========================================================================== + # 6. Thread interruption + # =========================================================================== + + def test_thread_kill_during_network_wait + started = false + thread = Thread.new do + resp = client.get("#{BASE}/drip?duration=10&numbytes=10") + started = true + resp.chunks { |_c| } + rescue StandardError => _e + end + + # Wait until the thread has actually started streaming before killing. + sleep 0.1 until started || !thread.alive? + sleep 0.5 # let it block on first chunk wait + thread.kill + assert thread.join(5), 'Thread should terminate after kill' + end + + def test_thread_kill_during_block_execution + started = false + thread = Thread.new do + resp = client.get("#{BASE}/stream/5") + started = true + resp.chunks do |_c| + sleep 10 # block in the Ruby block, not in I/O + end + rescue StandardError => _e + end + + sleep 0.1 until started || !thread.alive? + sleep 0.3 + thread.kill + assert thread.join(5), 'Thread should terminate when killed during block execution' + end + + def test_thread_raise_during_streaming + error_class = Class.new(StandardError) + received_error = nil + started = false + + thread = Thread.new do + resp = client.get("#{BASE}/drip?duration=10&numbytes=10") + started = true + resp.chunks { |_c| } + rescue StandardError => e + received_error = e + end + + sleep 0.1 until started || !thread.alive? + sleep 0.5 + thread.raise(error_class, 'injected') + assert thread.join(5), 'Thread should terminate after raise' + assert_instance_of error_class, received_error, + "Expected injected error class, got #{received_error.class}" + end + + # =========================================================================== + # 7. Streaming error propagation + # =========================================================================== + + def test_streaming_error_raises_ruby_exception + # Use a very short timeout against a slow drip to force a mid-body error. + resp = client.get("#{BASE}/drip?duration=10&numbytes=10", timeout: 1) + error_raised = false + begin + resp.chunks { |_c| } + rescue Wreq::TimeoutError, Wreq::BodyError, Wreq::ConnectionResetError + error_raised = true + end + assert error_raised, 'A streaming timeout should raise a Wreq error' + end + + def test_streaming_error_is_not_silently_swallowed + # Before the fix, BodyReceiver swallowed errors via .and_then(|r| r.ok()). + # This test verifies that an error causes an exception, not silent EOF. + resp = client.get("#{BASE}/drip?duration=10&numbytes=10", timeout: 1) + chunks_received = 0 + begin + resp.chunks do |_c| + chunks_received += 1 + end + rescue Wreq::TimeoutError, Wreq::BodyError, Wreq::ConnectionResetError + # expected + end + # We should have received fewer chunks than the full 10 (cut short by timeout) + # and an error should have been raised (not just silently stopped at 0 chunks). + assert chunks_received < 10, + 'Should not have received all 10 chunks before timeout' + end + + # =========================================================================== + # 8. GC safety + # =========================================================================== + + def test_block_not_gc_collected_during_streaming + # Force GC between every chunk. If the block Proc is not GC-pinned, + # this will crash or raise "invalid block VALUE". + chunks_received = 0 + client.get("#{BASE}/drip?duration=3&numbytes=3&delay=1").chunks do |_c| + chunks_received += 1 + GC.start(full_mark: true, immediate_sweep: true) + GC.compact if GC.respond_to?(:compact) + end + assert_equal 3, chunks_received, + 'All chunks must arrive even with forced GC + compaction between yields' + end + + def test_gc_compaction_during_streaming + skip 'GC.compact not available' unless GC.respond_to?(:compact) + chunks = [] + client.get("#{BASE}/drip?duration=2&numbytes=2&delay=1").chunks do |c| + chunks << c + GC.compact + end + assert chunks.size >= 1 + chunks.each { |c| assert c.bytesize > 0 } + end + + def test_aggressive_gc_between_chunks_does_not_corrupt_data + stream = ''.b + client.get("#{BASE}/bytes/8192").chunks do |c| + GC.start + stream << c + end + assert_equal 8192, stream.bytesize + end + + # =========================================================================== + # 9. close() integration + # =========================================================================== + + def test_close_after_full_stream_does_not_raise + resp = get('/stream/3') + resp.chunks { |_c| } + assert_silent { resp.close } + end + + def test_close_after_partial_stream_does_not_raise + resp = get('/stream/5') + count = 0 + begin + resp.chunks do |_c| + count += 1 + break if count == 2 + end + rescue StandardError + end + assert_silent { resp.close } + end + + # =========================================================================== + # 10. Client / module method variants + # =========================================================================== + + def test_chunks_via_module_method + chunks = [] + Wreq.get("#{BASE}/stream/3").chunks { |c| chunks << c } + assert_equal 3, chunks.size + end + + def test_chunks_via_client_instance + chunks = [] + client.get("#{BASE}/stream/3").chunks { |c| chunks << c } + assert_equal 3, chunks.size + end + + def test_chunks_on_post_response + chunks = [] + client.post("#{BASE}/post", body: 'hello').chunks { |c| chunks << c } + assert chunks.size >= 1 + combined = chunks.join + assert combined.bytesize > 0 + end +end