From f25d76eed78c10ef98ad64dcb9144232bad5e29c Mon Sep 17 00:00:00 2001 From: Spring_MT Date: Sat, 27 Sep 2025 00:28:44 +0900 Subject: [PATCH] Merge pull request #114 from cosmo0920/add-pending-handler-to-concat-128kb-or-more-data Add pending handler to concat 128kb or more data --- ext/zstdruby/streaming_compress.c | 25 ++++- spec/streaming_128kb_cliff_spec.rb | 150 +++++++++++++++++++++++++++++ 2 files changed, 170 insertions(+), 5 deletions(-) create mode 100644 spec/streaming_128kb_cliff_spec.rb diff --git a/ext/zstdruby/streaming_compress.c b/ext/zstdruby/streaming_compress.c index af01a3a..90c3a51 100644 --- a/ext/zstdruby/streaming_compress.c +++ b/ext/zstdruby/streaming_compress.c @@ -4,6 +4,7 @@ struct streaming_compress_t { ZSTD_CCtx* ctx; VALUE buf; size_t buf_size; + VALUE pending; /* accumulate compressed bytes produced by write() */ }; static void @@ -12,8 +13,10 @@ streaming_compress_mark(void *p) struct streaming_compress_t *sc = p; #ifdef HAVE_RB_GC_MARK_MOVABLE rb_gc_mark_movable(sc->buf); + rb_gc_mark_movable(sc->pending); #else rb_gc_mark(sc->buf); + rb_gc_mark(sc->pending); #endif } @@ -40,6 +43,7 @@ streaming_compress_compact(void *p) { struct streaming_compress_t *sc = p; sc->buf = rb_gc_location(sc->buf); + sc->pending = rb_gc_location(sc->pending); } #endif @@ -64,6 +68,7 @@ rb_streaming_compress_allocate(VALUE klass) sc->ctx = NULL; sc->buf = Qnil; sc->buf_size = 0; + sc->pending = Qnil; return obj; } @@ -87,6 +92,7 @@ rb_streaming_compress_initialize(int argc, VALUE *argv, VALUE obj) sc->ctx = ctx; sc->buf = rb_str_new(NULL, buffOutSize); sc->buf_size = buffOutSize; + sc->pending = rb_str_new(0, 0); return obj; } @@ -143,7 +149,6 @@ static VALUE rb_streaming_compress_write(int argc, VALUE *argv, VALUE obj) { size_t total = 0; - VALUE result = rb_str_new(0, 0); struct streaming_compress_t* sc; TypedData_Get_Struct(obj, struct streaming_compress_t, &streaming_compress_type, sc); const char* output_data = RSTRING_PTR(sc->buf); @@ -161,6 +166,10 @@ rb_streaming_compress_write(int argc, VALUE *argv, VALUE obj) if (ZSTD_isError(ret)) { rb_raise(rb_eRuntimeError, "compress error error code: %s", ZSTD_getErrorName(ret)); } + /* collect produced bytes */ + if (output.pos > 0) { + rb_str_cat(sc->pending, output.dst, output.pos); + } total += RSTRING_LEN(str); } } @@ -193,8 +202,11 @@ rb_streaming_compress_flush(VALUE obj) { struct streaming_compress_t* sc; TypedData_Get_Struct(obj, struct streaming_compress_t, &streaming_compress_type, sc); - VALUE result = no_compress(sc, ZSTD_e_flush); - return result; + VALUE drained = no_compress(sc, ZSTD_e_flush); + rb_str_cat(sc->pending, RSTRING_PTR(drained), RSTRING_LEN(drained)); + VALUE out = sc->pending; + sc->pending = rb_str_new(0, 0); + return out; } static VALUE @@ -202,8 +214,11 @@ rb_streaming_compress_finish(VALUE obj) { struct streaming_compress_t* sc; TypedData_Get_Struct(obj, struct streaming_compress_t, &streaming_compress_type, sc); - VALUE result = no_compress(sc, ZSTD_e_end); - return result; + VALUE drained = no_compress(sc, ZSTD_e_end); + rb_str_cat(sc->pending, RSTRING_PTR(drained), RSTRING_LEN(drained)); + VALUE out = sc->pending; + sc->pending = rb_str_new(0, 0); + return out; } extern VALUE rb_mZstd, cStreamingCompress; diff --git a/spec/streaming_128kb_cliff_spec.rb b/spec/streaming_128kb_cliff_spec.rb new file mode 100644 index 0000000..085af3c --- /dev/null +++ b/spec/streaming_128kb_cliff_spec.rb @@ -0,0 +1,150 @@ +# spec/streaming_128kb_cliff_spec.rb +require "spec_helper" +require "stringio" +require "zstd-ruby" + +RSpec.describe "Zstd streaming 128KB cliff" do + MAGIC = [0x28, 0xB5, 0x2F, 0xFD].freeze + + def hex(str) + str.unpack1("H*") + end + + def has_magic?(bin) + bytes = bin.bytes + bytes[0, 4] == MAGIC + end + + shared_examples "round-trip streaming compress" do |size| + it "round-trips #{size} bytes and starts with zstd magic" do + # Produce data + src = "a" * size + + sc = Zstd::StreamingCompress.new + sc << src + compressed = sc.finish + + expect(has_magic?(compressed)).to be(true), "missing magic: #{hex(compressed)[0, 16]}" + expect(Zstd.decompress(compressed)).to eq(src) + end + end + + context "exactly around 128 KiB" do + include_examples "round-trip streaming compress", 131_071 + include_examples "round-trip streaming compress", 131_072 # the cliff + include_examples "round-trip streaming compress", 131_073 + end + + context "multiple writes crossing the boundary" do + it "64KiB + 64KiB (exact boundary) works" do + part = "x" * 65_536 + sc = Zstd::StreamingCompress.new + sc << part + sc << part + compressed = sc.finish + expect(has_magic?(compressed)).to be(true) + expect(Zstd.decompress(compressed)).to eq(part * 2) + end + + it "64KiB + 64KiB + 1 works" do + a = "a" * 65_536 + b = "b" * 65_536 + c = "c" + sc = Zstd::StreamingCompress.new + sc << a << b << c + compressed = sc.finish + expect(has_magic?(compressed)).to be(true) + expect(Zstd.decompress(compressed)).to eq(a + b + c) + end + end + + context "flush/end draining" do + it "returns all produced bytes across flush and finish" do + sc = Zstd::StreamingCompress.new + sc << ("a" * 70_000) + out = sc.flush + expect(out).not_to be_empty + sc << ("b" * 70_000) + out << sc.finish + expect(has_magic?(out)).to be(true) + expect(Zstd.decompress(out)).to eq(("a" * 70_000) + ("b" * 70_000)) + end + end + + context "GC.compact interaction" do + it "survives compaction around the boundary" do + sc = Zstd::StreamingCompress.new + sc << ("a" * 80_000) + GC.compact if GC.respond_to?(:compact) + sc << ("b" * 60_000) # total now > 128KiB + compressed = sc.finish + expect(has_magic?(compressed)).to be(true) + expect(Zstd.decompress(compressed)).to eq(("a" * 80_000) + ("b" * 60_000)) + end + end + + context "larger payload sanity" do + it "round-trips ~1 MiB" do + src = "z" * 1_048_576 + sc = Zstd::StreamingCompress.new(level: 3) + sc << src + compressed = sc.finish + expect(has_magic?(compressed)).to be(true) + expect(Zstd.decompress(compressed)).to eq(src) + end + end + + describe Zstd::StreamWriter do + it "produces a valid frame and round-trips at exactly 128KiB" do + io = StringIO.new + sw = Zstd::StreamWriter.new(io) + sw.write("a" * 131_072) + sw.finish + + io.rewind + bin = io.read + expect(has_magic?(bin)).to be(true), "missing magic: #{hex(bin)[0, 16]}" + + io.rewind + sr = Zstd::StreamReader.new(io) + out = sr.read(2_000_000) + expect(out.size).to eq(131_072) + expect(out).to eq("a" * 131_072) + io.close + end + + it "handles boundary-crossing writes with flush in between" do + io = StringIO.new + sw = Zstd::StreamWriter.new(io) + sw.write("a" * 70_000) + sw.write("b" * 70_000) # crosses 128KiB internally + sw.finish + + io.rewind + bin = io.read + expect(has_magic?(bin)).to be(true) + io.rewind + sr = Zstd::StreamReader.new(io) + out = sr.read(1_000_000) + expect(out).to eq("a" * 70_000 + "b" * 70_000) + io.close + end + + it "survives GC.compact mid-stream" do + io = StringIO.new + sw = Zstd::StreamWriter.new(io) + sw.write("x" * 90_000) + GC.compact if GC.respond_to?(:compact) + sw.write("y" * 50_000) + sw.finish + + io.rewind + bin = io.read + expect(has_magic?(bin)).to be(true) + io.rewind + sr = Zstd::StreamReader.new(io) + out = sr.read(200_000) + expect(out).to eq("x" * 90_000 + "y" * 50_000) + end + end +end