Skip to content

Commit

Permalink
[5889] Bmoric/log application name for all workers [2/2] (#7268)
Browse files Browse the repository at this point in the history
This is adding the application name in the logs produced by the docker containers.

This is related to #5889.
  • Loading branch information
benmoriceau committed Oct 28, 2021
1 parent aa2caf1 commit 9d30bb0
Show file tree
Hide file tree
Showing 13 changed files with 293 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package io.airbyte.commons.io;

import io.airbyte.commons.concurrency.VoidCallable;
import io.airbyte.commons.logging.MdcScope;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
Expand All @@ -21,13 +22,17 @@ public class LineGobbler implements VoidCallable {
private final static Logger LOGGER = LoggerFactory.getLogger(LineGobbler.class);

public static void gobble(final InputStream is, final Consumer<String> consumer) {
gobble(is, consumer, "generic");
gobble(is, consumer, "generic", MdcScope.DEFAULT);
}

public static void gobble(final InputStream is, final Consumer<String> consumer, final String caller) {
public static void gobble(final InputStream is, final Consumer<String> consumer, final MdcScope mdcScope) {
gobble(is, consumer, "generic", mdcScope);
}

public static void gobble(final InputStream is, final Consumer<String> consumer, final String caller, final MdcScope mdcScope) {
final ExecutorService executor = Executors.newSingleThreadExecutor();
final Map<String, String> mdc = MDC.getCopyOfContextMap();
final var gobbler = new LineGobbler(is, consumer, executor, mdc, caller);
final var gobbler = new LineGobbler(is, consumer, executor, mdc, caller, mdcScope);
executor.submit(gobbler);
}

Expand All @@ -36,24 +41,35 @@ public static void gobble(final InputStream is, final Consumer<String> consumer,
private final ExecutorService executor;
private final Map<String, String> mdc;
private final String caller;
private final MdcScope containerLogMDC;

LineGobbler(final InputStream is,
final Consumer<String> consumer,
final ExecutorService executor,
final Map<String, String> mdc) {
this(is, consumer, executor, mdc, "generic");
this(is, consumer, executor, mdc, "generic", MdcScope.DEFAULT);
}

LineGobbler(final InputStream is,
final Consumer<String> consumer,
final ExecutorService executor,
final Map<String, String> mdc,
final MdcScope mdcScope) {
this(is, consumer, executor, mdc, "generic", mdcScope);
}

LineGobbler(final InputStream is,
final Consumer<String> consumer,
final ExecutorService executor,
final Map<String, String> mdc,
final String caller) {
final String caller,
final MdcScope mdcScope) {
this.is = IOs.newBufferedReader(is);
this.consumer = consumer;
this.executor = executor;
this.mdc = mdc;
this.caller = caller;
this.containerLogMDC = mdcScope;
}

@Override
Expand All @@ -62,7 +78,9 @@ public void voidCall() {
try {
String line;
while ((line = is.readLine()) != null) {
consumer.accept(line);
try (containerLogMDC) {
consumer.accept(line);
}
}
} catch (final IOException i) {
LOGGER.warn("{} gobbler IOException: {}. Typically happens when cancelling a job.", caller, i.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

package io.airbyte.commons.logging;

import com.google.common.annotations.VisibleForTesting;

public class LoggingHelper {

public enum Color {
Expand Down Expand Up @@ -31,7 +33,8 @@ public String getCode() {

public static final String LOG_SOURCE_MDC_KEY = "log_source";

private static final String RESET = "\u001B[0m";
@VisibleForTesting
public static final String RESET = "\u001B[0m";

public static String applyColor(final Color color, final String msg) {
return color.getCode() + msg + RESET;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@

package io.airbyte.commons.logging;

import io.airbyte.commons.logging.LoggingHelper.Color;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import org.slf4j.MDC;

/**
Expand All @@ -25,6 +28,8 @@
*/
public class MdcScope implements AutoCloseable {

public final static MdcScope DEFAULT = new Builder().build();

private final Map<String, String> originalContextMap;

public MdcScope(final Map<String, String> keyValuesToAdd) {
Expand All @@ -35,8 +40,41 @@ public MdcScope(final Map<String, String> keyValuesToAdd) {
}

@Override
public void close() throws Exception {
public void close() {
MDC.setContextMap(originalContextMap);
}

public static class Builder {

private Optional<String> maybeLogPrefix = Optional.empty();
private Optional<Color> maybePrefixColor = Optional.empty();

public Builder setLogPrefix(final String logPrefix) {
this.maybeLogPrefix = Optional.ofNullable(logPrefix);

return this;
}

public Builder setPrefixColor(final Color color) {
this.maybePrefixColor = Optional.ofNullable(color);

return this;
}

public MdcScope build() {
final Map<String, String> extraMdcEntries = new HashMap<>();

maybeLogPrefix.stream().forEach(logPrefix -> {
final String potentiallyColoredLog = maybePrefixColor
.map(color -> LoggingHelper.applyColor(color, logPrefix))
.orElse(logPrefix);

extraMdcEntries.put(LoggingHelper.LOG_SOURCE_MDC_KEY, potentiallyColoredLog);
});

return new MdcScope(extraMdcEntries);
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -54,17 +54,14 @@ public void testMDCModified() {

});

} catch (final Exception e) {
e.printStackTrace();
}
}

@Test
@DisplayName("The MDC context is properly restored")
public void testMDCRestore() {
try (final MdcScope mdcScope = new MdcScope(modificationInMDC)) {} catch (final Exception e) {
e.printStackTrace();
}
try (final MdcScope mdcScope = new MdcScope(modificationInMDC)) {}

final Map<String, String> mdcState = MDC.getCopyOfContextMap();

Assertions.assertThat(mdcState).containsAllEntriesOf(originalMap);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.io.LineGobbler;
import io.airbyte.commons.logging.LoggingHelper.Color;
import io.airbyte.commons.logging.MdcScope;
import io.airbyte.commons.logging.MdcScope.Builder;
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.config.OperatorDbt;
import io.airbyte.config.ResourceRequirements;
Expand All @@ -28,6 +31,10 @@ public class DbtTransformationRunner implements AutoCloseable {

private static final Logger LOGGER = LoggerFactory.getLogger(DbtTransformationRunner.class);
private static final String DBT_ENTRYPOINT_SH = "entrypoint.sh";
private static final MdcScope CONTAINER_LOG_MDC = new Builder()
.setLogPrefix("dbt")
.setPrefixColor(Color.CYAN)
.build();

private final ProcessFactory processFactory;
private final NormalizationRunner normalizationRunner;
Expand All @@ -48,7 +55,7 @@ public void start() throws Exception {
* transform-config scripts (to translate Airbyte Catalogs into Dbt profiles file). Thus, we depend
* on the NormalizationRunner to configure the dbt project with the appropriate destination settings
* and pull the custom git repository into the workspace.
*
* <p>
* Once the workspace folder/files is setup to run, we invoke the custom transformation command as
* provided by the user to execute whatever extra transformation has been implemented.
*/
Expand Down Expand Up @@ -87,8 +94,8 @@ public boolean transform(final String jobId,
Map.of(KubeProcessFactory.JOB_TYPE, KubeProcessFactory.SYNC_JOB, KubeProcessFactory.SYNC_STEP, KubeProcessFactory.CUSTOM_STEP),
dbtArguments);

LineGobbler.gobble(process.getInputStream(), LOGGER::info);
LineGobbler.gobble(process.getErrorStream(), LOGGER::error);
LineGobbler.gobble(process.getInputStream(), LOGGER::info, CONTAINER_LOG_MDC);
LineGobbler.gobble(process.getErrorStream(), LOGGER::error, CONTAINER_LOG_MDC);

WorkerUtils.wait(process);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.io.LineGobbler;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.logging.LoggingHelper.Color;
import io.airbyte.commons.logging.MdcScope;
import io.airbyte.commons.logging.MdcScope.Builder;
import io.airbyte.config.OperatorDbt;
import io.airbyte.config.ResourceRequirements;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
Expand All @@ -27,6 +30,10 @@
public class DefaultNormalizationRunner implements NormalizationRunner {

private static final Logger LOGGER = LoggerFactory.getLogger(DefaultNormalizationRunner.class);
private static final MdcScope CONTAINER_LOG_MDC = new Builder()
.setLogPrefix("normalization")
.setPrefixColor(Color.GREEN)
.build();

private final DestinationType destinationType;
private final ProcessFactory processFactory;
Expand Down Expand Up @@ -109,8 +116,8 @@ private boolean runProcess(final String jobId,
process = processFactory.create(jobId, attempt, jobRoot, normalizationImageName, false, files, null, resourceRequirements,
Map.of(KubeProcessFactory.JOB_TYPE, KubeProcessFactory.SYNC_JOB, KubeProcessFactory.SYNC_STEP, KubeProcessFactory.NORMALISE_STEP), args);

LineGobbler.gobble(process.getInputStream(), LOGGER::info);
LineGobbler.gobble(process.getErrorStream(), LOGGER::error);
LineGobbler.gobble(process.getInputStream(), LOGGER::info, CONTAINER_LOG_MDC);
LineGobbler.gobble(process.getErrorStream(), LOGGER::error, CONTAINER_LOG_MDC);

WorkerUtils.wait(process);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.io.LineGobbler;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.logging.LoggingHelper.Color;
import io.airbyte.commons.logging.MdcScope;
import io.airbyte.commons.logging.MdcScope.Builder;
import io.airbyte.config.WorkerDestinationConfig;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteMessage.Type;
Expand All @@ -30,6 +33,10 @@
public class DefaultAirbyteDestination implements AirbyteDestination {

private static final Logger LOGGER = LoggerFactory.getLogger(DefaultAirbyteDestination.class);
private static final MdcScope CONTAINER_LOG_MDC = new Builder()
.setLogPrefix("destination")
.setPrefixColor(Color.MAGENTA)
.build();

private final IntegrationLauncher integrationLauncher;
private final AirbyteStreamFactory streamFactory;
Expand All @@ -41,7 +48,7 @@ public class DefaultAirbyteDestination implements AirbyteDestination {
private Iterator<AirbyteMessage> messageIterator = null;

public DefaultAirbyteDestination(final IntegrationLauncher integrationLauncher) {
this(integrationLauncher, new DefaultAirbyteStreamFactory());
this(integrationLauncher, new DefaultAirbyteStreamFactory(CONTAINER_LOG_MDC));

}

Expand All @@ -63,7 +70,7 @@ public void start(final WorkerDestinationConfig destinationConfig, final Path jo
WorkerConstants.DESTINATION_CATALOG_JSON_FILENAME,
Jsons.serialize(destinationConfig.getCatalog()));
// stdout logs are logged elsewhere since stdout also contains data
LineGobbler.gobble(destinationProcess.getErrorStream(), LOGGER::error, "airbyte-destination");
LineGobbler.gobble(destinationProcess.getErrorStream(), LOGGER::error, "airbyte-destination", CONTAINER_LOG_MDC);

writer = new BufferedWriter(new OutputStreamWriter(destinationProcess.getOutputStream(), Charsets.UTF_8));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.io.LineGobbler;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.logging.LoggingHelper.Color;
import io.airbyte.commons.logging.MdcScope;
import io.airbyte.commons.logging.MdcScope.Builder;
import io.airbyte.config.WorkerSourceConfig;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteMessage.Type;
Expand All @@ -35,6 +38,11 @@ public class DefaultAirbyteSource implements AirbyteSource {
private static final Duration GRACEFUL_SHUTDOWN_DURATION = Duration.of(10, ChronoUnit.HOURS);
private static final Duration FORCED_SHUTDOWN_DURATION = Duration.of(1, ChronoUnit.MINUTES);

private static final MdcScope CONTAINER_LOG_MDC = new Builder()
.setLogPrefix("source")
.setPrefixColor(Color.BLUE)
.build();

private final IntegrationLauncher integrationLauncher;
private final AirbyteStreamFactory streamFactory;
private final HeartbeatMonitor heartbeatMonitor;
Expand All @@ -43,7 +51,7 @@ public class DefaultAirbyteSource implements AirbyteSource {
private Iterator<AirbyteMessage> messageIterator = null;

public DefaultAirbyteSource(final IntegrationLauncher integrationLauncher) {
this(integrationLauncher, new DefaultAirbyteStreamFactory(), new HeartbeatMonitor(HEARTBEAT_FRESH_DURATION));
this(integrationLauncher, new DefaultAirbyteStreamFactory(CONTAINER_LOG_MDC), new HeartbeatMonitor(HEARTBEAT_FRESH_DURATION));
}

@VisibleForTesting
Expand All @@ -67,7 +75,7 @@ public void start(final WorkerSourceConfig sourceConfig, final Path jobRoot) thr
sourceConfig.getState() == null ? null : WorkerConstants.INPUT_STATE_JSON_FILENAME,
sourceConfig.getState() == null ? null : Jsons.serialize(sourceConfig.getState().getState()));
// stdout logs are logged elsewhere since stdout also contains data
LineGobbler.gobble(sourceProcess.getErrorStream(), LOGGER::error, "airbyte-source");
LineGobbler.gobble(sourceProcess.getErrorStream(), LOGGER::error, "airbyte-source", CONTAINER_LOG_MDC);

messageIterator = streamFactory.create(IOs.newBufferedReader(sourceProcess.getInputStream()))
.peek(message -> heartbeatMonitor.beat())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.logging.MdcScope;
import io.airbyte.commons.logging.MdcScope.Builder;
import io.airbyte.protocol.models.AirbyteLogMessage;
import io.airbyte.protocol.models.AirbyteMessage;
import java.io.BufferedReader;
Expand All @@ -28,16 +30,22 @@ public class DefaultAirbyteStreamFactory implements AirbyteStreamFactory {

private static final Logger LOGGER = LoggerFactory.getLogger(DefaultAirbyteStreamFactory.class);

private final MdcScope containerLogMDC;
private final AirbyteProtocolPredicate protocolValidator;
private final Logger logger;

public DefaultAirbyteStreamFactory() {
this(new AirbyteProtocolPredicate(), LOGGER);
this(new Builder().build());
}

DefaultAirbyteStreamFactory(final AirbyteProtocolPredicate protocolPredicate, final Logger logger) {
public DefaultAirbyteStreamFactory(final MdcScope containerLogMDC) {
this(new AirbyteProtocolPredicate(), LOGGER, containerLogMDC);
}

DefaultAirbyteStreamFactory(final AirbyteProtocolPredicate protocolPredicate, final Logger logger, final MdcScope containerLogMDC) {
protocolValidator = protocolPredicate;
this.logger = logger;
this.containerLogMDC = containerLogMDC;
}

@Override
Expand All @@ -50,7 +58,9 @@ public Stream<AirbyteMessage> create(final BufferedReader bufferedReader) {
// we log as info all the lines that are not valid json
// some sources actually log their process on stdout, we
// want to make sure this info is available in the logs.
logger.info(line);
try (containerLogMDC) {
logger.info(line);
}
}
return jsonLine.stream();
})
Expand All @@ -73,7 +83,9 @@ public Stream<AirbyteMessage> create(final BufferedReader bufferedReader) {
.filter(airbyteMessage -> {
final boolean isLog = airbyteMessage.getType() == AirbyteMessage.Type.LOG;
if (isLog) {
internalLog(airbyteMessage.getLog());
try (containerLogMDC) {
internalLog(airbyteMessage.getLog());
}
}
return !isLog;
});
Expand Down
Loading

0 comments on commit 9d30bb0

Please sign in to comment.