Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 20 additions & 5 deletions ext/zstdruby/streaming_compress.c
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ struct streaming_compress_t {
ZSTD_CCtx* ctx;
VALUE buf;
size_t buf_size;
VALUE pending; /* accumulate compressed bytes produced by write() */
};

static void
Expand All @@ -12,8 +13,10 @@ streaming_compress_mark(void *p)
struct streaming_compress_t *sc = p;
#ifdef HAVE_RB_GC_MARK_MOVABLE
rb_gc_mark_movable(sc->buf);
rb_gc_mark_movable(sc->pending);
#else
rb_gc_mark(sc->buf);
rb_gc_mark(sc->pending);
#endif
}

Expand All @@ -40,6 +43,7 @@ streaming_compress_compact(void *p)
{
struct streaming_compress_t *sc = p;
sc->buf = rb_gc_location(sc->buf);
sc->pending = rb_gc_location(sc->pending);
}
#endif

Expand All @@ -64,6 +68,7 @@ rb_streaming_compress_allocate(VALUE klass)
sc->ctx = NULL;
sc->buf = Qnil;
sc->buf_size = 0;
sc->pending = Qnil;
return obj;
}

Expand All @@ -87,6 +92,7 @@ rb_streaming_compress_initialize(int argc, VALUE *argv, VALUE obj)
sc->ctx = ctx;
sc->buf = rb_str_new(NULL, buffOutSize);
sc->buf_size = buffOutSize;
sc->pending = rb_str_new(0, 0);

return obj;
}
Expand Down Expand Up @@ -143,7 +149,6 @@ static VALUE
rb_streaming_compress_write(int argc, VALUE *argv, VALUE obj)
{
size_t total = 0;
VALUE result = rb_str_new(0, 0);
struct streaming_compress_t* sc;
TypedData_Get_Struct(obj, struct streaming_compress_t, &streaming_compress_type, sc);
const char* output_data = RSTRING_PTR(sc->buf);
Expand All @@ -161,6 +166,10 @@ rb_streaming_compress_write(int argc, VALUE *argv, VALUE obj)
if (ZSTD_isError(ret)) {
rb_raise(rb_eRuntimeError, "compress error error code: %s", ZSTD_getErrorName(ret));
}
/* collect produced bytes */
if (output.pos > 0) {
rb_str_cat(sc->pending, output.dst, output.pos);
}
total += RSTRING_LEN(str);
}
}
Expand Down Expand Up @@ -193,17 +202,23 @@ rb_streaming_compress_flush(VALUE obj)
{
struct streaming_compress_t* sc;
TypedData_Get_Struct(obj, struct streaming_compress_t, &streaming_compress_type, sc);
VALUE result = no_compress(sc, ZSTD_e_flush);
return result;
VALUE drained = no_compress(sc, ZSTD_e_flush);
rb_str_cat(sc->pending, RSTRING_PTR(drained), RSTRING_LEN(drained));
VALUE out = sc->pending;
sc->pending = rb_str_new(0, 0);
return out;
}

static VALUE
rb_streaming_compress_finish(VALUE obj)
{
struct streaming_compress_t* sc;
TypedData_Get_Struct(obj, struct streaming_compress_t, &streaming_compress_type, sc);
VALUE result = no_compress(sc, ZSTD_e_end);
return result;
VALUE drained = no_compress(sc, ZSTD_e_end);
rb_str_cat(sc->pending, RSTRING_PTR(drained), RSTRING_LEN(drained));
VALUE out = sc->pending;
sc->pending = rb_str_new(0, 0);
return out;
}

extern VALUE rb_mZstd, cStreamingCompress;
Expand Down
150 changes: 150 additions & 0 deletions spec/streaming_128kb_cliff_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
# spec/streaming_128kb_cliff_spec.rb
require "spec_helper"
require "stringio"
require "zstd-ruby"

RSpec.describe "Zstd streaming 128KB cliff" do
MAGIC = [0x28, 0xB5, 0x2F, 0xFD].freeze

def hex(str)
str.unpack1("H*")
end

def has_magic?(bin)
bytes = bin.bytes
bytes[0, 4] == MAGIC
end

