From 8b55e2e3bde31a6e038513540943c52bd988d11a Mon Sep 17 00:00:00 2001 From: Vova Kolmakov Date: Wed, 20 May 2026 19:27:28 +0700 Subject: [PATCH 1/2] Kafka Connect: Capture docker container logs for integration tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The integration tests drive their workloads through a testcontainers docker compose stack (kafka, connect, iceberg REST catalog, minio). The Kafka Connect coordinator does the actual snapshot commit work, and its logs live in the Connect container's stdout — never in the JVM test worker. So when an Awaitility timeout surfaced as a bare AssertionError in CI, there was no way to see why the commit did not happen. Attach a withLogConsumer for the connect, kafka, and iceberg services in TestContext, writing each service's container output to ${rootDir}/build/testlogs/-container.log. The location is passed in from the integrationTest Gradle task via a `dockerLogDir` system property and falls back to a no-op when unset (so the constructor still works under IDEs or ad-hoc runs). The Kafka Connect CI artifact upload already covers `**/build/testlogs`, so on failure the docker logs come along automatically. Co-Authored-By: Claude Opus 4.7 (1M context) --- kafka-connect/build.gradle | 3 + .../apache/iceberg/connect/TestContext.java | 60 +++++++++++++++++++ 2 files changed, 63 insertions(+) diff --git a/kafka-connect/build.gradle b/kafka-connect/build.gradle index 43eb245d93a3..a38dd72b798d 100644 --- a/kafka-connect/build.gradle +++ b/kafka-connect/build.gradle @@ -194,6 +194,9 @@ project(':iceberg-kafka-connect:iceberg-kafka-connect-runtime') { testClassesDirs = sourceSets.integration.output.classesDirs classpath = sourceSets.integration.runtimeClasspath jvmArgs += project.property('extraJvmArgs') + + def logDir = "${rootDir}/build/testlogs" + systemProperty 'dockerLogDir', logDir } processResources { diff --git a/kafka-connect/kafka-connect-runtime/src/integration/java/org/apache/iceberg/connect/TestContext.java b/kafka-connect/kafka-connect-runtime/src/integration/java/org/apache/iceberg/connect/TestContext.java index 2a1ded6cd8a1..eab0a3eb9a00 100644 --- a/kafka-connect/kafka-connect-runtime/src/integration/java/org/apache/iceberg/connect/TestContext.java +++ b/kafka-connect/kafka-connect-runtime/src/integration/java/org/apache/iceberg/connect/TestContext.java @@ -19,10 +19,20 @@ package org.apache.iceberg.connect; import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.BufferedWriter; import java.io.File; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardOpenOption; import java.time.Duration; +import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.function.Consumer; import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.CatalogUtil; import org.apache.iceberg.catalog.Catalog; @@ -34,6 +44,7 @@ import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringSerializer; import org.testcontainers.containers.ComposeContainer; +import org.testcontainers.containers.output.OutputFrame; import org.testcontainers.containers.wait.strategy.Wait; public class TestContext { @@ -62,9 +73,58 @@ private TestContext() { new ComposeContainer(new File("./docker/docker-compose.yml")) .withStartupTimeout(Duration.ofMinutes(2)) .waitingFor("connect", Wait.forHttp("/connectors")); + attachContainerLogConsumers(container); container.start(); } + private static void attachContainerLogConsumers(ComposeContainer container) { + String logDir = System.getProperty("dockerLogDir"); + if (logDir == null) { + return; + } + Path dir = Paths.get(logDir); + try { + Files.createDirectories(dir); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + for (String service : List.of("connect", "kafka", "iceberg")) { + container.withLogConsumer(service, dockerLogFileConsumer(service, dir)); + } + } + + private static Consumer dockerLogFileConsumer(String service, Path dir) { + try { + BufferedWriter writer = + Files.newBufferedWriter( + dir.resolve(service + "-container.log"), + StandardCharsets.UTF_8, + StandardOpenOption.CREATE, + StandardOpenOption.TRUNCATE_EXISTING); + Runtime.getRuntime() + .addShutdownHook( + new Thread( + () -> { + try { + writer.flush(); + writer.close(); + } catch (IOException ignored) { + // best-effort + } + })); + return frame -> { + try { + writer.write(frame.getUtf8String()); + writer.flush(); + } catch (IOException ignored) { + // best-effort log capture, never fail the test + } + }; + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + public void startConnector(KafkaConnectUtils.Config config) { KafkaConnectUtils.startConnector(config); KafkaConnectUtils.ensureConnectorRunning(config.getName()); From 1738a02614b4ebbec0c4e999315400add01731e3 Mon Sep 17 00:00:00 2001 From: Vova Kolmakov Date: Wed, 20 May 2026 19:27:58 +0700 Subject: [PATCH 2/2] Kafka Connect: Capture per-test output and reports for integration tests in CI The `integrationTest` Gradle task carried no `addTestOutputListener` and no `testLogging` block, so test-process stdout/stderr was lost and the Gradle console output for CI showed only a bare `FAILED` line and the assertion source location, with no stack trace or AssertJ description. The Kafka Connect CI workflow uploaded only `**/build/testlogs`, which is populated by the unit test task in the root `build.gradle` but not by `integrationTest`. Mirror the existing `test` block from the root `build.gradle` inside the `integrationTest` task: stream per-test output to `${rootDir}/build/testlogs/${project.name}-integration.log` (a separate file from the unit-test log), and emit verbose `testLogging` events with `exceptionFormat "full"` on CI. Extend the Kafka Connect CI artifact upload to also include `**/build/reports/tests/integrationTest` so the HTML reports with per-test stack traces are preserved. Co-Authored-By: Claude Opus 4.7 (1M context) --- .github/workflows/kafka-connect-ci.yml | 1 + kafka-connect/build.gradle | 24 ++++++++++++++++++++++++ 2 files changed, 25 insertions(+) diff --git a/.github/workflows/kafka-connect-ci.yml b/.github/workflows/kafka-connect-ci.yml index c8de0e177ba3..9e981273620f 100644 --- a/.github/workflows/kafka-connect-ci.yml +++ b/.github/workflows/kafka-connect-ci.yml @@ -108,3 +108,4 @@ jobs: name: test logs path: | **/build/testlogs + **/build/reports/tests/integrationTest diff --git a/kafka-connect/build.gradle b/kafka-connect/build.gradle index a38dd72b798d..8813653de4df 100644 --- a/kafka-connect/build.gradle +++ b/kafka-connect/build.gradle @@ -196,7 +196,31 @@ project(':iceberg-kafka-connect:iceberg-kafka-connect-runtime') { jvmArgs += project.property('extraJvmArgs') def logDir = "${rootDir}/build/testlogs" + def logFile = "${logDir}/${project.name}-integration.log" + mkdir("${logDir}") + delete("${logFile}") systemProperty 'dockerLogDir', logDir + def buildLog = new File(logFile) + addTestOutputListener(new TestOutputListener() { + def lastDescriptor + @Override + void onOutput(TestDescriptor testDescriptor, TestOutputEvent testOutputEvent) { + if (lastDescriptor != testDescriptor) { + buildLog << "--------\n- Test log for: " << testDescriptor << "\n--------\n" + lastDescriptor = testDescriptor + } + buildLog << testOutputEvent.destination << " " << testOutputEvent.message + } + }) + + testLogging { + if (System.getenv('CI') != null) { + events "started", "passed", "skipped", "failed" + } else { + events "failed" + } + exceptionFormat "full" + } } processResources {