Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/iceberg/snapshot.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ ICEBERG_EXPORT constexpr Result<SnapshotRefType> SnapshotRefTypeFromString(

/// \brief A reference to a snapshot, either a branch or a tag.
struct ICEBERG_EXPORT SnapshotRef {
static constexpr std::string_view kMainBranch = "main";

struct ICEBERG_EXPORT Branch {
/// A positive number for the minimum number of snapshots to keep in a branch while
/// expiring snapshots. Defaults to table property
Expand Down
74 changes: 70 additions & 4 deletions src/iceberg/table_requirements.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@

#include <memory>

#include "iceberg/snapshot.h"
#include "iceberg/table_metadata.h"
#include "iceberg/table_requirement.h"
#include "iceberg/table_update.h"
#include "iceberg/util/macros.h"

namespace iceberg {

Expand All @@ -36,12 +36,78 @@ Result<std::vector<std::unique_ptr<TableRequirement>>> TableUpdateContext::Build
return std::move(requirements_);
}

void TableUpdateContext::RequireLastAssignedFieldIdUnchanged() {
if (!added_last_assigned_field_id_) {
if (base_ != nullptr) {
AddRequirement(
std::make_unique<table::AssertLastAssignedFieldId>(base_->last_column_id));
}
added_last_assigned_field_id_ = true;
}
}

void TableUpdateContext::RequireCurrentSchemaIdUnchanged() {
if (!added_current_schema_id_) {
if (base_ != nullptr && !is_replace_) {
AddRequirement(std::make_unique<table::AssertCurrentSchemaID>(
base_->current_schema_id.value()));
}
added_current_schema_id_ = true;
}
}

void TableUpdateContext::RequireLastAssignedPartitionIdUnchanged() {
if (!added_last_assigned_partition_id_) {
if (base_ != nullptr) {
AddRequirement(std::make_unique<table::AssertLastAssignedPartitionId>(
base_->last_partition_id));
}
added_last_assigned_partition_id_ = true;
}
}

void TableUpdateContext::RequireDefaultSpecIdUnchanged() {
if (!added_default_spec_id_) {
if (base_ != nullptr && !is_replace_) {
AddRequirement(
std::make_unique<table::AssertDefaultSpecID>(base_->default_spec_id));
}
added_default_spec_id_ = true;
}
}

void TableUpdateContext::RequireDefaultSortOrderIdUnchanged() {
if (!added_default_sort_order_id_) {
if (base_ != nullptr && !is_replace_) {
AddRequirement(std::make_unique<table::AssertDefaultSortOrderID>(
base_->default_sort_order_id));
}
added_default_sort_order_id_ = true;
}
}

void TableUpdateContext::RequireNoBranchesChanged() {
if (base_ != nullptr && !is_replace_) {
for (const auto& [name, ref] : base_->refs) {
if (ref->type() == SnapshotRefType::kBranch && name != SnapshotRef::kMainBranch) {
AddRequirement(
std::make_unique<table::AssertRefSnapshotID>(name, ref->snapshot_id));
}
}
}
}

bool TableUpdateContext::AddChangedRef(const std::string& ref_name) {
auto [_, inserted] = changed_refs_.insert(ref_name);
return inserted;
}

Result<std::vector<std::unique_ptr<TableRequirement>>> TableRequirements::ForCreateTable(
const std::vector<std::unique_ptr<TableUpdate>>& table_updates) {
TableUpdateContext context(nullptr, false);
context.AddRequirement(std::make_unique<table::AssertDoesNotExist>());
for (const auto& update : table_updates) {
ICEBERG_RETURN_UNEXPECTED(update->GenerateRequirements(context));
update->GenerateRequirements(context);
}
return context.Build();
}
Expand All @@ -52,7 +118,7 @@ Result<std::vector<std::unique_ptr<TableRequirement>>> TableRequirements::ForRep
TableUpdateContext context(&base, true);
context.AddRequirement(std::make_unique<table::AssertUUID>(base.table_uuid));
for (const auto& update : table_updates) {
ICEBERG_RETURN_UNEXPECTED(update->GenerateRequirements(context));
update->GenerateRequirements(context);
}
return context.Build();
}
Expand All @@ -63,7 +129,7 @@ Result<std::vector<std::unique_ptr<TableRequirement>>> TableRequirements::ForUpd
TableUpdateContext context(&base, false);
context.AddRequirement(std::make_unique<table::AssertUUID>(base.table_uuid));
for (const auto& update : table_updates) {
ICEBERG_RETURN_UNEXPECTED(update->GenerateRequirements(context));
update->GenerateRequirements(context);
}
return context.Build();
}
Expand Down
44 changes: 23 additions & 21 deletions src/iceberg/table_requirements.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
/// for optimistic concurrency control when committing table changes.

#include <memory>
#include <string>
#include <unordered_set>
#include <vector>

#include "iceberg/iceberg_export.h"
Expand Down Expand Up @@ -68,27 +70,24 @@ class ICEBERG_EXPORT TableUpdateContext {
/// \brief Build and return the list of requirements
Result<std::vector<std::unique_ptr<TableRequirement>>> Build();

// Getters for deduplication flags
bool added_last_assigned_field_id() const { return added_last_assigned_field_id_; }
bool added_current_schema_id() const { return added_current_schema_id_; }
bool added_last_assigned_partition_id() const {
return added_last_assigned_partition_id_;
}
bool added_default_spec_id() const { return added_default_spec_id_; }
bool added_default_sort_order_id() const { return added_default_sort_order_id_; }

// Setters for deduplication flags
void set_added_last_assigned_field_id(bool value) {
added_last_assigned_field_id_ = value;
}
void set_added_current_schema_id(bool value) { added_current_schema_id_ = value; }
void set_added_last_assigned_partition_id(bool value) {
added_last_assigned_partition_id_ = value;
}
void set_added_default_spec_id(bool value) { added_default_spec_id_ = value; }
void set_added_default_sort_order_id(bool value) {
added_default_sort_order_id_ = value;
}
// Helper methods to deduplicate requirements to add.
/// \brief Require that the last assigned field ID remains unchanged
void RequireLastAssignedFieldIdUnchanged();
/// \brief Require that the current schema ID remains unchanged
void RequireCurrentSchemaIdUnchanged();
/// \brief Require that the last assigned partition ID remains unchanged
void RequireLastAssignedPartitionIdUnchanged();
/// \brief Require that the default spec ID remains unchanged
void RequireDefaultSpecIdUnchanged();
/// \brief Require that the default sort order ID remains unchanged
void RequireDefaultSortOrderIdUnchanged();
/// \brief Require that no branches have been changed
void RequireNoBranchesChanged();

/// \brief Track a changed ref and return whether it was newly added
/// \param ref_name The name of the ref being changed
/// \return true if this is the first time the ref is being changed
bool AddChangedRef(const std::string& ref_name);

private:
const TableMetadata* base_;
Expand All @@ -102,6 +101,9 @@ class ICEBERG_EXPORT TableUpdateContext {
bool added_last_assigned_partition_id_ = false;
bool added_default_spec_id_ = false;
bool added_default_sort_order_id_ = false;

// Track refs that have been changed to avoid duplicate requirements
std::unordered_set<std::string> changed_refs_;
};

/// \brief Factory class for generating table requirements
Expand Down
87 changes: 46 additions & 41 deletions src/iceberg/table_update.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

#include "iceberg/exception.h"
#include "iceberg/table_metadata.h"
#include "iceberg/table_requirement.h"
#include "iceberg/table_requirements.h"

namespace iceberg::table {
Expand All @@ -32,9 +31,8 @@ void AssignUUID::ApplyTo(TableMetadataBuilder& builder) const {
builder.AssignUUID(uuid_);
}

Status AssignUUID::GenerateRequirements(TableUpdateContext& context) const {
void AssignUUID::GenerateRequirements(TableUpdateContext& context) const {
// AssignUUID does not generate additional requirements.
return {};
}

// UpgradeFormatVersion
Expand All @@ -43,8 +41,8 @@ void UpgradeFormatVersion::ApplyTo(TableMetadataBuilder& builder) const {
throw IcebergError(std::format("{} not implemented", __FUNCTION__));
}

Status UpgradeFormatVersion::GenerateRequirements(TableUpdateContext& context) const {
return NotImplemented("UpgradeFormatVersion::GenerateRequirements not implemented");
void UpgradeFormatVersion::GenerateRequirements(TableUpdateContext& context) const {
// UpgradeFormatVersion doesn't generate any requirements
}

// AddSchema
Expand All @@ -53,8 +51,8 @@ void AddSchema::ApplyTo(TableMetadataBuilder& builder) const {
throw IcebergError(std::format("{} not implemented", __FUNCTION__));
}

Status AddSchema::GenerateRequirements(TableUpdateContext& context) const {
return NotImplemented("AddTableSchema::GenerateRequirements not implemented");
void AddSchema::GenerateRequirements(TableUpdateContext& context) const {
context.RequireLastAssignedFieldIdUnchanged();
}

// SetCurrentSchema
Expand All @@ -63,8 +61,8 @@ void SetCurrentSchema::ApplyTo(TableMetadataBuilder& builder) const {
throw IcebergError(std::format("{} not implemented", __FUNCTION__));
}

Status SetCurrentSchema::GenerateRequirements(TableUpdateContext& context) const {
return NotImplemented("SetCurrentTableSchema::GenerateRequirements not implemented");
void SetCurrentSchema::GenerateRequirements(TableUpdateContext& context) const {
context.RequireCurrentSchemaIdUnchanged();
}

// AddPartitionSpec
Expand All @@ -73,8 +71,8 @@ void AddPartitionSpec::ApplyTo(TableMetadataBuilder& builder) const {
throw IcebergError(std::format("{} not implemented", __FUNCTION__));
}

Status AddPartitionSpec::GenerateRequirements(TableUpdateContext& context) const {
return NotImplemented("AddTablePartitionSpec::GenerateRequirements not implemented");
void AddPartitionSpec::GenerateRequirements(TableUpdateContext& context) const {
context.RequireLastAssignedPartitionIdUnchanged();
}

// SetDefaultPartitionSpec
Expand All @@ -83,9 +81,8 @@ void SetDefaultPartitionSpec::ApplyTo(TableMetadataBuilder& builder) const {
throw IcebergError(std::format("{} not implemented", __FUNCTION__));
}

Status SetDefaultPartitionSpec::GenerateRequirements(TableUpdateContext& context) const {
return NotImplemented(
"SetDefaultTablePartitionSpec::GenerateRequirements not implemented");
void SetDefaultPartitionSpec::GenerateRequirements(TableUpdateContext& context) const {
context.RequireDefaultSpecIdUnchanged();
}

// RemovePartitionSpecs
Expand All @@ -94,9 +91,9 @@ void RemovePartitionSpecs::ApplyTo(TableMetadataBuilder& builder) const {
throw IcebergError(std::format("{} not implemented", __FUNCTION__));
}

Status RemovePartitionSpecs::GenerateRequirements(TableUpdateContext& context) const {
return NotImplemented(
"RemoveTablePartitionSpecs::GenerateRequirements not implemented");
void RemovePartitionSpecs::GenerateRequirements(TableUpdateContext& context) const {
context.RequireDefaultSpecIdUnchanged();
context.RequireNoBranchesChanged();
}

// RemoveSchemas
Expand All @@ -105,28 +102,29 @@ void RemoveSchemas::ApplyTo(TableMetadataBuilder& builder) const {
throw IcebergError(std::format("{} not implemented", __FUNCTION__));
}

Status RemoveSchemas::GenerateRequirements(TableUpdateContext& context) const {
return NotImplemented("RemoveTableSchemas::GenerateRequirements not implemented");
void RemoveSchemas::GenerateRequirements(TableUpdateContext& context) const {
context.RequireCurrentSchemaIdUnchanged();
context.RequireNoBranchesChanged();
}

// AddSortOrder

void AddSortOrder::ApplyTo(TableMetadataBuilder& builder) const {
throw IcebergError(std::format("{} not implemented", __FUNCTION__));
builder.AddSortOrder(sort_order_);
}

Status AddSortOrder::GenerateRequirements(TableUpdateContext& context) const {
return NotImplemented("AddTableSortOrder::GenerateRequirements not implemented");
void AddSortOrder::GenerateRequirements(TableUpdateContext& context) const {
// AddSortOrder doesn't generate any requirements
}

// SetDefaultSortOrder

void SetDefaultSortOrder::ApplyTo(TableMetadataBuilder& builder) const {
throw IcebergError(std::format("{} not implemented", __FUNCTION__));
builder.SetDefaultSortOrder(sort_order_id_);
}

Status SetDefaultSortOrder::GenerateRequirements(TableUpdateContext& context) const {
return NotImplemented("SetDefaultTableSortOrder::GenerateRequirements not implemented");
void SetDefaultSortOrder::GenerateRequirements(TableUpdateContext& context) const {
context.RequireDefaultSortOrderIdUnchanged();
}

// AddSnapshot
Expand All @@ -135,16 +133,16 @@ void AddSnapshot::ApplyTo(TableMetadataBuilder& builder) const {
throw IcebergError(std::format("{} not implemented", __FUNCTION__));
}

Status AddSnapshot::GenerateRequirements(TableUpdateContext& context) const {
return NotImplemented("AddTableSnapshot::GenerateRequirements not implemented");
void AddSnapshot::GenerateRequirements(TableUpdateContext& context) const {
// AddSnapshot doesn't generate any requirements
}

// RemoveSnapshots

void RemoveSnapshots::ApplyTo(TableMetadataBuilder& builder) const {}

Status RemoveSnapshots::GenerateRequirements(TableUpdateContext& context) const {
return NotImplemented("RemoveTableSnapshots::GenerateRequirements not implemented");
void RemoveSnapshots::GenerateRequirements(TableUpdateContext& context) const {
// RemoveSnapshots doesn't generate any requirements
}

// RemoveSnapshotRef
Expand All @@ -153,8 +151,8 @@ void RemoveSnapshotRef::ApplyTo(TableMetadataBuilder& builder) const {
throw IcebergError(std::format("{} not implemented", __FUNCTION__));
}

Status RemoveSnapshotRef::GenerateRequirements(TableUpdateContext& context) const {
return NotImplemented("RemoveTableSnapshotRef::GenerateRequirements not implemented");
void RemoveSnapshotRef::GenerateRequirements(TableUpdateContext& context) const {
// RemoveSnapshotRef doesn't generate any requirements
}

// SetSnapshotRef
Expand All @@ -163,8 +161,17 @@ void SetSnapshotRef::ApplyTo(TableMetadataBuilder& builder) const {
throw IcebergError(std::format("{} not implemented", __FUNCTION__));
}

Status SetSnapshotRef::GenerateRequirements(TableUpdateContext& context) const {
return NotImplemented("SetTableSnapshotRef::GenerateRequirements not implemented");
void SetSnapshotRef::GenerateRequirements(TableUpdateContext& context) const {
bool added = context.AddChangedRef(ref_name_);
if (added && context.base() != nullptr && !context.is_replace()) {
const auto& refs = context.base()->refs;
auto it = refs.find(ref_name_);
// Require that the ref does not exist (nullopt) or is the same as the base snapshot
std::optional<int64_t> base_snapshot_id =
(it != refs.end()) ? std::make_optional(it->second->snapshot_id) : std::nullopt;
context.AddRequirement(
std::make_unique<table::AssertRefSnapshotID>(ref_name_, base_snapshot_id));
}
}

// SetProperties
Expand All @@ -173,9 +180,8 @@ void SetProperties::ApplyTo(TableMetadataBuilder& builder) const {
builder.SetProperties(updated_);
}

Status SetProperties::GenerateRequirements(TableUpdateContext& context) const {
// No requirements
return {};
void SetProperties::GenerateRequirements(TableUpdateContext& context) const {
// SetProperties doesn't generate any requirements
}

// RemoveProperties
Expand All @@ -184,9 +190,8 @@ void RemoveProperties::ApplyTo(TableMetadataBuilder& builder) const {
builder.RemoveProperties(removed_);
}

Status RemoveProperties::GenerateRequirements(TableUpdateContext& context) const {
// No requirements
return {};
void RemoveProperties::GenerateRequirements(TableUpdateContext& context) const {
// RemoveProperties doesn't generate any requirements
}

// SetLocation
Expand All @@ -195,8 +200,8 @@ void SetLocation::ApplyTo(TableMetadataBuilder& builder) const {
throw IcebergError(std::format("{} not implemented", __FUNCTION__));
}

Status SetLocation::GenerateRequirements(TableUpdateContext& context) const {
return NotImplemented("SetTableLocation::GenerateRequirements not implemented");
void SetLocation::GenerateRequirements(TableUpdateContext& context) const {
// SetLocation doesn't generate any requirements
}

} // namespace iceberg::table
Loading
Loading