Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 1 addition & 8 deletions src/iceberg/table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -90,14 +90,7 @@ Result<std::shared_ptr<Snapshot>> Table::current_snapshot() const {
}

Result<std::shared_ptr<Snapshot>> Table::SnapshotById(int64_t snapshot_id) const {
auto iter = std::ranges::find_if(metadata_->snapshots,
[this, &snapshot_id](const auto& snapshot) {
return snapshot->snapshot_id == snapshot_id;
});
if (iter == metadata_->snapshots.end()) {
return NotFound("Snapshot with ID {} is not found", snapshot_id);
}
return *iter;
return metadata_->SnapshotById(snapshot_id);
}

const std::vector<std::shared_ptr<Snapshot>>& Table::snapshots() const {
Expand Down
21 changes: 15 additions & 6 deletions src/iceberg/table_metadata.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,16 @@ std::string ToString(const MetadataLogEntry& entry) {
}

Result<std::shared_ptr<Schema>> TableMetadata::Schema() const {
auto iter = std::ranges::find_if(schemas, [this](const auto& schema) {
return schema->schema_id() == current_schema_id;
return SchemaById(current_schema_id);
}

Result<std::shared_ptr<Schema>> TableMetadata::SchemaById(
const std::optional<int32_t>& schema_id) const {
auto iter = std::ranges::find_if(schemas, [schema_id](const auto& schema) {
return schema->schema_id() == schema_id;
});
if (iter == schemas.end()) {
return NotFound("Current schema is not found");
return NotFound("Schema with ID {} is not found", schema_id.value_or(-1));
}
return *iter;
}
Expand All @@ -77,11 +82,15 @@ Result<std::shared_ptr<SortOrder>> TableMetadata::SortOrder() const {
}

Result<std::shared_ptr<Snapshot>> TableMetadata::Snapshot() const {
auto iter = std::ranges::find_if(snapshots, [this](const auto& snapshot) {
return snapshot->snapshot_id == current_snapshot_id;
return SnapshotById(current_snapshot_id);
}

Result<std::shared_ptr<Snapshot>> TableMetadata::SnapshotById(int64_t snapshot_id) const {
auto iter = std::ranges::find_if(snapshots, [snapshot_id](const auto& snapshot) {
return snapshot->snapshot_id == snapshot_id;
});
if (iter == snapshots.end()) {
return NotFound("Current snapshot with ID {} is not found", current_snapshot_id);
return NotFound("Snapshot with ID {} is not found", snapshot_id);
}
return *iter;
}
Expand Down
5 changes: 5 additions & 0 deletions src/iceberg/table_metadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,12 +123,17 @@ struct ICEBERG_EXPORT TableMetadata {

/// \brief Get the current schema, return NotFoundError if not found
Result<std::shared_ptr<Schema>> Schema() const;
/// \brief Get the current schema by ID, return NotFoundError if not found
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about adding some test cases for these new functions?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, if there are no other questions, I will start adding test cases

Result<std::shared_ptr<iceberg::Schema>> SchemaById(
const std::optional<int32_t>& schema_id) const;
/// \brief Get the current partition spec, return NotFoundError if not found
Result<std::shared_ptr<PartitionSpec>> PartitionSpec() const;
/// \brief Get the current sort order, return NotFoundError if not found
Result<std::shared_ptr<SortOrder>> SortOrder() const;
/// \brief Get the current snapshot, return NotFoundError if not found
Result<std::shared_ptr<Snapshot>> Snapshot() const;
/// \brief Get the snapshot of this table with the given id
Result<std::shared_ptr<iceberg::Snapshot>> SnapshotById(int64_t snapshot_id) const;

friend bool operator==(const TableMetadata& lhs, const TableMetadata& rhs);
};
Expand Down
30 changes: 7 additions & 23 deletions src/iceberg/table_scan.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@
namespace iceberg {

// implement FileScanTask
FileScanTask::FileScanTask(std::shared_ptr<DataFile> file)
: data_file_(std::move(file)) {}
FileScanTask::FileScanTask(std::shared_ptr<DataFile> data_file)
: data_file_(std::move(data_file)) {}

const std::shared_ptr<DataFile>& FileScanTask::data_file() const { return data_file_; }

Expand Down Expand Up @@ -94,32 +94,13 @@ Result<std::unique_ptr<TableScan>> TableScanBuilder::Build() {
return InvalidArgument("No snapshot ID specified for table {}",
table_metadata->table_uuid);
}
auto iter = std::ranges::find_if(
table_metadata->snapshots,
[id = *snapshot_id](const auto& snapshot) { return snapshot->snapshot_id == id; });
if (iter == table_metadata->snapshots.end()) {
return NotFound("Snapshot with ID {} is not found", *snapshot_id);
}
context_.snapshot = *iter;
ICEBERG_ASSIGN_OR_RAISE(context_.snapshot, table_metadata->SnapshotById(*snapshot_id));

if (!context_.projected_schema) {
const auto& snapshot = context_.snapshot;
auto schema_id =
snapshot->schema_id ? snapshot->schema_id : table_metadata->current_schema_id;
if (!schema_id) {
return InvalidArgument("No schema ID found in snapshot {} for table {}",
snapshot->snapshot_id, table_metadata->table_uuid);
}

const auto& schemas = table_metadata->schemas;
const auto it = std::ranges::find_if(schemas, [id = *schema_id](const auto& schema) {
return schema->schema_id() == id;
});
if (it == schemas.end()) {
return InvalidArgument("Schema {} in snapshot {} is not found",
*snapshot->schema_id, snapshot->snapshot_id);
}
const auto& schema = *it;
ICEBERG_ASSIGN_OR_RAISE(auto schema, table_metadata->SchemaById(schema_id));

if (column_names_.empty()) {
context_.projected_schema = schema;
Expand All @@ -139,6 +120,9 @@ Result<std::unique_ptr<TableScan>> TableScanBuilder::Build() {
context_.projected_schema =
std::make_shared<Schema>(std::move(projected_fields), schema->schema_id());
}
} else if (!column_names_.empty()) {
return InvalidArgument(
"Cannot specify column names when a projected schema is provided");
}

return std::make_unique<DataTableScan>(std::move(context_), file_io_);
Expand Down
2 changes: 1 addition & 1 deletion src/iceberg/table_scan.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class ICEBERG_EXPORT ScanTask {
/// \brief Task representing a data file and its corresponding delete files.
class ICEBERG_EXPORT FileScanTask : public ScanTask {
public:
explicit FileScanTask(std::shared_ptr<DataFile> file);
explicit FileScanTask(std::shared_ptr<DataFile> data_file);

/// \brief The data file that should be read by this scan task.
const std::shared_ptr<DataFile>& data_file() const;
Expand Down
25 changes: 23 additions & 2 deletions test/metadata_serde_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -106,12 +106,23 @@ TEST_F(MetadataSerdeTest, DeserializeV2Valid) {
/*optional=*/false);
schema_fields.emplace_back(/*field_id=*/3, "z", iceberg::int64(),
/*optional=*/false);
auto expected_schema =
std::make_shared<Schema>(std::move(schema_fields), /*schema_id=*/1);
auto expected_schema = std::make_shared<Schema>(schema_fields, /*schema_id=*/1);
auto schema = metadata->Schema();
ASSERT_TRUE(schema.has_value());
EXPECT_EQ(*(schema.value().get()), *expected_schema);

// schema with ID 1
auto schema_v1 = metadata->SchemaById(1);
ASSERT_TRUE(schema_v1.has_value());
EXPECT_EQ(*(schema_v1.value().get()), *expected_schema);

// schema with ID 0
auto expected_schema_v0 = std::make_shared<Schema>(
std::vector<SchemaField>{schema_fields.at(0)}, /*schema_id=*/0);
auto schema_v0 = metadata->SchemaById(0);
ASSERT_TRUE(schema_v0.has_value());
EXPECT_EQ(*(schema_v0.value().get()), *expected_schema_v0);

// Compare partition spec
EXPECT_EQ(metadata->default_spec_id, 0);
std::vector<PartitionField> partition_fields;
Expand Down Expand Up @@ -165,6 +176,16 @@ TEST_F(MetadataSerdeTest, DeserializeV2Valid) {
EXPECT_EQ(*metadata->snapshots[i], expected_snapshots[i]);
}

// snapshot with ID 3051729675574597004
auto snapshot_v0 = metadata->SnapshotById(3051729675574597004);
ASSERT_TRUE(snapshot_v0.has_value());
EXPECT_EQ(*snapshot_v0.value(), expected_snapshots[0]);

// snapshot with ID 3055729675574597004
auto snapshot_v1 = metadata->SnapshotById(3055729675574597004);
ASSERT_TRUE(snapshot_v1.has_value());
EXPECT_EQ(*snapshot_v1.value(), expected_snapshots[1]);

// Compare snapshot logs
std::vector<SnapshotLogEntry> expected_snapshot_log{
{
Expand Down
Loading