Skip to content

Commit

Permalink
🐛 : make sure logs from threads in replication workers are added to l…
Browse files Browse the repository at this point in the history
…og file (#3874)
  • Loading branch information
cgardens committed Jun 4, 2021
1 parent 2621e03 commit bd3a40f
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import io.airbyte.workers.protocols.MessageTracker;
import io.airbyte.workers.protocols.Source;
import java.nio.file.Path;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand All @@ -45,6 +46,7 @@
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

public class DefaultReplicationWorker implements ReplicationWorker {

Expand Down Expand Up @@ -110,6 +112,7 @@ public ReplicationOutput run(StandardSyncInput syncInput, Path jobRoot) throws W
final StandardTapConfig sourceConfig = WorkerUtils.syncToTapConfig(syncInput);

final ExecutorService executorService = Executors.newFixedThreadPool(2);
final Map<String, String> mdc = MDC.getCopyOfContextMap();

// note: resources are closed in the opposite order in which they are declared. thus source will be
// closed first (which is what we want).
Expand All @@ -120,14 +123,16 @@ public ReplicationOutput run(StandardSyncInput syncInput, Path jobRoot) throws W
final Future<?> destinationOutputThreadFuture = executorService.submit(getDestinationOutputRunnable(
destination,
cancelled,
destinationMessageTracker));
destinationMessageTracker,
mdc));

final Future<?> replicationThreadFuture = executorService.submit(getReplicationRunnable(
source,
destination,
cancelled,
mapper,
sourceMessageTracker));
sourceMessageTracker,
mdc));

LOGGER.info("Waiting for source thread to join.");
replicationThreadFuture.get();
Expand Down Expand Up @@ -193,8 +198,11 @@ private static Runnable getReplicationRunnable(Source<AirbyteMessage> source,
Destination<AirbyteMessage> destination,
AtomicBoolean cancelled,
Mapper<AirbyteMessage> mapper,
MessageTracker<AirbyteMessage> sourceMessageTracker) {
MessageTracker<AirbyteMessage> sourceMessageTracker,
Map<String, String> mdc) {
return () -> {
MDC.setContextMap(mdc);
LOGGER.info("Replication thread started.");
try {
while (!cancelled.get() && !source.isFinished()) {
final Optional<AirbyteMessage> messageOptional = source.attemptRead();
Expand All @@ -214,8 +222,11 @@ private static Runnable getReplicationRunnable(Source<AirbyteMessage> source,

private static Runnable getDestinationOutputRunnable(Destination<AirbyteMessage> destination,
AtomicBoolean cancelled,
MessageTracker<AirbyteMessage> destinationMessageTracker) {
MessageTracker<AirbyteMessage> destinationMessageTracker,
Map<String, String> mdc) {
return () -> {
MDC.setContextMap(mdc);
LOGGER.info("Destination output thread started.");
try {
while (!cancelled.get() && !destination.isFinished()) {
final Optional<AirbyteMessage> messageOptional = destination.attemptRead();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.string.Strings;
import io.airbyte.config.ConfigSchema;
Expand All @@ -55,20 +56,23 @@
import io.airbyte.workers.protocols.airbyte.AirbyteMessageUtils;
import io.airbyte.workers.protocols.airbyte.AirbyteSource;
import io.airbyte.workers.protocols.airbyte.NamespacingMapper;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.function.Executable;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

class DefaultReplicationWorkerTest {

Expand Down Expand Up @@ -96,6 +100,8 @@ class DefaultReplicationWorkerTest {
@SuppressWarnings("unchecked")
@BeforeEach
void setup() throws Exception {
MDC.clear();

jobRoot = Files.createDirectories(Files.createTempDirectory("test").resolve(WORKSPACE_ROOT));

final ImmutablePair<StandardSync, StandardSyncInput> syncPair = TestConfigHelpers.createSyncConfig();
Expand All @@ -118,6 +124,11 @@ void setup() throws Exception {
when(mapper.mapMessage(RECORD_MESSAGE2)).thenReturn(RECORD_MESSAGE2);
}

@AfterEach
void tearDown() {
MDC.clear();
}

@Test
void test() throws Exception {
final ReplicationWorker worker = new DefaultReplicationWorker(
Expand All @@ -139,6 +150,32 @@ void test() throws Exception {
verify(destination).close();
}

@Test
void testLoggingInThreads() throws IOException, WorkerException {
// set up the mdc so that actually log to a file, so that we can verify that file logging captures
// threads.
final Path jobRoot = Files.createTempDirectory(Path.of("/tmp"), "mdc_test");
WorkerUtils.setJobMdc(jobRoot, "1");

final ReplicationWorker worker = new DefaultReplicationWorker(
JOB_ID,
JOB_ATTEMPT,
source,
mapper,
destination,
sourceMessageTracker,
destinationMessageTracker);

worker.run(syncInput, jobRoot);

final Path logPath = jobRoot.resolve(WorkerConstants.LOG_FILENAME);
final String logs = IOs.readFile(logPath);

// make sure we get logs from the threads.
assertTrue(logs.contains("Replication thread started."));
assertTrue(logs.contains("Destination output thread started."));
}

@SuppressWarnings({"BusyWait"})
@Test
void testCancellation() throws InterruptedException {
Expand Down

0 comments on commit bd3a40f

Please sign in to comment.