out_s3: add format=parquet with selectable page compression codecs#11885
out_s3: add format=parquet with selectable page compression codecs#11885rituparnakhaund wants to merge 3 commits into
Conversation
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (7)
💤 Files with no reviewable changes (1)
🚧 Files skipped from review as they are similar to previous changes (5)
📝 WalkthroughWalkthroughAdds runtime-selectable Parquet page-level compression codecs, updates the Parquet conversion API to accept a codec, maps/configures codecs in the S3 plugin and upload flow, adjusts payload cleanup, and adds unit tests for Parquet outputs. ChangesS3 Parquet Page-Level Compression
Sequence DiagramsequenceDiagram
participant Config as S3 Config
participant S3Upload as S3 Upload Path
participant ParquetLib as Parquet Conversion (out_s3_compress_parquet)
participant ArrowWriter as Arrow/GParquet
Config->>Config: parse format + compression (resolve parquet codec)
S3Upload->>ParquetLib: out_s3_compress_parquet(json, size, &buf, &sz, codec)
ParquetLib->>ArrowWriter: create writer with GParquetWriterProperties(codec)
ArrowWriter-->>ParquetLib: parquet buffer
ParquetLib-->>S3Upload: parquet buffer returned
S3Upload->>S3Upload: upload buffer to S3
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
6e4fd79 to
000f6af
Compare
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 000f6af6cc
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
There was a problem hiding this comment.
Actionable comments posted: 4
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@plugins/out_s3/s3.c`:
- Around line 740-761: The Parquet path currently accepts a user-specified
log_key which later breaks Parquet conversion; in the FLB_S3_FORMAT_PARQUET
branch (the block that calls enable_parquet_format(ctx)) add a check after
enable_parquet_format succeeds: if ctx->log_key != NULL then call
flb_plg_error(ctx->ins, "'log_key' is not supported when format is parquet") and
return -1; use the existing symbols FLB_S3_FORMAT_PARQUET,
enable_parquet_format, ctx->log_key and ctx->ins to locate and implement the
check.
- Around line 1316-1331: The Parquet branch (out_s3_compress_parquet) allocates
payload_buf but later cleanup only frees when ctx->compression !=
FLB_AWS_COMPRESS_NONE, leaking payload_buf whenever ctx->compression was cleared
for Parquet; ensure payload_buf is independently freed whenever it was
allocated: after successful upload in the put_object path and in every
early-return/error path (including the FLB_RETRY path where s3_store_file_unlock
and chunk->failures are adjusted). Add a conditional free of payload_buf (e.g.,
flb_free(payload_buf) guarded by whether out_s3_compress_parquet set it) in the
out_s3_compress_parquet success branch and all subsequent exit points regardless
of ctx->compression.
- Around line 114-125: map_to_parquet_codec currently maps unknown compression
types to FLB_PARQUET_COMPRESSION_NONE which silently downgrades user config;
change map_to_parquet_codec to return a sentinel (e.g., -1) for unsupported
codecs and in the caller (where ctx->parquet_compression is set) check for -1,
call flb_plg_error(ctx->ins, "unsupported compression '%s' for format=parquet",
tmp) and return -1 to fail init, and only set ctx->compression =
FLB_AWS_COMPRESS_NONE after a successful mapping; reference map_to_parquet_codec
and ctx->parquet_compression for locating changes.
In `@tests/internal/aws_compress.c`:
- Around line 347-353: The test currently calls out_s3_compress_parquet and
always performs memcmp/out_size checks even if ret != 0, risking dereferencing
invalid out_buf or using out_size after a failed conversion; update the test
around the out_s3_compress_parquet call (and the analogous blocks testing other
compressions) to guard subsequent assertions by early-failing or skipping those
checks when ret != 0 (e.g., assert/require the conversion succeeded before using
out_buf/out_size, or return/continue the test case on failure), and ensure
out_buf is only dereferenced and flb_free'd when it was actually populated by a
successful call.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 4502cf5b-2e0a-4cf0-86d5-02fb93b9e780
📒 Files selected for processing (7)
include/fluent-bit/aws/flb_aws_compress.hplugins/out_s3/s3.cplugins/out_s3/s3.hsrc/aws/compression/arrow/compress.csrc/aws/compression/arrow/compress.hsrc/aws/flb_aws_compress.ctests/internal/aws_compress.c
💤 Files with no reviewable changes (1)
- src/aws/flb_aws_compress.c
facaa57 to
8abd42f
Compare
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@plugins/out_s3/s3.c`:
- Around line 714-715: The default Parquet codec is being set to
FLB_PARQUET_COMPRESSION_NONE (ctx->parquet_compression) which causes plain
format=parquet to produce uncompressed output; change the default initialization
to FLB_PARQUET_COMPRESSION_SNAPPY so new default codec is Snappy, and preserve
backward compatibility by keeping the deprecated compression=parquet code path
(the branch that explicitly handles compression="parquet") to force
ctx->parquet_compression = FLB_PARQUET_COMPRESSION_NONE when that deprecated
option is used; update both the initializations around where ctx->s3_format is
set and the identical block later (the region referenced around lines 879-886)
so both locations use SNAPPY as the default and still override to NONE for the
deprecated path.
- Around line 879-886: The deprecated branch that treats "compression=parquet"
as an alias still enables Parquet without rejecting the log_key option; update
the block handling strcasecmp(tmp, "parquet") so it performs the same validation
as the new format guard: call the same check that rejects/validates log_key (the
logic used by enable_parquet_format or the code that inspects log_key) before
enabling Parquet, and if log_key is present return -1 (after emitting the
existing flb_plg_warn about deprecation) so the deprecated path cannot bypass
the log_key rejection; ensure you reference the same ctx/ins structures and
preserve the warning text via flb_plg_warn and the call to
enable_parquet_format(ctx).
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 4b6ca01a-c367-4245-acb0-ff825b3e9049
📒 Files selected for processing (6)
e2e-tests.txtfluent-bit.imlplugins/out_s3/s3.cplugins/out_s3/s3.hsrc/aws/compression/arrow/compress.ctests/internal/aws_compress.c
✅ Files skipped from review due to trivial changes (1)
- fluent-bit.iml
🚧 Files skipped from review as they are similar to previous changes (3)
- plugins/out_s3/s3.h
- src/aws/compression/arrow/compress.c
- tests/internal/aws_compress.c
8abd42f to
daf983a
Compare
The Parquet writer passed NULL for GParquetWriterProperties, causing all output files to have UNCOMPRESSED pages. This adds a parquet_compression parameter that configures the page-level codec (snappy, gzip, zstd, or none) via the GLib Parquet writer properties API. The parquet entry is removed from the compression dispatch table since the new function signature includes the codec parameter and no longer matches the generic compress function pointer type. The S3 plugin now calls out_s3_compress_parquet directly. Signed-off-by: Rituparna Khaund <ritukhau@amazon.co.uk>
The compression config option conflated byte-level compression (gzip, zstd, snappy) with format conversion (parquet), making it impossible to produce Parquet files with page-level compression. This adds 'format parquet' as a new option. When format is parquet, the compression option (snappy, zstd, gzip) controls the page-level codec inside the Parquet file. Default is uncompressed to preserve existing behavior. The old 'compression=parquet' syntax is preserved as a deprecated alias that emits a warning and maps to format=parquet with no page-level compression (identical to current behavior). Arrow support is untouched and continues to work via 'compression=arrow' as before. Signed-off-by: Rituparna Khaund <ritukhau@amazon.co.uk>
Add unit tests for out_s3_compress_parquet covering snappy, zstd, gzip and uncompressed page-level codecs, verifying the PAR1 magic markers and that compression reduces the output size for repetitive input. Signed-off-by: Rituparna Khaund <ritukhau@amazon.co.uk>
daf983a to
d26fe19
Compare
The S3 output plugin currently overloads the compression option to handle both byte-level compression (gzip, zstd, snappy) and format conversion (parquet). This makes it impossible
for users to produce Parquet files with page-level compression, because compression can only hold one value.
When compression=parquet is set, the code path ultimately calls gparquet_arrow_file_writer_new_arrow with NULL for writer properties. This causes the Arrow GLib binding to use parquet::default_writer_properties(), which sets the page-level codec to Compression::UNCOMPRESSED.
Every column page ends up uncompressed despite the key being "compression" ?
This patch separates concerns:
externally.
Backwards compatibility is preserved: compression=parquet still works but emits a deprecation warning and maps to format=parquet with no page-level compression (identical to current
behavior).
Addresses feedback from #10691 requesting codec and page size configurability.
Enter
[N/A]in the box, if an item is not applicable to your change.Testing
Before we can approve your change; please submit the following in a comment:
Verified with parquet-tools inspect:
Before (current master):
compression: UNCOMPRESSED (space_saved: 0%)
After this patch:
compression: SNAPPY
[X] Debug log output from testing the change :
Testing strategy : Ran
build/bin/fluent-bitlocally on my mac to write to my s3 bucket with different configs.Here are the e2e-tests.txt
Individually ran this for each of the tests and no corruption found. The 17 tests are taking too long, so will post here as soon as I see an update.
If this is a change to packaging of containers or native binaries then please confirm it works for all targets.
ok-package-testlabel to test for all targets (requires maintainer to do).Documentation
fluent/fluent-bit-docs#2591
Backporting
Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.
Summary by CodeRabbit
New Features
Bug Fixes
Chores
compression=parquetdeprecated with a warning; config help updated.Tests