diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/general/DefaultReplicationWorker.java b/airbyte-workers/src/main/java/io/airbyte/workers/general/DefaultReplicationWorker.java index 28be935591ed8..7ce35ae0b9ee4 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/general/DefaultReplicationWorker.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/general/DefaultReplicationWorker.java @@ -17,6 +17,7 @@ import io.airbyte.config.WorkerDestinationConfig; import io.airbyte.config.WorkerSourceConfig; import io.airbyte.protocol.models.AirbyteMessage; +import io.airbyte.protocol.models.AirbyteMessage.Type; import io.airbyte.protocol.models.AirbyteRecordMessage; import io.airbyte.workers.*; import io.airbyte.workers.exception.RecordSchemaValidationException; @@ -336,11 +337,15 @@ private static Runnable getReplicationRunnable(final AirbyteSource source, final AirbyteMessage message = mapper.mapMessage(airbyteMessage); messageTracker.acceptFromSource(message); + try { - destination.accept(message); + if (message.getType() == Type.RECORD || message.getType() == Type.STATE) { + destination.accept(message); + } } catch (final Exception e) { throw new DestinationException("Destination process message delivery failed", e); } + recordsRead += 1; if (recordsRead % 1000 == 0) { diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/general/DefaultReplicationWorkerTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/general/DefaultReplicationWorkerTest.java index 4a0768669e263..95b2fcc91933f 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/general/DefaultReplicationWorkerTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/general/DefaultReplicationWorkerTest.java @@ -14,6 +14,7 @@ import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -40,6 +41,7 @@ import io.airbyte.config.helpers.LogConfigs; import io.airbyte.metrics.lib.MetricClient; import io.airbyte.metrics.lib.MetricClientFactory; +import io.airbyte.protocol.models.AirbyteLogMessage.Level; import io.airbyte.protocol.models.AirbyteMessage; import io.airbyte.protocol.models.AirbyteTraceMessage; import io.airbyte.validation.json.JsonSchemaValidator; @@ -181,6 +183,7 @@ void testInvalidSchema() throws Exception { verify(destination).start(destinationConfig, jobRoot); verify(destination).accept(RECORD_MESSAGE1); verify(destination).accept(RECORD_MESSAGE2); + verify(destination).accept(RECORD_MESSAGE3); verify(recordSchemaValidator).validateSchema(RECORD_MESSAGE1.getRecord(), STREAM_NAME); verify(recordSchemaValidator).validateSchema(RECORD_MESSAGE2.getRecord(), STREAM_NAME); verify(recordSchemaValidator).validateSchema(RECORD_MESSAGE3.getRecord(), STREAM_NAME); @@ -292,6 +295,36 @@ void testReplicationRunnableWorkerFailure() throws Exception { .anyMatch(f -> f.getFailureOrigin().equals(FailureOrigin.REPLICATION) && f.getStacktrace().contains(WORKER_ERROR_MESSAGE))); } + @Test + void testOnlyStateAndRecordMessagesDeliveredToDestination() throws Exception { + final AirbyteMessage LOG_MESSAGE = AirbyteMessageUtils.createLogMessage(Level.INFO, "a log message"); + final AirbyteMessage TRACE_MESSAGE = AirbyteMessageUtils.createTraceMessage("a trace message", 123456.0); + when(mapper.mapMessage(LOG_MESSAGE)).thenReturn(LOG_MESSAGE); + when(mapper.mapMessage(TRACE_MESSAGE)).thenReturn(TRACE_MESSAGE); + when(source.isFinished()).thenReturn(false, false, false, false, true); + when(source.attemptRead()).thenReturn(Optional.of(RECORD_MESSAGE1), Optional.of(LOG_MESSAGE), Optional.of(TRACE_MESSAGE), + Optional.of(RECORD_MESSAGE2)); + + final ReplicationWorker worker = new DefaultReplicationWorker( + JOB_ID, + JOB_ATTEMPT, + source, + mapper, + destination, + messageTracker, + recordSchemaValidator, + workerMetricReporter); + + worker.run(syncInput, jobRoot); + + verify(source).start(sourceConfig, jobRoot); + verify(destination).start(destinationConfig, jobRoot); + verify(destination).accept(RECORD_MESSAGE1); + verify(destination).accept(RECORD_MESSAGE2); + verify(destination, never()).accept(LOG_MESSAGE); + verify(destination, never()).accept(TRACE_MESSAGE); + } + @Test void testDestinationNonZeroExitValue() throws Exception { when(destination.getExitValue()).thenReturn(1);