Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
330 changes: 330 additions & 0 deletions table/replace_partitions.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,330 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package table

import (
"context"
"errors"
"fmt"

"github.com/apache/iceberg-go"
"github.com/apache/iceberg-go/io"
"github.com/google/uuid"
)

// ReplacePartitions is used to replace data in a table, partition by partition.
//
// This interface is similar to OverwriteFiles but only replaces specific partitions
// rather than files. Only data files that belong to partitions that match added files
// will be replaced.
//
// As an example, if you have a table partitioned by columns "year" and "month" and this
// action adds two data files with partition data (2021, 1) and (2021, 2), then all existing
// data files that contain records where year=2021 and month=1 OR year=2021 and month=2
// will be replaced.
type ReplacePartitions interface {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this and all other PR-s looks like a 1-1 adaptation for java iceberg interfaces

this approach is not really work with golang, it's not ideomatic.

Better to stick with what we have with for example snapshot

No need to define interface just for the sake of interface.

// AddFile adds a data file to the table.
AddFile(dataFile iceberg.DataFile) ReplacePartitions

// AddFiles adds multiple data files to the table.
AddFiles(dataFiles ...iceberg.DataFile) ReplacePartitions

// ValidateAddedFilesMatchOverwriteFilter ensures that files added concurrently
// do not conflict with the replace operation.
ValidateAddedFilesMatchOverwriteFilter() ReplacePartitions

// ValidateFromSnapshot ensures that no delete files matching this operation
// are added between the snapshot used to plan the operation and the current snapshot.
ValidateFromSnapshot(snapshotID int64) ReplacePartitions

// Set sets a summary property in the snapshot produced by this update.
Set(property, value string) ReplacePartitions

// StageOnly stages the snapshot but does not update the current snapshot ID.
StageOnly() ReplacePartitions

// Commit executes this update and commits the result.
Commit(ctx context.Context) (*Table, error)
}

// BaseReplacePartitions implements the ReplacePartitions interface.
type BaseReplacePartitions struct {
txn *Transaction
addedFiles []iceberg.DataFile
snapshotProps iceberg.Properties
validateAddedFiles bool
fromSnapshotID *int64
stageOnly bool
validatePartitionsFilter bool
}

// NewReplacePartitions creates a new BaseReplacePartitions instance.
func NewReplacePartitions(txn *Transaction) ReplacePartitions {
return &BaseReplacePartitions{
txn: txn,
addedFiles: make([]iceberg.DataFile, 0),
snapshotProps: make(iceberg.Properties),
validateAddedFiles: false,
stageOnly: false,
validatePartitionsFilter: false,
}
}

// AddFile adds a data file to the table.
func (rp *BaseReplacePartitions) AddFile(dataFile iceberg.DataFile) ReplacePartitions {
rp.addedFiles = append(rp.addedFiles, dataFile)
return rp

Check failure on line 91 in table/replace_partitions.go

View workflow job for this annotation

GitHub Actions / windows-latest go1.23.6

return with no blank line before (nlreturn)

Check failure on line 91 in table/replace_partitions.go

View workflow job for this annotation

GitHub Actions / macos-latest go1.23.6

return with no blank line before (nlreturn)

Check failure on line 91 in table/replace_partitions.go

View workflow job for this annotation

GitHub Actions / ubuntu-latest go1.23.6

return with no blank line before (nlreturn)
}

// AddFiles adds multiple data files to the table.
func (rp *BaseReplacePartitions) AddFiles(dataFiles ...iceberg.DataFile) ReplacePartitions {
rp.addedFiles = append(rp.addedFiles, dataFiles...)
return rp

Check failure on line 97 in table/replace_partitions.go

View workflow job for this annotation

GitHub Actions / windows-latest go1.23.6

return with no blank line before (nlreturn)

Check failure on line 97 in table/replace_partitions.go

View workflow job for this annotation

GitHub Actions / macos-latest go1.23.6

return with no blank line before (nlreturn)

Check failure on line 97 in table/replace_partitions.go

View workflow job for this annotation

GitHub Actions / ubuntu-latest go1.23.6

return with no blank line before (nlreturn)
}

