Skip to content

Bug(table): RowDelta.validate uses AlwaysTrue filter β€” false conflicts for concurrent appends to different partitionsΒ #978

@mzzz-zzm

Description

@mzzz-zzm

Apache Iceberg version

main (development)

Please describe the bug 🐞

RowDelta.validate() checks for conflicting concurrent data files by calling validateNoConflictingDataFiles(cc, iceberg.AlwaysTrue{}, level). The AlwaysTrue{} filter matches every data file in every partition. As a result, a concurrent FastAppend to a completely different partition causes a RowDelta equality-delete commit to be rejected with ErrConflictingDataFiles, even though the two operations cannot possibly interfere with each other.

In practice, any busy partitioned table with a streaming DELETE/UPDATE job (which uses RowDelta with equality-delete files) will almost never be able to commit under the default SERIALIZABLE isolation level, because some other writer is always appending to some partition.

Affected component

table/row_delta.go β€” RowDelta.validate()

Steps to reproduce

The test is self-contained. Save the file below as table/bug_repro_rowdelta_partition_test.go in an unmodified checkout of main and run:

go test ./table/ -run TestBugRepro_RowDeltaFalseConflictDifferentPartition -v

Expected output on unfixed upstream:

--- FAIL: TestBugRepro_RowDeltaFalseConflictDifferentPartition
    bug_repro_rowdelta_partition_test.go:232:
        Error:   Received unexpected error:
                 commit failed, refresh and try again: concurrent data files added:
                 snapshot ... added data file .../worker-b-cold.parquet matching filter AlwaysTrue()
        Messages: serializable isolation must allow an eq-delete when the only
                  concurrent data file is in a completely different partition (AlwaysTrue bug)

The test sets up a baseline commit first (so Writer A starts from a non-empty base), which isolates this bug from the separate empty-base
conflict-bypass bug. Worker B's data is in partition category="cold";
Writer A's equality-delete targets category="hot". The two partitions are completely disjoint.

Full source of table/bug_repro_rowdelta_partition_test.go:

package table_test

import (
	"context"
	"fmt"
	"path/filepath"
	"sync"
	"testing"

	"github.com/apache/iceberg-go"
	iceio "github.com/apache/iceberg-go/io"
	"github.com/apache/iceberg-go/table"
	"github.com/stretchr/testify/require"
)

type rowDeltaRepro3Catalog struct {
	mu       sync.Mutex
	current  table.Metadata
	location string
}

func newRowDeltaRepro3Catalog(meta table.Metadata, location string) *rowDeltaRepro3Catalog {
	return &rowDeltaRepro3Catalog{current: meta, location: location}
}

func (c *rowDeltaRepro3Catalog) LoadTable(_ context.Context, ident table.Identifier) (*table.Table, error) {
	c.mu.Lock()
	meta := c.current
	c.mu.Unlock()
	return table.New(
		ident, meta, c.location+"/metadata/v1.metadata.json",
		func(_ context.Context) (iceio.IO, error) { return iceio.LocalFS{}, nil },
		c,
	), nil
}

func (c *rowDeltaRepro3Catalog) CommitTable(
	_ context.Context,
	ident table.Identifier,
	reqs []table.Requirement,
	updates []table.Update,
) (table.Metadata, string, error) {
	c.mu.Lock()
	defer c.mu.Unlock()
	for _, req := range reqs {
		if err := req.Validate(c.current); err != nil {
			return nil, "", fmt.Errorf("%w: CAS check failed: %v", table.ErrCommitFailed, err)
		}
	}
	newMeta, err := table.UpdateTableMetadata(c.current, updates, "")
	if err != nil {
		return nil, "", err
	}
	c.current = newMeta
	return newMeta, c.location + "/metadata/committed.metadata.json", nil
}

func makeRepro3DataFile(t *testing.T, spec *iceberg.PartitionSpec, path, category string) iceberg.DataFile {
	t.Helper()
	var partition map[int]any
	if category != "" {
		partition = map[int]any{1000: category}
	}
	b, err := iceberg.NewDataFileBuilder(
		*spec, iceberg.EntryContentData, path,
		iceberg.ParquetFile, partition, nil, nil, 100, 1024,
	)
	require.NoError(t, err)
	return b.Build()
}

func makeRepro3EqDeleteFile(t *testing.T, spec *iceberg.PartitionSpec, path, category string) iceberg.DataFile {
	t.Helper()
	var partition map[int]any
	if category != "" {
		partition = map[int]any{1000: category}
	}
	b, err := iceberg.NewDataFileBuilder(
		*spec, iceberg.EntryContentEqDeletes, path,
		iceberg.ParquetFile, partition, nil, nil, 5, 512,
	)
	require.NoError(t, err)
	return b.EqualityFieldIDs([]int{3}).Build()
}

