Skip to content

Commit

Permalink
Prepare injection of Versioning compatible StreamReaderFactory (#17487)
Browse files Browse the repository at this point in the history
* Use Version instead of AirbyteVersion

* Update Deserializer interface to fit better in existing flow

* Add versioned migrator

* Refactor DefaultAirbyteStreamFactory to enable Versioning

* Use explicit constructor

* Add logging on failed message upgrade
  • Loading branch information
gosusnp committed Oct 3, 2022
1 parent 54c01aa commit 5cd605d
Show file tree
Hide file tree
Showing 11 changed files with 170 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

import com.google.common.annotations.VisibleForTesting;
import io.airbyte.commons.protocol.migrations.AirbyteMessageMigration;
import io.airbyte.commons.version.AirbyteVersion;
import io.airbyte.commons.version.Version;
import jakarta.annotation.PostConstruct;
import jakarta.inject.Singleton;
import java.util.Collection;
Expand Down Expand Up @@ -46,7 +46,7 @@ public void initialize() {
* Downgrade a message from the most recent version to the target version by chaining all the
* required migrations
*/
public <PreviousVersion, CurrentVersion> PreviousVersion downgrade(final CurrentVersion message, final AirbyteVersion target) {
public <PreviousVersion, CurrentVersion> PreviousVersion downgrade(final CurrentVersion message, final Version target) {
if (target.getMajorVersion().equals(mostRecentVersion)) {
return (PreviousVersion) message;
}
Expand All @@ -63,7 +63,7 @@ public <PreviousVersion, CurrentVersion> PreviousVersion downgrade(final Current
* Upgrade a message from the source version to the most recent version by chaining all the required
* migrations
*/
public <PreviousVersion, CurrentVersion> CurrentVersion upgrade(final PreviousVersion message, final AirbyteVersion source) {
public <PreviousVersion, CurrentVersion> CurrentVersion upgrade(final PreviousVersion message, final Version source) {
if (source.getMajorVersion().equals(mostRecentVersion)) {
return (CurrentVersion) message;
}
Expand All @@ -75,7 +75,7 @@ public <PreviousVersion, CurrentVersion> CurrentVersion upgrade(final PreviousVe
return (CurrentVersion) result;
}

private Collection<AirbyteMessageMigration<?, ?>> selectMigrations(final AirbyteVersion version) {
private Collection<AirbyteMessageMigration<?, ?>> selectMigrations(final Version version) {
final Collection<AirbyteMessageMigration<?, ?>> results = migrations.tailMap(version.getMajorVersion()).values();
if (results.isEmpty()) {
throw new RuntimeException("Unsupported migration version " + version.serialize());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.commons.protocol;

import io.airbyte.commons.version.Version;
import io.airbyte.protocol.models.v0.AirbyteMessage;

/**
* Wraps message migration from a fixed version to the most recent version
*/
public class AirbyteMessageVersionedMigrator<OriginalMessageType> {

private final AirbyteMessageMigrator migrator;
private final Version version;

public AirbyteMessageVersionedMigrator(final AirbyteMessageMigrator migrator, final Version version) {
this.migrator = migrator;
this.version = version;
}

public OriginalMessageType downgrade(final AirbyteMessage message) {
return migrator.downgrade(message, version);
}

public AirbyteMessage upgrade(final OriginalMessageType message) {
return migrator.upgrade(message, version);
}

public Version getVersion() {
return version;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

package io.airbyte.commons.protocol.migrations;

import io.airbyte.commons.version.AirbyteVersion;
import io.airbyte.commons.version.Version;

/**
* AirbyteProtocol message migration interface
Expand Down Expand Up @@ -33,11 +33,11 @@ public interface AirbyteMessageMigration<PreviousVersion, CurrentVersion> {
/**
* The Old version, note that due to semver, the important piece of information is the Major.
*/
AirbyteVersion getPreviousVersion();
Version getPreviousVersion();

/**
* The New version, note that due to semver, the important piece of information is the Major.
*/
AirbyteVersion getCurrentVersion();
Version getCurrentVersion();

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
package io.airbyte.commons.protocol.migrations;

import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.version.AirbyteVersion;
import io.airbyte.commons.version.Version;
import io.airbyte.protocol.models.AirbyteMessage;

/**
Expand All @@ -32,13 +32,13 @@ public io.airbyte.protocol.models.AirbyteMessage downgrade(final io.airbyte.prot
}

@Override
public AirbyteVersion getPreviousVersion() {
return new AirbyteVersion("0.2.0");
public Version getPreviousVersion() {
return new Version("0.2.0");
}

@Override
public AirbyteVersion getCurrentVersion() {
return new AirbyteVersion("0.2.0");
public Version getCurrentVersion() {
return new Version("0.2.0");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@

package io.airbyte.commons.protocol.serde;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.version.AirbyteVersion;

public interface AirbyteMessageDeserializer<T> {

T deserialize(final String json);
T deserialize(final JsonNode json);

AirbyteVersion getTargetVersion();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package io.airbyte.commons.protocol.serde;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.version.AirbyteVersion;
import lombok.Getter;
Expand All @@ -20,8 +21,8 @@ public AirbyteMessageGenericDeserializer(final AirbyteVersion targetVersion, fin
}

@Override
public T deserialize(String json) {
return Jsons.deserialize(json, typeClass);
public T deserialize(JsonNode json) {
return Jsons.object(json, typeClass);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@
import static org.junit.jupiter.api.Assertions.assertThrows;

import io.airbyte.commons.protocol.migrations.AirbyteMessageMigration;
import io.airbyte.commons.version.AirbyteVersion;
import io.airbyte.commons.version.Version;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

class AirbyteMessageMigratorTest {

static final AirbyteVersion v0 = new AirbyteVersion("0.0.0");
static final AirbyteVersion v1 = new AirbyteVersion("1.0.0");
static final AirbyteVersion v2 = new AirbyteVersion("2.0.0");
static final Version v0 = new Version("0.0.0");
static final Version v1 = new Version("1.0.0");
static final Version v2 = new Version("2.0.0");

record ObjectV0(String name0) {}

Expand All @@ -37,12 +37,12 @@ public ObjectV1 upgrade(ObjectV0 message) {
}

@Override
public AirbyteVersion getPreviousVersion() {
public Version getPreviousVersion() {
return v0;
}

@Override
public AirbyteVersion getCurrentVersion() {
public Version getCurrentVersion() {
return v1;
}

Expand All @@ -61,12 +61,12 @@ public ObjectV2 upgrade(ObjectV1 message) {
}

@Override
public AirbyteVersion getPreviousVersion() {
public Version getPreviousVersion() {
return v1;
}

@Override
public AirbyteVersion getCurrentVersion() {
public Version getCurrentVersion() {
return v2;
}

Expand Down Expand Up @@ -113,14 +113,14 @@ void testUpgrade() {
@Test
void testUnsupportedDowngradeShouldFailExplicitly() {
assertThrows(RuntimeException.class, () -> {
migrator.downgrade(new ObjectV2("woot"), new AirbyteVersion("5.0.0"));
migrator.downgrade(new ObjectV2("woot"), new Version("5.0.0"));
});
}

@Test
void testUnsupportedUpgradeShouldFailExplicitly() {
assertThrows(RuntimeException.class, () -> {
migrator.upgrade(new ObjectV0("woot"), new AirbyteVersion("4.0.0"));
migrator.upgrade(new ObjectV0("woot"), new Version("4.0.0"));
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import static org.junit.jupiter.api.Assertions.assertEquals;

import io.airbyte.commons.json.Jsons;
import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.v0.AirbyteMessage.Type;
import io.airbyte.protocol.models.v0.ConnectorSpecification;
Expand All @@ -28,7 +29,7 @@ void v0SerDeRoundTripTest() throws URISyntaxException {
.withDocumentationUrl(new URI("file:///tmp/doc")));

final String serializedMessage = ser.serialize(message);
final AirbyteMessage deserializedMessage = deser.deserialize(serializedMessage);
final AirbyteMessage deserializedMessage = deser.deserialize(Jsons.deserialize(serializedMessage));

assertEquals(message, deserializedMessage);
}
Expand Down
1 change: 1 addition & 0 deletions airbyte-workers/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ dependencies {
implementation project(':airbyte-analytics')
implementation project(':airbyte-api')
implementation project(':airbyte-commons-docker')
implementation project(':airbyte-commons-protocol')
implementation project(':airbyte-commons-temporal')
implementation project(':airbyte-config:config-models')
implementation project(':airbyte-config:config-persistence')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,46 +52,52 @@ public DefaultAirbyteStreamFactory(final MdcScope.Builder containerLogMdcBuilder
public Stream<AirbyteMessage> create(final BufferedReader bufferedReader) {
return bufferedReader
.lines()
.flatMap(line -> {
final Optional<JsonNode> jsonLine = Jsons.tryDeserialize(line);
if (jsonLine.isEmpty()) {
// we log as info all the lines that are not valid json
// some sources actually log their process on stdout, we
// want to make sure this info is available in the logs.
try (final var mdcScope = containerLogMdcBuilder.build()) {
logger.info(line);
}
}
return jsonLine.stream();
})
// filter invalid messages
.filter(jsonLine -> {
final boolean res = protocolValidator.test(jsonLine);
if (!res) {
logger.error("Validation failed: {}", Jsons.serialize(jsonLine));
}
return res;
})
.flatMap(jsonLine -> {
final Optional<AirbyteMessage> m = Jsons.tryObject(jsonLine, AirbyteMessage.class);
if (m.isEmpty()) {
logger.error("Deserialization failed: {}", Jsons.serialize(jsonLine));
}
return m.stream();
})
// filter logs
.filter(airbyteMessage -> {
final boolean isLog = airbyteMessage.getType() == AirbyteMessage.Type.LOG;
if (isLog) {
try (final var mdcScope = containerLogMdcBuilder.build()) {
internalLog(airbyteMessage.getLog());
}
}
return !isLog;
});
.flatMap(this::parseJson)
.filter(this::validate)
.flatMap(this::toAirbyteMessage)
.filter(this::filterLog);
}

private void internalLog(final AirbyteLogMessage logMessage) {
protected Stream<JsonNode> parseJson(final String line) {
final Optional<JsonNode> jsonLine = Jsons.tryDeserialize(line);
if (jsonLine.isEmpty()) {
// we log as info all the lines that are not valid json
// some sources actually log their process on stdout, we
// want to make sure this info is available in the logs.
try (final var mdcScope = containerLogMdcBuilder.build()) {
logger.info(line);
}
}
return jsonLine.stream();
}

protected boolean validate(final JsonNode json) {
final boolean res = protocolValidator.test(json);
if (!res) {
logger.error("Validation failed: {}", Jsons.serialize(json));
}
return res;
}

protected Stream<AirbyteMessage> toAirbyteMessage(final JsonNode json) {
final Optional<AirbyteMessage> m = Jsons.tryObject(json, AirbyteMessage.class);
if (m.isEmpty()) {
logger.error("Deserialization failed: {}", Jsons.serialize(json));
}
return m.stream();
}

protected boolean filterLog(final AirbyteMessage message) {
final boolean isLog = message.getType() == AirbyteMessage.Type.LOG;
if (isLog) {
try (final var mdcScope = containerLogMdcBuilder.build()) {
internalLog(message.getLog());
}
}
return !isLog;
}

protected void internalLog(final AirbyteLogMessage logMessage) {
final String combinedMessage =
logMessage.getMessage() + (logMessage.getStackTrace() != null ? (System.lineSeparator() + "Stack Trace: " + logMessage.getStackTrace()) : "");

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers.internal;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.logging.MdcScope;
import io.airbyte.commons.protocol.AirbyteMessageVersionedMigrator;
import io.airbyte.commons.protocol.serde.AirbyteMessageDeserializer;
import io.airbyte.protocol.models.AirbyteMessage;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Extends DefaultAirbyteStreamFactory to handle version specific conversions.
*
* A VersionedAirbyteStreamFactory handles parsing and validation from a specific version of the
* Airbyte Protocol as well as upgrading messages to the current version.
*/
public class VersionedAirbyteStreamFactory<T> extends DefaultAirbyteStreamFactory {

private static final Logger LOGGER = LoggerFactory.getLogger(VersionedAirbyteStreamFactory.class);

private final AirbyteMessageDeserializer<T> deserializer;
private final AirbyteMessageVersionedMigrator<T> migrator;

public VersionedAirbyteStreamFactory(final AirbyteMessageDeserializer<T> deserializer,
final AirbyteMessageVersionedMigrator<T> migrator) {
this(deserializer, migrator, MdcScope.DEFAULT_BUILDER);
}

public VersionedAirbyteStreamFactory(final AirbyteMessageDeserializer<T> deserializer,
final AirbyteMessageVersionedMigrator<T> migrator,
final MdcScope.Builder containerLogMdcBuilder) {
// TODO AirbyteProtocolPredicate needs to be updated to be protocol version aware
super(new AirbyteProtocolPredicate(), LOGGER, containerLogMdcBuilder);
this.deserializer = deserializer;
this.migrator = migrator;
}

@Override
protected Stream<AirbyteMessage> toAirbyteMessage(final JsonNode json) {
try {
final io.airbyte.protocol.models.v0.AirbyteMessage message = migrator.upgrade(deserializer.deserialize(json));
return Stream.of(convert(message));
} catch (RuntimeException e) {
LOGGER.warn("Failed to upgrade a message from version {}: {}", migrator.getVersion(), Jsons.serialize(json));
return Stream.empty();
}
}

// TODO remove this conversion once we migrated default AirbyteMessage to be from a versioned
// namespace
private AirbyteMessage convert(final io.airbyte.protocol.models.v0.AirbyteMessage message) {
return Jsons.object(Jsons.jsonNode(message), AirbyteMessage.class);
}

}

0 comments on commit 5cd605d

Please sign in to comment.