Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,10 @@ public class IcebergSinkConfig extends AbstractConfig {
private static final String COORDINATOR_EXECUTOR_KEEP_ALIVE_TIMEOUT_MS =
"iceberg.coordinator-executor-keep-alive-timeout-ms";

private static final String COMMIT_MAX_CONSECUTIVE_FAILURES_PROP =
"iceberg.control.commit.max-consecutive-failures";
private static final int COMMIT_MAX_CONSECUTIVE_FAILURES_DEFAULT = 1;

@VisibleForTesting static final String COMMA_NO_PARENS_REGEX = ",(?![^()]*+\\))";

public static final ConfigDef CONFIG_DEF = newConfigDef();
Expand Down Expand Up @@ -235,6 +239,12 @@ private static ConfigDef newConfigDef() {
120000L,
Importance.LOW,
"config to control coordinator executor keep alive time");
configDef.define(
COMMIT_MAX_CONSECUTIVE_FAILURES_PROP,
ConfigDef.Type.INT,
COMMIT_MAX_CONSECUTIVE_FAILURES_DEFAULT,
Importance.MEDIUM,
"Maximum number of consecutive commit failures before the coordinator terminates");
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two small things I'd fold in while we're here:

The property name iceberg.connect.commit.max-consecutive-failures introduces a new iceberg.connect.commit.* namespace — the rest of the commit knobs in this file live under a different prefix. Once this ships it's public and renaming costs a deprecation cycle. Worth aligning with the existing commit-prefix convention now.

And I'd add ConfigDef.Range.atLeast(1) as the validator argument — 0 or negative silently makes the feature unreachable.

return configDef;
}

Expand Down Expand Up @@ -355,6 +365,10 @@ public long keepAliveTimeoutInMs() {
return getLong(COORDINATOR_EXECUTOR_KEEP_ALIVE_TIMEOUT_MS);
}

public int commitMaxConsecutiveFailures() {
return getInt(COMMIT_MAX_CONSECUTIVE_FAILURES_PROP);
}

public TableSinkConfig tableConfig(String tableName) {
return tableConfigMap.computeIfAbsent(
tableName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.apache.iceberg.connect.events.Event;
import org.apache.iceberg.connect.events.StartCommit;
import org.apache.iceberg.connect.events.TableReference;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Streams;
Expand Down Expand Up @@ -82,6 +83,7 @@ class Coordinator extends Channel {
private final CommitState commitState;
private volatile boolean terminated;
private final String taskId;
private int consecutiveCommitFailures;

Coordinator(
Catalog catalog,
Expand Down Expand Up @@ -150,16 +152,33 @@ protected boolean receive(Envelope envelope) {
private void commit(boolean partialCommit) {
try {
doCommit(partialCommit);
} catch (RuntimeException e) {
if (!partialCommit) {
consecutiveCommitFailures = 0;
}
} catch (CommitFailedException e) {
if (partialCommit) {
LOG.warn(
"Partial commit {} failed for task {}, will retry",
commitState.currentCommitId(),
taskId,
e);
} else {
LOG.error("Commit {} failed for task {}", commitState.currentCommitId(), taskId, e);
throw e;
consecutiveCommitFailures++;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd narrow this catch. catch (RuntimeException e) sweeps in two classes of exception we really don't want to retry:

CommitStateUnknownException — its javadoc is explicit that retrying may result in duplicate records or unintentional modifications, and SnapshotProducer itself re-throws it rather than retrying. We have no idempotency check before the next attempt, so we'd be the ones producing the duplicates.

Permanent failures like ValidationException, ForbiddenException, NotAuthorizedException, BadRequestException are also RuntimeException. With a 5-minute commit interval and default=3 they'd burn ~15 minutes silently before terminating — the modified testCoordinatorCommittedOffsetValidation (now pinned to =1 to keep its assertion) is a tell that retrying validation failures isn't desired here either.

I'd either catch CommitFailedException specifically, or keep the wider catch and short-circuit the non-retryable types:

} catch (RuntimeException e) {
  if (e instanceof CommitStateUnknownException) {
    throw e;
  }
  // ... existing partialCommit / retry logic, ideally gated on (e instanceof CommitFailedException)
}

wdyt?

if (consecutiveCommitFailures >= config.commitMaxConsecutiveFailures()) {
LOG.error(
"Commit {} failed for task {} ({} consecutive failures, terminating)",
commitState.currentCommitId(),
taskId,
consecutiveCommitFailures,
e);
throw e;
}
LOG.warn(
"Commit {} failed for task {} ({} consecutive failure(s), will retry)",
commitState.currentCommitId(),
taskId,
consecutiveCommitFailures,
e);
}
} finally {
commitState.endCurrentCommit();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ public void before() {
when(config.commitThreads()).thenReturn(1);
when(config.connectGroupId()).thenReturn(CONNECT_CONSUMER_GROUP_ID);
when(config.tableConfig(any())).thenReturn(mock(TableSinkConfig.class));
when(config.commitMaxConsecutiveFailures()).thenReturn(3);

TopicPartitionInfo partitionInfo = mock(TopicPartitionInfo.class);
when(partitionInfo.partition()).thenReturn(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,8 @@ public void testCommitNoFiles() {

@Test
public void testCommitError() {
when(config.commitMaxConsecutiveFailures()).thenReturn(1);

// this spec isn't registered with the table
PartitionSpec badPartitionSpec =
PartitionSpec.builderFor(SCHEMA).withSpecId(1).identity("id").build();
Expand All @@ -152,6 +154,8 @@ public void testCommitError() {

@Test
public void testCommitFailedExceptionPropagates() {
when(config.commitMaxConsecutiveFailures()).thenReturn(1);

Table spiedTable = spy(table);
AppendFiles spiedAppend = spy(table.newAppend());
doThrow(new CommitFailedException("Glue detected concurrent update"))
Expand All @@ -170,6 +174,103 @@ public void testCommitFailedExceptionPropagates() {
.hasMessageContaining("Glue detected concurrent update");
}

@Test
public void testCommitBoundedRetry() {
when(config.commitMaxConsecutiveFailures()).thenReturn(3);
when(config.commitIntervalMs()).thenReturn(0);
when(config.commitTimeoutMs()).thenReturn(Integer.MAX_VALUE);

Table spiedTable = spy(table);
AppendFiles spiedAppend = spy(table.newAppend());
doThrow(new CommitFailedException("transient error")).when(spiedAppend).commit();
when(spiedTable.newAppend()).thenReturn(spiedAppend);
when(catalog.loadTable(TABLE_IDENTIFIER)).thenReturn(spiedTable);

SinkTaskContext context = mock(SinkTaskContext.class);
Coordinator coordinator =
new Coordinator(catalog, config, ImmutableList.of(), clientFactory, context);
coordinator.start();
initConsumer();

// first two failures should not throw
triggerCommitCycle(coordinator);
triggerCommitCycle(coordinator);

// third consecutive failure should terminate
assertThatThrownBy(() -> triggerCommitCycle(coordinator))
.isInstanceOf(CommitFailedException.class)
.hasMessageContaining("transient error");
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

testCommitBoundedRetry only covers fail-fail-terminate. The reset-on-success path (fail, fail, succeed, fail, fail — must not terminate) is the half of the contract that a future refactor could silently break, and it's not exercised anywhere. A second case that interleaves a successful cycle between failures would lock that in.

Separately — once the classifier above lands, a one-liner asserting CommitStateUnknownException propagates on the first failure (rather than being retried) would pin the most important new invariant.

}

@Test
public void testCommitCounterResetsOnSuccess() {
when(config.commitMaxConsecutiveFailures()).thenReturn(3);
when(config.commitIntervalMs()).thenReturn(0);
when(config.commitTimeoutMs()).thenReturn(Integer.MAX_VALUE);

Table spiedTable = spy(table);
AppendFiles failingAppend = spy(table.newAppend());
doThrow(new CommitFailedException("transient error")).when(failingAppend).commit();

// fail, fail, succeed, fail, fail — should NOT terminate
when(spiedTable.newAppend())
.thenReturn(failingAppend)
.thenReturn(failingAppend)
.thenCallRealMethod()
.thenReturn(failingAppend)
.thenReturn(failingAppend);
when(catalog.loadTable(TABLE_IDENTIFIER)).thenReturn(spiedTable);

SinkTaskContext context = mock(SinkTaskContext.class);
Coordinator coordinator =
new Coordinator(catalog, config, ImmutableList.of(), clientFactory, context);
coordinator.start();
initConsumer();

// two failures
triggerCommitCycle(coordinator);
triggerCommitCycle(coordinator);

// success resets counter
triggerCommitCycle(coordinator);

// two more failures — still under threshold
triggerCommitCycle(coordinator);
triggerCommitCycle(coordinator);
}

private long nextOffset = 1;

private void triggerCommitCycle(Coordinator coordinator) {
coordinator.process();

byte[] startBytes = producer.history().get(producer.history().size() - 1).value();
Event startEvent = AvroUtil.decode(startBytes);
UUID commitId = ((StartCommit) startEvent.payload()).commitId();

Event commitResponse =
new Event(
config.connectGroupId(),
new DataWritten(
StructType.of(),
commitId,
TableReference.of("catalog", TableIdentifier.of("db", "tbl"), null),
ImmutableList.of(EventTestUtil.createDataFile()),
ImmutableList.of()));
byte[] bytes = AvroUtil.encode(commitResponse);
consumer.addRecord(new ConsumerRecord<>(CTL_TOPIC_NAME, 0, nextOffset++, "key", bytes));

Event commitReady =
new Event(
config.connectGroupId(),
new DataComplete(
commitId, ImmutableList.of(new TopicPartitionOffset("topic", 1, 1L, null))));
bytes = AvroUtil.encode(commitReady);
consumer.addRecord(new ConsumerRecord<>(CTL_TOPIC_NAME, 0, nextOffset++, "key", bytes));

coordinator.process();
}

private void assertCommitTable(int idx, UUID commitId, OffsetDateTime ts) {
byte[] bytes = producer.history().get(idx).value();
Event commitTable = AvroUtil.decode(bytes);
Expand Down Expand Up @@ -288,6 +389,8 @@ public void testCoordinatorCommittedOffsetMerging() {

@Test
public void testCoordinatorCommittedOffsetValidation() {
when(config.commitMaxConsecutiveFailures()).thenReturn(1);

// This test demonstrates that the Coordinator's validateAndCommit method
// prevents commits when another independent commit has updated the offsets
// during the commit process
Expand Down