Skip to content

Commit

Permalink
Only STATE and RECORD messages from the Source should be sent to the …
Browse files Browse the repository at this point in the history
…Destination (#16418)

* Only STATE and RECORD messages from the Source should be sent to the Destination

* add test

* update mocks

* fixup tests

* revert mock changes
  • Loading branch information
evantahler committed Sep 9, 2022
1 parent 6b4c523 commit a66b973
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import io.airbyte.config.WorkerDestinationConfig;
import io.airbyte.config.WorkerSourceConfig;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteMessage.Type;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.workers.*;
import io.airbyte.workers.exception.RecordSchemaValidationException;
Expand Down Expand Up @@ -336,11 +337,15 @@ private static Runnable getReplicationRunnable(final AirbyteSource source,
final AirbyteMessage message = mapper.mapMessage(airbyteMessage);

messageTracker.acceptFromSource(message);

try {
destination.accept(message);
if (message.getType() == Type.RECORD || message.getType() == Type.STATE) {
destination.accept(message);
}
} catch (final Exception e) {
throw new DestinationException("Destination process message delivery failed", e);
}

recordsRead += 1;

if (recordsRead % 1000 == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

Expand All @@ -40,6 +41,7 @@
import io.airbyte.config.helpers.LogConfigs;
import io.airbyte.metrics.lib.MetricClient;
import io.airbyte.metrics.lib.MetricClientFactory;
import io.airbyte.protocol.models.AirbyteLogMessage.Level;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteTraceMessage;
import io.airbyte.validation.json.JsonSchemaValidator;
Expand Down Expand Up @@ -181,6 +183,7 @@ void testInvalidSchema() throws Exception {
verify(destination).start(destinationConfig, jobRoot);
verify(destination).accept(RECORD_MESSAGE1);
verify(destination).accept(RECORD_MESSAGE2);
verify(destination).accept(RECORD_MESSAGE3);
verify(recordSchemaValidator).validateSchema(RECORD_MESSAGE1.getRecord(), STREAM_NAME);
verify(recordSchemaValidator).validateSchema(RECORD_MESSAGE2.getRecord(), STREAM_NAME);
verify(recordSchemaValidator).validateSchema(RECORD_MESSAGE3.getRecord(), STREAM_NAME);
Expand Down Expand Up @@ -292,6 +295,36 @@ void testReplicationRunnableWorkerFailure() throws Exception {
.anyMatch(f -> f.getFailureOrigin().equals(FailureOrigin.REPLICATION) && f.getStacktrace().contains(WORKER_ERROR_MESSAGE)));
}

@Test
void testOnlyStateAndRecordMessagesDeliveredToDestination() throws Exception {
final AirbyteMessage LOG_MESSAGE = AirbyteMessageUtils.createLogMessage(Level.INFO, "a log message");
final AirbyteMessage TRACE_MESSAGE = AirbyteMessageUtils.createTraceMessage("a trace message", 123456.0);
when(mapper.mapMessage(LOG_MESSAGE)).thenReturn(LOG_MESSAGE);
when(mapper.mapMessage(TRACE_MESSAGE)).thenReturn(TRACE_MESSAGE);
when(source.isFinished()).thenReturn(false, false, false, false, true);
when(source.attemptRead()).thenReturn(Optional.of(RECORD_MESSAGE1), Optional.of(LOG_MESSAGE), Optional.of(TRACE_MESSAGE),
Optional.of(RECORD_MESSAGE2));

final ReplicationWorker worker = new DefaultReplicationWorker(
JOB_ID,
JOB_ATTEMPT,
source,
mapper,
destination,
messageTracker,
recordSchemaValidator,
workerMetricReporter);

worker.run(syncInput, jobRoot);

verify(source).start(sourceConfig, jobRoot);
verify(destination).start(destinationConfig, jobRoot);
verify(destination).accept(RECORD_MESSAGE1);
verify(destination).accept(RECORD_MESSAGE2);
verify(destination, never()).accept(LOG_MESSAGE);
verify(destination, never()).accept(TRACE_MESSAGE);
}

@Test
void testDestinationNonZeroExitValue() throws Exception {
when(destination.getExitValue()).thenReturn(1);
Expand Down

0 comments on commit a66b973

Please sign in to comment.