shared_examples "round-trip streaming compress" do |size|
it "round-trips #{size} bytes and starts with zstd magic" do
# Produce data
src = "a" * size

sc = Zstd::StreamingCompress.new
sc << src
compressed = sc.finish

expect(has_magic?(compressed)).to be(true), "missing magic: #{hex(compressed)[0, 16]}"
expect(Zstd.decompress(compressed)).to eq(src)
end
end

context "exactly around 128 KiB" do
include_examples "round-trip streaming compress", 131_071
include_examples "round-trip streaming compress", 131_072 # the cliff
include_examples "round-trip streaming compress", 131_073
end

context "multiple writes crossing the boundary" do
it "64KiB + 64KiB (exact boundary) works" do
part = "x" * 65_536
sc = Zstd::StreamingCompress.new
sc << part
sc << part
compressed = sc.finish
expect(has_magic?(compressed)).to be(true)
expect(Zstd.decompress(compressed)).to eq(part * 2)
end

it "64KiB + 64KiB + 1 works" do
a = "a" * 65_536
b = "b" * 65_536
c = "c"
sc = Zstd::StreamingCompress.new
sc << a << b << c
compressed = sc.finish
expect(has_magic?(compressed)).to be(true)
expect(Zstd.decompress(compressed)).to eq(a + b + c)
end
end

context "flush/end draining" do
it "returns all produced bytes across flush and finish" do
sc = Zstd::StreamingCompress.new
sc << ("a" * 70_000)
out = sc.flush
expect(out).not_to be_empty
sc << ("b" * 70_000)
out << sc.finish
expect(has_magic?(out)).to be(true)
expect(Zstd.decompress(out)).to eq(("a" * 70_000) + ("b" * 70_000))
end
end

context "GC.compact interaction" do
it "survives compaction around the boundary" do
sc = Zstd::StreamingCompress.new
sc << ("a" * 80_000)
GC.compact if GC.respond_to?(:compact)
sc << ("b" * 60_000) # total now > 128KiB
compressed = sc.finish
expect(has_magic?(compressed)).to be(true)
expect(Zstd.decompress(compressed)).to eq(("a" * 80_000) + ("b" * 60_000))
end
end

context "larger payload sanity" do
it "round-trips ~1 MiB" do
src = "z" * 1_048_576
sc = Zstd::StreamingCompress.new(level: 3)
sc << src
compressed = sc.finish
expect(has_magic?(compressed)).to be(true)
expect(Zstd.decompress(compressed)).to eq(src)
end
end

describe Zstd::StreamWriter do
it "produces a valid frame and round-trips at exactly 128KiB" do
io = StringIO.new
sw = Zstd::StreamWriter.new(io)
sw.write("a" * 131_072)
sw.finish

io.rewind
bin = io.read
expect(has_magic?(bin)).to be(true), "missing magic: #{hex(bin)[0, 16]}"

io.rewind
sr = Zstd::StreamReader.new(io)
out = sr.read(2_000_000)
expect(out.size).to eq(131_072)
expect(out).to eq("a" * 131_072)
io.close
end

it "handles boundary-crossing writes with flush in between" do
io = StringIO.new
sw = Zstd::StreamWriter.new(io)
sw.write("a" * 70_000)
sw.write("b" * 70_000) # crosses 128KiB internally
sw.finish

io.rewind
bin = io.read
expect(has_magic?(bin)).to be(true)
io.rewind
sr = Zstd::StreamReader.new(io)
out = sr.read(1_000_000)
expect(out).to eq("a" * 70_000 + "b" * 70_000)
io.close
end

it "survives GC.compact mid-stream" do
io = StringIO.new
sw = Zstd::StreamWriter.new(io)
sw.write("x" * 90_000)
GC.compact if GC.respond_to?(:compact)
sw.write("y" * 50_000)
sw.finish

io.rewind
bin = io.read
expect(has_magic?(bin)).to be(true)
io.rewind
sr = Zstd::StreamReader.new(io)
out = sr.read(200_000)
expect(out).to eq("x" * 90_000 + "y" * 50_000)
end
end
end