// ValidateAddedFilesMatchOverwriteFilter ensures that files added concurrently
// do not conflict with the replace operation.
func (rp *BaseReplacePartitions) ValidateAddedFilesMatchOverwriteFilter() ReplacePartitions {
rp.validateAddedFiles = true
return rp

Check failure on line 104 in table/replace_partitions.go

View workflow job for this annotation

GitHub Actions / windows-latest go1.23.6

return with no blank line before (nlreturn)

Check failure on line 104 in table/replace_partitions.go

View workflow job for this annotation

GitHub Actions / macos-latest go1.23.6

return with no blank line before (nlreturn)

Check failure on line 104 in table/replace_partitions.go

View workflow job for this annotation

GitHub Actions / ubuntu-latest go1.23.6

return with no blank line before (nlreturn)
}

// ValidateFromSnapshot ensures that no delete files matching this operation
// are added between the snapshot used to plan the operation and the current snapshot.
func (rp *BaseReplacePartitions) ValidateFromSnapshot(snapshotID int64) ReplacePartitions {
rp.fromSnapshotID = &snapshotID
return rp
}

// Set sets a summary property in the snapshot produced by this update.
func (rp *BaseReplacePartitions) Set(property, value string) ReplacePartitions {
rp.snapshotProps[property] = value
return rp
}

// StageOnly stages the snapshot but does not update the current snapshot ID.
func (rp *BaseReplacePartitions) StageOnly() ReplacePartitions {
rp.stageOnly = true
return rp
}

// Commit executes this update and commits the result.
func (rp *BaseReplacePartitions) Commit(ctx context.Context) (*Table, error) {
if len(rp.addedFiles) == 0 {
return nil, errors.New("cannot commit replace partitions operation with no files added")
}

fs, err := rp.txn.tbl.fsF(ctx)
if err != nil {
return nil, err
}

// Create a replace partitions snapshot producer
producer := rp.createSnapshotProducer(fs.(io.WriteFileIO))

// Add all the new files
for _, file := range rp.addedFiles {
producer.appendDataFile(file)
}

// Find and mark files to be deleted based on partition matching
if err := rp.markFilesForDeletion(ctx, producer); err != nil {
return nil, err
}

// Perform validation if requested
if err := rp.performValidation(ctx); err != nil {
return nil, err
}

// Commit the changes
updates, reqs, err := producer.commit()
if err != nil {
return nil, err
}

if err := rp.txn.apply(updates, reqs); err != nil {
return nil, err
}

if rp.stageOnly {
// If stageOnly is true, return the staged table without committing the transaction
stagedTable, err := rp.txn.StagedTable()
if err != nil {
return nil, err
}
return stagedTable.Table, nil
}

// Commit the transaction
return rp.txn.Commit(ctx)
}

func (rp *BaseReplacePartitions) createSnapshotProducer(fs io.WriteFileIO) *snapshotProducer {
commitUUID := uuid.New()

Check failure on line 180 in table/replace_partitions.go

View workflow job for this annotation

GitHub Actions / windows-latest go1.23.6

File is not properly formatted (gofmt)

Check failure on line 180 in table/replace_partitions.go

View workflow job for this annotation

GitHub Actions / macos-latest go1.23.6

File is not properly formatted (gofmt)

Check failure on line 180 in table/replace_partitions.go

View workflow job for this annotation

GitHub Actions / ubuntu-latest go1.23.6

File is not properly formatted (gofmt)
// For replace partitions, we use the REPLACE operation if there are existing files,
// otherwise APPEND for the first data
operation := OpReplace
if rp.txn.meta.currentSnapshot() == nil {
operation = OpAppend
}

return createSnapshotProducer(operation, rp.txn, fs, &commitUUID, rp.snapshotProps)
}

func (rp *BaseReplacePartitions) markFilesForDeletion(ctx context.Context, producer *snapshotProducer) error {
currentSnapshot := rp.txn.meta.currentSnapshot()
if currentSnapshot == nil {
// No existing data to replace
return nil
}

fs, err := rp.txn.tbl.fsF(ctx)
if err != nil {
return err
}

// Get the partition types for all specs
allSpecs := rp.txn.meta.specs
specs := make(map[int]iceberg.PartitionSpec)
for _, spec := range allSpecs {
specs[spec.ID()] = spec
}
schema := rp.txn.meta.CurrentSchema()

// Create partition evaluators for matching
addedPartitions := make(map[int]map[string]bool) // specID -> partition key -> exists

// Extract partition keys from added files
for _, addedFile := range rp.addedFiles {
specID := int(addedFile.SpecID())
spec, exists := specs[specID]
if !exists {
return fmt.Errorf("partition spec with ID %d not found", specID)
}

partitionType := spec.PartitionType(schema)
partitionKey := partitionKeyString(addedFile.Partition(), partitionType)

if addedPartitions[specID] == nil {
addedPartitions[specID] = make(map[string]bool)
}
addedPartitions[specID][partitionKey] = true
}

// Iterate through existing data files and mark for deletion if partition matches
for dataFile, err := range currentSnapshot.dataFiles(fs, nil) {
if err != nil {
return err
}

specID := int(dataFile.SpecID())
spec, exists := specs[specID]
if !exists {
continue // Skip files with unknown specs
}

partitionType := spec.PartitionType(schema)
partitionKey := partitionKeyString(dataFile.Partition(), partitionType)

// Check if this partition should be replaced
if partitions, exists := addedPartitions[specID]; exists && partitions[partitionKey] {
producer.deleteDataFile(dataFile)
}
}

return nil
}

func (rp *BaseReplacePartitions) performValidation(ctx context.Context) error {
if !rp.validateAddedFiles && rp.fromSnapshotID == nil {
return nil // No validation requested
}

currentSnapshot := rp.txn.meta.currentSnapshot()
if currentSnapshot == nil {
return nil // No validation needed for empty tables
}

if rp.fromSnapshotID != nil {
// Validate that no conflicting changes happened since the specified snapshot
baseSnapshot, err := rp.txn.meta.SnapshotByID(*rp.fromSnapshotID)
if err != nil || baseSnapshot == nil {
return fmt.Errorf("snapshot %d not found for validation", *rp.fromSnapshotID)
}

if currentSnapshot.SnapshotID != *rp.fromSnapshotID {
// Check if any conflicting changes were made
if err := rp.validateNoConflictingChanges(ctx, baseSnapshot, currentSnapshot); err != nil {
return err
}
}
}

if rp.validateAddedFiles {
// Validate that added files don't conflict with the overwrite filter
// This is a more complex validation that would check for concurrent modifications
// For now, we'll implement a basic check
return rp.validateAddedFilesMatchFilter(ctx)
}

return nil
}

func (rp *BaseReplacePartitions) validateNoConflictingChanges(ctx context.Context, baseSnapshot, currentSnapshot *Snapshot) error {
// This is a simplified validation - in a full implementation, you would
// check for conflicting delete files, schema changes, etc.
// For now, we'll just check if the partition spec or schema changed

if baseSnapshot.SchemaID != nil && currentSnapshot.SchemaID != nil &&
*baseSnapshot.SchemaID != *currentSnapshot.SchemaID {
return errors.New("schema changed since snapshot used for planning replace partitions")
}

return nil
}

func (rp *BaseReplacePartitions) validateAddedFilesMatchFilter(ctx context.Context) error {
// This would perform more sophisticated validation to ensure that
// concurrently added files don't conflict with the replace operation
// For now, we'll implement a basic check
return nil
}

// partitionKeyString creates a string representation of a partition for comparison
func partitionKeyString(partition map[int]any, partitionType *iceberg.StructType) string {
if len(partition) == 0 {
return "__HIVE_DEFAULT_PARTITION__"
}

result := ""
for i, field := range partitionType.FieldList {
if i > 0 {
result += "/"
}
value := partition[field.ID]
if value == nil {
result += fmt.Sprintf("%s=__HIVE_DEFAULT_PARTITION__", field.Name)

Check failure on line 323 in table/replace_partitions.go

View workflow job for this annotation

GitHub Actions / windows-latest go1.23.6

string-format: fmt.Sprintf can be replaced with string concatenation (perfsprint)

Check failure on line 323 in table/replace_partitions.go

View workflow job for this annotation

GitHub Actions / macos-latest go1.23.6

string-format: fmt.Sprintf can be replaced with string concatenation (perfsprint)

Check failure on line 323 in table/replace_partitions.go

View workflow job for this annotation

GitHub Actions / ubuntu-latest go1.23.6

string-format: fmt.Sprintf can be replaced with string concatenation (perfsprint)
} else {
result += fmt.Sprintf("%s=%v", field.Name, value)
}
}

return result
}
Loading
Loading