Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/kafka-connect-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -108,3 +108,4 @@ jobs:
name: test logs
path: |
**/build/testlogs
**/build/reports/tests/integrationTest
27 changes: 27 additions & 0 deletions kafka-connect/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,33 @@ project(':iceberg-kafka-connect:iceberg-kafka-connect-runtime') {
testClassesDirs = sourceSets.integration.output.classesDirs
classpath = sourceSets.integration.runtimeClasspath
jvmArgs += project.property('extraJvmArgs')

def logDir = "${rootDir}/build/testlogs"
def logFile = "${logDir}/${project.name}-integration.log"
mkdir("${logDir}")
delete("${logFile}")
systemProperty 'dockerLogDir', logDir
def buildLog = new File(logFile)
addTestOutputListener(new TestOutputListener() {
def lastDescriptor
@Override
void onOutput(TestDescriptor testDescriptor, TestOutputEvent testOutputEvent) {
if (lastDescriptor != testDescriptor) {
buildLog << "--------\n- Test log for: " << testDescriptor << "\n--------\n"
lastDescriptor = testDescriptor
}
buildLog << testOutputEvent.destination << " " << testOutputEvent.message
}
})

testLogging {
if (System.getenv('CI') != null) {
events "started", "passed", "skipped", "failed"
} else {
events "failed"
}
exceptionFormat "full"
}
}

processResources {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,20 @@
package org.apache.iceberg.connect;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.BufferedWriter;
import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.function.Consumer;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.catalog.Catalog;
Expand All @@ -34,6 +44,7 @@
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.testcontainers.containers.ComposeContainer;
import org.testcontainers.containers.output.OutputFrame;
import org.testcontainers.containers.wait.strategy.Wait;

public class TestContext {
Expand Down Expand Up @@ -62,9 +73,58 @@ private TestContext() {
new ComposeContainer(new File("./docker/docker-compose.yml"))
.withStartupTimeout(Duration.ofMinutes(2))
.waitingFor("connect", Wait.forHttp("/connectors"));
attachContainerLogConsumers(container);
container.start();
}

private static void attachContainerLogConsumers(ComposeContainer container) {
String logDir = System.getProperty("dockerLogDir");
if (logDir == null) {
return;
}
Path dir = Paths.get(logDir);
try {
Files.createDirectories(dir);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
for (String service : List.of("connect", "kafka", "iceberg")) {
container.withLogConsumer(service, dockerLogFileConsumer(service, dir));
}
}

private static Consumer<OutputFrame> dockerLogFileConsumer(String service, Path dir) {
try {
BufferedWriter writer =
Files.newBufferedWriter(
dir.resolve(service + "-container.log"),
StandardCharsets.UTF_8,
StandardOpenOption.CREATE,
StandardOpenOption.TRUNCATE_EXISTING);
Runtime.getRuntime()
.addShutdownHook(
new Thread(
() -> {
try {
writer.flush();
writer.close();
} catch (IOException ignored) {
// best-effort
}
}));
return frame -> {
try {
writer.write(frame.getUtf8String());
writer.flush();
} catch (IOException ignored) {
// best-effort log capture, never fail the test
}
};
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

public void startConnector(KafkaConnectUtils.Config config) {
KafkaConnectUtils.startConnector(config);
KafkaConnectUtils.ensureConnectorRunning(config.getName());
Expand Down