diff --git a/src/iceberg/table.cc b/src/iceberg/table.cc index ed533cbd..3bbb3b82 100644 --- a/src/iceberg/table.cc +++ b/src/iceberg/table.cc @@ -90,14 +90,7 @@ Result> Table::current_snapshot() const { } Result> Table::SnapshotById(int64_t snapshot_id) const { - auto iter = std::ranges::find_if(metadata_->snapshots, - [this, &snapshot_id](const auto& snapshot) { - return snapshot->snapshot_id == snapshot_id; - }); - if (iter == metadata_->snapshots.end()) { - return NotFound("Snapshot with ID {} is not found", snapshot_id); - } - return *iter; + return metadata_->SnapshotById(snapshot_id); } const std::vector>& Table::snapshots() const { diff --git a/src/iceberg/table_metadata.cc b/src/iceberg/table_metadata.cc index b820517b..e58d06ae 100644 --- a/src/iceberg/table_metadata.cc +++ b/src/iceberg/table_metadata.cc @@ -47,11 +47,16 @@ std::string ToString(const MetadataLogEntry& entry) { } Result> TableMetadata::Schema() const { - auto iter = std::ranges::find_if(schemas, [this](const auto& schema) { - return schema->schema_id() == current_schema_id; + return SchemaById(current_schema_id); +} + +Result> TableMetadata::SchemaById( + const std::optional& schema_id) const { + auto iter = std::ranges::find_if(schemas, [schema_id](const auto& schema) { + return schema->schema_id() == schema_id; }); if (iter == schemas.end()) { - return NotFound("Current schema is not found"); + return NotFound("Schema with ID {} is not found", schema_id.value_or(-1)); } return *iter; } @@ -77,11 +82,15 @@ Result> TableMetadata::SortOrder() const { } Result> TableMetadata::Snapshot() const { - auto iter = std::ranges::find_if(snapshots, [this](const auto& snapshot) { - return snapshot->snapshot_id == current_snapshot_id; + return SnapshotById(current_snapshot_id); +} + +Result> TableMetadata::SnapshotById(int64_t snapshot_id) const { + auto iter = std::ranges::find_if(snapshots, [snapshot_id](const auto& snapshot) { + return snapshot->snapshot_id == snapshot_id; }); if (iter == snapshots.end()) { - return NotFound("Current snapshot with ID {} is not found", current_snapshot_id); + return NotFound("Snapshot with ID {} is not found", snapshot_id); } return *iter; } diff --git a/src/iceberg/table_metadata.h b/src/iceberg/table_metadata.h index c34091ae..da4d677b 100644 --- a/src/iceberg/table_metadata.h +++ b/src/iceberg/table_metadata.h @@ -123,12 +123,17 @@ struct ICEBERG_EXPORT TableMetadata { /// \brief Get the current schema, return NotFoundError if not found Result> Schema() const; + /// \brief Get the current schema by ID, return NotFoundError if not found + Result> SchemaById( + const std::optional& schema_id) const; /// \brief Get the current partition spec, return NotFoundError if not found Result> PartitionSpec() const; /// \brief Get the current sort order, return NotFoundError if not found Result> SortOrder() const; /// \brief Get the current snapshot, return NotFoundError if not found Result> Snapshot() const; + /// \brief Get the snapshot of this table with the given id + Result> SnapshotById(int64_t snapshot_id) const; friend bool operator==(const TableMetadata& lhs, const TableMetadata& rhs); }; diff --git a/src/iceberg/table_scan.cc b/src/iceberg/table_scan.cc index 45539ef8..25702b6a 100644 --- a/src/iceberg/table_scan.cc +++ b/src/iceberg/table_scan.cc @@ -34,8 +34,8 @@ namespace iceberg { // implement FileScanTask -FileScanTask::FileScanTask(std::shared_ptr file) - : data_file_(std::move(file)) {} +FileScanTask::FileScanTask(std::shared_ptr data_file) + : data_file_(std::move(data_file)) {} const std::shared_ptr& FileScanTask::data_file() const { return data_file_; } @@ -94,32 +94,13 @@ Result> TableScanBuilder::Build() { return InvalidArgument("No snapshot ID specified for table {}", table_metadata->table_uuid); } - auto iter = std::ranges::find_if( - table_metadata->snapshots, - [id = *snapshot_id](const auto& snapshot) { return snapshot->snapshot_id == id; }); - if (iter == table_metadata->snapshots.end()) { - return NotFound("Snapshot with ID {} is not found", *snapshot_id); - } - context_.snapshot = *iter; + ICEBERG_ASSIGN_OR_RAISE(context_.snapshot, table_metadata->SnapshotById(*snapshot_id)); if (!context_.projected_schema) { const auto& snapshot = context_.snapshot; auto schema_id = snapshot->schema_id ? snapshot->schema_id : table_metadata->current_schema_id; - if (!schema_id) { - return InvalidArgument("No schema ID found in snapshot {} for table {}", - snapshot->snapshot_id, table_metadata->table_uuid); - } - - const auto& schemas = table_metadata->schemas; - const auto it = std::ranges::find_if(schemas, [id = *schema_id](const auto& schema) { - return schema->schema_id() == id; - }); - if (it == schemas.end()) { - return InvalidArgument("Schema {} in snapshot {} is not found", - *snapshot->schema_id, snapshot->snapshot_id); - } - const auto& schema = *it; + ICEBERG_ASSIGN_OR_RAISE(auto schema, table_metadata->SchemaById(schema_id)); if (column_names_.empty()) { context_.projected_schema = schema; @@ -139,6 +120,9 @@ Result> TableScanBuilder::Build() { context_.projected_schema = std::make_shared(std::move(projected_fields), schema->schema_id()); } + } else if (!column_names_.empty()) { + return InvalidArgument( + "Cannot specify column names when a projected schema is provided"); } return std::make_unique(std::move(context_), file_io_); diff --git a/src/iceberg/table_scan.h b/src/iceberg/table_scan.h index 09dd0885..dcfa7220 100644 --- a/src/iceberg/table_scan.h +++ b/src/iceberg/table_scan.h @@ -45,7 +45,7 @@ class ICEBERG_EXPORT ScanTask { /// \brief Task representing a data file and its corresponding delete files. class ICEBERG_EXPORT FileScanTask : public ScanTask { public: - explicit FileScanTask(std::shared_ptr file); + explicit FileScanTask(std::shared_ptr data_file); /// \brief The data file that should be read by this scan task. const std::shared_ptr& data_file() const; diff --git a/test/metadata_serde_test.cc b/test/metadata_serde_test.cc index 9070503c..73e5dd3a 100644 --- a/test/metadata_serde_test.cc +++ b/test/metadata_serde_test.cc @@ -106,12 +106,23 @@ TEST_F(MetadataSerdeTest, DeserializeV2Valid) { /*optional=*/false); schema_fields.emplace_back(/*field_id=*/3, "z", iceberg::int64(), /*optional=*/false); - auto expected_schema = - std::make_shared(std::move(schema_fields), /*schema_id=*/1); + auto expected_schema = std::make_shared(schema_fields, /*schema_id=*/1); auto schema = metadata->Schema(); ASSERT_TRUE(schema.has_value()); EXPECT_EQ(*(schema.value().get()), *expected_schema); + // schema with ID 1 + auto schema_v1 = metadata->SchemaById(1); + ASSERT_TRUE(schema_v1.has_value()); + EXPECT_EQ(*(schema_v1.value().get()), *expected_schema); + + // schema with ID 0 + auto expected_schema_v0 = std::make_shared( + std::vector{schema_fields.at(0)}, /*schema_id=*/0); + auto schema_v0 = metadata->SchemaById(0); + ASSERT_TRUE(schema_v0.has_value()); + EXPECT_EQ(*(schema_v0.value().get()), *expected_schema_v0); + // Compare partition spec EXPECT_EQ(metadata->default_spec_id, 0); std::vector partition_fields; @@ -165,6 +176,16 @@ TEST_F(MetadataSerdeTest, DeserializeV2Valid) { EXPECT_EQ(*metadata->snapshots[i], expected_snapshots[i]); } + // snapshot with ID 3051729675574597004 + auto snapshot_v0 = metadata->SnapshotById(3051729675574597004); + ASSERT_TRUE(snapshot_v0.has_value()); + EXPECT_EQ(*snapshot_v0.value(), expected_snapshots[0]); + + // snapshot with ID 3055729675574597004 + auto snapshot_v1 = metadata->SnapshotById(3055729675574597004); + ASSERT_TRUE(snapshot_v1.has_value()); + EXPECT_EQ(*snapshot_v1.value(), expected_snapshots[1]); + // Compare snapshot logs std::vector expected_snapshot_log{ {