Skip to content

Spark: serializable-isolation tests can run until external CI/resource limits #16359

@kevinjqliu

Description

@kevinjqliu

Summary

Some spark-extensions serializable-isolation concurrency tests wait on Future.get() without a timeout while worker threads loop up to Integer.MAX_VALUE.

If the expected ValidationException is not thrown, the test has no in-method upper bound and can continue running until CI timeout, disk exhaustion, or manual interruption.

Example

Representative source:

public synchronized void testMergeWithSerializableIsolation() throws InterruptedException {
// cannot run tests with concurrency for Hadoop tables without atomic renames
assumeThat(catalogName).isNotEqualToIgnoringCase("testhadoop");
// if caching is off, the table is eagerly refreshed during runtime filtering
// this can cause a validation exception as concurrent changes would be visible
assumeThat(cachingCatalogEnabled()).isTrue();
createAndInitTable("id INT, dep STRING");
createOrReplaceView("source", Collections.singletonList(1), Encoders.INT());
sql(
"ALTER TABLE %s SET TBLPROPERTIES('%s' '%s')",
tableName, MERGE_ISOLATION_LEVEL, "serializable");
sql("INSERT INTO TABLE %s VALUES (1, 'hr')", tableName);
createBranchIfNeeded();
ExecutorService executorService =
MoreExecutors.getExitingExecutorService(
(ThreadPoolExecutor) Executors.newFixedThreadPool(2));
AtomicInteger barrier = new AtomicInteger(0);
AtomicBoolean shouldAppend = new AtomicBoolean(true);
// merge thread
Future<?> mergeFuture =
executorService.submit(
() -> {
for (int numOperations = 0; numOperations < Integer.MAX_VALUE; numOperations++) {
int currentNumOperations = numOperations;
Awaitility.await()
.pollInterval(10, TimeUnit.MILLISECONDS)
.atMost(5, TimeUnit.SECONDS)
.until(() -> barrier.get() >= currentNumOperations * 2);
sql(
"MERGE INTO %s t USING source s "
+ "ON t.id == s.value "
+ "WHEN MATCHED THEN "
+ " UPDATE SET dep = 'x'",
commitTarget());
barrier.incrementAndGet();
}
});
// append thread
Future<?> appendFuture =
executorService.submit(
() -> {
// load the table via the validation catalog to use another table instance
Table table = validationCatalog.loadTable(tableIdent);
GenericRecord record = GenericRecord.create(table.schema());
record.set(0, 1); // id
record.set(1, "hr"); // dep
for (int numOperations = 0; numOperations < Integer.MAX_VALUE; numOperations++) {
int currentNumOperations = numOperations;
Awaitility.await()
.pollInterval(10, TimeUnit.MILLISECONDS)
.atMost(5, TimeUnit.SECONDS)
.until(() -> !shouldAppend.get() || barrier.get() >= currentNumOperations * 2);
if (!shouldAppend.get()) {
return;
}
for (int numAppends = 0; numAppends < 5; numAppends++) {
DataFile dataFile = writeDataFile(table, ImmutableList.of(record));
AppendFiles appendFiles = table.newFastAppend().appendFile(dataFile);
if (branch != null) {
appendFiles.toBranch(branch);
}
appendFiles.commit();
}
barrier.incrementAndGet();
}
});
try {
assertThatThrownBy(mergeFuture::get)
.isInstanceOf(ExecutionException.class)
.cause()
.isInstanceOf(ValidationException.class)
.hasMessageContaining("Found conflicting files that can contain");
} finally {
shouldAppend.set(false);
appendFuture.cancel(true);
}

The merge thread loops nearly unboundedly:

for (int numOperations = 0; numOperations < Integer.MAX_VALUE; numOperations++) {
  ...
  sql("MERGE INTO ... WHEN MATCHED THEN UPDATE SET dep = 'x'", commitTarget());
  barrier.incrementAndGet();
}

The test then waits without a timeout:

assertThatThrownBy(mergeFuture::get)
    .isInstanceOf(ExecutionException.class)
    .cause()
    .isInstanceOf(ValidationException.class)
    .hasMessageContaining("Found conflicting files that can contain");

Cleanup is in finally, so it is not reached unless mergeFuture.get() returns or throws.

CI evidence

This is not only theoretical. In PR #16303, CI was retriggered multiple times before the root cause was understood.

The failure was later identified as a legitimate runaway-loop issue: the test kept running until GitHub Actions reported No space left on device.

Affected area

The pattern appears in serializable-isolation row-level operation tests, including:

  • TestMerge.testMergeWithSerializableIsolation
  • TestDelete.testDeleteWithSerializableIsolation
  • TestUpdate.testUpdateWithSerializableIsolation

These tests are inherited by the corresponding copy-on-write / merge-on-read test classes where present.

Suggested fix

Add an explicit upper bound so these tests fail fast with useful diagnostics instead of relying on CI timeouts or disk exhaustion.

Possible options:

  • use Future.get(timeout, unit);
  • add a JUnit timeout;
  • bound the operation loop to a reasonable retry count;
  • ensure both futures are cancelled and the executor is shut down on timeout.

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions