Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tweak schema validation code for readability. #13721

Merged
merged 4 commits into from
Jun 16, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -306,31 +306,11 @@ private static Runnable getReplicationRunnable(final AirbyteSource source,
} catch (final Exception e) {
throw new SourceException("Source process read attempt failed", e);
}
if (messageOptional.isPresent()) {
if (messageOptional.get().getRecord() != null) {
final AirbyteRecordMessage message = messageOptional.get().getRecord();
final String messageStream = WorkerUtils.streamNameWithNamespace(message.getNamespace(), message.getStream());
// validate a record's schema if there are less than 10 records with validation errors
if (validationErrors.get(messageStream) == null || validationErrors.get(messageStream).getRight() < 10) {
try {
recordSchemaValidator.validateSchema(messageOptional.get().getRecord(), messageStream);
} catch (final RecordSchemaValidationException e) {
final ImmutablePair<Set<String>, Integer> exceptionWithCount = validationErrors.get(messageStream);
if (exceptionWithCount == null) {
validationErrors.put(messageStream, new ImmutablePair<>(e.errorMessages, 1));
} else {
final Integer currentCount = exceptionWithCount.getRight();
final Set<String> currentErrorMessages = exceptionWithCount.getLeft();
final Set<String> updatedErrorMessages =
Stream.concat(currentErrorMessages.stream(), e.errorMessages.stream()).collect(Collectors.toSet());
validationErrors.put(messageStream, new ImmutablePair<>(updatedErrorMessages, currentCount + 1));
}
}

}
}

final AirbyteMessage message = mapper.mapMessage(messageOptional.get());
if (messageOptional.isPresent()) {
final AirbyteMessage airbyteMessage = messageOptional.get();
validateSchema(recordSchemaValidator, validationErrors, airbyteMessage);
Copy link
Contributor Author

@davinchia davinchia Jun 13, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pull all this logic into a separate function to preserve the same level of detail within a layer e.g. the top level replication function should only know about high-level replication related code.

final AirbyteMessage message = mapper.mapMessage(airbyteMessage);

messageTracker.acceptFromSource(message);
try {
Expand Down Expand Up @@ -386,6 +366,36 @@ private static Runnable getReplicationRunnable(final AirbyteSource source,
};
}

private static void validateSchema(RecordSchemaValidator recordSchemaValidator,
Map<String, ImmutablePair<Set<String>, Integer>> validationErrors,
AirbyteMessage message) {
if (message.getRecord() == null) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exit early to reduce nested braces.

return;
}

final AirbyteRecordMessage record = message.getRecord();
final String messageStream = WorkerUtils.streamNameWithNamespace(record.getNamespace(), record.getStream());
// avoid noise by validating only if the stream has less than 10 records with validation errors
final boolean streamHasLessThenTenErrs =
validationErrors.get(messageStream) == null || validationErrors.get(messageStream).getRight() < 10;
Comment on lines +378 to +380
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

turn the condition into a bool var so the name helps with readability.

if (streamHasLessThenTenErrs) {
try {
recordSchemaValidator.validateSchema(record, messageStream);
} catch (final RecordSchemaValidationException e) {
final ImmutablePair<Set<String>, Integer> exceptionWithCount = validationErrors.get(messageStream);
if (exceptionWithCount == null) {
validationErrors.put(messageStream, new ImmutablePair<>(e.errorMessages, 1));
} else {
final Integer currentCount = exceptionWithCount.getRight();
final Set<String> currentErrorMessages = exceptionWithCount.getLeft();
final Set<String> updatedErrorMessages = Stream.concat(currentErrorMessages.stream(), e.errorMessages.stream()).collect(Collectors.toSet());
validationErrors.put(messageStream, new ImmutablePair<>(updatedErrorMessages, currentCount + 1));
}
}

}
}

private static Runnable getDestinationOutputRunnable(final AirbyteDestination destination,
final AtomicBoolean cancelled,
final MessageTracker messageTracker,
Expand Down