Skip to content

Commit

Permalink
Tweak schema validation code for readability. (#13721)
Browse files Browse the repository at this point in the history
* Refactor schema validation for better reading.

* Better comments.

* Format.
  • Loading branch information
davinchia committed Jun 16, 2022
1 parent 62626ee commit 18b023f
Showing 1 changed file with 34 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -306,31 +306,11 @@ private static Runnable getReplicationRunnable(final AirbyteSource source,
} catch (final Exception e) {
throw new SourceException("Source process read attempt failed", e);
}
if (messageOptional.isPresent()) {
if (messageOptional.get().getRecord() != null) {
final AirbyteRecordMessage message = messageOptional.get().getRecord();
final String messageStream = WorkerUtils.streamNameWithNamespace(message.getNamespace(), message.getStream());
// validate a record's schema if there are less than 10 records with validation errors
if (validationErrors.get(messageStream) == null || validationErrors.get(messageStream).getRight() < 10) {
try {
recordSchemaValidator.validateSchema(messageOptional.get().getRecord(), messageStream);
} catch (final RecordSchemaValidationException e) {
final ImmutablePair<Set<String>, Integer> exceptionWithCount = validationErrors.get(messageStream);
if (exceptionWithCount == null) {
validationErrors.put(messageStream, new ImmutablePair<>(e.errorMessages, 1));
} else {
final Integer currentCount = exceptionWithCount.getRight();
final Set<String> currentErrorMessages = exceptionWithCount.getLeft();
final Set<String> updatedErrorMessages =
Stream.concat(currentErrorMessages.stream(), e.errorMessages.stream()).collect(Collectors.toSet());
validationErrors.put(messageStream, new ImmutablePair<>(updatedErrorMessages, currentCount + 1));
}
}

}
}

final AirbyteMessage message = mapper.mapMessage(messageOptional.get());
if (messageOptional.isPresent()) {
final AirbyteMessage airbyteMessage = messageOptional.get();
validateSchema(recordSchemaValidator, validationErrors, airbyteMessage);
final AirbyteMessage message = mapper.mapMessage(airbyteMessage);

messageTracker.acceptFromSource(message);
try {
Expand Down Expand Up @@ -386,6 +366,36 @@ private static Runnable getReplicationRunnable(final AirbyteSource source,
};
}

private static void validateSchema(RecordSchemaValidator recordSchemaValidator,
Map<String, ImmutablePair<Set<String>, Integer>> validationErrors,
AirbyteMessage message) {
if (message.getRecord() == null) {
return;
}

final AirbyteRecordMessage record = message.getRecord();
final String messageStream = WorkerUtils.streamNameWithNamespace(record.getNamespace(), record.getStream());
// avoid noise by validating only if the stream has less than 10 records with validation errors
final boolean streamHasLessThenTenErrs =
validationErrors.get(messageStream) == null || validationErrors.get(messageStream).getRight() < 10;
if (streamHasLessThenTenErrs) {
try {
recordSchemaValidator.validateSchema(record, messageStream);
} catch (final RecordSchemaValidationException e) {
final ImmutablePair<Set<String>, Integer> exceptionWithCount = validationErrors.get(messageStream);
if (exceptionWithCount == null) {
validationErrors.put(messageStream, new ImmutablePair<>(e.errorMessages, 1));
} else {
final Integer currentCount = exceptionWithCount.getRight();
final Set<String> currentErrorMessages = exceptionWithCount.getLeft();
final Set<String> updatedErrorMessages = Stream.concat(currentErrorMessages.stream(), e.errorMessages.stream()).collect(Collectors.toSet());
validationErrors.put(messageStream, new ImmutablePair<>(updatedErrorMessages, currentCount + 1));
}
}

}
}

private static Runnable getDestinationOutputRunnable(final AirbyteDestination destination,
final AtomicBoolean cancelled,
final MessageTracker messageTracker,
Expand Down

0 comments on commit 18b023f

Please sign in to comment.