diff --git a/src/iceberg/json_internal.cc b/src/iceberg/json_internal.cc index 1bad8cc4b..5f45cd938 100644 --- a/src/iceberg/json_internal.cc +++ b/src/iceberg/json_internal.cc @@ -529,7 +529,9 @@ Result> PartitionSpecFromJson( ICEBERG_ASSIGN_OR_RAISE(auto partition_field, PartitionFieldFromJson(field_json)); partition_fields.push_back(std::move(*partition_field)); } - return std::make_unique(schema, spec_id, std::move(partition_fields)); + // TODO(Li Feiyang):use a new PartitionSpec::Make to find the source field of each + // partition field from schema and then verify it + return std::make_unique(spec_id, std::move(partition_fields)); } Result> SnapshotRefFromJson(const nlohmann::json& json) { @@ -902,8 +904,10 @@ Status ParsePartitionSpecs(const nlohmann::json& json, int8_t format_version, std::move(field->transform())); } - auto spec = std::make_unique( - current_schema, PartitionSpec::kInitialSpecId, std::move(fields)); + // TODO(Li Feiyang):use a new PartitionSpec::Make to find the source field of each + // partition field from schema and then verify it + auto spec = + std::make_unique(PartitionSpec::kInitialSpecId, std::move(fields)); default_spec_id = spec->spec_id(); partition_specs.push_back(std::move(spec)); } diff --git a/src/iceberg/json_internal.h b/src/iceberg/json_internal.h index 894bc6eb3..d4741e3b8 100644 --- a/src/iceberg/json_internal.h +++ b/src/iceberg/json_internal.h @@ -173,6 +173,7 @@ ICEBERG_EXPORT Result ToJsonString(const PartitionSpec& partition_s /// objects. Each `PartitionField` will be parsed using the `PartitionFieldFromJson` /// function. /// +/// \param schema The current schema. /// \param json The JSON object representing a `PartitionSpec`. /// \return An `expected` value containing either a `PartitionSpec` object or an error. If /// the JSON is malformed or missing expected fields, an error will be returned. diff --git a/src/iceberg/manifest_adapter.cc b/src/iceberg/manifest_adapter.cc index c0e267de4..76ed0eca9 100644 --- a/src/iceberg/manifest_adapter.cc +++ b/src/iceberg/manifest_adapter.cc @@ -19,6 +19,8 @@ #include "iceberg/manifest_adapter.h" +#include + #include #include "iceberg/arrow/nanoarrow_status_internal.h" @@ -140,8 +142,11 @@ Result ManifestAdapter::FinishAppending() { } ManifestEntryAdapter::ManifestEntryAdapter(std::shared_ptr partition_spec, + std::shared_ptr current_schema, ManifestContent content) - : partition_spec_(std::move(partition_spec)), content_(content) { + : partition_spec_(std::move(partition_spec)), + current_schema_(std::move(current_schema)), + content_(content) { if (!partition_spec_) { partition_spec_ = PartitionSpec::Unpartitioned(); } diff --git a/src/iceberg/manifest_adapter.h b/src/iceberg/manifest_adapter.h index 87505068d..269a73bb6 100644 --- a/src/iceberg/manifest_adapter.h +++ b/src/iceberg/manifest_adapter.h @@ -62,7 +62,7 @@ class ICEBERG_EXPORT ManifestAdapter { class ICEBERG_EXPORT ManifestEntryAdapter : public ManifestAdapter { public: ManifestEntryAdapter(std::shared_ptr partition_spec, - ManifestContent content); + std::shared_ptr current_schema, ManifestContent content); ~ManifestEntryAdapter() override; @@ -92,6 +92,7 @@ class ICEBERG_EXPORT ManifestEntryAdapter : public ManifestAdapter { protected: std::shared_ptr partition_spec_; + std::shared_ptr current_schema_; std::shared_ptr manifest_schema_; const ManifestContent content_; }; diff --git a/src/iceberg/manifest_writer.cc b/src/iceberg/manifest_writer.cc index 8b295e12d..9c2be5615 100644 --- a/src/iceberg/manifest_writer.cc +++ b/src/iceberg/manifest_writer.cc @@ -70,9 +70,23 @@ Result> OpenFileWriter( Result> ManifestWriter::MakeV1Writer( std::optional snapshot_id, std::string_view manifest_location, - std::shared_ptr file_io, std::shared_ptr partition_spec) { - auto adapter = - std::make_unique(snapshot_id, std::move(partition_spec)); + std::shared_ptr file_io, std::shared_ptr partition_spec, + std::shared_ptr current_schema) { + if (manifest_location.empty()) { + return InvalidArgument("Manifest location cannot be empty"); + } + if (!file_io) { + return InvalidArgument("FileIO cannot be null"); + } + if (!partition_spec) { + return InvalidArgument("PartitionSpec cannot be null"); + } + if (!current_schema) { + return InvalidArgument("Current schema cannot be null"); + } + + auto adapter = std::make_unique( + snapshot_id, std::move(partition_spec), std::move(current_schema)); ICEBERG_RETURN_UNEXPECTED(adapter->Init()); ICEBERG_RETURN_UNEXPECTED(adapter->StartAppending()); @@ -86,9 +100,21 @@ Result> ManifestWriter::MakeV1Writer( Result> ManifestWriter::MakeV2Writer( std::optional snapshot_id, std::string_view manifest_location, std::shared_ptr file_io, std::shared_ptr partition_spec, - ManifestContent content) { + std::shared_ptr current_schema, ManifestContent content) { + if (manifest_location.empty()) { + return InvalidArgument("Manifest location cannot be empty"); + } + if (!file_io) { + return InvalidArgument("FileIO cannot be null"); + } + if (!partition_spec) { + return InvalidArgument("PartitionSpec cannot be null"); + } + if (!current_schema) { + return InvalidArgument("Current schema cannot be null"); + } auto adapter = std::make_unique( - snapshot_id, std::move(partition_spec), content); + snapshot_id, std::move(partition_spec), std::move(current_schema), content); ICEBERG_RETURN_UNEXPECTED(adapter->Init()); ICEBERG_RETURN_UNEXPECTED(adapter->StartAppending()); @@ -102,9 +128,23 @@ Result> ManifestWriter::MakeV2Writer( Result> ManifestWriter::MakeV3Writer( std::optional snapshot_id, std::optional first_row_id, std::string_view manifest_location, std::shared_ptr file_io, - std::shared_ptr partition_spec, ManifestContent content) { + std::shared_ptr partition_spec, std::shared_ptr current_schema, + ManifestContent content) { + if (manifest_location.empty()) { + return InvalidArgument("Manifest location cannot be empty"); + } + if (!file_io) { + return InvalidArgument("FileIO cannot be null"); + } + if (!partition_spec) { + return InvalidArgument("PartitionSpec cannot be null"); + } + if (!current_schema) { + return InvalidArgument("Current schema cannot be null"); + } auto adapter = std::make_unique( - snapshot_id, first_row_id, std::move(partition_spec), content); + snapshot_id, first_row_id, std::move(partition_spec), std::move(current_schema), + content); ICEBERG_RETURN_UNEXPECTED(adapter->Init()); ICEBERG_RETURN_UNEXPECTED(adapter->StartAppending()); diff --git a/src/iceberg/manifest_writer.h b/src/iceberg/manifest_writer.h index be6c83107..37a2c5a11 100644 --- a/src/iceberg/manifest_writer.h +++ b/src/iceberg/manifest_writer.h @@ -62,22 +62,26 @@ class ICEBERG_EXPORT ManifestWriter { /// \param manifest_location Path to the manifest file. /// \param file_io File IO implementation to use. /// \param partition_spec Partition spec for the manifest. + /// \param current_schema Current table schema. /// \return A Result containing the writer or an error. static Result> MakeV1Writer( std::optional snapshot_id, std::string_view manifest_location, - std::shared_ptr file_io, std::shared_ptr partition_spec); + std::shared_ptr file_io, std::shared_ptr partition_spec, + std::shared_ptr current_schema); /// \brief Creates a writer for a manifest file. /// \param snapshot_id ID of the snapshot. /// \param manifest_location Path to the manifest file. /// \param file_io File IO implementation to use. /// \param partition_spec Partition spec for the manifest. + /// \param current_schema Schema containing the source fields referenced by partition + /// spec. /// \param content Content of the manifest. /// \return A Result containing the writer or an error. static Result> MakeV2Writer( std::optional snapshot_id, std::string_view manifest_location, std::shared_ptr file_io, std::shared_ptr partition_spec, - ManifestContent content); + std::shared_ptr current_schema, ManifestContent content); /// \brief Creates a writer for a manifest file. /// \param snapshot_id ID of the snapshot. @@ -85,12 +89,15 @@ class ICEBERG_EXPORT ManifestWriter { /// \param manifest_location Path to the manifest file. /// \param file_io File IO implementation to use. /// \param partition_spec Partition spec for the manifest. + /// \param current_schema Schema containing the source fields referenced by partition + /// spec. /// \param content Content of the manifest. /// \return A Result containing the writer or an error. static Result> MakeV3Writer( std::optional snapshot_id, std::optional first_row_id, std::string_view manifest_location, std::shared_ptr file_io, - std::shared_ptr partition_spec, ManifestContent content); + std::shared_ptr partition_spec, + std::shared_ptr current_schema, ManifestContent content); private: static constexpr int64_t kBatchSize = 1024; diff --git a/src/iceberg/partition_spec.cc b/src/iceberg/partition_spec.cc index 3fa5d86eb..f0a211443 100644 --- a/src/iceberg/partition_spec.cc +++ b/src/iceberg/partition_spec.cc @@ -21,6 +21,7 @@ #include #include +#include #include #include "iceberg/schema.h" @@ -31,10 +32,9 @@ namespace iceberg { -PartitionSpec::PartitionSpec(std::shared_ptr schema, int32_t spec_id, - std::vector fields, +PartitionSpec::PartitionSpec(int32_t spec_id, std::vector fields, std::optional last_assigned_field_id) - : schema_(std::move(schema)), spec_id_(spec_id), fields_(std::move(fields)) { + : spec_id_(spec_id), fields_(std::move(fields)) { if (last_assigned_field_id) { last_assigned_field_id_ = last_assigned_field_id.value(); } else if (fields_.empty()) { @@ -48,34 +48,25 @@ PartitionSpec::PartitionSpec(std::shared_ptr schema, int32_t spec_id, const std::shared_ptr& PartitionSpec::Unpartitioned() { static const std::shared_ptr unpartitioned = - std::make_shared( - /*schema=*/std::make_shared(std::vector{}), kInitialSpecId, - std::vector{}, kLegacyPartitionDataIdStart - 1); + std::make_shared(kInitialSpecId, std::vector{}, + kLegacyPartitionDataIdStart - 1); return unpartitioned; } -const std::shared_ptr& PartitionSpec::schema() const { return schema_; } - int32_t PartitionSpec::spec_id() const { return spec_id_; } std::span PartitionSpec::fields() const { return fields_; } -Result> PartitionSpec::PartitionType() { +Result> PartitionSpec::PartitionType(const Schema& schema) { if (fields_.empty()) { - return nullptr; - } - { - std::scoped_lock lock(mutex_); - if (partition_type_ != nullptr) { - return partition_type_; - } + return std::make_unique(std::vector{}); } std::vector partition_fields; for (const auto& partition_field : fields_) { // Get the source field from the original schema by source_id ICEBERG_ASSIGN_OR_RAISE(auto source_field, - schema_->FindFieldById(partition_field.source_id())); + schema.FindFieldById(partition_field.source_id())); if (!source_field.has_value()) { // TODO(xiao.dong) when source field is missing, // should return an error or just use UNKNOWN type @@ -97,11 +88,7 @@ Result> PartitionSpec::PartitionType() { /*optional=*/true); } - std::scoped_lock lock(mutex_); - if (partition_type_ == nullptr) { - partition_type_ = std::make_shared(std::move(partition_fields)); - } - return partition_type_; + return std::make_unique(std::move(partition_fields)); } std::string PartitionSpec::ToString() const { diff --git a/src/iceberg/partition_spec.h b/src/iceberg/partition_spec.h index 88a081bf7..d1dd3a0fb 100644 --- a/src/iceberg/partition_spec.h +++ b/src/iceberg/partition_spec.h @@ -23,7 +23,7 @@ /// Partition specs for Iceberg tables. #include -#include +#include #include #include #include @@ -32,6 +32,7 @@ #include "iceberg/iceberg_export.h" #include "iceberg/partition_field.h" #include "iceberg/result.h" +#include "iceberg/type_fwd.h" #include "iceberg/util/formattable.h" namespace iceberg { @@ -56,24 +57,20 @@ class ICEBERG_EXPORT PartitionSpec : public util::Formattable { /// \param fields The partition fields. /// \param last_assigned_field_id The last assigned field ID. If not provided, it will /// be calculated from the fields. - PartitionSpec(std::shared_ptr schema, int32_t spec_id, - std::vector fields, + PartitionSpec(int32_t spec_id, std::vector fields, std::optional last_assigned_field_id = std::nullopt); /// \brief Get an unsorted partition spec singleton. static const std::shared_ptr& Unpartitioned(); - /// \brief Get the table schema - const std::shared_ptr& schema() const; - /// \brief Get the spec ID. int32_t spec_id() const; /// \brief Get a list view of the partition fields. std::span fields() const; - /// \brief Get the partition type. - Result> PartitionType(); + /// \brief Get the partition type binding to the input schema. + Result> PartitionType(const Schema&); std::string ToString() const override; @@ -87,14 +84,9 @@ class ICEBERG_EXPORT PartitionSpec : public util::Formattable { /// \brief Compare two partition specs for equality. bool Equals(const PartitionSpec& other) const; - std::shared_ptr schema_; const int32_t spec_id_; std::vector fields_; int32_t last_assigned_field_id_; - - // FIXME: use similar lazy initialization pattern as in StructType - std::mutex mutex_; - std::shared_ptr partition_type_; }; } // namespace iceberg diff --git a/src/iceberg/table_scan.cc b/src/iceberg/table_scan.cc index 0ec2f0539..bc52a1fad 100644 --- a/src/iceberg/table_scan.cc +++ b/src/iceberg/table_scan.cc @@ -31,6 +31,7 @@ #include "iceberg/schema_field.h" #include "iceberg/snapshot.h" #include "iceberg/table_metadata.h" +#include "iceberg/type.h" #include "iceberg/util/macros.h" namespace iceberg { @@ -269,7 +270,11 @@ Result>> DataTableScan::PlanFiles() co std::vector> tasks; ICEBERG_ASSIGN_OR_RAISE(auto partition_spec, context_.table_metadata->PartitionSpec()); - auto partition_schema = partition_spec->schema(); + + // Get the table schema and partition type + ICEBERG_ASSIGN_OR_RAISE(auto current_schema, context_.table_metadata->Schema()); + ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr partition_schema, + partition_spec->PartitionType(*current_schema)); for (const auto& manifest_file : manifest_files) { ICEBERG_ASSIGN_OR_RAISE( diff --git a/src/iceberg/test/json_internal_test.cc b/src/iceberg/test/json_internal_test.cc index 50f1e9175..656aa7d7f 100644 --- a/src/iceberg/test/json_internal_test.cc +++ b/src/iceberg/test/json_internal_test.cc @@ -144,11 +144,9 @@ TEST(JsonPartitionTest, PartitionSpec) { std::vector{SchemaField(3, "region", iceberg::string(), false), SchemaField(5, "ts", iceberg::int64(), false)}, /*schema_id=*/100); - auto identity_transform = Transform::Identity(); - PartitionSpec spec(schema, 1, - {PartitionField(3, 101, "region", identity_transform), - PartitionField(5, 102, "ts", identity_transform)}); + PartitionSpec spec(1, {PartitionField(3, 101, "region", identity_transform), + PartitionField(5, 102, "ts", identity_transform)}); auto json = ToJson(spec); nlohmann::json expected_json = R"({"spec-id": 1, "fields": [ diff --git a/src/iceberg/test/manifest_reader_writer_test.cc b/src/iceberg/test/manifest_reader_writer_test.cc index d625d5cf9..dbcd8bfc7 100644 --- a/src/iceberg/test/manifest_reader_writer_test.cc +++ b/src/iceberg/test/manifest_reader_writer_test.cc @@ -28,6 +28,7 @@ #include "iceberg/manifest_list.h" #include "iceberg/manifest_reader.h" #include "iceberg/manifest_writer.h" +#include "iceberg/partition_spec.h" #include "iceberg/schema.h" #include "iceberg/test/matchers.h" #include "iceberg/test/temp_file_test_base.h" @@ -152,9 +153,11 @@ class ManifestReaderV1Test : public ManifestReaderTestBase { void TestWriteManifest(const std::string& manifest_list_path, std::shared_ptr partition_spec, - const std::vector& manifest_entries) { - auto result = ManifestWriter::MakeV1Writer(1, manifest_list_path, file_io_, - std::move(partition_spec)); + const std::vector& manifest_entries, + std::shared_ptr table_schema) { + auto result = + ManifestWriter::MakeV1Writer(1, manifest_list_path, file_io_, + std::move(partition_spec), std::move(table_schema)); ASSERT_TRUE(result.has_value()) << result.error().message; auto writer = std::move(result.value()); auto status = writer->AddAll(manifest_entries); @@ -183,11 +186,11 @@ TEST_F(ManifestReaderV1Test, WritePartitionedTest) { auto identity_transform = Transform::Identity(); std::vector fields{ PartitionField(1, 1000, "order_ts_hour", identity_transform)}; - auto partition_spec = std::make_shared(table_schema, 1, fields); + auto partition_spec = std::make_shared(1, fields); auto expected_entries = PreparePartitionedTestData(); auto write_manifest_path = CreateNewTempFilePath(); - TestWriteManifest(write_manifest_path, partition_spec, expected_entries); + TestWriteManifest(write_manifest_path, partition_spec, expected_entries, table_schema); TestManifestReadingByPath(write_manifest_path, expected_entries, partition_schema); } @@ -259,10 +262,11 @@ class ManifestReaderV2Test : public ManifestReaderTestBase { void TestWriteManifest(int64_t snapshot_id, const std::string& manifest_list_path, std::shared_ptr partition_spec, - const std::vector& manifest_entries) { - auto result = - ManifestWriter::MakeV2Writer(snapshot_id, manifest_list_path, file_io_, - std::move(partition_spec), ManifestContent::kData); + const std::vector& manifest_entries, + std::shared_ptr table_schema) { + auto result = ManifestWriter::MakeV2Writer( + snapshot_id, manifest_list_path, file_io_, std::move(partition_spec), + std::move(table_schema), ManifestContent::kData); ASSERT_TRUE(result.has_value()) << result.error().message; auto writer = std::move(result.value()); auto status = writer->AddAll(manifest_entries); @@ -292,16 +296,24 @@ TEST_F(ManifestReaderV2Test, MetadataInheritanceTest) { } TEST_F(ManifestReaderV2Test, WriteNonPartitionedTest) { + iceberg::SchemaField table_field(1, "order_ts_hour_source", int32(), true); + iceberg::SchemaField partition_field(1000, "order_ts_hour", int32(), true); + auto table_schema = std::make_shared(std::vector({table_field})); auto expected_entries = PrepareNonPartitionedTestData(); auto write_manifest_path = CreateNewTempFilePath(); - TestWriteManifest(679879563479918846LL, write_manifest_path, nullptr, expected_entries); + TestWriteManifest(679879563479918846LL, write_manifest_path, + PartitionSpec::Unpartitioned(), expected_entries, table_schema); TestManifestReadingByPath(write_manifest_path, expected_entries); } TEST_F(ManifestReaderV2Test, WriteInheritancePartitionedTest) { + iceberg::SchemaField table_field(1, "order_ts_hour_source", int32(), true); + iceberg::SchemaField partition_field(1000, "order_ts_hour", int32(), true); + auto table_schema = std::make_shared(std::vector({table_field})); auto expected_entries = PrepareMetadataInheritanceTestData(); auto write_manifest_path = CreateNewTempFilePath(); - TestWriteManifest(679879563479918846LL, write_manifest_path, nullptr, expected_entries); + TestWriteManifest(679879563479918846LL, write_manifest_path, + PartitionSpec::Unpartitioned(), expected_entries, table_schema); ManifestFile manifest_file{ .manifest_path = write_manifest_path, .manifest_length = 100, diff --git a/src/iceberg/test/metadata_serde_test.cc b/src/iceberg/test/metadata_serde_test.cc index 9c1dd4bcc..9dac1e324 100644 --- a/src/iceberg/test/metadata_serde_test.cc +++ b/src/iceberg/test/metadata_serde_test.cc @@ -100,7 +100,7 @@ TEST(MetadataSerdeTest, DeserializeV1Valid) { /*schema_id=*/std::nullopt); auto expected_spec = std::make_shared( - expected_schema, /*spec_id=*/0, + /*spec_id=*/0, std::vector{PartitionField(/*source_id=*/1, /*field_id=*/1000, "x", Transform::Identity())}); @@ -144,7 +144,7 @@ TEST(MetadataSerdeTest, DeserializeV2Valid) { /*schema_id=*/1); auto expected_spec = std::make_shared( - expected_schema_2, /*spec_id=*/0, + /*spec_id=*/0, std::vector{PartitionField(/*source_id=*/1, /*field_id=*/1000, "x", Transform::Identity())}); @@ -224,7 +224,7 @@ TEST(MetadataSerdeTest, DeserializeV2ValidMinimal) { /*schema_id=*/0); auto expected_spec = std::make_shared( - expected_schema, /*spec_id=*/0, + /*spec_id=*/0, std::vector{PartitionField(/*source_id=*/1, /*field_id=*/1000, "x", Transform::Identity())}); @@ -271,8 +271,8 @@ TEST(MetadataSerdeTest, DeserializeStatisticsFiles) { /*optional=*/false)}, /*schema_id=*/0); - auto expected_spec = std::make_shared(expected_schema, /*spec_id=*/0, - std::vector{}); + auto expected_spec = + std::make_shared(/*spec_id=*/0, std::vector{}); auto expected_snapshot = std::make_shared(Snapshot{ .snapshot_id = 3055729675574597004, diff --git a/src/iceberg/test/partition_spec_test.cc b/src/iceberg/test/partition_spec_test.cc index d50b53782..4ae5e33f8 100644 --- a/src/iceberg/test/partition_spec_test.cc +++ b/src/iceberg/test/partition_spec_test.cc @@ -39,20 +39,17 @@ TEST(PartitionSpecTest, Basics) { { SchemaField field1(5, "ts", iceberg::timestamp(), true); SchemaField field2(7, "bar", iceberg::string(), true); - auto const schema = - std::make_shared(std::vector{field1, field2}, 100); auto identity_transform = Transform::Identity(); PartitionField pt_field1(5, 1000, "day", identity_transform); PartitionField pt_field2(5, 1001, "hour", identity_transform); - PartitionSpec spec(schema, 100, {pt_field1, pt_field2}); + PartitionSpec spec(100, {pt_field1, pt_field2}); ASSERT_EQ(spec, spec); ASSERT_EQ(100, spec.spec_id()); std::span fields = spec.fields(); ASSERT_EQ(2, fields.size()); ASSERT_EQ(pt_field1, fields[0]); ASSERT_EQ(pt_field2, fields[1]); - ASSERT_EQ(*schema, *spec.schema()); auto spec_str = "partition_spec[spec_id<100>,\n day (1000 identity(5))\n hour (1001 " "identity(5))\n]"; @@ -64,18 +61,16 @@ TEST(PartitionSpecTest, Basics) { TEST(PartitionSpecTest, Equality) { SchemaField field1(5, "ts", iceberg::timestamp(), true); SchemaField field2(7, "bar", iceberg::string(), true); - auto const schema = - std::make_shared(std::vector{field1, field2}, 100); auto identity_transform = Transform::Identity(); PartitionField pt_field1(5, 1000, "day", identity_transform); PartitionField pt_field2(7, 1001, "hour", identity_transform); PartitionField pt_field3(7, 1001, "hour", identity_transform); - PartitionSpec schema1(schema, 100, {pt_field1, pt_field2}); - PartitionSpec schema2(schema, 101, {pt_field1, pt_field2}); - PartitionSpec schema3(schema, 101, {pt_field1}); - PartitionSpec schema4(schema, 101, {pt_field3, pt_field1}); - PartitionSpec schema5(schema, 100, {pt_field1, pt_field2}); - PartitionSpec schema6(schema, 100, {pt_field2, pt_field1}); + PartitionSpec schema1(100, {pt_field1, pt_field2}); + PartitionSpec schema2(101, {pt_field1, pt_field2}); + PartitionSpec schema3(101, {pt_field1}); + PartitionSpec schema4(101, {pt_field3, pt_field1}); + PartitionSpec schema5(100, {pt_field1, pt_field2}); + PartitionSpec schema6(100, {pt_field2, pt_field1}); ASSERT_EQ(schema1, schema1); ASSERT_NE(schema1, schema2); @@ -93,14 +88,13 @@ TEST(PartitionSpecTest, Equality) { TEST(PartitionSpecTest, PartitionSchemaTest) { SchemaField field1(5, "ts", iceberg::timestamp(), true); SchemaField field2(7, "bar", iceberg::string(), true); - auto const schema = - std::make_shared(std::vector{field1, field2}, 100); + Schema schema({field1, field2}, 100); auto identity_transform = Transform::Identity(); PartitionField pt_field1(5, 1000, "day", identity_transform); PartitionField pt_field2(7, 1001, "hour", identity_transform); - PartitionSpec spec(schema, 100, {pt_field1, pt_field2}); + PartitionSpec spec(100, {pt_field1, pt_field2}); - auto partition_schema = spec.PartitionType(); + auto partition_schema = spec.PartitionType(schema); ASSERT_TRUE(partition_schema.has_value()); ASSERT_EQ(2, partition_schema.value()->fields().size()); EXPECT_EQ(pt_field1.name(), partition_schema.value()->fields()[0].name()); @@ -144,7 +138,7 @@ TEST(PartitionSpecTest, PartitionTypeTest) { auto parsed_spec_result = PartitionSpecFromJson(schema, json); ASSERT_TRUE(parsed_spec_result.has_value()) << parsed_spec_result.error().message; - auto partition_schema = parsed_spec_result.value()->PartitionType(); + auto partition_schema = parsed_spec_result.value()->PartitionType(*schema); SchemaField pt_field1(1000, "ts_day", date(), true); SchemaField pt_field2(1001, "id_bucket", int32(), true); diff --git a/src/iceberg/v1_metadata.cc b/src/iceberg/v1_metadata.cc index 917d30a8e..2da81ea5e 100644 --- a/src/iceberg/v1_metadata.cc +++ b/src/iceberg/v1_metadata.cc @@ -19,18 +19,23 @@ #include "iceberg/v1_metadata.h" +#include + #include "iceberg/json_internal.h" #include "iceberg/manifest_entry.h" #include "iceberg/manifest_list.h" #include "iceberg/schema.h" #include "iceberg/schema_internal.h" +#include "iceberg/type.h" #include "iceberg/util/macros.h" namespace iceberg { ManifestEntryAdapterV1::ManifestEntryAdapterV1( - std::optional snapshot_id, std::shared_ptr partition_spec) - : ManifestEntryAdapter(std::move(partition_spec), ManifestContent::kData), + std::optional snapshot_id, std::shared_ptr partition_spec, + std::shared_ptr current_schema) + : ManifestEntryAdapter(std::move(partition_spec), std::move(current_schema), + ManifestContent::kData), snapshot_id_(snapshot_id) {} std::shared_ptr ManifestEntryAdapterV1::EntrySchema( @@ -72,16 +77,14 @@ std::shared_ptr ManifestEntryAdapterV1::DataFileSchema( } Status ManifestEntryAdapterV1::Init() { - // TODO(gangwu): fix the schema to use current table schema. - // ICEBERG_ASSIGN_OR_RAISE(metadata_["schema"], ToJsonString(*manifest_schema_)) + ICEBERG_ASSIGN_OR_RAISE(metadata_["schema"], ToJsonString(*current_schema_)) ICEBERG_ASSIGN_OR_RAISE(metadata_["partition-spec"], ToJsonString(*partition_spec_)); metadata_["partition-spec-id"] = std::to_string(partition_spec_->spec_id()); metadata_["format-version"] = "1"; - ICEBERG_ASSIGN_OR_RAISE(auto partition_type, partition_spec_->PartitionType()); - if (!partition_type) { - partition_type = std::make_shared(std::vector{}); - } + ICEBERG_ASSIGN_OR_RAISE(auto partition_type, + partition_spec_->PartitionType(*current_schema_)); + manifest_schema_ = EntrySchema(std::move(partition_type)); return ToArrowSchema(*manifest_schema_, &schema_); } diff --git a/src/iceberg/v1_metadata.h b/src/iceberg/v1_metadata.h index d97190123..4279230ab 100644 --- a/src/iceberg/v1_metadata.h +++ b/src/iceberg/v1_metadata.h @@ -19,16 +19,19 @@ #pragma once -/// \file iceberg/v1_metadata.h #include "iceberg/manifest_adapter.h" +/// \file iceberg/v1_metadata.h + namespace iceberg { /// \brief Adapter to convert V1 ManifestEntry to `ArrowArray`. class ManifestEntryAdapterV1 : public ManifestEntryAdapter { public: ManifestEntryAdapterV1(std::optional snapshot_id, - std::shared_ptr partition_spec); + std::shared_ptr partition_spec, + std::shared_ptr current_schema); + Status Init() override; Status Append(const ManifestEntry& entry) override; diff --git a/src/iceberg/v2_metadata.cc b/src/iceberg/v2_metadata.cc index 2545c30e6..e5f222605 100644 --- a/src/iceberg/v2_metadata.cc +++ b/src/iceberg/v2_metadata.cc @@ -30,8 +30,9 @@ namespace iceberg { ManifestEntryAdapterV2::ManifestEntryAdapterV2( std::optional snapshot_id, std::shared_ptr partition_spec, - ManifestContent content) - : ManifestEntryAdapter(std::move(partition_spec), content), + std::shared_ptr current_schema, ManifestContent content) + : ManifestEntryAdapter(std::move(partition_spec), std::move(current_schema), + std::move(content)), snapshot_id_(snapshot_id) {} std::shared_ptr ManifestEntryAdapterV2::EntrySchema( @@ -74,16 +75,14 @@ std::shared_ptr ManifestEntryAdapterV2::DataFileType( } Status ManifestEntryAdapterV2::Init() { - // ICEBERG_ASSIGN_OR_RAISE(metadata_["schema"], ToJsonString(*manifest_schema_)) + ICEBERG_ASSIGN_OR_RAISE(metadata_["schema"], ToJsonString(*current_schema_)) ICEBERG_ASSIGN_OR_RAISE(metadata_["partition-spec"], ToJsonString(*partition_spec_)); metadata_["partition-spec-id"] = std::to_string(partition_spec_->spec_id()); metadata_["format-version"] = "2"; metadata_["content"] = content_ == ManifestContent::kData ? "data" : "delete"; - ICEBERG_ASSIGN_OR_RAISE(auto partition_type, partition_spec_->PartitionType()); - if (!partition_type) { - partition_type = std::make_shared(std::vector{}); - } + ICEBERG_ASSIGN_OR_RAISE(auto partition_type, + partition_spec_->PartitionType(*current_schema_)); manifest_schema_ = EntrySchema(std::move(partition_type)); return ToArrowSchema(*manifest_schema_, &schema_); } diff --git a/src/iceberg/v2_metadata.h b/src/iceberg/v2_metadata.h index 1b2b43ac4..4459d9bce 100644 --- a/src/iceberg/v2_metadata.h +++ b/src/iceberg/v2_metadata.h @@ -30,7 +30,8 @@ class ManifestEntryAdapterV2 : public ManifestEntryAdapter { public: ManifestEntryAdapterV2(std::optional snapshot_id, std::shared_ptr partition_spec, - ManifestContent content); + std::shared_ptr current_schema, ManifestContent content); + Status Init() override; Status Append(const ManifestEntry& entry) override; diff --git a/src/iceberg/v3_metadata.cc b/src/iceberg/v3_metadata.cc index 3b033035a..64d674a46 100644 --- a/src/iceberg/v3_metadata.cc +++ b/src/iceberg/v3_metadata.cc @@ -19,6 +19,8 @@ #include "iceberg/v3_metadata.h" +#include + #include "iceberg/json_internal.h" #include "iceberg/manifest_entry.h" #include "iceberg/manifest_list.h" @@ -30,8 +32,10 @@ namespace iceberg { ManifestEntryAdapterV3::ManifestEntryAdapterV3( std::optional snapshot_id, std::optional first_row_id, - std::shared_ptr partition_spec, ManifestContent content) - : ManifestEntryAdapter(std::move(partition_spec), content), + std::shared_ptr partition_spec, std::shared_ptr current_schema, + ManifestContent content) + : ManifestEntryAdapter(std::move(partition_spec), std::move(current_schema), + std::move(content)), snapshot_id_(snapshot_id), first_row_id_(first_row_id) {} @@ -80,16 +84,14 @@ std::shared_ptr ManifestEntryAdapterV3::DataFileType( } Status ManifestEntryAdapterV3::Init() { - // ICEBERG_ASSIGN_OR_RAISE(metadata_["schema"], ToJsonString(*manifest_schema_)) + ICEBERG_ASSIGN_OR_RAISE(metadata_["schema"], ToJsonString(*current_schema_)) ICEBERG_ASSIGN_OR_RAISE(metadata_["partition-spec"], ToJsonString(*partition_spec_)); metadata_["partition-spec-id"] = std::to_string(partition_spec_->spec_id()); metadata_["format-version"] = "3"; metadata_["content"] = content_ == ManifestContent::kData ? "data" : "delete"; - ICEBERG_ASSIGN_OR_RAISE(auto partition_type, partition_spec_->PartitionType()); - if (!partition_type) { - partition_type = std::make_shared(std::vector{}); - } + ICEBERG_ASSIGN_OR_RAISE(auto partition_type, + partition_spec_->PartitionType(*current_schema_)); manifest_schema_ = EntrySchema(std::move(partition_type)); return ToArrowSchema(*manifest_schema_, &schema_); } diff --git a/src/iceberg/v3_metadata.h b/src/iceberg/v3_metadata.h index 421b9c0fe..a8262bc67 100644 --- a/src/iceberg/v3_metadata.h +++ b/src/iceberg/v3_metadata.h @@ -31,7 +31,8 @@ class ManifestEntryAdapterV3 : public ManifestEntryAdapter { ManifestEntryAdapterV3(std::optional snapshot_id, std::optional first_row_id, std::shared_ptr partition_spec, - ManifestContent content); + std::shared_ptr current_schema, ManifestContent content); + Status Init() override; Status Append(const ManifestEntry& entry) override;