Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce the AirbyteStreamNamespaceNamePair to the DefaultReplicationWorker. #19360

Merged
merged 17 commits into from
Nov 16, 2022
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair;
import io.airbyte.validation.json.JsonSchemaValidator;
import io.airbyte.validation.json.JsonValidationException;
import io.airbyte.workers.exception.RecordSchemaValidationException;
Expand All @@ -22,9 +23,9 @@

public class RecordSchemaValidator {

private final Map<String, JsonNode> streams;
private final Map<AirbyteStreamNameNamespacePair, JsonNode> streams;

public RecordSchemaValidator(final Map<String, JsonNode> streamNamesToSchemas) {
public RecordSchemaValidator(final Map<AirbyteStreamNameNamespacePair, JsonNode> streamNamesToSchemas) {
// streams is Map of a stream source namespace + name mapped to the stream schema
// for easy access when we check each record's schema
this.streams = streamNamesToSchemas;
Expand All @@ -37,7 +38,8 @@ public RecordSchemaValidator(final Map<String, JsonNode> streamNamesToSchemas) {
* @param message
* @throws RecordSchemaValidationException
*/
public void validateSchema(final AirbyteRecordMessage message, final String messageStream) throws RecordSchemaValidationException {
public void validateSchema(final AirbyteRecordMessage message, final AirbyteStreamNameNamespacePair messageStream)
throws RecordSchemaValidationException {
final JsonNode messageData = message.getData();
final JsonNode matchingSchema = streams.get(messageStream);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import io.airbyte.metrics.lib.MetricAttribute;
import io.airbyte.metrics.lib.MetricClient;
import io.airbyte.metrics.lib.OssMetricsRegistry;
import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair;

public class WorkerMetricReporter {

Expand All @@ -21,9 +22,9 @@ public WorkerMetricReporter(final MetricClient metricClient, final String docker
this.metricClient = metricClient;
}

public void trackSchemaValidationError(final String stream) {
public void trackSchemaValidationError(final AirbyteStreamNameNamespacePair stream) {
metricClient.count(OssMetricsRegistry.NUM_SOURCE_STREAMS_WITH_RECORD_SCHEMA_VALIDATION_ERRORS, 1, new MetricAttribute("docker_repo", dockerRepo),
new MetricAttribute("docker_version", dockerVersion), new MetricAttribute("stream", stream));
new MetricAttribute("docker_version", dockerVersion), new MetricAttribute("stream", stream.toString()));
davinchia marked this conversation as resolved.
Show resolved Hide resolved
}

public void trackStateMetricTrackerError() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import io.airbyte.config.WorkerSourceConfig;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteMessage.Type;
import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair;
import io.airbyte.protocol.models.AirbyteTraceMessage;
import io.airbyte.workers.exception.WorkerException;
import io.airbyte.workers.helper.FailureHelper;
Expand All @@ -22,11 +23,9 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -132,18 +131,12 @@ public static ConnectorJobOutput getJobFailureOutputOrThrow(final OutputType out
throw new WorkerException(defaultErrorMessage);
}

public static Map<String, JsonNode> mapStreamNamesToSchemas(final StandardSyncInput syncInput) {
public static Map<AirbyteStreamNameNamespacePair, JsonNode> mapStreamNamesToSchemas(final StandardSyncInput syncInput) {
return syncInput.getCatalog().getStreams().stream().collect(
Collectors.toMap(
k -> {
return streamNameWithNamespace(k.getStream().getNamespace(), k.getStream().getName());
},
k -> AirbyteStreamNameNamespacePair.fromAirbyteStream(k.getStream()),
v -> v.getStream().getJsonSchema()));

}

public static String streamNameWithNamespace(final @Nullable String namespace, final String streamName) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed as no longer in use.

return Objects.toString(namespace, "").trim() + streamName.trim();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteMessage.Type;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair;
import io.airbyte.workers.RecordSchemaValidator;
import io.airbyte.workers.WorkerMetricReporter;
import io.airbyte.workers.WorkerUtils;
Expand Down Expand Up @@ -287,7 +288,7 @@ private static Runnable readFromSrcAndWriteToDstRunnable(final AirbyteSource sou
MDC.setContextMap(mdc);
LOGGER.info("Replication thread started.");
Long recordsRead = 0L;
final Map<String, ImmutablePair<Set<String>, Integer>> validationErrors = new HashMap<>();
final Map<AirbyteStreamNameNamespacePair, ImmutablePair<Set<String>, Integer>> validationErrors = new HashMap<>();
try {
while (!cancelled.get() && !source.isFinished()) {
final Optional<AirbyteMessage> messageOptional;
Expand Down Expand Up @@ -516,14 +517,14 @@ private List<FailureReason> getFailureReasons(AtomicReference<FailureReason> rep
}

private static void validateSchema(final RecordSchemaValidator recordSchemaValidator,
final Map<String, ImmutablePair<Set<String>, Integer>> validationErrors,
final Map<AirbyteStreamNameNamespacePair, ImmutablePair<Set<String>, Integer>> validationErrors,
final AirbyteMessage message) {
if (message.getRecord() == null) {
return;
}

final AirbyteRecordMessage record = message.getRecord();
final String messageStream = WorkerUtils.streamNameWithNamespace(record.getNamespace(), record.getStream());
final AirbyteStreamNameNamespacePair messageStream = AirbyteStreamNameNamespacePair.fromRecordMessage(record);
// avoid noise by validating only if the stream has less than 10 records with validation errors
final boolean streamHasLessThenTenErrs =
validationErrors.get(messageStream) == null || validationErrors.get(messageStream).getRight() < 10;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import io.airbyte.config.StandardSync;
import io.airbyte.config.StandardSyncInput;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair;
import io.airbyte.workers.exception.RecordSchemaValidationException;
import io.airbyte.workers.test_utils.AirbyteMessageUtils;
import io.airbyte.workers.test_utils.TestConfigHelpers;
Expand All @@ -34,13 +35,14 @@ void setup() throws Exception {
@Test
void testValidateValidSchema() throws Exception {
final RecordSchemaValidator recordSchemaValidator = new RecordSchemaValidator(WorkerUtils.mapStreamNamesToSchemas(syncInput));
recordSchemaValidator.validateSchema(VALID_RECORD.getRecord(), STREAM_NAME);
recordSchemaValidator.validateSchema(VALID_RECORD.getRecord(), AirbyteStreamNameNamespacePair.fromRecordMessage(VALID_RECORD.getRecord()));
}

@Test
void testValidateInvalidSchema() throws Exception {
final RecordSchemaValidator recordSchemaValidator = new RecordSchemaValidator(WorkerUtils.mapStreamNamesToSchemas(syncInput));
assertThrows(RecordSchemaValidationException.class, () -> recordSchemaValidator.validateSchema(INVALID_RECORD.getRecord(), STREAM_NAME));
assertThrows(RecordSchemaValidationException.class, () -> recordSchemaValidator.validateSchema(INVALID_RECORD.getRecord(),
AirbyteStreamNameNamespacePair.fromRecordMessage(INVALID_RECORD.getRecord())));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import io.airbyte.config.EnvConfigs;
import io.airbyte.config.StandardSync;
import io.airbyte.config.StandardSyncInput;
import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair;
import io.airbyte.workers.internal.HeartbeatMonitor;
import io.airbyte.workers.test_utils.TestConfigHelpers;
import java.time.Duration;
Expand Down Expand Up @@ -128,17 +129,17 @@ void testProcessDies() {
void testMapStreamNamesToSchemasWithNullNamespace() {
final ImmutablePair<StandardSync, StandardSyncInput> syncPair = TestConfigHelpers.createSyncConfig();
final StandardSyncInput syncInput = syncPair.getValue();
final Map<String, JsonNode> mapOutput = WorkerUtils.mapStreamNamesToSchemas(syncInput);
assertNotNull(mapOutput.get("user_preferences"));
final Map<AirbyteStreamNameNamespacePair, JsonNode> mapOutput = WorkerUtils.mapStreamNamesToSchemas(syncInput);
assertNotNull(mapOutput.get(new AirbyteStreamNameNamespacePair("user_preferences", null)));
}

@Test
void testMapStreamNamesToSchemasWithMultipleNamespaces() {
final ImmutablePair<StandardSync, StandardSyncInput> syncPair = TestConfigHelpers.createSyncConfig(true);
final StandardSyncInput syncInput = syncPair.getValue();
final Map<String, JsonNode> mapOutput = WorkerUtils.mapStreamNamesToSchemas(syncInput);
assertNotNull(mapOutput.get("namespaceuser_preferences"));
assertNotNull(mapOutput.get("namespace2user_preferences"));
final Map<AirbyteStreamNameNamespacePair, JsonNode> mapOutput = WorkerUtils.mapStreamNamesToSchemas(syncInput);
assertNotNull(mapOutput.get(new AirbyteStreamNameNamespacePair("user_preferences", "namespace")));
assertNotNull(mapOutput.get(new AirbyteStreamNameNamespacePair("user_preferences", "namespace2")));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import io.airbyte.metrics.lib.MetricClientFactory;
import io.airbyte.protocol.models.AirbyteLogMessage.Level;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair;
import io.airbyte.protocol.models.AirbyteTraceMessage;
import io.airbyte.validation.json.JsonSchemaValidator;
import io.airbyte.workers.*;
Expand Down Expand Up @@ -160,8 +161,10 @@ void test() throws Exception {
verify(destination).accept(RECORD_MESSAGE2);
verify(source, atLeastOnce()).close();
verify(destination).close();
verify(recordSchemaValidator).validateSchema(RECORD_MESSAGE1.getRecord(), STREAM_NAME);
verify(recordSchemaValidator).validateSchema(RECORD_MESSAGE2.getRecord(), STREAM_NAME);
verify(recordSchemaValidator).validateSchema(RECORD_MESSAGE1.getRecord(),
AirbyteStreamNameNamespacePair.fromRecordMessage(RECORD_MESSAGE1.getRecord()));
verify(recordSchemaValidator).validateSchema(RECORD_MESSAGE2.getRecord(),
AirbyteStreamNameNamespacePair.fromRecordMessage(RECORD_MESSAGE2.getRecord()));
}

@Test
Expand All @@ -185,9 +188,12 @@ void testInvalidSchema() throws Exception {
verify(destination).accept(RECORD_MESSAGE1);
verify(destination).accept(RECORD_MESSAGE2);
verify(destination).accept(RECORD_MESSAGE3);
verify(recordSchemaValidator).validateSchema(RECORD_MESSAGE1.getRecord(), STREAM_NAME);
verify(recordSchemaValidator).validateSchema(RECORD_MESSAGE2.getRecord(), STREAM_NAME);
verify(recordSchemaValidator).validateSchema(RECORD_MESSAGE3.getRecord(), STREAM_NAME);
verify(recordSchemaValidator).validateSchema(RECORD_MESSAGE1.getRecord(),
AirbyteStreamNameNamespacePair.fromRecordMessage(RECORD_MESSAGE1.getRecord()));
verify(recordSchemaValidator).validateSchema(RECORD_MESSAGE2.getRecord(),
AirbyteStreamNameNamespacePair.fromRecordMessage(RECORD_MESSAGE2.getRecord()));
verify(recordSchemaValidator).validateSchema(RECORD_MESSAGE3.getRecord(),
AirbyteStreamNameNamespacePair.fromRecordMessage(RECORD_MESSAGE3.getRecord()));
verify(source).close();
verify(destination).close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import io.airbyte.commons.functional.CheckedBiFunction;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.base.AirbyteMessageConsumer;
import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair;
import io.airbyte.integrations.destination.NamingConventionTransformer;
import io.airbyte.integrations.destination.buffered_stream_consumer.BufferedStreamConsumer;
import io.airbyte.integrations.destination.buffered_stream_consumer.OnCloseFunction;
Expand All @@ -18,6 +17,7 @@
import io.airbyte.integrations.destination.record_buffer.SerializedBufferingStrategy;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteStream;
import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.DestinationSyncMode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

import io.airbyte.commons.functional.CheckedBiFunction;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair;
import io.airbyte.integrations.destination.record_buffer.BufferStorage;
import io.airbyte.integrations.destination.record_buffer.SerializableBuffer;
import io.airbyte.integrations.destination.s3.avro.AvroSerializedBuffer;
Expand All @@ -16,6 +15,7 @@
import io.airbyte.integrations.destination.s3.jsonl.JsonLSerializedBuffer;
import io.airbyte.integrations.destination.s3.jsonl.S3JsonlFormatConfig;
import io.airbyte.integrations.destination.s3.parquet.ParquetSerializedBuffer;
import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import java.util.concurrent.Callable;
import java.util.function.Function;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@
package io.airbyte.integrations.destination.s3.avro;

import io.airbyte.commons.functional.CheckedBiFunction;
import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair;
import io.airbyte.integrations.destination.record_buffer.BaseSerializedBuffer;
import io.airbyte.integrations.destination.record_buffer.BufferStorage;
import io.airbyte.integrations.destination.record_buffer.SerializableBuffer;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import java.io.IOException;
import java.io.OutputStream;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@
package io.airbyte.integrations.destination.s3.csv;

import io.airbyte.commons.functional.CheckedBiFunction;
import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair;
import io.airbyte.integrations.destination.record_buffer.BaseSerializedBuffer;
import io.airbyte.integrations.destination.record_buffer.BufferStorage;
import io.airbyte.integrations.destination.record_buffer.SerializableBuffer;
import io.airbyte.integrations.destination.s3.util.CompressionType;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import java.io.IOException;
import java.io.OutputStream;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@
import io.airbyte.commons.functional.CheckedBiFunction;
import io.airbyte.commons.jackson.MoreMappers;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair;
import io.airbyte.integrations.base.JavaBaseConstants;
import io.airbyte.integrations.destination.record_buffer.BaseSerializedBuffer;
import io.airbyte.integrations.destination.record_buffer.BufferStorage;
import io.airbyte.integrations.destination.record_buffer.SerializableBuffer;
import io.airbyte.integrations.destination.s3.S3DestinationConstants;
import io.airbyte.integrations.destination.s3.util.CompressionType;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import java.io.OutputStream;
import java.io.PrintWriter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@
package io.airbyte.integrations.destination.s3.parquet;

import io.airbyte.commons.functional.CheckedBiFunction;
import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair;
import io.airbyte.integrations.destination.record_buffer.FileBuffer;
import io.airbyte.integrations.destination.record_buffer.SerializableBuffer;
import io.airbyte.integrations.destination.s3.S3DestinationConfig;
import io.airbyte.integrations.destination.s3.avro.AvroConstants;
import io.airbyte.integrations.destination.s3.avro.AvroRecordFactory;
import io.airbyte.integrations.destination.s3.avro.JsonToAvroSchemaConverter;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import java.io.File;
import java.io.FileInputStream;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair;
import io.airbyte.integrations.destination.record_buffer.BufferStorage;
import io.airbyte.integrations.destination.record_buffer.FileBuffer;
import io.airbyte.integrations.destination.record_buffer.InMemoryBuffer;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.Field;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair;
import io.airbyte.integrations.destination.record_buffer.BufferStorage;
import io.airbyte.integrations.destination.record_buffer.FileBuffer;
import io.airbyte.integrations.destination.record_buffer.InMemoryBuffer;
import io.airbyte.integrations.destination.s3.S3Format;
import io.airbyte.integrations.destination.s3.csv.S3CsvFormatConfig.Flattening;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.Field;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair;
import io.airbyte.integrations.destination.record_buffer.BufferStorage;
import io.airbyte.integrations.destination.record_buffer.FileBuffer;
import io.airbyte.integrations.destination.record_buffer.InMemoryBuffer;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.Field;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@
import com.amazonaws.util.IOUtils;
import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair;
import io.airbyte.integrations.destination.record_buffer.SerializableBuffer;
import io.airbyte.integrations.destination.s3.S3DestinationConfig;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.Field;
Expand Down
Loading