From 0e6f97f9f7f517515232f5eed03840605a53d3fe Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Fri, 26 Sep 2025 19:07:46 +0900 Subject: [PATCH 1/2] Add pending mechanism to handle 128kb or more data for zstd frames Signed-off-by: Hiroshi Hatake --- ext/zstdruby/streaming_compress.c | 25 ++++++++++++++++++++----- 1 file changed, 20 insertions(+), 5 deletions(-) diff --git a/ext/zstdruby/streaming_compress.c b/ext/zstdruby/streaming_compress.c index 8614d47..8f491eb 100644 --- a/ext/zstdruby/streaming_compress.c +++ b/ext/zstdruby/streaming_compress.c @@ -4,6 +4,7 @@ struct streaming_compress_t { ZSTD_CCtx* ctx; VALUE buf; size_t buf_size; + VALUE pending; /* accumulate compressed bytes produced by write() */ }; static void @@ -12,8 +13,10 @@ streaming_compress_mark(void *p) struct streaming_compress_t *sc = p; #ifdef HAVE_RB_GC_MARK_MOVABLE rb_gc_mark_movable(sc->buf); + rb_gc_mark_movable(sc->pending); #else rb_gc_mark(sc->buf); + rb_gc_mark(sc->pending); #endif } @@ -40,6 +43,7 @@ streaming_compress_compact(void *p) { struct streaming_compress_t *sc = p; sc->buf = rb_gc_location(sc->buf); + sc->pending = rb_gc_location(sc->pending); } #endif @@ -64,6 +68,7 @@ rb_streaming_compress_allocate(VALUE klass) sc->ctx = NULL; sc->buf = Qnil; sc->buf_size = 0; + sc->pending = Qnil; return obj; } @@ -86,6 +91,7 @@ rb_streaming_compress_initialize(int argc, VALUE *argv, VALUE obj) sc->ctx = ctx; sc->buf = rb_str_new(NULL, buffOutSize); sc->buf_size = buffOutSize; + sc->pending = rb_str_new(0, 0); return obj; } @@ -142,7 +148,6 @@ static VALUE rb_streaming_compress_write(int argc, VALUE *argv, VALUE obj) { size_t total = 0; - VALUE result = rb_str_new(0, 0); struct streaming_compress_t* sc; TypedData_Get_Struct(obj, struct streaming_compress_t, &streaming_compress_type, sc); const char* output_data = RSTRING_PTR(sc->buf); @@ -160,6 +165,10 @@ rb_streaming_compress_write(int argc, VALUE *argv, VALUE obj) if (ZSTD_isError(ret)) { rb_raise(rb_eRuntimeError, "compress error error code: %s", ZSTD_getErrorName(ret)); } + /* collect produced bytes */ + if (output.pos > 0) { + rb_str_cat(sc->pending, output.dst, output.pos); + } total += RSTRING_LEN(str); } } @@ -192,8 +201,11 @@ rb_streaming_compress_flush(VALUE obj) { struct streaming_compress_t* sc; TypedData_Get_Struct(obj, struct streaming_compress_t, &streaming_compress_type, sc); - VALUE result = no_compress(sc, ZSTD_e_flush); - return result; + VALUE drained = no_compress(sc, ZSTD_e_flush); + rb_str_cat(sc->pending, RSTRING_PTR(drained), RSTRING_LEN(drained)); + VALUE out = sc->pending; + sc->pending = rb_str_new(0, 0); + return out; } static VALUE @@ -201,8 +213,11 @@ rb_streaming_compress_finish(VALUE obj) { struct streaming_compress_t* sc; TypedData_Get_Struct(obj, struct streaming_compress_t, &streaming_compress_type, sc); - VALUE result = no_compress(sc, ZSTD_e_end); - return result; + VALUE drained = no_compress(sc, ZSTD_e_end); + rb_str_cat(sc->pending, RSTRING_PTR(drained), RSTRING_LEN(drained)); + VALUE out = sc->pending; + sc->pending = rb_str_new(0, 0); + return out; } extern VALUE rb_mZstd, cStreamingCompress; From ddcbf26f3dd43bf6aebd0866cd0696bcd444a5ac Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Fri, 26 Sep 2025 19:11:39 +0900 Subject: [PATCH 2/2] Add test cases for exceeding 128kb cliff Signed-off-by: Hiroshi Hatake --- spec/streaming_128kb_cliff_spec.rb | 150 +++++++++++++++++++++++++++++ 1 file changed, 150 insertions(+) create mode 100644 spec/streaming_128kb_cliff_spec.rb diff --git a/spec/streaming_128kb_cliff_spec.rb b/spec/streaming_128kb_cliff_spec.rb new file mode 100644 index 0000000..085af3c --- /dev/null +++ b/spec/streaming_128kb_cliff_spec.rb @@ -0,0 +1,150 @@ +# spec/streaming_128kb_cliff_spec.rb +require "spec_helper" +require "stringio" +require "zstd-ruby" + +RSpec.describe "Zstd streaming 128KB cliff" do + MAGIC = [0x28, 0xB5, 0x2F, 0xFD].freeze + + def hex(str) + str.unpack1("H*") + end + + def has_magic?(bin) + bytes = bin.bytes + bytes[0, 4] == MAGIC + end + + shared_examples "round-trip streaming compress" do |size| + it "round-trips #{size} bytes and starts with zstd magic" do + # Produce data + src = "a" * size + + sc = Zstd::StreamingCompress.new + sc << src + compressed = sc.finish + + expect(has_magic?(compressed)).to be(true), "missing magic: #{hex(compressed)[0, 16]}" + expect(Zstd.decompress(compressed)).to eq(src) + end + end + + context "exactly around 128 KiB" do + include_examples "round-trip streaming compress", 131_071 + include_examples "round-trip streaming compress", 131_072 # the cliff + include_examples "round-trip streaming compress", 131_073 + end + + context "multiple writes crossing the boundary" do + it "64KiB + 64KiB (exact boundary) works" do + part = "x" * 65_536 + sc = Zstd::StreamingCompress.new + sc << part + sc << part + compressed = sc.finish + expect(has_magic?(compressed)).to be(true) + expect(Zstd.decompress(compressed)).to eq(part * 2) + end + + it "64KiB + 64KiB + 1 works" do + a = "a" * 65_536 + b = "b" * 65_536 + c = "c" + sc = Zstd::StreamingCompress.new + sc << a << b << c + compressed = sc.finish + expect(has_magic?(compressed)).to be(true) + expect(Zstd.decompress(compressed)).to eq(a + b + c) + end + end + + context "flush/end draining" do + it "returns all produced bytes across flush and finish" do + sc = Zstd::StreamingCompress.new + sc << ("a" * 70_000) + out = sc.flush + expect(out).not_to be_empty + sc << ("b" * 70_000) + out << sc.finish + expect(has_magic?(out)).to be(true) + expect(Zstd.decompress(out)).to eq(("a" * 70_000) + ("b" * 70_000)) + end + end + + context "GC.compact interaction" do + it "survives compaction around the boundary" do + sc = Zstd::StreamingCompress.new + sc << ("a" * 80_000) + GC.compact if GC.respond_to?(:compact) + sc << ("b" * 60_000) # total now > 128KiB + compressed = sc.finish + expect(has_magic?(compressed)).to be(true) + expect(Zstd.decompress(compressed)).to eq(("a" * 80_000) + ("b" * 60_000)) + end + end + + context "larger payload sanity" do + it "round-trips ~1 MiB" do + src = "z" * 1_048_576 + sc = Zstd::StreamingCompress.new(level: 3) + sc << src + compressed = sc.finish + expect(has_magic?(compressed)).to be(true) + expect(Zstd.decompress(compressed)).to eq(src) + end + end + + describe Zstd::StreamWriter do + it "produces a valid frame and round-trips at exactly 128KiB" do + io = StringIO.new + sw = Zstd::StreamWriter.new(io) + sw.write("a" * 131_072) + sw.finish + + io.rewind + bin = io.read + expect(has_magic?(bin)).to be(true), "missing magic: #{hex(bin)[0, 16]}" + + io.rewind + sr = Zstd::StreamReader.new(io) + out = sr.read(2_000_000) + expect(out.size).to eq(131_072) + expect(out).to eq("a" * 131_072) + io.close + end + + it "handles boundary-crossing writes with flush in between" do + io = StringIO.new + sw = Zstd::StreamWriter.new(io) + sw.write("a" * 70_000) + sw.write("b" * 70_000) # crosses 128KiB internally + sw.finish + + io.rewind + bin = io.read + expect(has_magic?(bin)).to be(true) + io.rewind + sr = Zstd::StreamReader.new(io) + out = sr.read(1_000_000) + expect(out).to eq("a" * 70_000 + "b" * 70_000) + io.close + end + + it "survives GC.compact mid-stream" do + io = StringIO.new + sw = Zstd::StreamWriter.new(io) + sw.write("x" * 90_000) + GC.compact if GC.respond_to?(:compact) + sw.write("y" * 50_000) + sw.finish + + io.rewind + bin = io.read + expect(has_magic?(bin)).to be(true) + io.rewind + sr = Zstd::StreamReader.new(io) + out = sr.read(200_000) + expect(out).to eq("x" * 90_000 + "y" * 50_000) + end + end +end