diff --git a/src/paimon/format/avro/avro_file_batch_reader_test.cpp b/src/paimon/format/avro/avro_file_batch_reader_test.cpp index 10ddda00..285b8c27 100644 --- a/src/paimon/format/avro/avro_file_batch_reader_test.cpp +++ b/src/paimon/format/avro/avro_file_batch_reader_test.cpp @@ -39,7 +39,8 @@ namespace paimon::avro::test { class AvroFileBatchReaderTest : public ::testing::Test, public ::testing::WithParamInterface { public: void SetUp() override { - ASSERT_OK_AND_ASSIGN(file_format_, FileFormatFactory::Get("avro", {})); + ASSERT_OK_AND_ASSIGN(file_format_, + FileFormatFactory::Get("avro", {{Options::FILE_FORMAT, "avro"}})); fs_ = std::make_shared(); dir_ = ::paimon::test::UniqueTestDirectory::Create(); ASSERT_TRUE(dir_); diff --git a/src/paimon/format/avro/avro_file_format_test.cpp b/src/paimon/format/avro/avro_file_format_test.cpp index f51364f7..00f1a395 100644 --- a/src/paimon/format/avro/avro_file_format_test.cpp +++ b/src/paimon/format/avro/avro_file_format_test.cpp @@ -45,7 +45,8 @@ namespace paimon::avro::test { class AvroFileFormatTest : public testing::Test, public ::testing::WithParamInterface { public: void SetUp() override { - ASSERT_OK_AND_ASSIGN(file_format_, FileFormatFactory::Get("avro", {})); + ASSERT_OK_AND_ASSIGN(file_format_, + FileFormatFactory::Get("avro", {{Options::FILE_FORMAT, "avro"}})); fs_ = std::make_shared(); dir_ = ::paimon::test::UniqueTestDirectory::Create(); ASSERT_TRUE(dir_); diff --git a/src/paimon/format/avro/avro_format_defs.h b/src/paimon/format/avro/avro_format_defs.h new file mode 100644 index 00000000..f918e28c --- /dev/null +++ b/src/paimon/format/avro/avro_format_defs.h @@ -0,0 +1,25 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +namespace paimon::avro { + +// write +static inline const char AVRO_CODEC[] = "avro.codec"; + +} // namespace paimon::avro diff --git a/src/paimon/format/avro/avro_format_writer.cpp b/src/paimon/format/avro/avro_format_writer.cpp index f9c69b68..4ba23e64 100644 --- a/src/paimon/format/avro/avro_format_writer.cpp +++ b/src/paimon/format/avro/avro_format_writer.cpp @@ -53,13 +53,14 @@ AvroFormatWriter::AvroFormatWriter(std::unique_ptr<::avro::DataFileWriterBase>&& Result> AvroFormatWriter::Create( std::unique_ptr out, const std::shared_ptr& schema, - const ::avro::Codec codec) { + const ::avro::Codec codec, std::optional compression_level) { try { PAIMON_ASSIGN_OR_RAISE(::avro::ValidSchema avro_schema, AvroSchemaConverter::ArrowSchemaToAvroSchema(schema)); AvroOutputStreamImpl* avro_output_stream = out.get(); - auto writer = std::make_unique<::avro::DataFileWriterBase>(std::move(out), avro_schema, - DEFAULT_SYNC_INTERVAL, codec); + auto writer = std::make_unique<::avro::DataFileWriterBase>( + std::move(out), avro_schema, DEFAULT_SYNC_INTERVAL, codec, ::avro::Metadata(), + compression_level); auto data_type = arrow::struct_(schema->fields()); return std::unique_ptr( new AvroFormatWriter(std::move(writer), avro_schema, data_type, avro_output_stream)); diff --git a/src/paimon/format/avro/avro_format_writer.h b/src/paimon/format/avro/avro_format_writer.h index e4b49548..b1de8aea 100644 --- a/src/paimon/format/avro/avro_format_writer.h +++ b/src/paimon/format/avro/avro_format_writer.h @@ -19,6 +19,7 @@ #include #include #include +#include #include "arrow/api.h" #include "avro/DataFile.hh" @@ -49,7 +50,7 @@ class AvroFormatWriter : public FormatWriter { public: static Result> Create( std::unique_ptr out, const std::shared_ptr& schema, - const ::avro::Codec codec); + const ::avro::Codec codec, std::optional compression_level); Status AddBatch(ArrowArray* batch) override; diff --git a/src/paimon/format/avro/avro_format_writer_test.cpp b/src/paimon/format/avro/avro_format_writer_test.cpp index ec0b5651..67ea461f 100644 --- a/src/paimon/format/avro/avro_format_writer_test.cpp +++ b/src/paimon/format/avro/avro_format_writer_test.cpp @@ -68,7 +68,8 @@ class AvroFormatWriterTest : public ::testing::Test { int32_t batch_size) { ::ArrowSchema c_schema; EXPECT_TRUE(arrow::ExportSchema(*schema, &c_schema).ok()); - EXPECT_OK_AND_ASSIGN(auto file_format, FileFormatFactory::Get("avro", {})); + EXPECT_OK_AND_ASSIGN(auto file_format, + FileFormatFactory::Get("avro", {{Options::FILE_FORMAT, "avro"}})); EXPECT_OK_AND_ASSIGN(auto writer_builder, file_format->CreateWriterBuilder(&c_schema, batch_size)); EXPECT_OK_AND_ASSIGN(std::shared_ptr writer, diff --git a/src/paimon/format/avro/avro_writer_builder.h b/src/paimon/format/avro/avro_writer_builder.h index 61024831..808b288b 100644 --- a/src/paimon/format/avro/avro_writer_builder.h +++ b/src/paimon/format/avro/avro_writer_builder.h @@ -24,7 +24,10 @@ #include "avro/DataFile.hh" #include "avro/Stream.hh" +#include "paimon/common/utils/options_utils.h" #include "paimon/common/utils/string_utils.h" +#include "paimon/core/core_options.h" +#include "paimon/format/avro/avro_format_defs.h" #include "paimon/format/avro/avro_format_writer.h" #include "paimon/format/avro/avro_output_stream_impl.h" #include "paimon/format/writer_builder.h" @@ -56,9 +59,15 @@ class AvroWriterBuilder : public WriterBuilder { Result> Build(const std::shared_ptr& out, const std::string& compression) override { auto output_stream = std::make_unique(out, BUFFER_SIZE, pool_); + PAIMON_ASSIGN_OR_RAISE( + std::string file_compression, + OptionsUtils::GetValueFromMap(options_, AVRO_CODEC, compression)); PAIMON_ASSIGN_OR_RAISE(::avro::Codec codec, - ToAvroCompressionKind(StringUtils::ToLowerCase(compression))); - return AvroFormatWriter::Create(std::move(output_stream), schema_, codec); + ToAvroCompressionKind(StringUtils::ToLowerCase(file_compression))); + PAIMON_ASSIGN_OR_RAISE(std::optional compression_level, + GetAvroCompressionLevel(codec)); + return AvroFormatWriter::Create(std::move(output_stream), schema_, codec, + compression_level); } private: @@ -77,6 +86,14 @@ class AvroWriterBuilder : public WriterBuilder { return Status::Invalid("unknown compression " + file_compression); } } + Result> GetAvroCompressionLevel(const ::avro::Codec& codec) { + std::optional compression_level; + if (codec == ::avro::Codec::ZSTD_CODEC) { + PAIMON_ASSIGN_OR_RAISE(CoreOptions core_options, CoreOptions::FromMap(options_)); + compression_level = core_options.GetFileCompressionZstdLevel(); + } + return compression_level; + } std::shared_ptr pool_; std::shared_ptr schema_; diff --git a/src/paimon/format/avro/avro_writer_builder_test.cpp b/src/paimon/format/avro/avro_writer_builder_test.cpp index f6b20851..3bad2178 100644 --- a/src/paimon/format/avro/avro_writer_builder_test.cpp +++ b/src/paimon/format/avro/avro_writer_builder_test.cpp @@ -22,7 +22,7 @@ namespace paimon::avro::test { -TEST(ToAvroCompressionKindTest, HandlesValidCompressions) { +TEST(AvroWriterBuilderTest, HandlesValidCompressions) { ASSERT_OK_AND_ASSIGN(::avro::Codec zstd_codec, AvroWriterBuilder::ToAvroCompressionKind("zstd")); ASSERT_EQ(zstd_codec, ::avro::Codec::ZSTD_CODEC); @@ -44,11 +44,105 @@ TEST(ToAvroCompressionKindTest, HandlesValidCompressions) { ASSERT_EQ(deflate_codec, ::avro::Codec::DEFLATE_CODEC); } -TEST(ToAvroCompressionKindTest, HandlesInvalidCompression) { +TEST(AvroWriterBuilderTest, HandlesInvalidCompression) { ASSERT_NOK(AvroWriterBuilder::ToAvroCompressionKind("unknown_compression")); } -TEST(ToAvroCompressionKindTest, HandlesEmptyString) { +TEST(AvroWriterBuilderTest, HandlesEmptyString) { ASSERT_NOK(AvroWriterBuilder::ToAvroCompressionKind("")); } + +TEST(AvroWriterBuilderTest, CheckAvroCodec) { + arrow::FieldVector fields = {arrow::field("f0", arrow::int32())}; + auto schema = std::make_shared(fields); + { + AvroWriterBuilder builder(schema, -1, + {{Options::FILE_FORMAT, "avro"}, {"avro.codec", "snappy"}}); + ASSERT_OK_AND_ASSIGN(auto file_writer, builder.Build(nullptr, "zstd")); + auto* avro_file_writer = dynamic_cast(file_writer.get()); + ASSERT_EQ(avro_file_writer->writer_->codec_, ::avro::Codec::SNAPPY_CODEC); + ASSERT_EQ(avro_file_writer->writer_->compressionLevel_, std::nullopt); + } + { + AvroWriterBuilder builder(schema, -1, + {{Options::FILE_FORMAT, "avro"}, {"avro.codec", "deflate"}}); + ASSERT_OK_AND_ASSIGN(auto file_writer, builder.Build(nullptr, "zstd")); + auto* avro_file_writer = dynamic_cast(file_writer.get()); + ASSERT_EQ(avro_file_writer->writer_->codec_, ::avro::Codec::DEFLATE_CODEC); + ASSERT_EQ(avro_file_writer->writer_->compressionLevel_, std::nullopt); + } + { + AvroWriterBuilder builder(schema, -1, + {{Options::FILE_FORMAT, "avro"}, {"avro.codec", "zstd"}}); + ASSERT_OK_AND_ASSIGN(auto file_writer, builder.Build(nullptr, "zstd")); + auto* avro_file_writer = dynamic_cast(file_writer.get()); + ASSERT_EQ(avro_file_writer->writer_->codec_, ::avro::Codec::ZSTD_CODEC); + ASSERT_EQ(avro_file_writer->writer_->compressionLevel_, 1); + } + { + AvroWriterBuilder builder(schema, -1, + {{Options::FILE_FORMAT, "avro"}, + {"avro.codec", "zstd"}, + {Options::FILE_COMPRESSION_ZSTD_LEVEL, "3"}}); + ASSERT_OK_AND_ASSIGN(auto file_writer, builder.Build(nullptr, "zstd")); + auto* avro_file_writer = dynamic_cast(file_writer.get()); + ASSERT_EQ(avro_file_writer->writer_->codec_, ::avro::Codec::ZSTD_CODEC); + ASSERT_EQ(avro_file_writer->writer_->compressionLevel_, 3); + } + { + AvroWriterBuilder builder(schema, -1, + {{Options::FILE_FORMAT, "avro"}, + {"avro.codec", "null"}, + {Options::FILE_COMPRESSION_ZSTD_LEVEL, "3"}}); + ASSERT_OK_AND_ASSIGN(auto file_writer, builder.Build(nullptr, "zstd")); + auto* avro_file_writer = dynamic_cast(file_writer.get()); + ASSERT_EQ(avro_file_writer->writer_->codec_, ::avro::Codec::NULL_CODEC); + ASSERT_EQ(avro_file_writer->writer_->compressionLevel_, std::nullopt); + } + { + AvroWriterBuilder builder(schema, -1, + {{Options::FILE_FORMAT, "avro"}, + {"avro.codec", "test"}, + {Options::FILE_COMPRESSION_ZSTD_LEVEL, "3"}}); + ASSERT_NOK(builder.Build(nullptr, "zstd")); + } +} + +TEST(AvroWriterBuilderTest, CheckAvroCompressionLevel) { + { + AvroWriterBuilder builder(nullptr, -1, {{Options::FILE_FORMAT, "avro"}}); + ASSERT_OK_AND_ASSIGN(std::optional zstd_level, + builder.GetAvroCompressionLevel(::avro::Codec::ZSTD_CODEC)); + ASSERT_TRUE(zstd_level.has_value()); + ASSERT_EQ(zstd_level.value(), 1); + } + { + AvroWriterBuilder builder(nullptr, -1, {{Options::FILE_FORMAT, "avro"}}); + ASSERT_OK_AND_ASSIGN(std::optional compression_level, + builder.GetAvroCompressionLevel(::avro::Codec::SNAPPY_CODEC)); + ASSERT_FALSE(compression_level.has_value()); + } + { + AvroWriterBuilder builder(nullptr, -1, {{Options::FILE_FORMAT, "avro"}}); + ASSERT_OK_AND_ASSIGN(std::optional compression_level, + builder.GetAvroCompressionLevel(::avro::Codec::DEFLATE_CODEC)); + ASSERT_FALSE(compression_level.has_value()); + } + { + AvroWriterBuilder builder(nullptr, -1, {{Options::FILE_FORMAT, "avro"}}); + ASSERT_OK_AND_ASSIGN(std::optional compression_level, + builder.GetAvroCompressionLevel(::avro::Codec::NULL_CODEC)); + ASSERT_FALSE(compression_level.has_value()); + } + { + AvroWriterBuilder builder( + nullptr, -1, + {{Options::FILE_FORMAT, "avro"}, {Options::FILE_COMPRESSION_ZSTD_LEVEL, "3"}}); + ASSERT_OK_AND_ASSIGN(std::optional zstd_level, + builder.GetAvroCompressionLevel(::avro::Codec::ZSTD_CODEC)); + ASSERT_TRUE(zstd_level.has_value()); + ASSERT_EQ(zstd_level.value(), 3); + } +} + } // namespace paimon::avro::test diff --git a/third_party/versions.txt b/third_party/versions.txt index 21e67dee..187217aa 100644 --- a/third_party/versions.txt +++ b/third_party/versions.txt @@ -55,8 +55,8 @@ PAIMON_GTEST_PKG_NAME=gtest-${PAIMON_GTEST_BUILD_VERSION}.tar.gz PAIMON_ARROW_BUILD_VERSION=17.0.0 PAIMON_ARROW_BUILD_SHA256_CHECKSUM=9d280d8042e7cf526f8c28d170d93bfab65e50f94569f6a790982a878d8d898d PAIMON_ARROW_PKG_NAME=apache-arrow-${PAIMON_ARROW_BUILD_VERSION}.tar.gz -PAIMON_AVRO_BUILD_VERSION=54b332161524086dcb6cde8afe097097eed7f3ee -PAIMON_AVRO_BUILD_SHA256_CHECKSUM=00febd590b1e328d3a97b67a6d29a1d0243e0e41bb2b1582ec580d37698d1fe2 +PAIMON_AVRO_BUILD_VERSION=c499eefb48aa2db906c7bca14a047223806f36db +PAIMON_AVRO_BUILD_SHA256_CHECKSUM=9771f1dcfe3c01aff7ff670e873e66d3406362f71941821d482de65f3d32d780 PAIMON_AVRO_PKG_NAME=avro-${PAIMON_AVRO_BUILD_VERSION}.tar.gz PAIMON_FMT_BUILD_VERSION=11.2.0 PAIMON_FMT_BUILD_SHA256_CHECKSUM=bc23066d87ab3168f27cef3e97d545fa63314f5c79df5ea444d41d56f962c6af