func TestBugRepro_RowDeltaFalseConflictDifferentPartition(t *testing.T) {
	location := filepath.ToSlash(t.TempDir())
	ctx := context.Background()

	schema := iceberg.NewSchema(0,
		iceberg.NestedField{ID: 1, Name: "category", Type: iceberg.PrimitiveTypes.String, Required: false},
		iceberg.NestedField{ID: 2, Name: "value", Type: iceberg.PrimitiveTypes.Int64, Required: false},
		iceberg.NestedField{ID: 3, Name: "event_id", Type: iceberg.PrimitiveTypes.Int64, Required: false},
	)
	spec := iceberg.NewPartitionSpec(
		iceberg.PartitionField{
			SourceIDs: []int{1},
			FieldID:   1000,
			Name:      "category",
			Transform: iceberg.IdentityTransform{},
		},
	)
	props := iceberg.Properties{
		table.PropertyFormatVersion:        "2",
		table.CommitNumRetriesKey:          "1",
		table.CommitMinRetryWaitMsKey:      "0",
		table.CommitMaxRetryWaitMsKey:      "0",
		table.CommitTotalRetryTimeoutMsKey: "60000",
	}

	metaEmpty, err := table.NewMetadata(schema, &spec, table.UnsortedSortOrder, location, props)
	require.NoError(t, err)

	cat := newRowDeltaRepro3Catalog(metaEmpty, location)
	fsF := func(_ context.Context) (iceio.IO, error) { return iceio.LocalFS{}, nil }

	newTbl := func(base table.Metadata) *table.Table {
		return table.New([]string{"default", "repro3"}, base,
			location+"/metadata/v1.metadata.json", fsF, cat)
	}

	// Step 1: Establish baseline snapshot S0 so Writer A starts from a
	// non-empty base (isolates AlwaysTrue bug from the empty-base bug).
	baseline := makeRepro3DataFile(t, &spec, location+"/data/baseline.parquet", "warm")
	txSetup := newTbl(metaEmpty).NewTransaction()
	require.NoError(t, txSetup.AddDataFiles(ctx, []iceberg.DataFile{baseline}, nil))
	_, err = txSetup.Commit(ctx)
	require.NoError(t, err, "baseline commit must succeed")

	cat.mu.Lock()
	metaS0 := cat.current
	cat.mu.Unlock()

	// Step 2: Worker B appends a data file in partition category="cold".
	workerBTbl, err := cat.LoadTable(ctx, []string{"default", "repro3"})
	require.NoError(t, err)
	dfCold := makeRepro3DataFile(t, &spec, location+"/data/worker-b-cold.parquet", "cold")
	txB := workerBTbl.NewTransaction()
	require.NoError(t, txB.AddDataFiles(ctx, []iceberg.DataFile{dfCold}, nil))
	_, err = txB.Commit(ctx)
	require.NoError(t, err, "Worker B must commit successfully")

	// Step 3: Writer A starts from S0, adds equality-delete in category="hot".
	writerATbl := newTbl(metaS0)
	eqDelHot := makeRepro3EqDeleteFile(t, &spec, location+"/data/writer-a-eq-del-hot.parquet", "hot")
	txA := writerATbl.NewTransaction()
	rd := txA.NewRowDelta(nil)
	rd.AddDeletes(eqDelHot)
	require.NoError(t, rd.Commit(ctx))

	// Step 4: Writer A commits.
	// Attempt 1: CAS fails (S0 vs Worker B's HEAD) β†’ ErrCommitFailed.
	// Retry: newConflictContext returns [Worker_B_snapshot] as concurrent.
	//        RowDelta.validate calls validateNoConflictingDataFiles(AlwaysTrue{}).
	//        AlwaysTrue matches Worker B's "cold" file β†’ ErrConflictingDataFiles.
	// Expected: no error ("cold" β‰  "hot", partitions are disjoint).
	_, err = txA.Commit(ctx)
	require.NoError(t, err,
		"serializable isolation must allow an eq-delete when the only concurrent "+
			"data file is in a completely different partition (AlwaysTrue bug)")
}

Root cause

The relevant section of table/row_delta.go on main:

// Comment in code:
// "does not yet surface the bound predicate. AlwaysTrue is the 
//  conservative fallback [...] Follow-up:"
if err := validateNoConflictingDataFiles(cc, iceberg.AlwaysTrue{}, level); err != nil {
    return err
}

validateNoConflictingDataFiles iterates every ADDED entry in every concurrent snapshot and accepts any data file whose partition satisfies the supplied filter.
Because the filter is iceberg.AlwaysTrue{}, every data file in every partition satisfies it β€” turning any concurrent append anywhere in the table into a reported conflict for the RowDelta committer.

The Java reference implementation (MergingSnapshotProducer.validateNoNewDataFiles) derives the conflict filter from the equality-delete predicate itself, so only rows that could match the delete are checked. The Go implementation had not yet threaded that predicate through and fell back to AlwaysTrue as a temporary conservative measure. The result is that SERIALIZABLE RowDelta commits are effectively impossible on any active partitioned table β€” the default isolation level is unusable.

Expected behaviour

The conflict filter should be scoped to the partition(s) covered by the equality-delete files in the RowDelta. A concurrent append to a completely different partition cannot be affected by the equality deletes and must not block the commit. Only concurrent data files whose partition intersects with the equality-delete partition(s) should be flagged as a conflict.

Metadata

Metadata

Assignees

Labels

No labels
No labels

Type

No fields configured for Bug.

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions