Skip to content

Commit

Permalink
Log sync summary and sync activity seperators (#16314)
Browse files Browse the repository at this point in the history
* Log activity start and end

* pretty-print JSON replication and failure summaries

* simplify
  • Loading branch information
evantahler committed Sep 6, 2022
1 parent 68ab523 commit f15234b
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@
import io.airbyte.commons.concurrency.VoidCallable;
import io.airbyte.commons.logging.MdcScope;
import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand All @@ -26,6 +28,35 @@ public static void gobble(final InputStream is, final Consumer<String> consumer)
gobble(is, consumer, GENERIC, MdcScope.DEFAULT_BUILDER);
}

public static void gobble(final String message, final Consumer<String> consumer) {
final InputStream stringAsSteam = new ByteArrayInputStream(message.getBytes(StandardCharsets.UTF_8));
gobble(stringAsSteam, consumer);
}

public static void gobble(final String message) {
gobble(message, LOGGER::info);
}

/**
* Used to emit a visual separator in the user-facing logs indicating a start of a meaningful
* temporal activity
*
* @param message
*/
public static void startSection(final String message) {
gobble("\r\n----- START " + message + " -----\r\n\r\n");
}

/**
* Used to emit a visual separator in the user-facing logs indicating a end of a meaningful temporal
* activity
*
* @param message
*/
public static void endSection(final String message) {
gobble("\r\n----- END " + message + " -----\r\n\r\n");
}

public static void gobble(final InputStream is, final Consumer<String> consumer, final MdcScope.Builder mdcScopeBuilder) {
gobble(is, consumer, GENERIC, mdcScopeBuilder);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package io.airbyte.workers.general;

import io.airbyte.commons.io.LineGobbler;
import io.airbyte.config.OperatorDbtInput;
import io.airbyte.config.ResourceRequirements;
import io.airbyte.workers.Worker;
Expand Down Expand Up @@ -42,6 +43,7 @@ public DbtTransformationWorker(final String jobId,
@Override
public Void run(final OperatorDbtInput operatorDbtInput, final Path jobRoot) throws WorkerException {
final long startTime = System.currentTimeMillis();
LineGobbler.startSection("DBT TRANSFORMATION");

try (dbtTransformationRunner) {
LOGGER.info("Running dbt transformation.");
Expand All @@ -65,6 +67,7 @@ public Void run(final OperatorDbtInput operatorDbtInput, final Path jobRoot) thr

final Duration duration = Duration.ofMillis(System.currentTimeMillis() - startTime);
LOGGER.info("Dbt Transformation executed in {}.", duration.toMinutesPart());
LineGobbler.endSection("DBT TRANSFORMATION");

return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public DefaultCheckConnectionWorker(final WorkerConfigs workerConfigs, final Int

@Override
public ConnectorJobOutput run(final StandardCheckConnectionInput input, final Path jobRoot) throws WorkerException {
LineGobbler.startSection("CHECK");

try {
process = integrationLauncher.check(
Expand Down Expand Up @@ -88,6 +89,7 @@ public ConnectorJobOutput run(final StandardCheckConnectionInput input, final Pa

LOGGER.debug("Check connection job subprocess finished with exit code {}", exitCode);
LOGGER.debug("Check connection job received output: {}", output);
LineGobbler.endSection("CHECK");
return new ConnectorJobOutput().withOutputType(OutputType.CHECK_CONNECTION).withCheckConnection(output);
} else {
final String message = String.format("Error checking connection, status: %s, exit code: %d", status, exitCode);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package io.airbyte.workers.general;

import io.airbyte.commons.io.LineGobbler;
import io.airbyte.config.Configs.WorkerEnvironment;
import io.airbyte.config.FailureReason;
import io.airbyte.config.NormalizationInput;
Expand Down Expand Up @@ -54,7 +55,7 @@ public NormalizationSummary run(final NormalizationInput input, final Path jobRo
final long startTime = System.currentTimeMillis();

try (normalizationRunner) {
LOGGER.info("Running normalization.");
LineGobbler.startSection("DEFAULT NORMALIZATION");
normalizationRunner.start();

Path normalizationRoot = null;
Expand Down Expand Up @@ -92,6 +93,7 @@ public NormalizationSummary run(final NormalizationInput input, final Path jobRo
}

LOGGER.info("Normalization summary: {}", summary);
LineGobbler.endSection("DEFAULT NORMALIZATION");

return summary;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

package io.airbyte.workers.general;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.airbyte.commons.io.LineGobbler;
import io.airbyte.config.FailureReason;
import io.airbyte.config.ReplicationAttemptSummary;
import io.airbyte.config.ReplicationOutput;
Expand Down Expand Up @@ -117,6 +119,7 @@ public DefaultReplicationWorker(final String jobId,
@Override
public final ReplicationOutput run(final StandardSyncInput syncInput, final Path jobRoot) throws WorkerException {
LOGGER.info("start sync worker. job id: {} attempt id: {}", jobId, attempt);
LineGobbler.startSection("REPLICATION");

// todo (cgardens) - this should not be happening in the worker. this is configuration information
// that is independent of workflow executions.
Expand Down Expand Up @@ -246,7 +249,6 @@ else if (hasFailed.get()) {
.withStartTime(startTime)
.withEndTime(System.currentTimeMillis());

LOGGER.info("sync summary: {}", summary);
final ReplicationOutput output = new ReplicationOutput()
.withReplicationAttemptSummary(summary)
.withOutputCatalog(destinationConfig.getCatalog());
Expand Down Expand Up @@ -293,6 +295,11 @@ else if (hasFailed.get()) {
metricReporter.trackStateMetricTrackerError();
}

final ObjectMapper mapper = new ObjectMapper();
LOGGER.info("sync summary: {}", mapper.writerWithDefaultPrettyPrinter().writeValueAsString(summary));
LOGGER.info("failures: {}", mapper.writerWithDefaultPrettyPrinter().writeValueAsString(failures));

LineGobbler.endSection("REPLICATION");
return output;
} catch (final Exception e) {
throw new WorkerException("Sync failed", e);
Expand Down

0 comments on commit f15234b

Please sign in to comment.