From 18b023fbab9930dbeb5e465cf9b3b4c81d7d0892 Mon Sep 17 00:00:00 2001 From: Davin Chia Date: Thu, 16 Jun 2022 10:26:17 -0700 Subject: [PATCH] Tweak schema validation code for readability. (#13721) * Refactor schema validation for better reading. * Better comments. * Format. --- .../general/DefaultReplicationWorker.java | 58 +++++++++++-------- 1 file changed, 34 insertions(+), 24 deletions(-) diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/general/DefaultReplicationWorker.java b/airbyte-workers/src/main/java/io/airbyte/workers/general/DefaultReplicationWorker.java index bc1d3e14e47db..4a45bf15de11e 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/general/DefaultReplicationWorker.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/general/DefaultReplicationWorker.java @@ -306,31 +306,11 @@ private static Runnable getReplicationRunnable(final AirbyteSource source, } catch (final Exception e) { throw new SourceException("Source process read attempt failed", e); } - if (messageOptional.isPresent()) { - if (messageOptional.get().getRecord() != null) { - final AirbyteRecordMessage message = messageOptional.get().getRecord(); - final String messageStream = WorkerUtils.streamNameWithNamespace(message.getNamespace(), message.getStream()); - // validate a record's schema if there are less than 10 records with validation errors - if (validationErrors.get(messageStream) == null || validationErrors.get(messageStream).getRight() < 10) { - try { - recordSchemaValidator.validateSchema(messageOptional.get().getRecord(), messageStream); - } catch (final RecordSchemaValidationException e) { - final ImmutablePair, Integer> exceptionWithCount = validationErrors.get(messageStream); - if (exceptionWithCount == null) { - validationErrors.put(messageStream, new ImmutablePair<>(e.errorMessages, 1)); - } else { - final Integer currentCount = exceptionWithCount.getRight(); - final Set currentErrorMessages = exceptionWithCount.getLeft(); - final Set updatedErrorMessages = - Stream.concat(currentErrorMessages.stream(), e.errorMessages.stream()).collect(Collectors.toSet()); - validationErrors.put(messageStream, new ImmutablePair<>(updatedErrorMessages, currentCount + 1)); - } - } - - } - } - final AirbyteMessage message = mapper.mapMessage(messageOptional.get()); + if (messageOptional.isPresent()) { + final AirbyteMessage airbyteMessage = messageOptional.get(); + validateSchema(recordSchemaValidator, validationErrors, airbyteMessage); + final AirbyteMessage message = mapper.mapMessage(airbyteMessage); messageTracker.acceptFromSource(message); try { @@ -386,6 +366,36 @@ private static Runnable getReplicationRunnable(final AirbyteSource source, }; } + private static void validateSchema(RecordSchemaValidator recordSchemaValidator, + Map, Integer>> validationErrors, + AirbyteMessage message) { + if (message.getRecord() == null) { + return; + } + + final AirbyteRecordMessage record = message.getRecord(); + final String messageStream = WorkerUtils.streamNameWithNamespace(record.getNamespace(), record.getStream()); + // avoid noise by validating only if the stream has less than 10 records with validation errors + final boolean streamHasLessThenTenErrs = + validationErrors.get(messageStream) == null || validationErrors.get(messageStream).getRight() < 10; + if (streamHasLessThenTenErrs) { + try { + recordSchemaValidator.validateSchema(record, messageStream); + } catch (final RecordSchemaValidationException e) { + final ImmutablePair, Integer> exceptionWithCount = validationErrors.get(messageStream); + if (exceptionWithCount == null) { + validationErrors.put(messageStream, new ImmutablePair<>(e.errorMessages, 1)); + } else { + final Integer currentCount = exceptionWithCount.getRight(); + final Set currentErrorMessages = exceptionWithCount.getLeft(); + final Set updatedErrorMessages = Stream.concat(currentErrorMessages.stream(), e.errorMessages.stream()).collect(Collectors.toSet()); + validationErrors.put(messageStream, new ImmutablePair<>(updatedErrorMessages, currentCount + 1)); + } + } + + } + } + private static Runnable getDestinationOutputRunnable(final AirbyteDestination destination, final AtomicBoolean cancelled, final MessageTracker messageTracker,