From c66169731926983c8d09e204da52cf2d1609d8b2 Mon Sep 17 00:00:00 2001 From: "xiao.dong" Date: Fri, 17 Oct 2025 11:15:45 +0800 Subject: [PATCH 1/2] feat: write manifest avro metadata --- src/iceberg/avro/avro_writer.cc | 10 ++++++- src/iceberg/manifest_adapter.h | 3 ++ src/iceberg/manifest_writer.cc | 52 +++++++++++++++++---------------- src/iceberg/v1_metadata.cc | 12 ++++---- src/iceberg/v2_metadata.cc | 13 +++++---- src/iceberg/v3_metadata.cc | 12 ++++---- 6 files changed, 61 insertions(+), 41 deletions(-) diff --git a/src/iceberg/avro/avro_writer.cc b/src/iceberg/avro/avro_writer.cc index ded0d4810..e76149b17 100644 --- a/src/iceberg/avro/avro_writer.cc +++ b/src/iceberg/avro/avro_writer.cc @@ -68,8 +68,16 @@ class AvroWriter::Impl { ICEBERG_ASSIGN_OR_RAISE(auto output_stream, CreateOutputStream(options, kDefaultBufferSize)); arrow_output_stream_ = output_stream->arrow_output_stream(); + std::map> metadata; + for (const auto& [key, value] : options.properties) { + std::vector vec; + vec.reserve(value.size()); + vec.assign(value.begin(), value.end()); + metadata.emplace(key, std::move(vec)); + } writer_ = std::make_unique<::avro::DataFileWriter<::avro::GenericDatum>>( - std::move(output_stream), *avro_schema_); + std::move(output_stream), *avro_schema_, 16 * 1024 /*syncInterval*/, + ::avro::NULL_CODEC /*codec*/, metadata); datum_ = std::make_unique<::avro::GenericDatum>(*avro_schema_); ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(*write_schema_, &arrow_schema_)); return {}; diff --git a/src/iceberg/manifest_adapter.h b/src/iceberg/manifest_adapter.h index 7c2a8bbec..50f3b0caf 100644 --- a/src/iceberg/manifest_adapter.h +++ b/src/iceberg/manifest_adapter.h @@ -44,6 +44,9 @@ class ICEBERG_EXPORT ManifestAdapter { Status StartAppending(); Result FinishAppending(); int64_t size() const { return size_; } + const std::unordered_map& metadata() const { + return metadata_; + } protected: ArrowArray array_; diff --git a/src/iceberg/manifest_writer.cc b/src/iceberg/manifest_writer.cc index 7bdfee7b6..a27fb4558 100644 --- a/src/iceberg/manifest_writer.cc +++ b/src/iceberg/manifest_writer.cc @@ -53,14 +53,16 @@ Status ManifestWriter::Close() { return writer_->Close(); } -Result> OpenFileWriter(std::string_view location, - std::shared_ptr schema, - std::shared_ptr file_io) { +Result> OpenFileWriter( + std::string_view location, std::shared_ptr schema, + std::shared_ptr file_io, + std::unordered_map properties) { ICEBERG_ASSIGN_OR_RAISE( - auto writer, - WriterFactoryRegistry::Open(FileFormatType::kAvro, {.path = std::string(location), - .schema = std::move(schema), - .io = std::move(file_io)})); + auto writer, WriterFactoryRegistry::Open(FileFormatType::kAvro, + {.path = std::string(location), + .schema = std::move(schema), + .io = std::move(file_io), + .properties = std::move(properties)})); return writer; } @@ -73,9 +75,9 @@ Result> ManifestWriter::MakeV1Writer( ICEBERG_RETURN_UNEXPECTED(adapter->StartAppending()); auto schema = adapter->schema(); - ICEBERG_ASSIGN_OR_RAISE( - auto writer, - OpenFileWriter(manifest_location, std::move(schema), std::move(file_io))); + ICEBERG_ASSIGN_OR_RAISE(auto writer, + OpenFileWriter(manifest_location, std::move(schema), + std::move(file_io), adapter->metadata())); return std::make_unique(std::move(writer), std::move(adapter)); } @@ -88,9 +90,9 @@ Result> ManifestWriter::MakeV2Writer( ICEBERG_RETURN_UNEXPECTED(adapter->StartAppending()); auto schema = adapter->schema(); - ICEBERG_ASSIGN_OR_RAISE( - auto writer, - OpenFileWriter(manifest_location, std::move(schema), std::move(file_io))); + ICEBERG_ASSIGN_OR_RAISE(auto writer, + OpenFileWriter(manifest_location, std::move(schema), + std::move(file_io), adapter->metadata())); return std::make_unique(std::move(writer), std::move(adapter)); } @@ -104,9 +106,9 @@ Result> ManifestWriter::MakeV3Writer( ICEBERG_RETURN_UNEXPECTED(adapter->StartAppending()); auto schema = adapter->schema(); - ICEBERG_ASSIGN_OR_RAISE( - auto writer, - OpenFileWriter(manifest_location, std::move(schema), std::move(file_io))); + ICEBERG_ASSIGN_OR_RAISE(auto writer, + OpenFileWriter(manifest_location, std::move(schema), + std::move(file_io), adapter->metadata())); return std::make_unique(std::move(writer), std::move(adapter)); } @@ -142,9 +144,9 @@ Result> ManifestListWriter::MakeV1Writer( ICEBERG_RETURN_UNEXPECTED(adapter->StartAppending()); auto schema = adapter->schema(); - ICEBERG_ASSIGN_OR_RAISE( - auto writer, - OpenFileWriter(manifest_list_location, std::move(schema), std::move(file_io))); + ICEBERG_ASSIGN_OR_RAISE(auto writer, + OpenFileWriter(manifest_list_location, std::move(schema), + std::move(file_io), adapter->metadata())); return std::make_unique(std::move(writer), std::move(adapter)); } @@ -158,9 +160,9 @@ Result> ManifestListWriter::MakeV2Writer( ICEBERG_RETURN_UNEXPECTED(adapter->StartAppending()); auto schema = adapter->schema(); - ICEBERG_ASSIGN_OR_RAISE( - auto writer, - OpenFileWriter(manifest_list_location, std::move(schema), std::move(file_io))); + ICEBERG_ASSIGN_OR_RAISE(auto writer, + OpenFileWriter(manifest_list_location, std::move(schema), + std::move(file_io), adapter->metadata())); return std::make_unique(std::move(writer), std::move(adapter)); } @@ -175,9 +177,9 @@ Result> ManifestListWriter::MakeV3Writer( ICEBERG_RETURN_UNEXPECTED(adapter->StartAppending()); auto schema = adapter->schema(); - ICEBERG_ASSIGN_OR_RAISE( - auto writer, - OpenFileWriter(manifest_list_location, std::move(schema), std::move(file_io))); + ICEBERG_ASSIGN_OR_RAISE(auto writer, + OpenFileWriter(manifest_list_location, std::move(schema), + std::move(file_io), adapter->metadata())); return std::make_unique(std::move(writer), std::move(adapter)); } diff --git a/src/iceberg/v1_metadata.cc b/src/iceberg/v1_metadata.cc index ba381f836..226065aac 100644 --- a/src/iceberg/v1_metadata.cc +++ b/src/iceberg/v1_metadata.cc @@ -19,6 +19,9 @@ #include "iceberg/v1_metadata.h" +#include + +#include "iceberg/json_internal.h" #include "iceberg/manifest_entry.h" #include "iceberg/manifest_list.h" #include "iceberg/schema.h" @@ -47,15 +50,14 @@ Status ManifestEntryAdapterV1::Init() { DataFile::kSplitOffsets.field_id(), DataFile::kSortOrderId.field_id(), }; - // TODO(xiao.dong) schema to json - metadata_["schema"] = "{}"; - // TODO(xiao.dong) partition spec to json - metadata_["partition-spec"] = "{}"; + ICEBERG_RETURN_UNEXPECTED(InitSchema(kManifestEntryFieldIds)); + metadata_["schema"] = ToJson(*manifest_schema_).dump(); if (partition_spec_ != nullptr) { + metadata_["partition-spec"] = ToJson(*partition_spec_).dump(); metadata_["partition-spec-id"] = std::to_string(partition_spec_->spec_id()); } metadata_["format-version"] = "1"; - return InitSchema(kManifestEntryFieldIds); + return {}; } Status ManifestEntryAdapterV1::Append(const ManifestEntry& entry) { diff --git a/src/iceberg/v2_metadata.cc b/src/iceberg/v2_metadata.cc index e2bbb91b6..e9f9f2549 100644 --- a/src/iceberg/v2_metadata.cc +++ b/src/iceberg/v2_metadata.cc @@ -19,9 +19,13 @@ #include "iceberg/v2_metadata.h" +#include + +#include "iceberg/json_internal.h" #include "iceberg/manifest_entry.h" #include "iceberg/manifest_list.h" #include "iceberg/schema.h" +#include "iceberg/util/macros.h" namespace iceberg { @@ -50,16 +54,15 @@ Status ManifestEntryAdapterV2::Init() { DataFile::kSortOrderId.field_id(), DataFile::kReferencedDataFile.field_id(), }; - // TODO(xiao.dong) schema to json - metadata_["schema"] = "{}"; - // TODO(xiao.dong) partition spec to json - metadata_["partition-spec"] = "{}"; + ICEBERG_RETURN_UNEXPECTED(InitSchema(kManifestEntryFieldIds)); + metadata_["schema"] = ToJson(*manifest_schema_).dump(); if (partition_spec_ != nullptr) { + metadata_["partition-spec"] = ToJson(*partition_spec_).dump(); metadata_["partition-spec-id"] = std::to_string(partition_spec_->spec_id()); } metadata_["format-version"] = "2"; metadata_["content"] = "data"; - return InitSchema(kManifestEntryFieldIds); + return {}; } Status ManifestEntryAdapterV2::Append(const ManifestEntry& entry) { diff --git a/src/iceberg/v3_metadata.cc b/src/iceberg/v3_metadata.cc index e4605987b..03a8c3c33 100644 --- a/src/iceberg/v3_metadata.cc +++ b/src/iceberg/v3_metadata.cc @@ -19,6 +19,9 @@ #include "iceberg/v3_metadata.h" +#include + +#include "iceberg/json_internal.h" #include "iceberg/manifest_entry.h" #include "iceberg/manifest_list.h" #include "iceberg/schema.h" @@ -54,16 +57,15 @@ Status ManifestEntryAdapterV3::Init() { DataFile::kContentOffset.field_id(), DataFile::kContentSize.field_id(), }; - // TODO(xiao.dong) schema to json - metadata_["schema"] = "{}"; - // TODO(xiao.dong) partition spec to json - metadata_["partition-spec"] = "{}"; + ICEBERG_RETURN_UNEXPECTED(InitSchema(kManifestEntryFieldIds)); + metadata_["schema"] = ToJson(*manifest_schema_).dump(); if (partition_spec_ != nullptr) { + metadata_["partition-spec"] = ToJson(*partition_spec_).dump(); metadata_["partition-spec-id"] = std::to_string(partition_spec_->spec_id()); } metadata_["format-version"] = "3"; metadata_["content"] = "data"; - return InitSchema(kManifestEntryFieldIds); + return {}; } Status ManifestEntryAdapterV3::Append(const ManifestEntry& entry) { From 26d4a3f7b7aa6c2c75e4e8f522c4eff6ab599665 Mon Sep 17 00:00:00 2001 From: "xiao.dong" Date: Fri, 17 Oct 2025 14:57:59 +0800 Subject: [PATCH 2/2] add ToJsonString to avoid include nlohmann/json.hpp --- src/iceberg/json_internal.cc | 12 ++++++++++++ src/iceberg/json_internal.h | 24 ++++++++++++++++++++++++ src/iceberg/test/metadata_io_test.cc | 3 +-- src/iceberg/v1_metadata.cc | 6 ++---- src/iceberg/v2_metadata.cc | 6 ++---- src/iceberg/v3_metadata.cc | 6 ++---- 6 files changed, 43 insertions(+), 14 deletions(-) diff --git a/src/iceberg/json_internal.cc b/src/iceberg/json_internal.cc index 6adf5ab15..0ad546197 100644 --- a/src/iceberg/json_internal.cc +++ b/src/iceberg/json_internal.cc @@ -309,6 +309,10 @@ nlohmann::json ToJson(const Schema& schema) { return json; } +Result ToJsonString(const Schema& schema) { + return ToJsonString(ToJson(schema)); +} + nlohmann::json ToJson(const SnapshotRef& ref) { nlohmann::json json; json[kSnapshotId] = ref.snapshot_id; @@ -490,6 +494,10 @@ nlohmann::json ToJson(const PartitionSpec& partition_spec) { return json; } +Result ToJsonString(const PartitionSpec& partition_spec) { + return ToJsonString(ToJson(partition_spec)); +} + Result> PartitionFieldFromJson( const nlohmann::json& json, bool allow_field_id_missing) { ICEBERG_ASSIGN_OR_RAISE(auto source_id, GetJsonValue(json, kSourceId)); @@ -785,6 +793,10 @@ nlohmann::json ToJson(const TableMetadata& table_metadata) { return json; } +Result ToJsonString(const TableMetadata& table_metadata) { + return ToJsonString(ToJson(table_metadata)); +} + namespace { /// \brief Parse the schemas from the JSON object. diff --git a/src/iceberg/json_internal.h b/src/iceberg/json_internal.h index 7d13459b5..d5eb5bcd4 100644 --- a/src/iceberg/json_internal.h +++ b/src/iceberg/json_internal.h @@ -80,6 +80,12 @@ ICEBERG_EXPORT Result> SortOrderFromJson( /// \return The JSON representation of the schema. ICEBERG_EXPORT nlohmann::json ToJson(const Schema& schema); +/// \brief Convert an Iceberg Schema to JSON. +/// +/// \param[in] schema The Iceberg schema to convert. +/// \return The JSON string of the schema. +ICEBERG_EXPORT Result ToJsonString(const Schema& schema); + /// \brief Convert JSON to an Iceberg Schema. /// /// \param[in] json The JSON representation of the schema. @@ -148,6 +154,18 @@ ICEBERG_EXPORT Result> PartitionFieldFromJson( /// array. ICEBERG_EXPORT nlohmann::json ToJson(const PartitionSpec& partition_spec); +/// \brief Serializes a `PartitionSpec` object to JSON. +/// +/// This function converts a `PartitionSpec` object into a JSON representation. +/// The resulting JSON includes the spec ID and a list of `PartitionField` objects. +/// Each `PartitionField` is serialized as described in the `ToJson(PartitionField)` +/// function. +/// +/// \param partition_spec The `PartitionSpec` object to be serialized. +/// \return A JSON string of the `PartitionSpec` with its order ID and fields +/// array. +ICEBERG_EXPORT Result ToJsonString(const PartitionSpec& partition_spec); + /// \brief Deserializes a JSON object into a `PartitionSpec` object. /// /// This function parses the provided JSON and creates a `PartitionSpec` object. @@ -246,6 +264,12 @@ ICEBERG_EXPORT Result MetadataLogEntryFromJson( /// \return A JSON object representing the `TableMetadata`. ICEBERG_EXPORT nlohmann::json ToJson(const TableMetadata& table_metadata); +/// \brief Serializes a `TableMetadata` object to JSON. +/// +/// \param table_metadata The `TableMetadata` object to be serialized. +/// \return A JSON string of the `TableMetadata`. +ICEBERG_EXPORT Result ToJsonString(const TableMetadata& table_metadata); + /// \brief Deserializes a JSON object into a `TableMetadata` object. /// /// \param json The JSON object representing a `TableMetadata`. diff --git a/src/iceberg/test/metadata_io_test.cc b/src/iceberg/test/metadata_io_test.cc index 1590af4d9..aff1e9a42 100644 --- a/src/iceberg/test/metadata_io_test.cc +++ b/src/iceberg/test/metadata_io_test.cc @@ -90,8 +90,7 @@ TEST_F(MetadataIOTest, ReadWriteMetadata) { TEST_F(MetadataIOTest, ReadWriteCompressedMetadata) { TableMetadata metadata = PrepareMetadata(); - auto json = ToJson(metadata); - auto ret = ToJsonString(json); + auto ret = ToJsonString(metadata); ASSERT_TRUE(ret.has_value()); auto json_string = ret.value(); diff --git a/src/iceberg/v1_metadata.cc b/src/iceberg/v1_metadata.cc index 226065aac..52c52cde9 100644 --- a/src/iceberg/v1_metadata.cc +++ b/src/iceberg/v1_metadata.cc @@ -19,8 +19,6 @@ #include "iceberg/v1_metadata.h" -#include - #include "iceberg/json_internal.h" #include "iceberg/manifest_entry.h" #include "iceberg/manifest_list.h" @@ -51,9 +49,9 @@ Status ManifestEntryAdapterV1::Init() { DataFile::kSortOrderId.field_id(), }; ICEBERG_RETURN_UNEXPECTED(InitSchema(kManifestEntryFieldIds)); - metadata_["schema"] = ToJson(*manifest_schema_).dump(); + ICEBERG_ASSIGN_OR_RAISE(metadata_["schema"], ToJsonString(*manifest_schema_)) if (partition_spec_ != nullptr) { - metadata_["partition-spec"] = ToJson(*partition_spec_).dump(); + ICEBERG_ASSIGN_OR_RAISE(metadata_["partition-spec"], ToJsonString(*partition_spec_)); metadata_["partition-spec-id"] = std::to_string(partition_spec_->spec_id()); } metadata_["format-version"] = "1"; diff --git a/src/iceberg/v2_metadata.cc b/src/iceberg/v2_metadata.cc index e9f9f2549..c1407b12f 100644 --- a/src/iceberg/v2_metadata.cc +++ b/src/iceberg/v2_metadata.cc @@ -19,8 +19,6 @@ #include "iceberg/v2_metadata.h" -#include - #include "iceberg/json_internal.h" #include "iceberg/manifest_entry.h" #include "iceberg/manifest_list.h" @@ -55,9 +53,9 @@ Status ManifestEntryAdapterV2::Init() { DataFile::kReferencedDataFile.field_id(), }; ICEBERG_RETURN_UNEXPECTED(InitSchema(kManifestEntryFieldIds)); - metadata_["schema"] = ToJson(*manifest_schema_).dump(); + ICEBERG_ASSIGN_OR_RAISE(metadata_["schema"], ToJsonString(*manifest_schema_)) if (partition_spec_ != nullptr) { - metadata_["partition-spec"] = ToJson(*partition_spec_).dump(); + ICEBERG_ASSIGN_OR_RAISE(metadata_["partition-spec"], ToJsonString(*partition_spec_)); metadata_["partition-spec-id"] = std::to_string(partition_spec_->spec_id()); } metadata_["format-version"] = "2"; diff --git a/src/iceberg/v3_metadata.cc b/src/iceberg/v3_metadata.cc index 03a8c3c33..61474f697 100644 --- a/src/iceberg/v3_metadata.cc +++ b/src/iceberg/v3_metadata.cc @@ -19,8 +19,6 @@ #include "iceberg/v3_metadata.h" -#include - #include "iceberg/json_internal.h" #include "iceberg/manifest_entry.h" #include "iceberg/manifest_list.h" @@ -58,9 +56,9 @@ Status ManifestEntryAdapterV3::Init() { DataFile::kContentSize.field_id(), }; ICEBERG_RETURN_UNEXPECTED(InitSchema(kManifestEntryFieldIds)); - metadata_["schema"] = ToJson(*manifest_schema_).dump(); + ICEBERG_ASSIGN_OR_RAISE(metadata_["schema"], ToJsonString(*manifest_schema_)) if (partition_spec_ != nullptr) { - metadata_["partition-spec"] = ToJson(*partition_spec_).dump(); + ICEBERG_ASSIGN_OR_RAISE(metadata_["partition-spec"], ToJsonString(*partition_spec_)); metadata_["partition-spec-id"] = std::to_string(partition_spec_->spec_id()); } metadata_["format-version"] = "3";