diff --git a/src/iceberg/avro/avro_writer.cc b/src/iceberg/avro/avro_writer.cc index e76149b17..4e86a6846 100644 --- a/src/iceberg/avro/avro_writer.cc +++ b/src/iceberg/avro/avro_writer.cc @@ -55,6 +55,12 @@ Result> CreateOutputStream(const WriterOptions class AvroWriter::Impl { public: + ~Impl() { + if (arrow_schema_.release != nullptr) { + ArrowSchemaRelease(&arrow_schema_); + } + } + Status Open(const WriterOptions& options) { write_schema_ = options.schema; diff --git a/src/iceberg/manifest_adapter.cc b/src/iceberg/manifest_adapter.cc index b568077a9..c0e267de4 100644 --- a/src/iceberg/manifest_adapter.cc +++ b/src/iceberg/manifest_adapter.cc @@ -139,6 +139,14 @@ Result ManifestAdapter::FinishAppending() { return &array_; } +ManifestEntryAdapter::ManifestEntryAdapter(std::shared_ptr partition_spec, + ManifestContent content) + : partition_spec_(std::move(partition_spec)), content_(content) { + if (!partition_spec_) { + partition_spec_ = PartitionSpec::Unpartitioned(); + } +} + ManifestEntryAdapter::~ManifestEntryAdapter() { if (array_.release != nullptr) { ArrowArrayRelease(&array_); @@ -148,14 +156,6 @@ ManifestEntryAdapter::~ManifestEntryAdapter() { } } -Result> ManifestEntryAdapter::GetManifestEntryType() { - if (partition_spec_ == nullptr) [[unlikely]] { - return ManifestEntry::TypeFromPartitionType(nullptr); - } - ICEBERG_ASSIGN_OR_RAISE(auto partition_type, partition_spec_->PartitionType()); - return ManifestEntry::TypeFromPartitionType(std::move(partition_type)); -} - Status ManifestEntryAdapter::AppendPartitionValues( ArrowArray* array, const std::shared_ptr& partition_type, const std::vector& partition_values) { @@ -436,37 +436,6 @@ Status ManifestEntryAdapter::AppendInternal(const ManifestEntry& entry) { return {}; } -Status ManifestEntryAdapter::InitSchema(const std::unordered_set& fields_ids) { - ICEBERG_ASSIGN_OR_RAISE(auto manifest_entry_type, GetManifestEntryType()) - auto fields_span = manifest_entry_type->fields(); - std::vector fields; - // TODO(xiao.dong) Make this a common function to recursively handle - // all nested fields in the schema - for (const auto& field : fields_span) { - if (field.field_id() == 2) { - // handle data_file field - auto data_file_struct = internal::checked_pointer_cast(field.type()); - std::vector data_file_fields; - for (const auto& data_file_field : data_file_struct->fields()) { - if (fields_ids.contains(data_file_field.field_id())) { - data_file_fields.emplace_back(data_file_field); - } - } - auto type = std::make_shared(data_file_fields); - auto data_file_field = SchemaField::MakeRequired( - field.field_id(), std::string(field.name()), std::move(type)); - fields.emplace_back(std::move(data_file_field)); - } else { - if (fields_ids.contains(field.field_id())) { - fields.emplace_back(field); - } - } - } - manifest_schema_ = std::make_shared(fields); - ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(*manifest_schema_, &schema_)); - return {}; -} - ManifestFileAdapter::~ManifestFileAdapter() { if (array_.release != nullptr) { ArrowArrayRelease(&array_); @@ -671,16 +640,4 @@ Status ManifestFileAdapter::AppendInternal(const ManifestFile& file) { return {}; } -Status ManifestFileAdapter::InitSchema(const std::unordered_set& fields_ids) { - std::vector fields; - for (const auto& field : ManifestFile::Type().fields()) { - if (fields_ids.contains(field.field_id())) { - fields.emplace_back(field); - } - } - manifest_list_schema_ = std::make_shared(fields); - ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(*manifest_list_schema_, &schema_)); - return {}; -} - } // namespace iceberg diff --git a/src/iceberg/manifest_adapter.h b/src/iceberg/manifest_adapter.h index 50f3b0caf..87505068d 100644 --- a/src/iceberg/manifest_adapter.h +++ b/src/iceberg/manifest_adapter.h @@ -61,22 +61,18 @@ class ICEBERG_EXPORT ManifestAdapter { /// Implemented by different versions with version-specific schemas. class ICEBERG_EXPORT ManifestEntryAdapter : public ManifestAdapter { public: - explicit ManifestEntryAdapter(std::shared_ptr partition_spec) - : partition_spec_(std::move(partition_spec)) {} + ManifestEntryAdapter(std::shared_ptr partition_spec, + ManifestContent content); + ~ManifestEntryAdapter() override; virtual Status Append(const ManifestEntry& entry) = 0; const std::shared_ptr& schema() const { return manifest_schema_; } - protected: - virtual Result> GetManifestEntryType(); + ManifestContent content() const { return content_; } - /// \brief Initialize version-specific schema. - /// - /// \param fields_ids Field IDs to include in the manifest schema. The schema will be - /// initialized to include only the fields with these IDs. - Status InitSchema(const std::unordered_set& fields_ids); + protected: Status AppendInternal(const ManifestEntry& entry); Status AppendDataFile(ArrowArray* array, const std::shared_ptr& data_file_type, @@ -97,6 +93,7 @@ class ICEBERG_EXPORT ManifestEntryAdapter : public ManifestAdapter { protected: std::shared_ptr partition_spec_; std::shared_ptr manifest_schema_; + const ManifestContent content_; }; /// \brief Adapter for appending a list of `ManifestFile`s to an `ArrowArray`. @@ -110,12 +107,9 @@ class ICEBERG_EXPORT ManifestFileAdapter : public ManifestAdapter { const std::shared_ptr& schema() const { return manifest_list_schema_; } + virtual std::optional next_row_id() const { return std::nullopt; } + protected: - /// \brief Initialize version-specific schema. - /// - /// \param fields_ids Field IDs to include in the manifest list schema. The schema will - /// be initialized to include only the fields with these IDs. - Status InitSchema(const std::unordered_set& fields_ids); Status AppendInternal(const ManifestFile& file); static Status AppendPartitionSummary( ArrowArray* array, const std::shared_ptr& summary_type, diff --git a/src/iceberg/manifest_entry.cc b/src/iceberg/manifest_entry.cc index 5a963b5d6..fee845d36 100644 --- a/src/iceberg/manifest_entry.cc +++ b/src/iceberg/manifest_entry.cc @@ -38,7 +38,7 @@ bool ManifestEntry::operator==(const ManifestEntry& other) const { std::shared_ptr DataFile::Type(std::shared_ptr partition_type) { if (!partition_type) { - partition_type = PartitionSpec::Unpartitioned()->schema(); + partition_type = std::make_shared(std::vector{}); } return std::make_shared(std::vector{ kContent, diff --git a/src/iceberg/manifest_entry.h b/src/iceberg/manifest_entry.h index 2b9987b06..c67a63ed5 100644 --- a/src/iceberg/manifest_entry.h +++ b/src/iceberg/manifest_entry.h @@ -57,6 +57,15 @@ ICEBERG_EXPORT constexpr Result ManifestStatusFromInt( } } +enum class ManifestContent { + kData = 0, + kDeletes = 1, +}; + +ICEBERG_EXPORT constexpr std::string_view ToString(ManifestContent content) noexcept; +ICEBERG_EXPORT constexpr Result ManifestContentFromString( + std::string_view str) noexcept; + /// \brief DataFile carries data file path, partition tuple, metrics, ... struct ICEBERG_EXPORT DataFile { /// \brief Content of a data file @@ -185,6 +194,8 @@ struct ICEBERG_EXPORT DataFile { 101, "file_format", iceberg::string(), "File format name: avro, orc, or parquet"); inline static const int32_t kPartitionFieldId = 102; inline static const std::string kPartitionField = "partition"; + inline static const std::string kPartitionDoc = + "Partition data tuple, schema based on the partition spec"; inline static const SchemaField kRecordCount = SchemaField::MakeRequired( 103, "record_count", iceberg::int64(), "Number of records in the file"); inline static const SchemaField kFileSize = SchemaField::MakeRequired( diff --git a/src/iceberg/manifest_list.cc b/src/iceberg/manifest_list.cc index b9907604f..853a0182b 100644 --- a/src/iceberg/manifest_list.cc +++ b/src/iceberg/manifest_list.cc @@ -33,12 +33,25 @@ const StructType& PartitionFieldSummary::Type() { return kInstance; } -const StructType& ManifestFile::Type() { - static const StructType kInstance( - {kManifestPath, kManifestLength, kPartitionSpecId, kContent, kSequenceNumber, - kMinSequenceNumber, kAddedSnapshotId, kAddedFilesCount, kExistingFilesCount, - kDeletedFilesCount, kAddedRowsCount, kExistingRowsCount, kDeletedRowsCount, - kPartitions, kKeyMetadata, kFirstRowId}); +const std::shared_ptr& ManifestFile::Type() { + static const auto kInstance = std::make_shared(std::vector{ + kManifestPath, + kManifestLength, + kPartitionSpecId, + kContent, + kSequenceNumber, + kMinSequenceNumber, + kAddedSnapshotId, + kAddedFilesCount, + kExistingFilesCount, + kDeletedFilesCount, + kAddedRowsCount, + kExistingRowsCount, + kDeletedRowsCount, + kPartitions, + kKeyMetadata, + kFirstRowId, + }); return kInstance; } diff --git a/src/iceberg/manifest_list.h b/src/iceberg/manifest_list.h index d6e7f1dd2..17dc28c8e 100644 --- a/src/iceberg/manifest_list.h +++ b/src/iceberg/manifest_list.h @@ -197,7 +197,7 @@ struct ICEBERG_EXPORT ManifestFile { bool operator==(const ManifestFile& other) const = default; - static const StructType& Type(); + static const std::shared_ptr& Type(); }; /// Snapshots are embedded in table metadata, but the list of manifests for a snapshot are diff --git a/src/iceberg/manifest_reader.cc b/src/iceberg/manifest_reader.cc index 1e9085255..60c15aa35 100644 --- a/src/iceberg/manifest_reader.cc +++ b/src/iceberg/manifest_reader.cc @@ -71,14 +71,15 @@ Result> ManifestReader::Make( Result> ManifestListReader::Make( std::string_view manifest_list_location, std::shared_ptr file_io) { - std::vector fields(ManifestFile::Type().fields().begin(), - ManifestFile::Type().fields().end()); - auto schema = std::make_shared(fields); - ICEBERG_ASSIGN_OR_RAISE(auto reader, ReaderFactoryRegistry::Open( - FileFormatType::kAvro, - {.path = std::string(manifest_list_location), - .io = std::move(file_io), - .projection = schema})); + std::shared_ptr schema = ManifestFile::Type(); + ICEBERG_ASSIGN_OR_RAISE( + auto reader, + ReaderFactoryRegistry::Open(FileFormatType::kAvro, + { + .path = std::string(manifest_list_location), + .io = std::move(file_io), + .projection = schema, + })); return std::make_unique(std::move(reader), std::move(schema)); } diff --git a/src/iceberg/manifest_writer.cc b/src/iceberg/manifest_writer.cc index a27fb4558..8b295e12d 100644 --- a/src/iceberg/manifest_writer.cc +++ b/src/iceberg/manifest_writer.cc @@ -53,6 +53,8 @@ Status ManifestWriter::Close() { return writer_->Close(); } +ManifestContent ManifestWriter::content() const { return adapter_->content(); } + Result> OpenFileWriter( std::string_view location, std::shared_ptr schema, std::shared_ptr file_io, @@ -83,9 +85,10 @@ Result> ManifestWriter::MakeV1Writer( Result> ManifestWriter::MakeV2Writer( std::optional snapshot_id, std::string_view manifest_location, - std::shared_ptr file_io, std::shared_ptr partition_spec) { - auto adapter = - std::make_unique(snapshot_id, std::move(partition_spec)); + std::shared_ptr file_io, std::shared_ptr partition_spec, + ManifestContent content) { + auto adapter = std::make_unique( + snapshot_id, std::move(partition_spec), content); ICEBERG_RETURN_UNEXPECTED(adapter->Init()); ICEBERG_RETURN_UNEXPECTED(adapter->StartAppending()); @@ -99,9 +102,9 @@ Result> ManifestWriter::MakeV2Writer( Result> ManifestWriter::MakeV3Writer( std::optional snapshot_id, std::optional first_row_id, std::string_view manifest_location, std::shared_ptr file_io, - std::shared_ptr partition_spec) { - auto adapter = std::make_unique(snapshot_id, first_row_id, - std::move(partition_spec)); + std::shared_ptr partition_spec, ManifestContent content) { + auto adapter = std::make_unique( + snapshot_id, first_row_id, std::move(partition_spec), content); ICEBERG_RETURN_UNEXPECTED(adapter->Init()); ICEBERG_RETURN_UNEXPECTED(adapter->StartAppending()); @@ -136,6 +139,10 @@ Status ManifestListWriter::Close() { return writer_->Close(); } +std::optional ManifestListWriter::next_row_id() const { + return adapter_->next_row_id(); +} + Result> ManifestListWriter::MakeV1Writer( int64_t snapshot_id, std::optional parent_snapshot_id, std::string_view manifest_list_location, std::shared_ptr file_io) { @@ -169,7 +176,7 @@ Result> ManifestListWriter::MakeV2Writer( Result> ManifestListWriter::MakeV3Writer( int64_t snapshot_id, std::optional parent_snapshot_id, - int64_t sequence_number, std::optional first_row_id, + int64_t sequence_number, int64_t first_row_id, std::string_view manifest_list_location, std::shared_ptr file_io) { auto adapter = std::make_unique(snapshot_id, parent_snapshot_id, sequence_number, first_row_id); diff --git a/src/iceberg/manifest_writer.h b/src/iceberg/manifest_writer.h index 69e191219..be6c83107 100644 --- a/src/iceberg/manifest_writer.h +++ b/src/iceberg/manifest_writer.h @@ -54,6 +54,9 @@ class ICEBERG_EXPORT ManifestWriter { /// \brief Close writer and flush to storage. Status Close(); + /// \brief Get the content of the manifest. + ManifestContent content() const; + /// \brief Creates a writer for a manifest file. /// \param snapshot_id ID of the snapshot. /// \param manifest_location Path to the manifest file. @@ -69,10 +72,12 @@ class ICEBERG_EXPORT ManifestWriter { /// \param manifest_location Path to the manifest file. /// \param file_io File IO implementation to use. /// \param partition_spec Partition spec for the manifest. + /// \param content Content of the manifest. /// \return A Result containing the writer or an error. static Result> MakeV2Writer( std::optional snapshot_id, std::string_view manifest_location, - std::shared_ptr file_io, std::shared_ptr partition_spec); + std::shared_ptr file_io, std::shared_ptr partition_spec, + ManifestContent content); /// \brief Creates a writer for a manifest file. /// \param snapshot_id ID of the snapshot. @@ -80,11 +85,12 @@ class ICEBERG_EXPORT ManifestWriter { /// \param manifest_location Path to the manifest file. /// \param file_io File IO implementation to use. /// \param partition_spec Partition spec for the manifest. + /// \param content Content of the manifest. /// \return A Result containing the writer or an error. static Result> MakeV3Writer( std::optional snapshot_id, std::optional first_row_id, std::string_view manifest_location, std::shared_ptr file_io, - std::shared_ptr partition_spec); + std::shared_ptr partition_spec, ManifestContent content); private: static constexpr int64_t kBatchSize = 1024; @@ -114,6 +120,9 @@ class ICEBERG_EXPORT ManifestListWriter { /// \brief Close writer and flush to storage. Status Close(); + /// \brief Get the next row id to assign. + std::optional next_row_id() const; + /// \brief Creates a writer for the v1 manifest list. /// \param snapshot_id ID of the snapshot. /// \param parent_snapshot_id ID of the parent snapshot. @@ -146,7 +155,7 @@ class ICEBERG_EXPORT ManifestListWriter { /// \return A Result containing the writer or an error. static Result> MakeV3Writer( int64_t snapshot_id, std::optional parent_snapshot_id, - int64_t sequence_number, std::optional first_row_id, + int64_t sequence_number, int64_t first_row_id, std::string_view manifest_list_location, std::shared_ptr file_io); private: diff --git a/src/iceberg/schema_field.h b/src/iceberg/schema_field.h index 4190dba7d..c7c826d87 100644 --- a/src/iceberg/schema_field.h +++ b/src/iceberg/schema_field.h @@ -79,6 +79,18 @@ class ICEBERG_EXPORT SchemaField : public iceberg::util::Formattable { return lhs.Equals(rhs); } + SchemaField AsRequired() const { + auto copy = *this; + copy.optional_ = false; + return copy; + } + + SchemaField AsOptional() const { + auto copy = *this; + copy.optional_ = true; + return copy; + } + private: /// \brief Compare two fields for equality. [[nodiscard]] bool Equals(const SchemaField& other) const; diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt index fd8cbc972..544e92f7c 100644 --- a/src/iceberg/test/CMakeLists.txt +++ b/src/iceberg/test/CMakeLists.txt @@ -121,6 +121,7 @@ if(ICEBERG_BUILD_BUNDLE) avro_schema_test.cc avro_stream_test.cc manifest_list_reader_writer_test.cc + manifest_list_versions_test.cc manifest_reader_writer_test.cc test_common.cc) diff --git a/src/iceberg/test/manifest_list_versions_test.cc b/src/iceberg/test/manifest_list_versions_test.cc new file mode 100644 index 000000000..1f329ba20 --- /dev/null +++ b/src/iceberg/test/manifest_list_versions_test.cc @@ -0,0 +1,487 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include + +#include +#include +#include +#include +#include + +#include "iceberg/arrow/arrow_file_io.h" +#include "iceberg/avro/avro_register.h" +#include "iceberg/file_reader.h" +#include "iceberg/file_writer.h" +#include "iceberg/manifest_list.h" +#include "iceberg/manifest_reader.h" +#include "iceberg/manifest_writer.h" +#include "iceberg/schema.h" +#include "iceberg/schema_internal.h" +#include "iceberg/test/matchers.h" +#include "iceberg/v1_metadata.h" + +namespace iceberg { + +constexpr int kRowLineageFormatVersion = 3; +constexpr const char* kPath = "s3://bucket/table/m1.avro"; +constexpr int64_t kLength = 1024L; +constexpr int32_t kSpecId = 1; +constexpr int64_t kSeqNum = 34L; +constexpr int64_t kMinSeqNum = 10L; +constexpr int64_t kSnapshotId = 987134631982734L; +constexpr int32_t kAddedFiles = 2; +constexpr int64_t kAddedRows = 5292L; +constexpr int32_t kExistingFiles = 343; +constexpr int64_t kExistingRows = 857273L; +constexpr int32_t kDeletedFiles = 1; +constexpr int64_t kDeletedRows = 22910L; +constexpr int64_t kFirstRowId = 100L; +constexpr int64_t kSnapshotFirstRowId = 130L; + +const static auto kTestManifest = ManifestFile{ + .manifest_path = kPath, + .manifest_length = kLength, + .partition_spec_id = kSpecId, + .content = ManifestFile::Content::kData, + .sequence_number = kSeqNum, + .min_sequence_number = kMinSeqNum, + .added_snapshot_id = kSnapshotId, + .added_files_count = kAddedFiles, + .existing_files_count = kExistingFiles, + .deleted_files_count = kDeletedFiles, + .added_rows_count = kAddedRows, + .existing_rows_count = kExistingRows, + .deleted_rows_count = kDeletedRows, + .partitions = {}, + .key_metadata = {}, + .first_row_id = kFirstRowId, +}; + +const static auto kDeleteManifest = ManifestFile{ + .manifest_path = kPath, + .manifest_length = kLength, + .partition_spec_id = kSpecId, + .content = ManifestFile::Content::kDeletes, + .sequence_number = kSeqNum, + .min_sequence_number = kMinSeqNum, + .added_snapshot_id = kSnapshotId, + .added_files_count = kAddedFiles, + .existing_files_count = kExistingFiles, + .deleted_files_count = kDeletedFiles, + .added_rows_count = kAddedRows, + .existing_rows_count = kExistingRows, + .deleted_rows_count = kDeletedRows, + .first_row_id = std::nullopt, +}; + +class TestManifestListVersions : public ::testing::Test { + protected: + void SetUp() override { + avro::RegisterAll(); + file_io_ = iceberg::arrow::MakeMockFileIO(); + } + + static std::string CreateManifestListPath() { + return std::format("manifest-list-{}.avro", + std::chrono::system_clock::now().time_since_epoch().count()); + } + + std::string WriteManifestList(int format_version, int64_t expected_next_row_id, + const std::vector& manifests) const { + const std::string manifest_list_path = CreateManifestListPath(); + constexpr int64_t kParentSnapshotId = kSnapshotId - 1; + + Result> writer_result = + NotSupported("Format version: {}", format_version); + + if (format_version == 1) { + writer_result = ManifestListWriter::MakeV1Writer(kSnapshotId, kParentSnapshotId, + manifest_list_path, file_io_); + } else if (format_version == 2) { + writer_result = ManifestListWriter::MakeV2Writer( + kSnapshotId, kParentSnapshotId, kSeqNum, manifest_list_path, file_io_); + } else if (format_version == 3) { + writer_result = ManifestListWriter::MakeV3Writer(kSnapshotId, kParentSnapshotId, + kSeqNum, kSnapshotFirstRowId, + manifest_list_path, file_io_); + } + + EXPECT_THAT(writer_result, IsOk()); + auto writer = std::move(writer_result.value()); + + EXPECT_THAT(writer->AddAll(manifests), IsOk()); + EXPECT_THAT(writer->Close(), IsOk()); + + if (format_version >= kRowLineageFormatVersion) { + EXPECT_EQ(writer->next_row_id(), std::make_optional(expected_next_row_id)); + } else { + EXPECT_FALSE(writer->next_row_id().has_value()); + } + + return manifest_list_path; + } + + ManifestFile WriteAndReadManifestList(int format_version) const { + return ReadManifestList( + WriteManifestList(format_version, kSnapshotFirstRowId, {kTestManifest})); + } + + ManifestFile ReadManifestList(const std::string& manifest_list_path) const { + auto reader_result = ManifestListReader::Make(manifest_list_path, file_io_); + EXPECT_THAT(reader_result, IsOk()); + + auto reader = std::move(reader_result.value()); + auto files_result = reader->Files(); + EXPECT_THAT(files_result, IsOk()); + + auto manifests = files_result.value(); + EXPECT_EQ(manifests.size(), 1); + + return manifests[0]; + } + + std::vector ReadAllManifests( + const std::string& manifest_list_path) const { + auto reader_result = ManifestListReader::Make(manifest_list_path, file_io_); + EXPECT_THAT(reader_result, IsOk()); + + auto reader = std::move(reader_result.value()); + auto files_result = reader->Files(); + EXPECT_THAT(files_result, IsOk()); + + return files_result.value(); + } + + void ReadAvro(const std::string& path, const std::shared_ptr& schema, + const std::string& expected_json) const { + auto reader_result = ReaderFactoryRegistry::Open( + FileFormatType::kAvro, {.path = path, .io = file_io_, .projection = schema}); + EXPECT_THAT(reader_result, IsOk()); + auto reader = std::move(reader_result.value()); + + auto arrow_schema_result = reader->Schema(); + EXPECT_THAT(arrow_schema_result, IsOk()); + auto arrow_c_schema = std::move(arrow_schema_result.value()); + auto arrow_schema = ::arrow::ImportType(&arrow_c_schema).ValueOrDie(); + + auto expected_array = + ::arrow::json::ArrayFromJSONString(arrow_schema, expected_json).ValueOrDie(); + + auto batch_result = reader->Next(); + EXPECT_THAT(batch_result, IsOk()); + EXPECT_TRUE(batch_result.value().has_value()); + auto arrow_c_batch = std::move(batch_result.value().value()); + + auto arrow_batch_result = + ::arrow::ImportArray(&arrow_c_batch, std::move(arrow_schema)); + auto array = arrow_batch_result.ValueOrDie(); + EXPECT_TRUE(array != nullptr); + EXPECT_TRUE(expected_array->Equals(*array)); + } + + std::shared_ptr file_io_; +}; + +TEST_F(TestManifestListVersions, TestV1WriteDeleteManifest) { + const std::string manifest_list_path = CreateManifestListPath(); + + auto writer_result = ManifestListWriter::MakeV1Writer(kSnapshotId, kSnapshotId - 1, + manifest_list_path, file_io_); + EXPECT_THAT(writer_result, IsOk()); + + auto writer = std::move(writer_result.value()); + auto status = writer->Add(kDeleteManifest); + + EXPECT_THAT(status, IsError(ErrorKind::kInvalidManifestList)); + EXPECT_THAT(status, HasErrorMessage("Cannot store delete manifests in a v1 table")); +} + +TEST_F(TestManifestListVersions, TestV1Write) { + auto manifest = WriteAndReadManifestList(/*format_version=*/1); + + // V3 fields are not written and are defaulted + EXPECT_FALSE(manifest.first_row_id.has_value()); + + // V2 fields are not written and are defaulted + EXPECT_EQ(manifest.sequence_number, 0); + EXPECT_EQ(manifest.min_sequence_number, 0); + + // V1 fields are read correctly + EXPECT_EQ(manifest.manifest_path, kPath); + EXPECT_EQ(manifest.manifest_length, kLength); + EXPECT_EQ(manifest.partition_spec_id, kSpecId); + EXPECT_EQ(manifest.content, ManifestFile::Content::kData); + EXPECT_EQ(manifest.added_snapshot_id, kSnapshotId); + EXPECT_EQ(manifest.added_files_count, kAddedFiles); + EXPECT_EQ(manifest.existing_files_count, kExistingFiles); + EXPECT_EQ(manifest.deleted_files_count, kDeletedFiles); + EXPECT_EQ(manifest.added_rows_count, kAddedRows); + EXPECT_EQ(manifest.existing_rows_count, kExistingRows); + EXPECT_EQ(manifest.deleted_rows_count, kDeletedRows); +} + +TEST_F(TestManifestListVersions, TestV2Write) { + auto manifest = WriteAndReadManifestList(2); + + // V3 fields are not written and are defaulted + EXPECT_FALSE(manifest.first_row_id.has_value()); + + // All V2 fields should be read correctly + EXPECT_EQ(manifest.manifest_path, kPath); + EXPECT_EQ(manifest.manifest_length, kLength); + EXPECT_EQ(manifest.partition_spec_id, kSpecId); + EXPECT_EQ(manifest.content, ManifestFile::Content::kData); + EXPECT_EQ(manifest.sequence_number, kSeqNum); + EXPECT_EQ(manifest.min_sequence_number, kMinSeqNum); + EXPECT_EQ(manifest.added_snapshot_id, kSnapshotId); + EXPECT_EQ(manifest.added_files_count, kAddedFiles); + EXPECT_EQ(manifest.added_rows_count, kAddedRows); + EXPECT_EQ(manifest.existing_files_count, kExistingFiles); + EXPECT_EQ(manifest.existing_rows_count, kExistingRows); + EXPECT_EQ(manifest.deleted_files_count, kDeletedFiles); + EXPECT_EQ(manifest.deleted_rows_count, kDeletedRows); +} + +TEST_F(TestManifestListVersions, TestV3Write) { + auto manifest = WriteAndReadManifestList(/*format_version=*/3); + + // All V3 fields should be read correctly + EXPECT_EQ(manifest.manifest_path, kPath); + EXPECT_EQ(manifest.manifest_length, kLength); + EXPECT_EQ(manifest.partition_spec_id, kSpecId); + EXPECT_EQ(manifest.content, ManifestFile::Content::kData); + EXPECT_EQ(manifest.sequence_number, kSeqNum); + EXPECT_EQ(manifest.min_sequence_number, kMinSeqNum); + EXPECT_EQ(manifest.added_snapshot_id, kSnapshotId); + EXPECT_EQ(manifest.added_files_count, kAddedFiles); + EXPECT_EQ(manifest.added_rows_count, kAddedRows); + EXPECT_EQ(manifest.existing_files_count, kExistingFiles); + EXPECT_EQ(manifest.existing_rows_count, kExistingRows); + EXPECT_EQ(manifest.deleted_files_count, kDeletedFiles); + EXPECT_EQ(manifest.deleted_rows_count, kDeletedRows); + EXPECT_TRUE(manifest.first_row_id.has_value()); + EXPECT_EQ(manifest.first_row_id.value(), kFirstRowId); +} + +TEST_F(TestManifestListVersions, TestV3WriteFirstRowIdAssignment) { + ManifestFile missing_first_row_id = kTestManifest; + missing_first_row_id.first_row_id = std::nullopt; + + constexpr int64_t kExpectedNextRowId = kSnapshotFirstRowId + kAddedRows + kExistingRows; + auto manifest_list_path = + WriteManifestList(/*format_version=*/3, kExpectedNextRowId, {missing_first_row_id}); + + auto manifest = ReadManifestList(manifest_list_path); + EXPECT_EQ(manifest.manifest_path, kPath); + EXPECT_EQ(manifest.manifest_length, kLength); + EXPECT_EQ(manifest.partition_spec_id, kSpecId); + EXPECT_EQ(manifest.content, ManifestFile::Content::kData); + EXPECT_EQ(manifest.sequence_number, kSeqNum); + EXPECT_EQ(manifest.min_sequence_number, kMinSeqNum); + EXPECT_EQ(manifest.added_snapshot_id, kSnapshotId); + EXPECT_EQ(manifest.added_files_count, kAddedFiles); + EXPECT_EQ(manifest.added_rows_count, kAddedRows); + EXPECT_EQ(manifest.existing_files_count, kExistingFiles); + EXPECT_EQ(manifest.existing_rows_count, kExistingRows); + EXPECT_EQ(manifest.deleted_files_count, kDeletedFiles); + EXPECT_EQ(manifest.deleted_rows_count, kDeletedRows); + EXPECT_EQ(manifest.first_row_id, std::make_optional(kSnapshotFirstRowId)); +} + +TEST_F(TestManifestListVersions, TestV3WriteMixedRowIdAssignment) { + ManifestFile missing_first_row_id = kTestManifest; + missing_first_row_id.first_row_id = std::nullopt; + + constexpr int64_t kExpectedNextRowId = + kSnapshotFirstRowId + 2 * (kAddedRows + kExistingRows); + + auto manifest_list_path = WriteManifestList( + 3, kExpectedNextRowId, {missing_first_row_id, kTestManifest, missing_first_row_id}); + + auto manifests = ReadAllManifests(manifest_list_path); + EXPECT_EQ(manifests.size(), 3); + + // all v2 fields should be read correctly + for (const auto& manifest : manifests) { + EXPECT_EQ(manifest.manifest_path, kPath); + EXPECT_EQ(manifest.manifest_length, kLength); + EXPECT_EQ(manifest.partition_spec_id, kSpecId); + EXPECT_EQ(manifest.content, ManifestFile::Content::kData); + EXPECT_EQ(manifest.sequence_number, kSeqNum); + EXPECT_EQ(manifest.min_sequence_number, kMinSeqNum); + EXPECT_EQ(manifest.added_snapshot_id, kSnapshotId); + EXPECT_EQ(manifest.added_files_count, kAddedFiles); + EXPECT_EQ(manifest.added_rows_count, kAddedRows); + EXPECT_EQ(manifest.existing_files_count, kExistingFiles); + EXPECT_EQ(manifest.existing_rows_count, kExistingRows); + EXPECT_EQ(manifest.deleted_files_count, kDeletedFiles); + EXPECT_EQ(manifest.deleted_rows_count, kDeletedRows); + } + + EXPECT_EQ(manifests[0].first_row_id, std::make_optional(kSnapshotFirstRowId)); + EXPECT_EQ(manifests[1].first_row_id, kTestManifest.first_row_id); + EXPECT_EQ(manifests[2].first_row_id, + std::make_optional(kSnapshotFirstRowId + kAddedRows + kExistingRows)); +} + +TEST_F(TestManifestListVersions, TestV1ForwardCompatibility) { + std::string manifest_list_path = + WriteManifestList(/*format_version=*/1, kSnapshotFirstRowId, {kTestManifest}); + std::string expected_array_json = R"([ + ["s3://bucket/table/m1.avro", 1024, 1, 987134631982734, 2, 343, 1, [], 5292, 857273, 22910, null] + ])"; + ReadAvro(manifest_list_path, ManifestFileAdapterV1::kManifestListSchema, + expected_array_json); +} + +TEST_F(TestManifestListVersions, TestV2ForwardCompatibility) { + // V2 manifest list files can be read by V1 readers, but the sequence numbers and + // content will be ignored. + std::string manifest_list_path = + WriteManifestList(/*format_version=*/2, kSnapshotFirstRowId, {kTestManifest}); + std::string expected_array_json = R"([ + ["s3://bucket/table/m1.avro", 1024, 1, 987134631982734, 2, 343, 1, [], 5292, 857273, 22910, null] + ])"; + ReadAvro(manifest_list_path, ManifestFileAdapterV1::kManifestListSchema, + expected_array_json); +} + +TEST_F(TestManifestListVersions, TestManifestsWithoutRowStats) { + // Create a schema without row stats columns to simulate an old manifest list file + auto schema_without_stats = std::make_shared(std::vector{ + ManifestFile::kManifestPath, + ManifestFile::kManifestLength, + ManifestFile::kPartitionSpecId, + ManifestFile::kAddedSnapshotId, + ManifestFile::kAddedFilesCount, + ManifestFile::kExistingFilesCount, + ManifestFile::kDeletedFilesCount, + ManifestFile::kPartitions, + }); + + ArrowSchema arrow_c_schema; + EXPECT_THAT(ToArrowSchema(*schema_without_stats, &arrow_c_schema), IsOk()); + auto arrow_schema = ::arrow::ImportType(&arrow_c_schema).ValueOrDie(); + + std::string json_data = R"([["path/to/manifest.avro", 1024, 1, 100, 2, 3, 4, null]])"; + auto array = ::arrow::json::ArrayFromJSONString(arrow_schema, json_data).ValueOrDie(); + ArrowArray arrow_array; + EXPECT_TRUE(::arrow::ExportArray(*array, &arrow_array).ok()); + + std::string manifest_list_path = CreateManifestListPath(); + auto writer_result = WriterFactoryRegistry::Open( + FileFormatType::kAvro, + {.path = manifest_list_path, .schema = schema_without_stats, .io = file_io_}); + EXPECT_THAT(writer_result, IsOk()); + auto writer = std::move(writer_result.value()); + EXPECT_THAT(writer->Write(&arrow_array), IsOk()); + EXPECT_THAT(writer->Close(), IsOk()); + + // Read back and verify + auto manifest = ReadManifestList(manifest_list_path); + + EXPECT_EQ(manifest.manifest_path, "path/to/manifest.avro"); + EXPECT_EQ(manifest.manifest_length, 1024L); + EXPECT_EQ(manifest.partition_spec_id, 1); + EXPECT_EQ(manifest.added_snapshot_id, 100L); + + EXPECT_TRUE(manifest.has_added_files()); + EXPECT_EQ(manifest.added_files_count, 2); + EXPECT_FALSE(manifest.added_rows_count.has_value()); + + EXPECT_TRUE(manifest.has_existing_files()); + EXPECT_EQ(manifest.existing_files_count, 3); + EXPECT_FALSE(manifest.existing_rows_count.has_value()); + + EXPECT_TRUE(manifest.has_deleted_files()); + EXPECT_EQ(manifest.deleted_files_count, 4); + EXPECT_FALSE(manifest.deleted_rows_count.has_value()); + + EXPECT_FALSE(manifest.first_row_id.has_value()); +} + +TEST_F(TestManifestListVersions, TestManifestsPartitionSummary) { + auto first_summary_lower_bound = Literal::Int(10).Serialize().value(); + auto first_summary_upper_bound = Literal::Int(100).Serialize().value(); + auto second_summary_lower_bound = Literal::Int(20).Serialize().value(); + auto second_summary_upper_bound = Literal::Int(200).Serialize().value(); + + std::vector partition_summaries{ + PartitionFieldSummary{ + .contains_null = false, + .contains_nan = std::nullopt, + .lower_bound = first_summary_lower_bound, + .upper_bound = first_summary_upper_bound, + }, + PartitionFieldSummary{ + .contains_null = true, + .contains_nan = false, + .lower_bound = second_summary_lower_bound, + .upper_bound = second_summary_upper_bound, + }, + }; + + ManifestFile manifest{ + .manifest_path = kPath, + .manifest_length = kLength, + .partition_spec_id = kSpecId, + .content = ManifestFile::Content::kData, + .sequence_number = kSeqNum, + .min_sequence_number = kMinSeqNum, + .added_snapshot_id = kSnapshotId, + .added_files_count = kAddedFiles, + .existing_files_count = kExistingFiles, + .deleted_files_count = kDeletedFiles, + .added_rows_count = kAddedRows, + .existing_rows_count = kExistingRows, + .deleted_rows_count = kDeletedRows, + .partitions = partition_summaries, + .key_metadata = {}, + .first_row_id = std::nullopt, + }; + + // Test for all format versions + for (int format_version = 1; format_version <= 3; ++format_version) { + int64_t expected_next_row_id = kSnapshotFirstRowId + + manifest.added_rows_count.value() + + manifest.existing_rows_count.value(); + + auto manifest_list_path = + WriteManifestList(format_version, expected_next_row_id, {manifest}); + + auto returned_manifest = ReadManifestList(manifest_list_path); + EXPECT_EQ(returned_manifest.partitions.size(), 2); + + const auto& first = returned_manifest.partitions[0]; + EXPECT_FALSE(first.contains_null); + EXPECT_FALSE(first.contains_nan.has_value()); + EXPECT_EQ(first.lower_bound, first_summary_lower_bound); + EXPECT_EQ(first.upper_bound, first_summary_upper_bound); + + const auto& second = returned_manifest.partitions[1]; + EXPECT_TRUE(second.contains_null); + EXPECT_TRUE(second.contains_nan.has_value()); + EXPECT_FALSE(second.contains_nan.value()); + EXPECT_EQ(second.lower_bound, second_summary_lower_bound); + EXPECT_EQ(second.upper_bound, second_summary_upper_bound); + } +} + +} // namespace iceberg diff --git a/src/iceberg/test/manifest_reader_writer_test.cc b/src/iceberg/test/manifest_reader_writer_test.cc index f4e770fbe..d625d5cf9 100644 --- a/src/iceberg/test/manifest_reader_writer_test.cc +++ b/src/iceberg/test/manifest_reader_writer_test.cc @@ -260,8 +260,9 @@ class ManifestReaderV2Test : public ManifestReaderTestBase { void TestWriteManifest(int64_t snapshot_id, const std::string& manifest_list_path, std::shared_ptr partition_spec, const std::vector& manifest_entries) { - auto result = ManifestWriter::MakeV2Writer(snapshot_id, manifest_list_path, file_io_, - std::move(partition_spec)); + auto result = + ManifestWriter::MakeV2Writer(snapshot_id, manifest_list_path, file_io_, + std::move(partition_spec), ManifestContent::kData); ASSERT_TRUE(result.has_value()) << result.error().message; auto writer = std::move(result.value()); auto status = writer->AddAll(manifest_entries); diff --git a/src/iceberg/type_fwd.h b/src/iceberg/type_fwd.h index 436744811..62608a9b2 100644 --- a/src/iceberg/type_fwd.h +++ b/src/iceberg/type_fwd.h @@ -114,6 +114,7 @@ class NameMapping; enum class SnapshotRefType; enum class TransformType; +enum class ManifestContent; class Decimal; class Uuid; diff --git a/src/iceberg/v1_metadata.cc b/src/iceberg/v1_metadata.cc index 52c52cde9..917d30a8e 100644 --- a/src/iceberg/v1_metadata.cc +++ b/src/iceberg/v1_metadata.cc @@ -23,91 +23,98 @@ #include "iceberg/manifest_entry.h" #include "iceberg/manifest_list.h" #include "iceberg/schema.h" +#include "iceberg/schema_internal.h" #include "iceberg/util/macros.h" namespace iceberg { -Status ManifestEntryAdapterV1::Init() { - static std::unordered_set kManifestEntryFieldIds{ - ManifestEntry::kStatus.field_id(), - ManifestEntry::kSnapshotId.field_id(), - ManifestEntry::kDataFileFieldId, - DataFile::kFilePath.field_id(), - DataFile::kFileFormat.field_id(), - DataFile::kPartitionFieldId, - DataFile::kRecordCount.field_id(), - DataFile::kFileSize.field_id(), - 105, // kBlockSizeInBytes field id - DataFile::kColumnSizes.field_id(), - DataFile::kValueCounts.field_id(), - DataFile::kNullValueCounts.field_id(), - DataFile::kNanValueCounts.field_id(), - DataFile::kLowerBounds.field_id(), - DataFile::kUpperBounds.field_id(), - DataFile::kKeyMetadata.field_id(), - DataFile::kSplitOffsets.field_id(), - DataFile::kSortOrderId.field_id(), - }; - ICEBERG_RETURN_UNEXPECTED(InitSchema(kManifestEntryFieldIds)); - ICEBERG_ASSIGN_OR_RAISE(metadata_["schema"], ToJsonString(*manifest_schema_)) - if (partition_spec_ != nullptr) { - ICEBERG_ASSIGN_OR_RAISE(metadata_["partition-spec"], ToJsonString(*partition_spec_)); - metadata_["partition-spec-id"] = std::to_string(partition_spec_->spec_id()); - } - metadata_["format-version"] = "1"; - return {}; +ManifestEntryAdapterV1::ManifestEntryAdapterV1( + std::optional snapshot_id, std::shared_ptr partition_spec) + : ManifestEntryAdapter(std::move(partition_spec), ManifestContent::kData), + snapshot_id_(snapshot_id) {} + +std::shared_ptr ManifestEntryAdapterV1::EntrySchema( + std::shared_ptr partition_type) { + return WrapFileSchema(DataFileSchema(std::move(partition_type))); } -Status ManifestEntryAdapterV1::Append(const ManifestEntry& entry) { - return AppendInternal(entry); +std::shared_ptr ManifestEntryAdapterV1::WrapFileSchema( + std::shared_ptr file_schema) { + return std::make_shared(std::vector{ + ManifestEntry::kStatus, + ManifestEntry::kSnapshotId, + SchemaField::MakeRequired(ManifestEntry::kDataFileFieldId, + ManifestEntry::kDataFileField, std::move(file_schema)), + }); +} + +std::shared_ptr ManifestEntryAdapterV1::DataFileSchema( + std::shared_ptr partition_type) { + return std::make_shared(std::vector{ + DataFile::kFilePath, + DataFile::kFileFormat, + SchemaField::MakeRequired(DataFile::kPartitionFieldId, DataFile::kPartitionField, + std::move(partition_type)), + DataFile::kRecordCount, + DataFile::kFileSize, + SchemaField::MakeRequired(105, "block_size_in_bytes", int64(), + "Block size in bytes"), + DataFile::kColumnSizes, + DataFile::kValueCounts, + DataFile::kNullValueCounts, + DataFile::kNanValueCounts, + DataFile::kLowerBounds, + DataFile::kUpperBounds, + DataFile::kKeyMetadata, + DataFile::kSplitOffsets, + DataFile::kSortOrderId, + }); } -Result> ManifestEntryAdapterV1::GetManifestEntryType() { - // 'block_size_in_bytes' (ID 105) is a deprecated field that is REQUIRED - // in the v1 data_file schema for backward compatibility. - // Deprecated. Always write a default in v1. Do not write in v2 or v3. - static const SchemaField kBlockSizeInBytes = SchemaField::MakeRequired( - 105, "block_size_in_bytes", int64(), "Block size in bytes"); +Status ManifestEntryAdapterV1::Init() { + // TODO(gangwu): fix the schema to use current table schema. + // ICEBERG_ASSIGN_OR_RAISE(metadata_["schema"], ToJsonString(*manifest_schema_)) + ICEBERG_ASSIGN_OR_RAISE(metadata_["partition-spec"], ToJsonString(*partition_spec_)); + metadata_["partition-spec-id"] = std::to_string(partition_spec_->spec_id()); + metadata_["format-version"] = "1"; + ICEBERG_ASSIGN_OR_RAISE(auto partition_type, partition_spec_->PartitionType()); if (!partition_type) { - partition_type = PartitionSpec::Unpartitioned()->schema(); + partition_type = std::make_shared(std::vector{}); } - auto datafile_type = std::make_shared(std::vector{ - DataFile::kFilePath, DataFile::kFileFormat, - SchemaField::MakeRequired(102, DataFile::kPartitionField, - std::move(partition_type)), - DataFile::kRecordCount, DataFile::kFileSize, kBlockSizeInBytes, - DataFile::kColumnSizes, DataFile::kValueCounts, DataFile::kNullValueCounts, - DataFile::kNanValueCounts, DataFile::kLowerBounds, DataFile::kUpperBounds, - DataFile::kKeyMetadata, DataFile::kSplitOffsets, DataFile::kSortOrderId}); + manifest_schema_ = EntrySchema(std::move(partition_type)); + return ToArrowSchema(*manifest_schema_, &schema_); +} - return std::make_shared( - std::vector{ManifestEntry::kStatus, ManifestEntry::kSnapshotId, - SchemaField::MakeRequired(2, ManifestEntry::kDataFileField, - std::move(datafile_type))}); +Status ManifestEntryAdapterV1::Append(const ManifestEntry& entry) { + return AppendInternal(entry); } +const std::shared_ptr ManifestFileAdapterV1::kManifestListSchema = + std::make_shared(std::vector{ + ManifestFile::kManifestPath, + ManifestFile::kManifestLength, + ManifestFile::kPartitionSpecId, + ManifestFile::kAddedSnapshotId, + ManifestFile::kAddedFilesCount, + ManifestFile::kExistingFilesCount, + ManifestFile::kDeletedFilesCount, + ManifestFile::kPartitions, + ManifestFile::kAddedRowsCount, + ManifestFile::kExistingRowsCount, + ManifestFile::kDeletedRowsCount, + ManifestFile::kKeyMetadata, + }); + Status ManifestFileAdapterV1::Init() { - static std::unordered_set kManifestFileFieldIds{ - ManifestFile::kManifestPath.field_id(), - ManifestFile::kManifestLength.field_id(), - ManifestFile::kPartitionSpecId.field_id(), - ManifestFile::kAddedSnapshotId.field_id(), - ManifestFile::kAddedFilesCount.field_id(), - ManifestFile::kExistingFilesCount.field_id(), - ManifestFile::kDeletedFilesCount.field_id(), - ManifestFile::kAddedRowsCount.field_id(), - ManifestFile::kExistingRowsCount.field_id(), - ManifestFile::kDeletedRowsCount.field_id(), - ManifestFile::kPartitions.field_id(), - ManifestFile::kKeyMetadata.field_id(), - }; metadata_["snapshot-id"] = std::to_string(snapshot_id_); metadata_["parent-snapshot-id"] = parent_snapshot_id_.has_value() ? std::to_string(parent_snapshot_id_.value()) : "null"; metadata_["format-version"] = "1"; - return InitSchema(kManifestFileFieldIds); + + manifest_list_schema_ = kManifestListSchema; + return ToArrowSchema(*manifest_list_schema_, &schema_); } Status ManifestFileAdapterV1::Append(const ManifestFile& file) { diff --git a/src/iceberg/v1_metadata.h b/src/iceberg/v1_metadata.h index 3c095a9e5..d97190123 100644 --- a/src/iceberg/v1_metadata.h +++ b/src/iceberg/v1_metadata.h @@ -28,13 +28,14 @@ namespace iceberg { class ManifestEntryAdapterV1 : public ManifestEntryAdapter { public: ManifestEntryAdapterV1(std::optional snapshot_id, - std::shared_ptr partition_spec) - : ManifestEntryAdapter(std::move(partition_spec)), snapshot_id_(snapshot_id) {} + std::shared_ptr partition_spec); Status Init() override; Status Append(const ManifestEntry& entry) override; - protected: - Result> GetManifestEntryType() override; + static std::shared_ptr EntrySchema(std::shared_ptr partition_type); + static std::shared_ptr WrapFileSchema(std::shared_ptr file_schema); + static std::shared_ptr DataFileSchema( + std::shared_ptr partition_type); private: std::optional snapshot_id_; @@ -48,6 +49,8 @@ class ManifestFileAdapterV1 : public ManifestFileAdapter { Status Init() override; Status Append(const ManifestFile& file) override; + static const std::shared_ptr kManifestListSchema; + private: int64_t snapshot_id_; std::optional parent_snapshot_id_; diff --git a/src/iceberg/v2_metadata.cc b/src/iceberg/v2_metadata.cc index c1407b12f..2545c30e6 100644 --- a/src/iceberg/v2_metadata.cc +++ b/src/iceberg/v2_metadata.cc @@ -23,44 +23,69 @@ #include "iceberg/manifest_entry.h" #include "iceberg/manifest_list.h" #include "iceberg/schema.h" +#include "iceberg/schema_internal.h" #include "iceberg/util/macros.h" namespace iceberg { +ManifestEntryAdapterV2::ManifestEntryAdapterV2( + std::optional snapshot_id, std::shared_ptr partition_spec, + ManifestContent content) + : ManifestEntryAdapter(std::move(partition_spec), content), + snapshot_id_(snapshot_id) {} + +std::shared_ptr ManifestEntryAdapterV2::EntrySchema( + std::shared_ptr partition_type) { + return WrapFileSchema(DataFileType(std::move(partition_type))); +} +std::shared_ptr ManifestEntryAdapterV2::WrapFileSchema( + std::shared_ptr file_schema) { + return std::make_shared(std::vector{ + ManifestEntry::kStatus, + ManifestEntry::kSnapshotId, + ManifestEntry::kSequenceNumber, + ManifestEntry::kFileSequenceNumber, + SchemaField::MakeRequired(ManifestEntry::kDataFileFieldId, + ManifestEntry::kDataFileField, std::move(file_schema)), + }); +} +std::shared_ptr ManifestEntryAdapterV2::DataFileType( + std::shared_ptr partition_type) { + return std::make_shared(std::vector{ + DataFile::kContent.AsRequired(), + DataFile::kFilePath, + DataFile::kFileFormat, + SchemaField::MakeRequired(DataFile::kPartitionFieldId, DataFile::kPartitionField, + std::move(partition_type), DataFile::kPartitionDoc), + DataFile::kRecordCount, + DataFile::kFileSize, + DataFile::kColumnSizes, + DataFile::kValueCounts, + DataFile::kNullValueCounts, + DataFile::kNanValueCounts, + DataFile::kLowerBounds, + DataFile::kUpperBounds, + DataFile::kKeyMetadata, + DataFile::kSplitOffsets, + DataFile::kEqualityIds, + DataFile::kSortOrderId, + DataFile::kReferencedDataFile, + }); +} + Status ManifestEntryAdapterV2::Init() { - static std::unordered_set kManifestEntryFieldIds{ - ManifestEntry::kStatus.field_id(), - ManifestEntry::kSnapshotId.field_id(), - ManifestEntry::kSequenceNumber.field_id(), - ManifestEntry::kFileSequenceNumber.field_id(), - ManifestEntry::kDataFileFieldId, - DataFile::kContent.field_id(), - DataFile::kFilePath.field_id(), - DataFile::kFileFormat.field_id(), - DataFile::kPartitionFieldId, - DataFile::kRecordCount.field_id(), - DataFile::kFileSize.field_id(), - DataFile::kColumnSizes.field_id(), - DataFile::kValueCounts.field_id(), - DataFile::kNullValueCounts.field_id(), - DataFile::kNanValueCounts.field_id(), - DataFile::kLowerBounds.field_id(), - DataFile::kUpperBounds.field_id(), - DataFile::kKeyMetadata.field_id(), - DataFile::kSplitOffsets.field_id(), - DataFile::kEqualityIds.field_id(), - DataFile::kSortOrderId.field_id(), - DataFile::kReferencedDataFile.field_id(), - }; - ICEBERG_RETURN_UNEXPECTED(InitSchema(kManifestEntryFieldIds)); - ICEBERG_ASSIGN_OR_RAISE(metadata_["schema"], ToJsonString(*manifest_schema_)) - if (partition_spec_ != nullptr) { - ICEBERG_ASSIGN_OR_RAISE(metadata_["partition-spec"], ToJsonString(*partition_spec_)); - metadata_["partition-spec-id"] = std::to_string(partition_spec_->spec_id()); - } + // ICEBERG_ASSIGN_OR_RAISE(metadata_["schema"], ToJsonString(*manifest_schema_)) + ICEBERG_ASSIGN_OR_RAISE(metadata_["partition-spec"], ToJsonString(*partition_spec_)); + metadata_["partition-spec-id"] = std::to_string(partition_spec_->spec_id()); metadata_["format-version"] = "2"; - metadata_["content"] = "data"; - return {}; + metadata_["content"] = content_ == ManifestContent::kData ? "data" : "delete"; + + ICEBERG_ASSIGN_OR_RAISE(auto partition_type, partition_spec_->PartitionType()); + if (!partition_type) { + partition_type = std::make_shared(std::vector{}); + } + manifest_schema_ = EntrySchema(std::move(partition_type)); + return ToArrowSchema(*manifest_schema_, &schema_); } Status ManifestEntryAdapterV2::Append(const ManifestEntry& entry) { @@ -99,31 +124,35 @@ Result> ManifestEntryAdapterV2::GetReferenceDataFile( return std::nullopt; } +const std::shared_ptr ManifestFileAdapterV2::kManifestListSchema = + std::make_shared(std::vector{ + ManifestFile::kManifestPath, + ManifestFile::kManifestLength, + ManifestFile::kPartitionSpecId, + ManifestFile::kContent.AsRequired(), + ManifestFile::kSequenceNumber.AsRequired(), + ManifestFile::kMinSequenceNumber.AsRequired(), + ManifestFile::kAddedSnapshotId, + ManifestFile::kAddedFilesCount.AsRequired(), + ManifestFile::kExistingFilesCount.AsRequired(), + ManifestFile::kDeletedFilesCount.AsRequired(), + ManifestFile::kAddedRowsCount.AsRequired(), + ManifestFile::kExistingRowsCount.AsRequired(), + ManifestFile::kDeletedRowsCount.AsRequired(), + ManifestFile::kPartitions, + ManifestFile::kKeyMetadata, + }); + Status ManifestFileAdapterV2::Init() { - static std::unordered_set kManifestFileFieldIds{ - ManifestFile::kManifestPath.field_id(), - ManifestFile::kManifestLength.field_id(), - ManifestFile::kPartitionSpecId.field_id(), - ManifestFile::kContent.field_id(), - ManifestFile::kSequenceNumber.field_id(), - ManifestFile::kMinSequenceNumber.field_id(), - ManifestFile::kAddedSnapshotId.field_id(), - ManifestFile::kAddedFilesCount.field_id(), - ManifestFile::kExistingFilesCount.field_id(), - ManifestFile::kDeletedFilesCount.field_id(), - ManifestFile::kAddedRowsCount.field_id(), - ManifestFile::kExistingRowsCount.field_id(), - ManifestFile::kDeletedRowsCount.field_id(), - ManifestFile::kPartitions.field_id(), - ManifestFile::kKeyMetadata.field_id(), - }; metadata_["snapshot-id"] = std::to_string(snapshot_id_); metadata_["parent-snapshot-id"] = parent_snapshot_id_.has_value() ? std::to_string(parent_snapshot_id_.value()) : "null"; metadata_["sequence-number"] = std::to_string(sequence_number_); metadata_["format-version"] = "2"; - return InitSchema(kManifestFileFieldIds); + + manifest_list_schema_ = kManifestListSchema; + return ToArrowSchema(*manifest_list_schema_, &schema_); } Status ManifestFileAdapterV2::Append(const ManifestFile& file) { @@ -132,9 +161,12 @@ Status ManifestFileAdapterV2::Append(const ManifestFile& file) { Result ManifestFileAdapterV2::GetSequenceNumber(const ManifestFile& file) const { if (file.sequence_number == TableMetadata::kInvalidSequenceNumber) { + // if the sequence number is being assigned here, then the manifest must be created by + // the current operation. to validate this, check that the snapshot id matches the + // current commit if (snapshot_id_ != file.added_snapshot_id) { return InvalidManifestList( - "Found unassigned sequence number for a manifest from snapshot: %s", + "Found unassigned sequence number for a manifest from snapshot: {}", file.added_snapshot_id); } return sequence_number_; @@ -145,11 +177,15 @@ Result ManifestFileAdapterV2::GetSequenceNumber(const ManifestFile& fil Result ManifestFileAdapterV2::GetMinSequenceNumber( const ManifestFile& file) const { if (file.min_sequence_number == TableMetadata::kInvalidSequenceNumber) { + // same sanity check as above if (snapshot_id_ != file.added_snapshot_id) { return InvalidManifestList( - "Found unassigned sequence number for a manifest from snapshot: %s", + "Found unassigned sequence number for a manifest from snapshot: {}", file.added_snapshot_id); } + // if the min sequence number is not determined, then there was no assigned sequence + // number for any file written to the wrapped manifest. replace the unassigned + // sequence number with the one for this commit return sequence_number_; } return file.min_sequence_number; diff --git a/src/iceberg/v2_metadata.h b/src/iceberg/v2_metadata.h index 164a497a8..1b2b43ac4 100644 --- a/src/iceberg/v2_metadata.h +++ b/src/iceberg/v2_metadata.h @@ -29,11 +29,16 @@ namespace iceberg { class ManifestEntryAdapterV2 : public ManifestEntryAdapter { public: ManifestEntryAdapterV2(std::optional snapshot_id, - std::shared_ptr partition_spec) - : ManifestEntryAdapter(std::move(partition_spec)), snapshot_id_(snapshot_id) {} + std::shared_ptr partition_spec, + ManifestContent content); Status Init() override; Status Append(const ManifestEntry& entry) override; + static std::shared_ptr EntrySchema(std::shared_ptr partition_type); + static std::shared_ptr WrapFileSchema(std::shared_ptr file_schema); + static std::shared_ptr DataFileType( + std::shared_ptr partition_type); + protected: Result> GetSequenceNumber( const ManifestEntry& entry) const override; @@ -55,6 +60,8 @@ class ManifestFileAdapterV2 : public ManifestFileAdapter { Status Init() override; Status Append(const ManifestFile& file) override; + static const std::shared_ptr kManifestListSchema; + protected: Result GetSequenceNumber(const ManifestFile& file) const override; Result GetMinSequenceNumber(const ManifestFile& file) const override; diff --git a/src/iceberg/v3_metadata.cc b/src/iceberg/v3_metadata.cc index 61474f697..3b033035a 100644 --- a/src/iceberg/v3_metadata.cc +++ b/src/iceberg/v3_metadata.cc @@ -23,47 +23,75 @@ #include "iceberg/manifest_entry.h" #include "iceberg/manifest_list.h" #include "iceberg/schema.h" +#include "iceberg/schema_internal.h" #include "iceberg/util/macros.h" namespace iceberg { +ManifestEntryAdapterV3::ManifestEntryAdapterV3( + std::optional snapshot_id, std::optional first_row_id, + std::shared_ptr partition_spec, ManifestContent content) + : ManifestEntryAdapter(std::move(partition_spec), content), + snapshot_id_(snapshot_id), + first_row_id_(first_row_id) {} + +std::shared_ptr ManifestEntryAdapterV3::EntrySchema( + std::shared_ptr partition_type) { + return WrapFileSchema(DataFileType(std::move(partition_type))); +} + +std::shared_ptr ManifestEntryAdapterV3::WrapFileSchema( + std::shared_ptr file_schema) { + return std::make_shared(std::vector{ + ManifestEntry::kStatus, + ManifestEntry::kSnapshotId, + ManifestEntry::kSequenceNumber, + ManifestEntry::kFileSequenceNumber, + SchemaField::MakeRequired(ManifestEntry::kDataFileFieldId, + ManifestEntry::kDataFileField, std::move(file_schema)), + }); +} + +std::shared_ptr ManifestEntryAdapterV3::DataFileType( + std::shared_ptr partition_type) { + return std::make_shared(std::vector{ + DataFile::kContent.AsRequired(), + DataFile::kFilePath, + DataFile::kFileFormat, + SchemaField::MakeRequired(DataFile::kPartitionFieldId, DataFile::kPartitionField, + std::move(partition_type), DataFile::kPartitionDoc), + DataFile::kRecordCount, + DataFile::kFileSize, + DataFile::kColumnSizes, + DataFile::kValueCounts, + DataFile::kNullValueCounts, + DataFile::kNanValueCounts, + DataFile::kLowerBounds, + DataFile::kUpperBounds, + DataFile::kKeyMetadata, + DataFile::kSplitOffsets, + DataFile::kEqualityIds, + DataFile::kSortOrderId, + DataFile::kFirstRowId, + DataFile::kReferencedDataFile, + DataFile::kContentOffset, + DataFile::kContentSize, + }); +} + Status ManifestEntryAdapterV3::Init() { - static std::unordered_set kManifestEntryFieldIds{ - ManifestEntry::kStatus.field_id(), - ManifestEntry::kSnapshotId.field_id(), - ManifestEntry::kDataFileFieldId, - ManifestEntry::kSequenceNumber.field_id(), - ManifestEntry::kFileSequenceNumber.field_id(), - DataFile::kContent.field_id(), - DataFile::kFilePath.field_id(), - DataFile::kFileFormat.field_id(), - DataFile::kPartitionFieldId, - DataFile::kRecordCount.field_id(), - DataFile::kFileSize.field_id(), - DataFile::kColumnSizes.field_id(), - DataFile::kValueCounts.field_id(), - DataFile::kNullValueCounts.field_id(), - DataFile::kNanValueCounts.field_id(), - DataFile::kLowerBounds.field_id(), - DataFile::kUpperBounds.field_id(), - DataFile::kKeyMetadata.field_id(), - DataFile::kSplitOffsets.field_id(), - DataFile::kEqualityIds.field_id(), - DataFile::kSortOrderId.field_id(), - DataFile::kFirstRowId.field_id(), - DataFile::kReferencedDataFile.field_id(), - DataFile::kContentOffset.field_id(), - DataFile::kContentSize.field_id(), - }; - ICEBERG_RETURN_UNEXPECTED(InitSchema(kManifestEntryFieldIds)); - ICEBERG_ASSIGN_OR_RAISE(metadata_["schema"], ToJsonString(*manifest_schema_)) - if (partition_spec_ != nullptr) { - ICEBERG_ASSIGN_OR_RAISE(metadata_["partition-spec"], ToJsonString(*partition_spec_)); - metadata_["partition-spec-id"] = std::to_string(partition_spec_->spec_id()); - } + // ICEBERG_ASSIGN_OR_RAISE(metadata_["schema"], ToJsonString(*manifest_schema_)) + ICEBERG_ASSIGN_OR_RAISE(metadata_["partition-spec"], ToJsonString(*partition_spec_)); + metadata_["partition-spec-id"] = std::to_string(partition_spec_->spec_id()); metadata_["format-version"] = "3"; - metadata_["content"] = "data"; - return {}; + metadata_["content"] = content_ == ManifestContent::kData ? "data" : "delete"; + + ICEBERG_ASSIGN_OR_RAISE(auto partition_type, partition_spec_->PartitionType()); + if (!partition_type) { + partition_type = std::make_shared(std::vector{}); + } + manifest_schema_ = EntrySchema(std::move(partition_type)); + return ToArrowSchema(*manifest_schema_, &schema_); } Status ManifestEntryAdapterV3::Append(const ManifestEntry& entry) { @@ -126,25 +154,26 @@ Result> ManifestEntryAdapterV3::GetContentSizeInBytes( return std::nullopt; } +const std::shared_ptr ManifestFileAdapterV3::kManifestListSchema = + std::make_shared(std::vector{ + ManifestFile::kManifestPath, + ManifestFile::kManifestLength, + ManifestFile::kPartitionSpecId, + ManifestFile::kContent.AsRequired(), + ManifestFile::kSequenceNumber.AsRequired(), + ManifestFile::kMinSequenceNumber.AsRequired(), + ManifestFile::kAddedSnapshotId, + ManifestFile::kAddedFilesCount.AsRequired(), + ManifestFile::kExistingFilesCount.AsRequired(), + ManifestFile::kDeletedFilesCount.AsRequired(), + ManifestFile::kAddedRowsCount.AsRequired(), + ManifestFile::kExistingRowsCount.AsRequired(), + ManifestFile::kDeletedRowsCount.AsRequired(), + ManifestFile::kPartitions, + ManifestFile::kKeyMetadata, + ManifestFile::kFirstRowId, + }); Status ManifestFileAdapterV3::Init() { - static std::unordered_set kManifestFileFieldIds{ - ManifestFile::kManifestPath.field_id(), - ManifestFile::kManifestLength.field_id(), - ManifestFile::kPartitionSpecId.field_id(), - ManifestFile::kContent.field_id(), - ManifestFile::kSequenceNumber.field_id(), - ManifestFile::kMinSequenceNumber.field_id(), - ManifestFile::kAddedSnapshotId.field_id(), - ManifestFile::kAddedFilesCount.field_id(), - ManifestFile::kExistingFilesCount.field_id(), - ManifestFile::kDeletedFilesCount.field_id(), - ManifestFile::kAddedRowsCount.field_id(), - ManifestFile::kExistingRowsCount.field_id(), - ManifestFile::kDeletedRowsCount.field_id(), - ManifestFile::kPartitions.field_id(), - ManifestFile::kKeyMetadata.field_id(), - ManifestFile::kFirstRowId.field_id(), - }; metadata_["snapshot-id"] = std::to_string(snapshot_id_); metadata_["parent-snapshot-id"] = parent_snapshot_id_.has_value() ? std::to_string(parent_snapshot_id_.value()) @@ -153,24 +182,28 @@ Status ManifestFileAdapterV3::Init() { metadata_["first-row-id"] = next_row_id_.has_value() ? std::to_string(next_row_id_.value()) : "null"; metadata_["format-version"] = "3"; - return InitSchema(kManifestFileFieldIds); + + manifest_list_schema_ = kManifestListSchema; + return ToArrowSchema(*manifest_list_schema_, &schema_); } Status ManifestFileAdapterV3::Append(const ManifestFile& file) { - auto status = AppendInternal(file); - ICEBERG_RETURN_UNEXPECTED(status); - if (WrappedFirstRowId(file) && next_row_id_.has_value()) { + ICEBERG_RETURN_UNEXPECTED(AppendInternal(file)); + if (WrapFirstRowId(file) && next_row_id_.has_value()) { next_row_id_ = next_row_id_.value() + file.existing_rows_count.value_or(0) + file.added_rows_count.value_or(0); } - return status; + return {}; } Result ManifestFileAdapterV3::GetSequenceNumber(const ManifestFile& file) const { if (file.sequence_number == TableMetadata::kInvalidSequenceNumber) { + // if the sequence number is being assigned here, then the manifest must be created by + // the current operation. to validate this, check that the snapshot id matches the + // current commit if (snapshot_id_ != file.added_snapshot_id) { return InvalidManifestList( - "Found unassigned sequence number for a manifest from snapshot: %s", + "Found unassigned sequence number for a manifest from snapshot: {}", file.added_snapshot_id); } return sequence_number_; @@ -181,11 +214,15 @@ Result ManifestFileAdapterV3::GetSequenceNumber(const ManifestFile& fil Result ManifestFileAdapterV3::GetMinSequenceNumber( const ManifestFile& file) const { if (file.min_sequence_number == TableMetadata::kInvalidSequenceNumber) { + // same sanity check as above if (snapshot_id_ != file.added_snapshot_id) { return InvalidManifestList( - "Found unassigned sequence number for a manifest from snapshot: %s", + "Found unassigned sequence number for a manifest from snapshot: {}", file.added_snapshot_id); } + // if the min sequence number is not determined, then there was no assigned sequence + // number for any file written to the wrapped manifest. replace the unassigned + // sequence number with the one for this commit return sequence_number_; } return file.min_sequence_number; @@ -193,20 +230,26 @@ Result ManifestFileAdapterV3::GetMinSequenceNumber( Result> ManifestFileAdapterV3::GetFirstRowId( const ManifestFile& file) const { - if (WrappedFirstRowId(file)) { + if (WrapFirstRowId(file)) { + // if first-row-id is assigned, ensure that it is valid + if (!next_row_id_.has_value()) { + // TODO(gangwu): add ToString for ManifestFile + return InvalidManifestList("Found invalid first-row-id assignment: {}", + file.manifest_path); + } return next_row_id_; } else if (file.content != ManifestFile::Content::kData) { return std::nullopt; } else { if (!file.first_row_id.has_value()) { - return InvalidManifestList("Found unassigned first-row-id for file:{}", + return InvalidManifestList("Found unassigned first-row-id for file: {}", file.manifest_path); } - return file.first_row_id.value(); + return file.first_row_id; } } -bool ManifestFileAdapterV3::WrappedFirstRowId(const ManifestFile& file) const { +bool ManifestFileAdapterV3::WrapFirstRowId(const ManifestFile& file) const { return file.content == ManifestFile::Content::kData && !file.first_row_id.has_value(); } diff --git a/src/iceberg/v3_metadata.h b/src/iceberg/v3_metadata.h index a1076105b..421b9c0fe 100644 --- a/src/iceberg/v3_metadata.h +++ b/src/iceberg/v3_metadata.h @@ -30,13 +30,16 @@ class ManifestEntryAdapterV3 : public ManifestEntryAdapter { public: ManifestEntryAdapterV3(std::optional snapshot_id, std::optional first_row_id, - std::shared_ptr partition_spec) - : ManifestEntryAdapter(std::move(partition_spec)), - snapshot_id_(snapshot_id), - first_row_id_(first_row_id) {} + std::shared_ptr partition_spec, + ManifestContent content); Status Init() override; Status Append(const ManifestEntry& entry) override; + static std::shared_ptr EntrySchema(std::shared_ptr partition_type); + static std::shared_ptr WrapFileSchema(std::shared_ptr file_schema); + static std::shared_ptr DataFileType( + std::shared_ptr partition_type); + protected: Result> GetSequenceNumber( const ManifestEntry& entry) const override; @@ -56,13 +59,16 @@ class ManifestEntryAdapterV3 : public ManifestEntryAdapter { class ManifestFileAdapterV3 : public ManifestFileAdapter { public: ManifestFileAdapterV3(int64_t snapshot_id, std::optional parent_snapshot_id, - int64_t sequence_number, std::optional first_row_id) + int64_t sequence_number, int64_t first_row_id) : snapshot_id_(snapshot_id), parent_snapshot_id_(parent_snapshot_id), sequence_number_(sequence_number), next_row_id_(first_row_id) {} Status Init() override; Status Append(const ManifestFile& file) override; + std::optional next_row_id() const override { return next_row_id_; } + + static const std::shared_ptr kManifestListSchema; protected: Result GetSequenceNumber(const ManifestFile& file) const override; @@ -70,7 +76,7 @@ class ManifestFileAdapterV3 : public ManifestFileAdapter { Result> GetFirstRowId(const ManifestFile& file) const override; private: - bool WrappedFirstRowId(const ManifestFile& file) const; + bool WrapFirstRowId(const ManifestFile& file) const; private: int64_t snapshot_id_;