Skip to content

Commit

Permalink
Add ReplicationWorkerHelper (#6511)
Browse files Browse the repository at this point in the history
  • Loading branch information
gosusnp committed May 11, 2023
1 parent e9d73b4 commit 066a81b
Show file tree
Hide file tree
Showing 3 changed files with 173 additions and 91 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers.context;

import java.util.UUID;

/**
* Context of a Replication.
* <p>
* Contains the relevant ids of the object involved in a sync. This is not the place to hold
* configuration.
*/
public record ReplicationContext(UUID connectionId, UUID sourceId, UUID destinationId) {

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,12 @@
import io.airbyte.config.WorkerDestinationConfig;
import io.airbyte.config.WorkerSourceConfig;
import io.airbyte.metrics.lib.ApmTraceUtils;
import io.airbyte.protocol.models.AirbyteControlMessage;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteMessage.Type;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.workers.RecordSchemaValidator;
import io.airbyte.workers.WorkerMetricReporter;
import io.airbyte.workers.WorkerUtils;
import io.airbyte.workers.context.ReplicationContext;
import io.airbyte.workers.exception.WorkerException;
import io.airbyte.workers.helper.FailureHelper;
import io.airbyte.workers.internal.AirbyteDestination;
Expand All @@ -56,7 +55,6 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
Expand Down Expand Up @@ -85,6 +83,7 @@ public class DefaultReplicationWorker implements ReplicationWorker {

private final String jobId;
private final int attempt;
private final ReplicationWorkerHelper replicationWorkerHelper;
private final AirbyteSource source;
private final AirbyteMapper mapper;
private final AirbyteDestination destination;
Expand All @@ -95,9 +94,7 @@ public class DefaultReplicationWorker implements ReplicationWorker {
private final AtomicBoolean cancelled;
private final AtomicBoolean hasFailed;
private final RecordSchemaValidator recordSchemaValidator;
private final FieldSelector fieldSelector;
private final WorkerMetricReporter metricReporter;
private final ConnectorConfigUpdater connectorConfigUpdater;
private final boolean fieldSelectionEnabled;
private final HeartbeatTimeoutChaperone srcHeartbeatTimeoutChaperone;

Expand All @@ -116,16 +113,15 @@ public DefaultReplicationWorker(final String jobId,
final HeartbeatTimeoutChaperone srcHeartbeatTimeoutChaperone) {
this.jobId = jobId;
this.attempt = attempt;
this.replicationWorkerHelper = new ReplicationWorkerHelper(fieldSelector, mapper, messageTracker, syncPersistence, connectorConfigUpdater);
this.source = source;
this.mapper = mapper;
this.destination = destination;
this.messageTracker = messageTracker;
this.syncPersistence = syncPersistence;
this.executors = Executors.newFixedThreadPool(2);
this.recordSchemaValidator = recordSchemaValidator;
this.fieldSelector = fieldSelector;
this.metricReporter = metricReporter;
this.connectorConfigUpdater = connectorConfigUpdater;
this.fieldSelectionEnabled = fieldSelectionEnabled;
this.srcHeartbeatTimeoutChaperone = srcHeartbeatTimeoutChaperone;

Expand Down Expand Up @@ -207,11 +203,14 @@ private void replicate(final Path jobRoot,
source.start(sourceConfig, jobRoot);
timeTracker.trackDestinationWriteStartTime();

final ReplicationContext replicationContext = new ReplicationContext(connectionId, sourceConfig.getSourceId(),
destinationConfig.getDestinationId());
replicationWorkerHelper.beforeReplication(sourceConfig.getCatalog());

// note: `whenComplete` is used instead of `exceptionally` so that the original exception is still
// thrown
final CompletableFuture<?> readFromDstThread = CompletableFuture.runAsync(
readFromDstRunnable(destination, cancelled, messageTracker, syncPersistence, connectorConfigUpdater, mdc, timeTracker,
destinationConfig.getDestinationId(), connectionId, commitStatesAsap),
readFromDstRunnable(destination, cancelled, replicationWorkerHelper, replicationContext, mdc, timeTracker, commitStatesAsap),
executors)
.whenComplete((msg, ex) -> {
if (ex != null) {
Expand All @@ -227,15 +226,11 @@ private void replicate(final Path jobRoot,
final CompletableFuture<Void> readSrcAndWriteDstThread = CompletableFuture.runAsync(readFromSrcAndWriteToDstRunnable(
source,
destination,
sourceConfig.getCatalog(),
replicationWorkerHelper,
replicationContext,
cancelled,
mapper,
messageTracker,
connectorConfigUpdater,
mdc,
fieldSelector,
timeTracker,
sourceConfig.getSourceId()), executors)
timeTracker), executors)
.whenComplete((msg, ex) -> {
if (ex != null) {
ApmTraceUtils.addExceptionToTrace(ex);
Expand Down Expand Up @@ -284,13 +279,10 @@ static FailureReason getFailureReason(final Throwable ex, final long jobId, fina
@SuppressWarnings("PMD.AvoidInstanceofChecksInCatchClause")
private static Runnable readFromDstRunnable(final AirbyteDestination destination,
final AtomicBoolean cancelled,
final MessageTracker messageTracker,
final SyncPersistence syncPersistence,
final ConnectorConfigUpdater connectorConfigUpdater,
final ReplicationWorkerHelper replicationWorkerHelper,
final ReplicationContext replicationContext,
final Map<String, String> mdc,
final ThreadedTimeTracker timeHolder,
final UUID destinationId,
final UUID connectionId,
final boolean commitStatesAsap) {
return () -> {
MDC.setContextMap(mdc);
Expand All @@ -304,21 +296,7 @@ private static Runnable readFromDstRunnable(final AirbyteDestination destination
throw new DestinationException("Destination process read attempt failed", e);
}
if (messageOptional.isPresent()) {
final AirbyteMessage message = messageOptional.get();
LOGGER.info("State in DefaultReplicationWorker from destination: {}", message);

messageTracker.acceptFromDestination(message);
if (commitStatesAsap && message.getType() == Type.STATE) {
syncPersistence.persist(connectionId, message.getState());
}

try {
if (message.getType() == Type.CONTROL) {
acceptDstControlMessage(destinationId, message.getControl(), connectorConfigUpdater);
}
} catch (final Exception e) {
LOGGER.error("Error updating destination configuration", e);
}
replicationWorkerHelper.processMessageFromDestination(messageOptional.get(), commitStatesAsap, replicationContext);
}
}
timeHolder.trackDestinationWriteEndTime();
Expand Down Expand Up @@ -346,21 +324,15 @@ private static Runnable readFromDstRunnable(final AirbyteDestination destination
@SuppressWarnings("PMD.AvoidInstanceofChecksInCatchClause")
private static Runnable readFromSrcAndWriteToDstRunnable(final AirbyteSource source,
final AirbyteDestination destination,
final ConfiguredAirbyteCatalog catalog,
final ReplicationWorkerHelper replicationWorkerHelper,
final ReplicationContext replicationContext,
final AtomicBoolean cancelled,
final AirbyteMapper mapper,
final MessageTracker messageTracker,
final ConnectorConfigUpdater connectorConfigUpdater,
final Map<String, String> mdc,
final FieldSelector fieldSelector,
final ThreadedTimeTracker timeHolder,
final UUID sourceId) {
final ThreadedTimeTracker timeHolder) {
return () -> {
MDC.setContextMap(mdc);
LOGGER.info("Replication thread started.");
long recordsRead = 0L;

fieldSelector.populateFields(catalog);
try {
while (!cancelled.get() && !source.isFinished()) {
final Optional<AirbyteMessage> messageOptional;
Expand All @@ -372,34 +344,18 @@ private static Runnable readFromSrcAndWriteToDstRunnable(final AirbyteSource sou

if (messageOptional.isPresent()) {
final AirbyteMessage airbyteMessage = messageOptional.get();
fieldSelector.filterSelectedFields(airbyteMessage);
fieldSelector.validateSchema(airbyteMessage);

final AirbyteMessage message = mapper.mapMessage(airbyteMessage);

messageTracker.acceptFromSource(message);

try {
if (message.getType() == Type.CONTROL) {
acceptSrcControlMessage(sourceId, message.getControl(), connectorConfigUpdater);
final Optional<AirbyteMessage> processedAirbyteMessage =
replicationWorkerHelper.processMessageFromSource(airbyteMessage, replicationContext);

if (processedAirbyteMessage.isPresent()) {
final AirbyteMessage message = processedAirbyteMessage.get();
try {
if (message.getType() == Type.RECORD || message.getType() == Type.STATE) {
destination.accept(message);
}
} catch (final Exception e) {
throw new DestinationException("Destination process message delivery failed", e);
}
} catch (final Exception e) {
LOGGER.error("Error updating source configuration", e);
}

try {
if (message.getType() == Type.RECORD || message.getType() == Type.STATE) {
destination.accept(message);
}
} catch (final Exception e) {
throw new DestinationException("Destination process message delivery failed", e);
}

recordsRead += 1;

if (recordsRead % 5000 == 0) {
LOGGER.info("Records read: {} ({})", recordsRead,
FileUtils.byteCountToDisplaySize(messageTracker.getSyncStatsTracker().getTotalBytesEmitted()));
}
} else {
LOGGER.info("Source has no more messages, closing connection.");
Expand All @@ -411,10 +367,8 @@ private static Runnable readFromSrcAndWriteToDstRunnable(final AirbyteSource sou
}
}
timeHolder.trackSourceReadEndTime();
LOGGER.info("Total records read: {} ({})", recordsRead,
FileUtils.byteCountToDisplaySize(messageTracker.getSyncStatsTracker().getTotalBytesEmitted()));

fieldSelector.reportMetrics(sourceId);
replicationWorkerHelper.endOfSource(replicationContext);

try {
destination.notifyEndOfInput();
Expand Down Expand Up @@ -443,22 +397,6 @@ private static Runnable readFromSrcAndWriteToDstRunnable(final AirbyteSource sou
};
}

private static void acceptSrcControlMessage(final UUID sourceId,
final AirbyteControlMessage controlMessage,
final ConnectorConfigUpdater connectorConfigUpdater) {
if (controlMessage.getType() == AirbyteControlMessage.Type.CONNECTOR_CONFIG) {
connectorConfigUpdater.updateSource(sourceId, controlMessage.getConnectorConfig().getConfig());
}
}

private static void acceptDstControlMessage(final UUID destinationId,
final AirbyteControlMessage controlMessage,
final ConnectorConfigUpdater connectorConfigUpdater) {
if (controlMessage.getType() == AirbyteControlMessage.Type.CONNECTOR_CONFIG) {
connectorConfigUpdater.updateDestination(destinationId, controlMessage.getConnectorConfig().getConfig());
}
}

private ReplicationOutput getReplicationOutput(final StandardSyncInput syncInput,
final WorkerDestinationConfig destinationConfig,
final AtomicReference<FailureReason> replicationRunnableFailureRef,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers.general;

import io.airbyte.commons.converters.ConnectorConfigUpdater;
import io.airbyte.protocol.models.AirbyteControlMessage;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteMessage.Type;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.workers.context.ReplicationContext;
import io.airbyte.workers.internal.AirbyteMapper;
import io.airbyte.workers.internal.FieldSelector;
import io.airbyte.workers.internal.book_keeping.MessageTracker;
import io.airbyte.workers.internal.sync_persistence.SyncPersistence;
import java.util.Optional;
import java.util.UUID;
import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Contains the business logic that has been extracted from the DefaultReplicationWorker.
* <p>
* Expected lifecycle of this object is a sync.
* <p>
* This needs to be broken down further by responsibility. Until it happens, it holds the processing
* of the DefaultReplicationWorker that isn't control flow related.
*/
class ReplicationWorkerHelper {

private static final Logger LOGGER = LoggerFactory.getLogger(ReplicationWorkerHelper.class);

private final FieldSelector fieldSelector;
private final AirbyteMapper mapper;
private final MessageTracker messageTracker;
private final SyncPersistence syncPersistence;
private final ConnectorConfigUpdater connectorConfigUpdater;

private long recordsRead;

public ReplicationWorkerHelper(final FieldSelector fieldSelector,
final AirbyteMapper mapper,
final MessageTracker messageTracker,
final SyncPersistence syncPersistence,
final ConnectorConfigUpdater connectorConfigUpdater) {
this.fieldSelector = fieldSelector;
this.mapper = mapper;
this.messageTracker = messageTracker;
this.syncPersistence = syncPersistence;
this.connectorConfigUpdater = connectorConfigUpdater;

this.recordsRead = 0L;
}

public void beforeReplication(final ConfiguredAirbyteCatalog catalog) {
fieldSelector.populateFields(catalog);
}

public void endOfSource(final ReplicationContext replicationContext) {
LOGGER.info("Total records read: {} ({})", recordsRead,
FileUtils.byteCountToDisplaySize(messageTracker.getSyncStatsTracker().getTotalBytesEmitted()));

fieldSelector.reportMetrics(replicationContext.sourceId());
}

public Optional<AirbyteMessage> processMessageFromSource(final AirbyteMessage airbyteMessage, final ReplicationContext replicationContext) {
fieldSelector.filterSelectedFields(airbyteMessage);
fieldSelector.validateSchema(airbyteMessage);

final AirbyteMessage message = mapper.mapMessage(airbyteMessage);

messageTracker.acceptFromSource(message);

try {
if (message.getType() == Type.CONTROL) {
acceptSrcControlMessage(replicationContext.sourceId(), message.getControl());
}
} catch (final Exception e) {
LOGGER.error("Error updating source configuration", e);
}

recordsRead += 1;

if (recordsRead % 5000 == 0) {
LOGGER.info("Records read: {} ({})", recordsRead,
FileUtils.byteCountToDisplaySize(messageTracker.getSyncStatsTracker().getTotalBytesEmitted()));
}

return Optional.of(message);
}

public void processMessageFromDestination(final AirbyteMessage message,
final boolean commitStatesAsap,
final ReplicationContext replicationContext) {
LOGGER.info("State in DefaultReplicationWorker from destination: {}", message);

messageTracker.acceptFromDestination(message);
if (commitStatesAsap && message.getType() == Type.STATE) {
syncPersistence.persist(replicationContext.connectionId(), message.getState());
}

try {
if (message.getType() == Type.CONTROL) {
acceptDstControlMessage(replicationContext.destinationId(), message.getControl());
}
} catch (final Exception e) {
LOGGER.error("Error updating destination configuration", e);
}
}

private void acceptSrcControlMessage(final UUID sourceId,
final AirbyteControlMessage controlMessage) {
if (controlMessage.getType() == AirbyteControlMessage.Type.CONNECTOR_CONFIG) {
connectorConfigUpdater.updateSource(sourceId, controlMessage.getConnectorConfig().getConfig());
}
}

private void acceptDstControlMessage(final UUID destinationId,
final AirbyteControlMessage controlMessage) {
if (controlMessage.getType() == AirbyteControlMessage.Type.CONNECTOR_CONFIG) {
connectorConfigUpdater.updateDestination(destinationId, controlMessage.getConnectorConfig().getConfig());
}
}

}

0 comments on commit 066a81b

Please sign in to comment.