Skip to content

feat(table): Implement snapshot expiration #401

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
41 changes: 36 additions & 5 deletions table/metadata.go
Original file line number Diff line number Diff line change
@@ -187,14 +187,14 @@ func MetadataBuilderFromBase(metadata Metadata) (*MetadataBuilder, error) {
b.loc = metadata.Location()
b.lastUpdatedMS = metadata.LastUpdatedMillis()
b.lastColumnId = metadata.LastColumnID()
b.schemaList = metadata.Schemas()
b.schemaList = slices.Clone(metadata.Schemas())
b.currentSchemaID = metadata.CurrentSchema().ID
b.specs = metadata.PartitionSpecs()
b.specs = slices.Clone(metadata.PartitionSpecs())
b.defaultSpecID = metadata.DefaultPartitionSpec()
b.lastPartitionID = metadata.LastPartitionSpecID()
b.props = metadata.Properties()
b.snapshotList = metadata.Snapshots()
b.sortOrderList = metadata.SortOrders()
b.props = maps.Clone(metadata.Properties())
b.snapshotList = slices.Clone(metadata.Snapshots())
b.sortOrderList = slices.Clone(metadata.SortOrders())
Comment on lines -190 to +197
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if there's a way we could lazily do this cloning? Avoid performing the copies until we actually need to?

b.defaultSortOrderID = metadata.DefaultSortOrder()
if metadata.Version() > 1 {
seq := metadata.LastSequenceNumber()
@@ -335,6 +335,22 @@ func (b *MetadataBuilder) AddSnapshot(snapshot *Snapshot) (*MetadataBuilder, err
return b, nil
}

func (b *MetadataBuilder) RemoveSnapshots(snapshotIds []int64) (*MetadataBuilder, error) {
if slices.Contains(snapshotIds, *b.currentSnapshotID) {
return nil, errors.New("current snapshot cannot be removed")
}

b.snapshotList = slices.DeleteFunc(b.snapshotList, func(e Snapshot) bool {
return slices.Contains(snapshotIds, e.SnapshotID)
})
b.snapshotLog = slices.DeleteFunc(b.snapshotLog, func(e SnapshotLogEntry) bool {
return slices.Contains(snapshotIds, e.SnapshotID)
})
b.updates = append(b.updates, NewRemoveSnapshotsUpdate(snapshotIds))

return b, nil
}

func (b *MetadataBuilder) AddSortOrder(sortOrder *SortOrder, initial bool) (*MetadataBuilder, error) {
var sortOrders []SortOrder
if !initial {
@@ -593,6 +609,21 @@ func (b *MetadataBuilder) SetSnapshotRef(
return b, nil
}

func (b *MetadataBuilder) RemoveSnapshotRef(name string) (*MetadataBuilder, error) {
if _, found := b.refs[name]; !found {
return nil, fmt.Errorf("snapshot ref not found: %s", name)
}

if name == MainBranch {
return nil, errors.New("cannot remove main branch's snapshot ref")
}

delete(b.refs, name)
b.updates = append(b.updates, NewRemoveSnapshotRefUpdate(name))

return b, nil
}

func (b *MetadataBuilder) SetUUID(uuid uuid.UUID) (*MetadataBuilder, error) {
if b.uuid == uuid {
return b, nil
15 changes: 14 additions & 1 deletion table/properties.go
Original file line number Diff line number Diff line change
@@ -17,7 +17,11 @@

package table

import "github.com/apache/iceberg-go/table/internal"
import (
"math"

"github.com/apache/iceberg-go/table/internal"
)

const (
WriteDataPathKey = "write.data.path"
@@ -71,4 +75,13 @@ const (

WriteTargetFileSizeBytesKey = "write.target-file-size-bytes"
WriteTargetFileSizeBytesDefault = 512 * 1024 * 1024 // 512 MB

MinSnapshotsToKeepKey = "min-snapshots-to-keep"
MinSnapshotsToKeepDefault = math.MaxInt

MaxSnapshotAgeMsKey = "max-snapshot-age-ms"
MaxSnapshotAgeMsDefault = math.MaxInt

MaxRefAgeMsKey = "max-ref-age-ms"
MaxRefAgeMsDefault = math.MaxInt
)
51 changes: 51 additions & 0 deletions table/table_test.go
Original file line number Diff line number Diff line change
@@ -886,6 +886,57 @@ func (t *TableWritingTestSuite) TestReplaceDataFiles() {
}, staged.CurrentSnapshot().Summary)
}

func (t *TableWritingTestSuite) TestExpireSnapshots() {
fs := iceio.LocalFS{}

files := make([]string, 0)
for i := range 5 {
filePath := fmt.Sprintf("%s/replace_data_files_v%d/data-%d.parquet", t.location, t.formatVersion, i)
t.writeParquet(fs, filePath, t.arrTablePromotedTypes)
files = append(files, filePath)
}

ident := table.Identifier{"default", "replace_data_files_v" + strconv.Itoa(t.formatVersion)}
meta, err := table.NewMetadata(t.tableSchemaPromotedTypes, iceberg.UnpartitionedSpec,
table.UnsortedSortOrder, t.location, iceberg.Properties{"format-version": strconv.Itoa(t.formatVersion)})
t.Require().NoError(err)

ctx := context.Background()

tbl := table.New(
ident,
meta,
t.getMetadataLoc(),
func(ctx context.Context) (iceio.IO, error) {
return fs, nil
},
&mockedCatalog{},
)

tblfs, err := tbl.FS(ctx)

t.Require().NoError(err)

for i := range 5 {
tx := tbl.NewTransaction()
t.Require().NoError(tx.AddFiles(ctx, files[i:i+1], nil, false))
tbl, err = tx.Commit(ctx)
t.Require().NoError(err)
}

mflist, err := tbl.CurrentSnapshot().Manifests(tblfs)
t.Require().NoError(err)
t.Len(mflist, 5)
t.Require().Equal(5, len(tbl.Metadata().Snapshots()))

tx := tbl.NewTransaction()
t.Require().NoError(tx.ExpireSnapshots(table.WithOlderThan(0), table.WithRetainLast(2)))
tbl, err = tx.Commit(ctx)
t.Require().NoError(err)
t.Require().Equal(2, len(tbl.Metadata().Snapshots()))
t.Require().Equal(2, len(slices.Collect(tbl.Metadata().SnapshotLogs())))
}

func (t *TableWritingTestSuite) TestWriteSpecialCharacterColumn() {
ident := table.Identifier{"default", "write_special_character_column"}
colNameWithSpecialChar := "letter/abc"
122 changes: 121 additions & 1 deletion table/transaction.go
Original file line number Diff line number Diff line change
@@ -18,6 +18,7 @@
package table

import (
"cmp"
"context"
"encoding/json"
"errors"
@@ -146,6 +147,115 @@ func (t *Transaction) UpdateSpec(caseSensitive bool) *UpdateSpec {
return NewUpdateSpec(t, caseSensitive)
}

type expireSnapshotsCfg struct {
minSnapshotsToKeep *int
maxSnapshotAgeMs *int64
}

type ExpireSnapshotsOpt func(*expireSnapshotsCfg)

func WithRetainLast(n int) ExpireSnapshotsOpt {
return func(cfg *expireSnapshotsCfg) {
cfg.minSnapshotsToKeep = &n
}
}

func WithOlderThan(t time.Duration) ExpireSnapshotsOpt {
return func(cfg *expireSnapshotsCfg) {
n := t.Milliseconds()
cfg.maxSnapshotAgeMs = &n
}
}

func (t *Transaction) ExpireSnapshots(opts ...ExpireSnapshotsOpt) error {
var (
cfg expireSnapshotsCfg
updates []Update
snapsToKeep = make(map[int64]struct{})
nowMs = time.Now().UnixMilli()
)

for _, opt := range opts {
opt(&cfg)
}

for refName, ref := range t.meta.refs {
if refName == MainBranch {
snapsToKeep[ref.SnapshotID] = struct{}{}
}

snap, err := t.meta.SnapshotByID(ref.SnapshotID)
if err != nil {
return err
}

maxRefAgeMs := cmp.Or(ref.MaxRefAgeMs, cfg.maxSnapshotAgeMs)
if maxRefAgeMs == nil {
return errors.New("cannot find a valid value for maxRefAgeMs")
}

refAge := nowMs - snap.TimestampMs
if refAge > *maxRefAgeMs {
updates = append(updates, NewRemoveSnapshotRefUpdate(refName))

continue
}

var (
minSnapshotsToKeep = cmp.Or(ref.MinSnapshotsToKeep, cfg.minSnapshotsToKeep)
maxSnapshotAgeMs = cmp.Or(ref.MaxSnapshotAgeMs, cfg.maxSnapshotAgeMs)
)

if minSnapshotsToKeep == nil || maxSnapshotAgeMs == nil {
return errors.New("cannot find a valid value for minSnapshotsToKeep and maxSnapshotAgeMs")
}

if ref.SnapshotRefType != BranchRef {
snapsToKeep[ref.SnapshotID] = struct{}{}

continue
}

var (
numSnapshots int
snapId = ref.SnapshotID
)

for {
snap, err := t.meta.SnapshotByID(snapId)
if err != nil {
return err
}

snapAge := time.Now().UnixMilli() - snap.TimestampMs
if (snapAge > *maxSnapshotAgeMs) && (numSnapshots >= *minSnapshotsToKeep) {
break
}

snapsToKeep[snap.SnapshotID] = struct{}{}

if snap.ParentSnapshotID == nil {
break
}

snapId = *snap.ParentSnapshotID
numSnapshots++
}
}

var snapsToDelete []int64

for _, snap := range t.meta.snapshotList {
if _, found := snapsToKeep[snap.SnapshotID]; !found {
snapsToDelete = append(snapsToDelete, snap.SnapshotID)
}
}

updates = append(updates, NewRemoveSnapshotsUpdate(snapsToDelete))

return t.apply(updates, nil)
}

func (t *Transaction) AppendTable(ctx context.Context, tbl arrow.Table, batchSize int64, snapshotProps iceberg.Properties) error {
rdr := array.NewTableReader(tbl, batchSize)
defer rdr.Release()
@@ -401,8 +511,18 @@ func (t *Transaction) Commit(ctx context.Context) (*Table, error) {

if len(t.meta.updates) > 0 {
t.reqs = append(t.reqs, AssertTableUUID(t.meta.uuid))
tbl, err := t.tbl.doCommit(ctx, t.meta.updates, t.reqs)
if err != nil {
return tbl, err
}

for _, u := range t.meta.updates {
if perr := u.PostCommit(ctx, t.tbl, tbl); perr != nil {
err = errors.Join(err, perr)
}
}

return t.tbl.doCommit(ctx, t.meta.updates, t.reqs)
return tbl, err
}

return t.tbl, nil
Loading
Oops, something went wrong.
Loading
Oops, something went wrong.