Skip to content

Commit

Permalink
Introduce the AirbyteStreamNamespaceNamePair to the DefaultReplicatio…
Browse files Browse the repository at this point in the history
…nWorker. (#19360)

#19191 made me realise the DefaultReplicationWorker's metric tracking today has a bug where we aren't accounting for namespace when tracking metrics today. i.e. Streams with the same name and duplicate namespace will merge metrics.

While reading the code to figure out a fix, I realised we don't have a good conceptual representation of stream namespace <> name pairs within the platform today. We use a concatenated string. Though this works, it will become harder and harder to read/track as we do more operations that involve namespace i.e. progress bars and column selection.

This PR introduces the AirbyteStreamNameNamespacePair object into the platform code to make it more convenient to work with Streams in the future. (Especially if we proceed with the project to make streams a first-class citizen!)

The AirbyteStreamNameNamespacePair object was written to deal with the same issue of namespace <> name pair manipulation within the Java destination code. It implements the Comparable interface, which makes it convenient to use for Collections operations.

For an example of how this is consumed, see #19361.
  • Loading branch information
davinchia committed Nov 16, 2022
1 parent 8f2cf21 commit 0d4a2bb
Show file tree
Hide file tree
Showing 79 changed files with 128 additions and 130 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair;
import io.airbyte.validation.json.JsonSchemaValidator;
import io.airbyte.validation.json.JsonValidationException;
import io.airbyte.workers.exception.RecordSchemaValidationException;
Expand All @@ -22,9 +23,9 @@

public class RecordSchemaValidator {

private final Map<String, JsonNode> streams;
private final Map<AirbyteStreamNameNamespacePair, JsonNode> streams;

public RecordSchemaValidator(final Map<String, JsonNode> streamNamesToSchemas) {
public RecordSchemaValidator(final Map<AirbyteStreamNameNamespacePair, JsonNode> streamNamesToSchemas) {
// streams is Map of a stream source namespace + name mapped to the stream schema
// for easy access when we check each record's schema
this.streams = streamNamesToSchemas;
Expand All @@ -37,7 +38,8 @@ public RecordSchemaValidator(final Map<String, JsonNode> streamNamesToSchemas) {
* @param message
* @throws RecordSchemaValidationException
*/
public void validateSchema(final AirbyteRecordMessage message, final String messageStream) throws RecordSchemaValidationException {
public void validateSchema(final AirbyteRecordMessage message, final AirbyteStreamNameNamespacePair messageStream)
throws RecordSchemaValidationException {
final JsonNode messageData = message.getData();
final JsonNode matchingSchema = streams.get(messageStream);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import io.airbyte.metrics.lib.MetricAttribute;
import io.airbyte.metrics.lib.MetricClient;
import io.airbyte.metrics.lib.OssMetricsRegistry;
import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair;

public class WorkerMetricReporter {

Expand All @@ -21,9 +22,9 @@ public WorkerMetricReporter(final MetricClient metricClient, final String docker
this.metricClient = metricClient;
}

public void trackSchemaValidationError(final String stream) {
public void trackSchemaValidationError(final AirbyteStreamNameNamespacePair stream) {
metricClient.count(OssMetricsRegistry.NUM_SOURCE_STREAMS_WITH_RECORD_SCHEMA_VALIDATION_ERRORS, 1, new MetricAttribute("docker_repo", dockerRepo),
new MetricAttribute("docker_version", dockerVersion), new MetricAttribute("stream", stream));
new MetricAttribute("docker_version", dockerVersion), new MetricAttribute("stream", stream.toString()));
}

public void trackStateMetricTrackerError() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import io.airbyte.config.WorkerSourceConfig;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteMessage.Type;
import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair;
import io.airbyte.protocol.models.AirbyteTraceMessage;
import io.airbyte.workers.exception.WorkerException;
import io.airbyte.workers.helper.FailureHelper;
Expand All @@ -22,11 +23,9 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -132,18 +131,12 @@ public static ConnectorJobOutput getJobFailureOutputOrThrow(final OutputType out
throw new WorkerException(defaultErrorMessage);
}

public static Map<String, JsonNode> mapStreamNamesToSchemas(final StandardSyncInput syncInput) {
public static Map<AirbyteStreamNameNamespacePair, JsonNode> mapStreamNamesToSchemas(final StandardSyncInput syncInput) {
return syncInput.getCatalog().getStreams().stream().collect(
Collectors.toMap(
k -> {
return streamNameWithNamespace(k.getStream().getNamespace(), k.getStream().getName());
},
k -> AirbyteStreamNameNamespacePair.fromAirbyteStream(k.getStream()),
v -> v.getStream().getJsonSchema()));

}

public static String streamNameWithNamespace(final @Nullable String namespace, final String streamName) {
return Objects.toString(namespace, "").trim() + streamName.trim();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteMessage.Type;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair;
import io.airbyte.workers.RecordSchemaValidator;
import io.airbyte.workers.WorkerMetricReporter;
import io.airbyte.workers.WorkerUtils;
Expand Down Expand Up @@ -287,7 +288,7 @@ private static Runnable readFromSrcAndWriteToDstRunnable(final AirbyteSource sou
MDC.setContextMap(mdc);
LOGGER.info("Replication thread started.");
Long recordsRead = 0L;
final Map<String, ImmutablePair<Set<String>, Integer>> validationErrors = new HashMap<>();
final Map<AirbyteStreamNameNamespacePair, ImmutablePair<Set<String>, Integer>> validationErrors = new HashMap<>();
try {
while (!cancelled.get() && !source.isFinished()) {
final Optional<AirbyteMessage> messageOptional;
Expand Down Expand Up @@ -516,14 +517,14 @@ private List<FailureReason> getFailureReasons(AtomicReference<FailureReason> rep
}

private static void validateSchema(final RecordSchemaValidator recordSchemaValidator,
final Map<String, ImmutablePair<Set<String>, Integer>> validationErrors,
final Map<AirbyteStreamNameNamespacePair, ImmutablePair<Set<String>, Integer>> validationErrors,
final AirbyteMessage message) {
if (message.getRecord() == null) {
return;
}

final AirbyteRecordMessage record = message.getRecord();
final String messageStream = WorkerUtils.streamNameWithNamespace(record.getNamespace(), record.getStream());
final AirbyteStreamNameNamespacePair messageStream = AirbyteStreamNameNamespacePair.fromRecordMessage(record);
// avoid noise by validating only if the stream has less than 10 records with validation errors
final boolean streamHasLessThenTenErrs =
validationErrors.get(messageStream) == null || validationErrors.get(messageStream).getRight() < 10;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import io.airbyte.config.StandardSync;
import io.airbyte.config.StandardSyncInput;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair;
import io.airbyte.workers.exception.RecordSchemaValidationException;
import io.airbyte.workers.test_utils.AirbyteMessageUtils;
import io.airbyte.workers.test_utils.TestConfigHelpers;
Expand All @@ -34,13 +35,14 @@ void setup() throws Exception {
@Test
void testValidateValidSchema() throws Exception {
final RecordSchemaValidator recordSchemaValidator = new RecordSchemaValidator(WorkerUtils.mapStreamNamesToSchemas(syncInput));
recordSchemaValidator.validateSchema(VALID_RECORD.getRecord(), STREAM_NAME);
recordSchemaValidator.validateSchema(VALID_RECORD.getRecord(), AirbyteStreamNameNamespacePair.fromRecordMessage(VALID_RECORD.getRecord()));
}

@Test
void testValidateInvalidSchema() throws Exception {
final RecordSchemaValidator recordSchemaValidator = new RecordSchemaValidator(WorkerUtils.mapStreamNamesToSchemas(syncInput));
assertThrows(RecordSchemaValidationException.class, () -> recordSchemaValidator.validateSchema(INVALID_RECORD.getRecord(), STREAM_NAME));
assertThrows(RecordSchemaValidationException.class, () -> recordSchemaValidator.validateSchema(INVALID_RECORD.getRecord(),
AirbyteStreamNameNamespacePair.fromRecordMessage(INVALID_RECORD.getRecord())));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import io.airbyte.config.EnvConfigs;
import io.airbyte.config.StandardSync;
import io.airbyte.config.StandardSyncInput;
import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair;
import io.airbyte.workers.internal.HeartbeatMonitor;
import io.airbyte.workers.test_utils.TestConfigHelpers;
import java.time.Duration;
Expand Down Expand Up @@ -128,17 +129,17 @@ void testProcessDies() {
void testMapStreamNamesToSchemasWithNullNamespace() {
final ImmutablePair<StandardSync, StandardSyncInput> syncPair = TestConfigHelpers.createSyncConfig();
final StandardSyncInput syncInput = syncPair.getValue();
final Map<String, JsonNode> mapOutput = WorkerUtils.mapStreamNamesToSchemas(syncInput);
assertNotNull(mapOutput.get("user_preferences"));
final Map<AirbyteStreamNameNamespacePair, JsonNode> mapOutput = WorkerUtils.mapStreamNamesToSchemas(syncInput);
assertNotNull(mapOutput.get(new AirbyteStreamNameNamespacePair("user_preferences", null)));
}

@Test
void testMapStreamNamesToSchemasWithMultipleNamespaces() {
final ImmutablePair<StandardSync, StandardSyncInput> syncPair = TestConfigHelpers.createSyncConfig(true);
final StandardSyncInput syncInput = syncPair.getValue();
final Map<String, JsonNode> mapOutput = WorkerUtils.mapStreamNamesToSchemas(syncInput);
assertNotNull(mapOutput.get("namespaceuser_preferences"));
assertNotNull(mapOutput.get("namespace2user_preferences"));
final Map<AirbyteStreamNameNamespacePair, JsonNode> mapOutput = WorkerUtils.mapStreamNamesToSchemas(syncInput);
assertNotNull(mapOutput.get(new AirbyteStreamNameNamespacePair("user_preferences", "namespace")));
assertNotNull(mapOutput.get(new AirbyteStreamNameNamespacePair("user_preferences", "namespace2")));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import io.airbyte.metrics.lib.MetricClientFactory;
import io.airbyte.protocol.models.AirbyteLogMessage.Level;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair;
import io.airbyte.protocol.models.AirbyteTraceMessage;
import io.airbyte.validation.json.JsonSchemaValidator;
import io.airbyte.workers.*;
Expand Down Expand Up @@ -160,8 +161,10 @@ void test() throws Exception {
verify(destination).accept(RECORD_MESSAGE2);
verify(source, atLeastOnce()).close();
verify(destination).close();
verify(recordSchemaValidator).validateSchema(RECORD_MESSAGE1.getRecord(), STREAM_NAME);
verify(recordSchemaValidator).validateSchema(RECORD_MESSAGE2.getRecord(), STREAM_NAME);
verify(recordSchemaValidator).validateSchema(RECORD_MESSAGE1.getRecord(),
AirbyteStreamNameNamespacePair.fromRecordMessage(RECORD_MESSAGE1.getRecord()));
verify(recordSchemaValidator).validateSchema(RECORD_MESSAGE2.getRecord(),
AirbyteStreamNameNamespacePair.fromRecordMessage(RECORD_MESSAGE2.getRecord()));
}

@Test
Expand All @@ -185,9 +188,12 @@ void testInvalidSchema() throws Exception {
verify(destination).accept(RECORD_MESSAGE1);
verify(destination).accept(RECORD_MESSAGE2);
verify(destination).accept(RECORD_MESSAGE3);
verify(recordSchemaValidator).validateSchema(RECORD_MESSAGE1.getRecord(), STREAM_NAME);
verify(recordSchemaValidator).validateSchema(RECORD_MESSAGE2.getRecord(), STREAM_NAME);
verify(recordSchemaValidator).validateSchema(RECORD_MESSAGE3.getRecord(), STREAM_NAME);
verify(recordSchemaValidator).validateSchema(RECORD_MESSAGE1.getRecord(),
AirbyteStreamNameNamespacePair.fromRecordMessage(RECORD_MESSAGE1.getRecord()));
verify(recordSchemaValidator).validateSchema(RECORD_MESSAGE2.getRecord(),
AirbyteStreamNameNamespacePair.fromRecordMessage(RECORD_MESSAGE2.getRecord()));
verify(recordSchemaValidator).validateSchema(RECORD_MESSAGE3.getRecord(),
AirbyteStreamNameNamespacePair.fromRecordMessage(RECORD_MESSAGE3.getRecord()));
verify(source).close();
verify(destination).close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import io.airbyte.commons.functional.CheckedBiFunction;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.base.AirbyteMessageConsumer;
import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair;
import io.airbyte.integrations.destination.NamingConventionTransformer;
import io.airbyte.integrations.destination.buffered_stream_consumer.BufferedStreamConsumer;
import io.airbyte.integrations.destination.buffered_stream_consumer.OnCloseFunction;
Expand All @@ -18,6 +17,7 @@
import io.airbyte.integrations.destination.record_buffer.SerializedBufferingStrategy;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteStream;
import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.DestinationSyncMode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

import io.airbyte.commons.functional.CheckedBiFunction;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair;
import io.airbyte.integrations.destination.record_buffer.BufferStorage;
import io.airbyte.integrations.destination.record_buffer.SerializableBuffer;
import io.airbyte.integrations.destination.s3.avro.AvroSerializedBuffer;
Expand All @@ -16,6 +15,7 @@
import io.airbyte.integrations.destination.s3.jsonl.JsonLSerializedBuffer;
import io.airbyte.integrations.destination.s3.jsonl.S3JsonlFormatConfig;
import io.airbyte.integrations.destination.s3.parquet.ParquetSerializedBuffer;
import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import java.util.concurrent.Callable;
import java.util.function.Function;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@
package io.airbyte.integrations.destination.s3.avro;

import io.airbyte.commons.functional.CheckedBiFunction;
import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair;
import io.airbyte.integrations.destination.record_buffer.BaseSerializedBuffer;
import io.airbyte.integrations.destination.record_buffer.BufferStorage;
import io.airbyte.integrations.destination.record_buffer.SerializableBuffer;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import java.io.IOException;
import java.io.OutputStream;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@
package io.airbyte.integrations.destination.s3.csv;

import io.airbyte.commons.functional.CheckedBiFunction;
import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair;
import io.airbyte.integrations.destination.record_buffer.BaseSerializedBuffer;
import io.airbyte.integrations.destination.record_buffer.BufferStorage;
import io.airbyte.integrations.destination.record_buffer.SerializableBuffer;
import io.airbyte.integrations.destination.s3.util.CompressionType;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import java.io.IOException;
import java.io.OutputStream;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@
import io.airbyte.commons.functional.CheckedBiFunction;
import io.airbyte.commons.jackson.MoreMappers;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair;
import io.airbyte.integrations.base.JavaBaseConstants;
import io.airbyte.integrations.destination.record_buffer.BaseSerializedBuffer;
import io.airbyte.integrations.destination.record_buffer.BufferStorage;
import io.airbyte.integrations.destination.record_buffer.SerializableBuffer;
import io.airbyte.integrations.destination.s3.S3DestinationConstants;
import io.airbyte.integrations.destination.s3.util.CompressionType;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import java.io.OutputStream;
import java.io.PrintWriter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@
package io.airbyte.integrations.destination.s3.parquet;

import io.airbyte.commons.functional.CheckedBiFunction;
import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair;
import io.airbyte.integrations.destination.record_buffer.FileBuffer;
import io.airbyte.integrations.destination.record_buffer.SerializableBuffer;
import io.airbyte.integrations.destination.s3.S3DestinationConfig;
import io.airbyte.integrations.destination.s3.avro.AvroConstants;
import io.airbyte.integrations.destination.s3.avro.AvroRecordFactory;
import io.airbyte.integrations.destination.s3.avro.JsonToAvroSchemaConverter;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import java.io.File;
import java.io.FileInputStream;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair;
import io.airbyte.integrations.destination.record_buffer.BufferStorage;
import io.airbyte.integrations.destination.record_buffer.FileBuffer;
import io.airbyte.integrations.destination.record_buffer.InMemoryBuffer;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.Field;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair;
import io.airbyte.integrations.destination.record_buffer.BufferStorage;
import io.airbyte.integrations.destination.record_buffer.FileBuffer;
import io.airbyte.integrations.destination.record_buffer.InMemoryBuffer;
import io.airbyte.integrations.destination.s3.S3Format;
import io.airbyte.integrations.destination.s3.csv.S3CsvFormatConfig.Flattening;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.Field;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair;
import io.airbyte.integrations.destination.record_buffer.BufferStorage;
import io.airbyte.integrations.destination.record_buffer.FileBuffer;
import io.airbyte.integrations.destination.record_buffer.InMemoryBuffer;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.Field;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@
import com.amazonaws.util.IOUtils;
import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair;
import io.airbyte.integrations.destination.record_buffer.SerializableBuffer;
import io.airbyte.integrations.destination.s3.S3DestinationConfig;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.Field;
Expand Down
Loading

0 comments on commit 0d4a2bb

Please sign in to comment.