From cc067160eed7dabc210b7d84e673874dbbb81869 Mon Sep 17 00:00:00 2001 From: Gang Wu Date: Wed, 12 Nov 2025 11:57:15 +0800 Subject: [PATCH] feat: make avro and parquet reader writer more configurable - Added WriterProperties and ReaderProperties with predefined keys - Supported writing key-value metadata to file - Allowed manifest writer to customize avro schema name --- src/iceberg/avro/avro_reader.cc | 2 +- src/iceberg/avro/avro_writer.cc | 17 +++++-- src/iceberg/file_reader.cc | 11 +++++ src/iceberg/file_reader.h | 26 +++++++--- src/iceberg/file_writer.cc | 11 +++++ src/iceberg/file_writer.h | 31 +++++++++++- src/iceberg/manifest_reader.h | 10 ++++ src/iceberg/manifest_reader_internal.cc | 10 ++++ src/iceberg/manifest_reader_internal.h | 4 ++ src/iceberg/manifest_writer.cc | 63 +++++++++++++++---------- src/iceberg/parquet/parquet_reader.cc | 3 +- src/iceberg/parquet/parquet_writer.cc | 3 +- src/iceberg/test/avro_test.cc | 28 +++++++++-- src/iceberg/test/parquet_test.cc | 56 +++++++++++++++------- src/iceberg/type_fwd.h | 2 + 15 files changed, 213 insertions(+), 64 deletions(-) diff --git a/src/iceberg/avro/avro_reader.cc b/src/iceberg/avro/avro_reader.cc index 80087c8d5..932dd0f18 100644 --- a/src/iceberg/avro/avro_reader.cc +++ b/src/iceberg/avro/avro_reader.cc @@ -82,7 +82,7 @@ class AvroReader::Impl { return InvalidArgument("Projected schema is required by Avro reader"); } - batch_size_ = options.batch_size; + batch_size_ = options.properties->Get(ReaderProperties::kBatchSize); read_schema_ = options.projection; // Open the input stream and adapt to the avro interface. diff --git a/src/iceberg/avro/avro_writer.cc b/src/iceberg/avro/avro_writer.cc index 4e86a6846..9c8d58d78 100644 --- a/src/iceberg/avro/avro_writer.cc +++ b/src/iceberg/avro/avro_writer.cc @@ -66,23 +66,30 @@ class AvroWriter::Impl { ::avro::NodePtr root; ICEBERG_RETURN_UNEXPECTED(ToAvroNodeVisitor{}.Visit(*write_schema_, &root)); + if (const auto& schema_name = + options.properties->Get(WriterProperties::kAvroSchemaName); + !schema_name.empty()) { + root->setName(::avro::Name(schema_name)); + } avro_schema_ = std::make_shared<::avro::ValidSchema>(root); // Open the output stream and adapt to the avro interface. - constexpr int64_t kDefaultBufferSize = 1024 * 1024; - ICEBERG_ASSIGN_OR_RAISE(auto output_stream, - CreateOutputStream(options, kDefaultBufferSize)); + ICEBERG_ASSIGN_OR_RAISE( + auto output_stream, + CreateOutputStream(options, + options.properties->Get(WriterProperties::kAvroBufferSize))); arrow_output_stream_ = output_stream->arrow_output_stream(); std::map> metadata; - for (const auto& [key, value] : options.properties) { + for (const auto& [key, value] : options.metadata) { std::vector vec; vec.reserve(value.size()); vec.assign(value.begin(), value.end()); metadata.emplace(key, std::move(vec)); } writer_ = std::make_unique<::avro::DataFileWriter<::avro::GenericDatum>>( - std::move(output_stream), *avro_schema_, 16 * 1024 /*syncInterval*/, + std::move(output_stream), *avro_schema_, + options.properties->Get(WriterProperties::kAvroSyncInterval), ::avro::NULL_CODEC /*codec*/, metadata); datum_ = std::make_unique<::avro::GenericDatum>(*avro_schema_); ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(*write_schema_, &arrow_schema_)); diff --git a/src/iceberg/file_reader.cc b/src/iceberg/file_reader.cc index 59f2189ce..d0e291d96 100644 --- a/src/iceberg/file_reader.cc +++ b/src/iceberg/file_reader.cc @@ -59,4 +59,15 @@ Result> ReaderFactoryRegistry::Open( return reader; } +std::unique_ptr ReaderProperties::default_properties() { + return std::make_unique(); +} + +std::unique_ptr ReaderProperties::FromMap( + const std::unordered_map& properties) { + auto reader_properties = std::make_unique(); + reader_properties->configs_ = properties; + return reader_properties; +} + } // namespace iceberg diff --git a/src/iceberg/file_reader.h b/src/iceberg/file_reader.h index d25a5e451..d7e3b092d 100644 --- a/src/iceberg/file_reader.h +++ b/src/iceberg/file_reader.h @@ -30,6 +30,7 @@ #include "iceberg/file_format.h" #include "iceberg/result.h" #include "iceberg/type_fwd.h" +#include "iceberg/util/config.h" namespace iceberg { @@ -42,7 +43,7 @@ class ICEBERG_EXPORT Reader { Reader& operator=(const Reader&) = delete; /// \brief Open the reader. - virtual Status Open(const struct ReaderOptions& options) = 0; + virtual Status Open(const ReaderOptions& options) = 0; /// \brief Close the reader. virtual Status Close() = 0; @@ -67,19 +68,30 @@ struct ICEBERG_EXPORT Split { size_t length; }; +class ReaderProperties : public ConfigBase { + public: + template + using Entry = const ConfigBase::Entry; + + /// \brief The batch size to read. + inline static Entry kBatchSize{"read.batch-size", 4096}; + + /// \brief Create a default ReaderProperties instance. + static std::unique_ptr default_properties(); + + /// \brief Create a ReaderProperties instance from a map of key-value pairs. + static std::unique_ptr FromMap( + const std::unordered_map& properties); +}; + /// \brief Options for creating a reader. struct ICEBERG_EXPORT ReaderOptions { - static constexpr int64_t kDefaultBatchSize = 4096; - /// \brief The path to the file to read. std::string path; /// \brief The total length of the file. std::optional length; /// \brief The split to read. std::optional split; - /// \brief The batch size to read. Only applies to implementations that support - /// batching. - int64_t batch_size = kDefaultBatchSize; /// \brief FileIO instance to open the file. Reader implementations should down cast it /// to the specific FileIO implementation. By default, the `iceberg-bundle` library uses /// `ArrowFileSystemFileIO` as the default implementation. @@ -93,7 +105,7 @@ struct ICEBERG_EXPORT ReaderOptions { /// that may have different field names than the current schema. std::shared_ptr name_mapping; /// \brief Format-specific or implementation-specific properties. - std::unordered_map properties; + std::shared_ptr properties = ReaderProperties::default_properties(); }; /// \brief Factory function to create a reader of a specific file format. diff --git a/src/iceberg/file_writer.cc b/src/iceberg/file_writer.cc index e5dbea347..477562db2 100644 --- a/src/iceberg/file_writer.cc +++ b/src/iceberg/file_writer.cc @@ -59,4 +59,15 @@ Result> WriterFactoryRegistry::Open( return writer; } +std::unique_ptr WriterProperties::default_properties() { + return std::make_unique(); +} + +std::unique_ptr WriterProperties::FromMap( + const std::unordered_map& properties) { + auto writer_properties = std::make_unique(); + writer_properties->configs_ = properties; + return writer_properties; +} + } // namespace iceberg diff --git a/src/iceberg/file_writer.h b/src/iceberg/file_writer.h index fba97d3a2..ea4e42389 100644 --- a/src/iceberg/file_writer.h +++ b/src/iceberg/file_writer.h @@ -31,9 +31,34 @@ #include "iceberg/metrics.h" #include "iceberg/result.h" #include "iceberg/type_fwd.h" +#include "iceberg/util/config.h" namespace iceberg { +class WriterProperties : public ConfigBase { + public: + template + using Entry = const ConfigBase::Entry; + + /// \brief The name of the Avro root node schema to write. + inline static Entry kAvroSchemaName{"write.avro.schema-name", ""}; + + /// \brief The buffer size used by Avro output stream. + inline static Entry kAvroBufferSize{"write.avro.buffer-size", 1024 * 1024}; + + /// \brief The sync interval used by Avro writer. + inline static Entry kAvroSyncInterval{"write.avro.sync-interval", 16 * 1024}; + + /// TODO(gangwu): add more properties, like compression codec, compression level, etc. + + /// \brief Create a default WriterProperties instance. + static std::unique_ptr default_properties(); + + /// \brief Create a WriterProperties instance from a map of key-value pairs. + static std::unique_ptr FromMap( + const std::unordered_map& properties); +}; + /// \brief Options for creating a writer. struct ICEBERG_EXPORT WriterOptions { /// \brief The path to the file to write. @@ -44,8 +69,10 @@ struct ICEBERG_EXPORT WriterOptions { /// to the specific FileIO implementation. By default, the `iceberg-bundle` library uses /// `ArrowFileSystemFileIO` as the default implementation. std::shared_ptr io; + /// \brief Metadata to write to the file. + std::unordered_map metadata; /// \brief Format-specific or implementation-specific properties. - std::unordered_map properties; + std::shared_ptr properties = WriterProperties::default_properties(); }; /// \brief Base writer class to write data from different file formats. @@ -57,7 +84,7 @@ class ICEBERG_EXPORT Writer { Writer& operator=(const Writer&) = delete; /// \brief Open the writer. - virtual Status Open(const struct WriterOptions& options) = 0; + virtual Status Open(const WriterOptions& options) = 0; /// \brief Close the writer. virtual Status Close() = 0; diff --git a/src/iceberg/manifest_reader.h b/src/iceberg/manifest_reader.h index b1a462a52..e16282889 100644 --- a/src/iceberg/manifest_reader.h +++ b/src/iceberg/manifest_reader.h @@ -36,8 +36,13 @@ namespace iceberg { class ICEBERG_EXPORT ManifestReader { public: virtual ~ManifestReader() = default; + + /// \brief Read all manifest entries in the manifest file. virtual Result> Entries() const = 0; + /// \brief Get the metadata of the manifest file. + virtual Result> Metadata() const = 0; + /// \brief Creates a reader for a manifest file. /// \param manifest A ManifestFile object containing metadata about the manifest. /// \param file_io File IO implementation to use. @@ -61,8 +66,13 @@ class ICEBERG_EXPORT ManifestReader { class ICEBERG_EXPORT ManifestListReader { public: virtual ~ManifestListReader() = default; + + /// \brief Read all manifest files in the manifest list file. virtual Result> Files() const = 0; + /// \brief Get the metadata of the manifest list file. + virtual Result> Metadata() const = 0; + /// \brief Creates a reader for the manifest list. /// \param manifest_list_location Path to the manifest list file. /// \param file_io File IO implementation to use. diff --git a/src/iceberg/manifest_reader_internal.cc b/src/iceberg/manifest_reader_internal.cc index 002d4a433..346f19a8e 100644 --- a/src/iceberg/manifest_reader_internal.cc +++ b/src/iceberg/manifest_reader_internal.cc @@ -548,6 +548,11 @@ Result> ManifestReaderImpl::Entries() const { return manifest_entries; } +Result> ManifestReaderImpl::Metadata() + const { + return reader_->Metadata(); +} + Result> ManifestListReaderImpl::Files() const { std::vector manifest_files; ICEBERG_ASSIGN_OR_RAISE(auto arrow_schema, reader_->Schema()); @@ -569,6 +574,11 @@ Result> ManifestListReaderImpl::Files() const { return manifest_files; } +Result> ManifestListReaderImpl::Metadata() + const { + return reader_->Metadata(); +} + Result ManifestFileFieldFromIndex(int32_t index) { if (index >= 0 && index < static_cast(ManifestFileField::kNextUnusedId)) { return static_cast(index); diff --git a/src/iceberg/manifest_reader_internal.h b/src/iceberg/manifest_reader_internal.h index 13e3d2a80..e12892ef3 100644 --- a/src/iceberg/manifest_reader_internal.h +++ b/src/iceberg/manifest_reader_internal.h @@ -40,6 +40,8 @@ class ManifestReaderImpl : public ManifestReader { Result> Entries() const override; + Result> Metadata() const override; + private: std::shared_ptr schema_; std::unique_ptr reader_; @@ -55,6 +57,8 @@ class ManifestListReaderImpl : public ManifestListReader { Result> Files() const override; + Result> Metadata() const override; + private: std::shared_ptr schema_; std::unique_ptr reader_; diff --git a/src/iceberg/manifest_writer.cc b/src/iceberg/manifest_writer.cc index 9c2be5615..fd730f2f4 100644 --- a/src/iceberg/manifest_writer.cc +++ b/src/iceberg/manifest_writer.cc @@ -58,13 +58,20 @@ ManifestContent ManifestWriter::content() const { return adapter_->content(); } Result> OpenFileWriter( std::string_view location, std::shared_ptr schema, std::shared_ptr file_io, - std::unordered_map properties) { - ICEBERG_ASSIGN_OR_RAISE( - auto writer, WriterFactoryRegistry::Open(FileFormatType::kAvro, - {.path = std::string(location), - .schema = std::move(schema), - .io = std::move(file_io), - .properties = std::move(properties)})); + std::unordered_map metadata, std::string_view schema_name) { + auto writer_properties = WriterProperties::default_properties(); + if (!schema_name.empty()) { + writer_properties->Set(WriterProperties::kAvroSchemaName, std::string(schema_name)); + } + ICEBERG_ASSIGN_OR_RAISE(auto writer, WriterFactoryRegistry::Open( + FileFormatType::kAvro, + { + .path = std::string(location), + .schema = std::move(schema), + .io = std::move(file_io), + .metadata = std::move(metadata), + .properties = std::move(writer_properties), + })); return writer; } @@ -91,9 +98,10 @@ Result> ManifestWriter::MakeV1Writer( ICEBERG_RETURN_UNEXPECTED(adapter->StartAppending()); auto schema = adapter->schema(); - ICEBERG_ASSIGN_OR_RAISE(auto writer, - OpenFileWriter(manifest_location, std::move(schema), - std::move(file_io), adapter->metadata())); + ICEBERG_ASSIGN_OR_RAISE( + auto writer, + OpenFileWriter(manifest_location, std::move(schema), std::move(file_io), + adapter->metadata(), "manifest_entry")); return std::make_unique(std::move(writer), std::move(adapter)); } @@ -119,9 +127,10 @@ Result> ManifestWriter::MakeV2Writer( ICEBERG_RETURN_UNEXPECTED(adapter->StartAppending()); auto schema = adapter->schema(); - ICEBERG_ASSIGN_OR_RAISE(auto writer, - OpenFileWriter(manifest_location, std::move(schema), - std::move(file_io), adapter->metadata())); + ICEBERG_ASSIGN_OR_RAISE( + auto writer, + OpenFileWriter(manifest_location, std::move(schema), std::move(file_io), + adapter->metadata(), "manifest_entry")); return std::make_unique(std::move(writer), std::move(adapter)); } @@ -149,9 +158,10 @@ Result> ManifestWriter::MakeV3Writer( ICEBERG_RETURN_UNEXPECTED(adapter->StartAppending()); auto schema = adapter->schema(); - ICEBERG_ASSIGN_OR_RAISE(auto writer, - OpenFileWriter(manifest_location, std::move(schema), - std::move(file_io), adapter->metadata())); + ICEBERG_ASSIGN_OR_RAISE( + auto writer, + OpenFileWriter(manifest_location, std::move(schema), std::move(file_io), + adapter->metadata(), "manifest_entry")); return std::make_unique(std::move(writer), std::move(adapter)); } @@ -191,9 +201,10 @@ Result> ManifestListWriter::MakeV1Writer( ICEBERG_RETURN_UNEXPECTED(adapter->StartAppending()); auto schema = adapter->schema(); - ICEBERG_ASSIGN_OR_RAISE(auto writer, - OpenFileWriter(manifest_list_location, std::move(schema), - std::move(file_io), adapter->metadata())); + ICEBERG_ASSIGN_OR_RAISE( + auto writer, + OpenFileWriter(manifest_list_location, std::move(schema), std::move(file_io), + adapter->metadata(), "manifest_file")); return std::make_unique(std::move(writer), std::move(adapter)); } @@ -207,9 +218,10 @@ Result> ManifestListWriter::MakeV2Writer( ICEBERG_RETURN_UNEXPECTED(adapter->StartAppending()); auto schema = adapter->schema(); - ICEBERG_ASSIGN_OR_RAISE(auto writer, - OpenFileWriter(manifest_list_location, std::move(schema), - std::move(file_io), adapter->metadata())); + ICEBERG_ASSIGN_OR_RAISE( + auto writer, + OpenFileWriter(manifest_list_location, std::move(schema), std::move(file_io), + adapter->metadata(), "manifest_file")); return std::make_unique(std::move(writer), std::move(adapter)); } @@ -224,9 +236,10 @@ Result> ManifestListWriter::MakeV3Writer( ICEBERG_RETURN_UNEXPECTED(adapter->StartAppending()); auto schema = adapter->schema(); - ICEBERG_ASSIGN_OR_RAISE(auto writer, - OpenFileWriter(manifest_list_location, std::move(schema), - std::move(file_io), adapter->metadata())); + ICEBERG_ASSIGN_OR_RAISE( + auto writer, + OpenFileWriter(manifest_list_location, std::move(schema), std::move(file_io), + adapter->metadata(), "manifest_file")); return std::make_unique(std::move(writer), std::move(adapter)); } diff --git a/src/iceberg/parquet/parquet_reader.cc b/src/iceberg/parquet/parquet_reader.cc index 64373d044..96476587d 100644 --- a/src/iceberg/parquet/parquet_reader.cc +++ b/src/iceberg/parquet/parquet_reader.cc @@ -122,7 +122,8 @@ class ParquetReader::Impl { // Prepare reader properties ::parquet::ReaderProperties reader_properties(pool_); ::parquet::ArrowReaderProperties arrow_reader_properties; - arrow_reader_properties.set_batch_size(options.batch_size); + arrow_reader_properties.set_batch_size( + options.properties->Get(ReaderProperties::kBatchSize)); arrow_reader_properties.set_arrow_extensions_enabled(true); // Open the Parquet file reader diff --git a/src/iceberg/parquet/parquet_writer.cc b/src/iceberg/parquet/parquet_writer.cc index 61f46711b..9c8f08147 100644 --- a/src/iceberg/parquet/parquet_writer.cc +++ b/src/iceberg/parquet/parquet_writer.cc @@ -68,7 +68,8 @@ class ParquetWriter::Impl { ICEBERG_ASSIGN_OR_RAISE(output_stream_, OpenOutputStream(options)); auto file_writer = ::parquet::ParquetFileWriter::Open( - output_stream_, std::move(schema_node), std::move(writer_properties)); + output_stream_, std::move(schema_node), std::move(writer_properties), + std::make_shared<::arrow::KeyValueMetadata>(options.metadata)); ICEBERG_ARROW_RETURN_NOT_OK( ::parquet::arrow::FileWriter::Make(pool_, std::move(file_writer), arrow_schema_, std::move(arrow_writer_properties), &writer_)); diff --git a/src/iceberg/test/avro_test.cc b/src/iceberg/test/avro_test.cc index 4ca8ca148..1d421ede5 100644 --- a/src/iceberg/test/avro_test.cc +++ b/src/iceberg/test/avro_test.cc @@ -123,9 +123,13 @@ class AvroReaderTest : public TempFileTestBase { auto export_result = ::arrow::ExportArray(*array, &arrow_array); ASSERT_TRUE(export_result.ok()); - auto writer_result = WriterFactoryRegistry::Open( - FileFormatType::kAvro, - {.path = temp_avro_file_, .schema = schema, .io = file_io_}); + std::unordered_map metadata = {{"k1", "v1"}, {"k2", "v2"}}; + + auto writer_result = + WriterFactoryRegistry::Open(FileFormatType::kAvro, {.path = temp_avro_file_, + .schema = schema, + .io = file_io_, + .metadata = metadata}); ASSERT_TRUE(writer_result.has_value()); auto writer = std::move(writer_result.value()); ASSERT_THAT(writer->Write(&arrow_array), IsOk()); @@ -144,6 +148,15 @@ class AvroReaderTest : public TempFileTestBase { auto reader = std::move(reader_result.value()); ASSERT_NO_FATAL_FAILURE(VerifyNextBatch(*reader, expected_string)); ASSERT_NO_FATAL_FAILURE(VerifyExhausted(*reader)); + + auto metadata_result = reader->Metadata(); + ASSERT_THAT(metadata_result, IsOk()); + auto read_metadata = std::move(metadata_result.value()); + for (const auto& [key, value] : metadata) { + auto it = read_metadata.find(key); + ASSERT_NE(it, read_metadata.end()); + ASSERT_EQ(it->second, value); + } } std::shared_ptr<::arrow::fs::LocalFileSystem> local_fs_; @@ -191,9 +204,14 @@ TEST_F(AvroReaderTest, ReadWithBatchSize) { auto schema = std::make_shared(std::vector{ SchemaField::MakeRequired(1, "id", std::make_shared())}); + auto reader_properties = ReaderProperties::default_properties(); + reader_properties->Set(ReaderProperties::kBatchSize, int64_t{2}); + auto reader_result = ReaderFactoryRegistry::Open( - FileFormatType::kAvro, - {.path = temp_avro_file_, .batch_size = 2, .io = file_io_, .projection = schema}); + FileFormatType::kAvro, {.path = temp_avro_file_, + .io = file_io_, + .projection = schema, + .properties = std::move(reader_properties)}); ASSERT_THAT(reader_result, IsOk()); auto reader = std::move(reader_result.value()); diff --git a/src/iceberg/test/parquet_test.cc b/src/iceberg/test/parquet_test.cc index cbf49fb1d..a9be5fb76 100644 --- a/src/iceberg/test/parquet_test.cc +++ b/src/iceberg/test/parquet_test.cc @@ -63,7 +63,8 @@ Status WriteArray(std::shared_ptr<::arrow::Array> data, } Status ReadArray(std::shared_ptr<::arrow::Array>& out, - const ReaderOptions& reader_options) { + const ReaderOptions& reader_options, + std::unordered_map* metadata) { ICEBERG_ASSIGN_OR_RAISE( auto reader, ReaderFactoryRegistry::Open(FileFormatType::kParquet, reader_options)); ICEBERG_ASSIGN_OR_RAISE(auto read_data, reader->Next()); @@ -77,6 +78,11 @@ Status ReadArray(std::shared_ptr<::arrow::Array>& out, ICEBERG_ASSIGN_OR_RAISE(ArrowSchema arrow_schema, reader->Schema()); ICEBERG_ARROW_ASSIGN_OR_RETURN(out, ::arrow::ImportArray(&arrow_c_array, &arrow_schema)); + + if (metadata) { + ICEBERG_ASSIGN_OR_RAISE(*metadata, reader->Metadata()); + } + return {}; } @@ -85,18 +91,25 @@ void DoRoundtrip(std::shared_ptr<::arrow::Array> data, std::shared_ptr s std::shared_ptr file_io = arrow::ArrowFileSystemFileIO::MakeMockFileIO(); const std::string basePath = "base.parquet"; + std::unordered_map metadata = {{"k1", "v1"}, {"k2", "v2"}}; + auto writer_data = WriterFactoryRegistry::Open( - FileFormatType::kParquet, {.path = basePath, .schema = schema, .io = file_io}); + FileFormatType::kParquet, + {.path = basePath, .schema = schema, .io = file_io, .metadata = metadata}); ASSERT_THAT(writer_data, IsOk()) << "Failed to create writer: " << writer_data.error().message; auto writer = std::move(writer_data.value()); ASSERT_THAT(WriteArray(data, *writer), IsOk()); - ASSERT_THAT(ReadArray(out, {.path = basePath, - .length = writer->length(), - .io = file_io, - .projection = schema}), + std::unordered_map read_metadata; + ASSERT_THAT(ReadArray(out, + {.path = basePath, + .length = writer->length(), + .io = file_io, + .projection = schema}, + &read_metadata), IsOk()); + ASSERT_EQ(read_metadata, metadata); ASSERT_TRUE(out != nullptr) << "Reader.Next() returned no data"; } @@ -231,11 +244,14 @@ TEST_F(ParquetReaderTest, ReadWithBatchSize) { auto schema = std::make_shared( std::vector{SchemaField::MakeRequired(1, "id", int32())}); - auto reader_result = - ReaderFactoryRegistry::Open(FileFormatType::kParquet, {.path = temp_parquet_file_, - .batch_size = 2, - .io = file_io_, - .projection = schema}); + auto reader_properties = ReaderProperties::default_properties(); + reader_properties->Set(ReaderProperties::kBatchSize, int64_t{2}); + + auto reader_result = ReaderFactoryRegistry::Open( + FileFormatType::kParquet, {.path = temp_parquet_file_, + .io = file_io_, + .projection = schema, + .properties = std::move(reader_properties)}); ASSERT_THAT(reader_result, IsOk()); auto reader = std::move(reader_result.value()); @@ -271,13 +287,19 @@ TEST_F(ParquetReaderTest, ReadSplit) { R"([[1], [2], [3]])", R"([[1], [2]])", R"([[3]])", "", "", }; + std::shared_ptr reader_properties = + ReaderProperties::default_properties(); + reader_properties->Set(ReaderProperties::kBatchSize, int64_t{100}); + for (size_t i = 0; i < splits.size(); ++i) { - auto reader_result = - ReaderFactoryRegistry::Open(FileFormatType::kParquet, {.path = temp_parquet_file_, - .split = splits[i], - .batch_size = 100, - .io = file_io_, - .projection = schema}); + auto reader_result = ReaderFactoryRegistry::Open(FileFormatType::kParquet, + { + .path = temp_parquet_file_, + .split = splits[i], + .io = file_io_, + .projection = schema, + .properties = reader_properties, + }); ASSERT_THAT(reader_result, IsOk()); auto reader = std::move(reader_result.value()); if (!expected_json[i].empty()) { diff --git a/src/iceberg/type_fwd.h b/src/iceberg/type_fwd.h index 62608a9b2..5485d83fe 100644 --- a/src/iceberg/type_fwd.h +++ b/src/iceberg/type_fwd.h @@ -139,6 +139,8 @@ class ManifestListWriter; class ManifestReader; class ManifestWriter; +struct ReaderOptions; +struct WriterOptions; class Reader; class Writer;