Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Go back v0 as default #22278

Merged
merged 9 commits into from
Feb 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion airbyte-bootloader/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ airbyte:
target:
range:
min-version: ${AIRBYTE_PROTOCOL_VERSION_MIN:0.0.0}
max-version: ${AIRBYTE_PROTOCOL_VERSION_MAX:1.0.0}
max-version: ${AIRBYTE_PROTOCOL_VERSION_MAX:0.3.0}
secret:
persistence: ${SECRET_PERSISTENCE:TESTING_CONFIG_DB_TABLE}
store:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ public class MigrationContainer<T extends Migration> {

private final List<T> migrationsToRegister;
private final SortedMap<String, T> migrations = new TreeMap<>();
private String mostRecentMajorVersion = "";

// mostRecentMajorVersion defaults to v0 as no migration is required
private String mostRecentMajorVersion = "0";

public MigrationContainer(final List<T> migrations) {
this.migrationsToRegister = migrations;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.JsonSchemaReferenceTypes;
import io.airbyte.validation.json.JsonSchemaValidator;
import jakarta.inject.Singleton;
import java.util.Iterator;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;

@Singleton
// Disable V1 Migration, uncomment to re-enable
// @Singleton
public class AirbyteMessageMigrationV1 implements AirbyteMessageMigration<io.airbyte.protocol.models.v0.AirbyteMessage, AirbyteMessage> {

private final JsonSchemaValidator validator;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,4 +118,102 @@ private static boolean hasV0DataType(final JsonNode schema) {
return false;
}

/**
* Performs an in-place migration of the schema from v1 to v0 if v1 data types are detected
*
* @param configuredAirbyteCatalog to migrate
*/
public static void downgradeSchemaIfNeeded(final ConfiguredAirbyteCatalog configuredAirbyteCatalog) {
if (containsV1DataTypes(configuredAirbyteCatalog)) {
downgradeSchema(configuredAirbyteCatalog);
}
}

/**
* Performs an in-place migration of the schema from v1 to v0 if v1 data types are detected
*
* @param airbyteCatalog to migrate
*/
public static void downgradeSchemaIfNeeded(final AirbyteCatalog airbyteCatalog) {
if (containsV1DataTypes(airbyteCatalog)) {
downgradeSchema(airbyteCatalog);
}
}

/**
* Performs an in-place migration of the schema from v1 to v0
*
* @param configuredAirbyteCatalog to migrate
*/
private static void downgradeSchema(final ConfiguredAirbyteCatalog configuredAirbyteCatalog) {
for (final var stream : configuredAirbyteCatalog.getStreams()) {
SchemaMigrationV1.downgradeSchema(stream.getStream().getJsonSchema());
}
}

/**
* Performs an in-place migration of the schema from v1 to v0
*
* @param airbyteCatalog to migrate
*/
private static void downgradeSchema(final AirbyteCatalog airbyteCatalog) {
for (final var stream : airbyteCatalog.getStreams()) {
SchemaMigrationV1.downgradeSchema(stream.getJsonSchema());
}
}

/**
* Returns true if catalog contains v1 data types
*/
private static boolean containsV1DataTypes(final ConfiguredAirbyteCatalog configuredAirbyteCatalog) {
if (configuredAirbyteCatalog == null) {
return false;
}

return configuredAirbyteCatalog
.getStreams()
.stream().findFirst()
.map(ConfiguredAirbyteStream::getStream)
.map(CatalogMigrationV1Helper::streamContainsV1DataTypes)
.orElse(false);
}

/**
* Returns true if catalog contains v1 data types
*/
private static boolean containsV1DataTypes(final AirbyteCatalog airbyteCatalog) {
if (airbyteCatalog == null) {
return false;
}

return airbyteCatalog
.getStreams()
.stream().findFirst()
.map(CatalogMigrationV1Helper::streamContainsV1DataTypes)
.orElse(false);
}

private static boolean streamContainsV1DataTypes(final AirbyteStream airbyteStream) {
if (airbyteStream == null || airbyteStream.getJsonSchema() == null) {
return false;
}
return hasV1DataType(airbyteStream.getJsonSchema());
}

/**
* Performs of search of a v0 data type node, returns true at the first node found.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* Performs of search of a v0 data type node, returns true at the first node found.
* Performs of search of a v1 data type node, returns true at the first node found.

*/
private static boolean hasV1DataType(final JsonNode schema) {
if (SchemaMigrationV1.isPrimitiveReferenceTypeDeclaration(schema)) {
return true;
}

for (final JsonNode subSchema : SchemaMigrations.findSubschemas(schema)) {
if (hasV1DataType(subSchema)) {
return true;
}
}
return false;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@
import io.airbyte.commons.version.Version;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import jakarta.inject.Singleton;

@Singleton
// Disable V1 Migration, uncomment to re-enable
// @Singleton
public class ConfiguredAirbyteCatalogMigrationV1
implements ConfiguredAirbyteCatalogMigration<io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog, ConfiguredAirbyteCatalog> {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ static boolean isPrimitiveTypeDeclaration(final JsonNode schema) {
* Detects any schema that looks like a reference type declaration, e.g.: { "$ref":
* "WellKnownTypes.json...." } or { "oneOf": [{"$ref": "..."}, {"type": "object"}] }
*/
private static boolean isPrimitiveReferenceTypeDeclaration(final JsonNode schema) {
static boolean isPrimitiveReferenceTypeDeclaration(final JsonNode schema) {
if (!schema.isObject()) {
// Non-object schemas (i.e. true/false) never need to be modified
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
package io.airbyte.commons.protocol.serde;

import io.airbyte.commons.version.AirbyteProtocolVersion;
import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteMessage;
import jakarta.inject.Singleton;

@Singleton
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
package io.airbyte.commons.protocol.serde;

import io.airbyte.commons.version.AirbyteProtocolVersion;
import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteMessage;
import jakarta.inject.Singleton;

@Singleton
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class MigratorsMicronautTest {

// This should contain the list of all the supported majors of the airbyte protocol except the most
// recent one since the migrations themselves are keyed on the lower version.
final Set<String> SUPPORTED_VERSIONS = Set.of("0");
final Set<String> SUPPORTED_VERSIONS = Set.of();

@Test
void testAirbyteMessageMigrationInjection() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
import static org.junit.jupiter.api.Assertions.assertEquals;

import io.airbyte.commons.json.Jsons;
import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.v0.AirbyteMessage.Type;
import io.airbyte.protocol.models.v0.ConnectorSpecification;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteMessage.Type;
import io.airbyte.protocol.models.ConnectorSpecification;
import java.net.URI;
import java.net.URISyntaxException;
import org.junit.jupiter.api.Test;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
import io.airbyte.commons.protocol.AirbyteMessageSerDeProvider;
import io.airbyte.commons.protocol.AirbyteProtocolVersionedMigratorFactory;
import io.airbyte.commons.protocol.ConfiguredAirbyteCatalogMigrator;
import io.airbyte.commons.protocol.migrations.v1.AirbyteMessageMigrationV1;
import io.airbyte.commons.protocol.migrations.v1.ConfiguredAirbyteCatalogMigrationV1;
import io.airbyte.commons.protocol.serde.AirbyteMessageV0Deserializer;
import io.airbyte.commons.protocol.serde.AirbyteMessageV0Serializer;
import io.airbyte.commons.protocol.serde.AirbyteMessageV1Deserializer;
Expand Down Expand Up @@ -45,10 +43,12 @@ void beforeEach() {
List.of(new AirbyteMessageV0Serializer(), new AirbyteMessageV1Serializer())));
serDeProvider.initialize();
final AirbyteMessageMigrator airbyteMessageMigrator = new AirbyteMessageMigrator(
List.of(new AirbyteMessageMigrationV1()));
// TODO once data types v1 is re-enabled, this test should contain the migration
List.of(/* new AirbyteMessageMigrationV1() */));
airbyteMessageMigrator.initialize();
final ConfiguredAirbyteCatalogMigrator configuredAirbyteCatalogMigrator = new ConfiguredAirbyteCatalogMigrator(
List.of(new ConfiguredAirbyteCatalogMigrationV1()));
// TODO once data types v1 is re-enabled, this test should contain the migration
List.of(/* new ConfiguredAirbyteCatalogMigrationV1() */));
configuredAirbyteCatalogMigrator.initialize();
migratorFactory = spy(new AirbyteProtocolVersionedMigratorFactory(airbyteMessageMigrator, configuredAirbyteCatalogMigrator));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ public class EnvConfigs implements Configs {
private static final long DEFAULT_MAX_SYNC_WORKERS = 5;
private static final long DEFAULT_MAX_NOTIFY_WORKERS = 5;
private static final String DEFAULT_NETWORK = "host";
private static final Version DEFAULT_AIRBYTE_PROTOCOL_VERSION_MAX = new Version("1.0.0");
private static final Version DEFAULT_AIRBYTE_PROTOCOL_VERSION_MAX = new Version("0.3.0");
private static final Version DEFAULT_AIRBYTE_PROTOCOL_VERSION_MIN = new Version("0.0.0");
private static final String AUTO_DETECT_SCHEMA = "AUTO_DETECT_SCHEMA";
private static final String APPLY_FIELD_SELECTION = "APPLY_FIELD_SELECTION";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,9 @@ public static StandardSync buildStandardSync(final Record record, final List<UUI
private static ConfiguredAirbyteCatalog parseConfiguredAirbyteCatalog(final String configuredAirbyteCatalogString) {
final ConfiguredAirbyteCatalog configuredAirbyteCatalog = Jsons.deserialize(configuredAirbyteCatalogString, ConfiguredAirbyteCatalog.class);
// On-the-fly migration of persisted data types related objects (protocol v0->v1)
CatalogMigrationV1Helper.upgradeSchemaIfNeeded(configuredAirbyteCatalog);
// TODO feature flag this for data types rollout
// CatalogMigrationV1Helper.upgradeSchemaIfNeeded(configuredAirbyteCatalog);
CatalogMigrationV1Helper.downgradeSchemaIfNeeded(configuredAirbyteCatalog);
return configuredAirbyteCatalog;
}

Expand Down Expand Up @@ -249,7 +251,9 @@ public static ActorCatalogWithUpdatedAt buildActorCatalogWithUpdatedAt(final Rec
public static AirbyteCatalog parseAirbyteCatalog(final String airbyteCatalogString) {
final AirbyteCatalog airbyteCatalog = Jsons.deserialize(airbyteCatalogString, AirbyteCatalog.class);
// On-the-fly migration of persisted data types related objects (protocol v0->v1)
CatalogMigrationV1Helper.upgradeSchemaIfNeeded(airbyteCatalog);
// TODO feature flag this for data types rollout
// CatalogMigrationV1Helper.upgradeSchemaIfNeeded(airbyteCatalog);
CatalogMigrationV1Helper.downgradeSchemaIfNeeded(airbyteCatalog);
return airbyteCatalog;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ void testSimpleInsertActorCatalog() throws IOException, JsonValidationException,
configRepository.writeSourceConnectionNoSecrets(source);

final AirbyteCatalog actorCatalog = CatalogHelpers.createAirbyteCatalog("clothes", Field.of("name", JsonSchemaType.STRING));
final AirbyteCatalog expectedActorCatalog = CatalogHelpers.createAirbyteCatalog("clothes", Field.of("name", JsonSchemaType.STRING_V1));
final AirbyteCatalog expectedActorCatalog = CatalogHelpers.createAirbyteCatalog("clothes", Field.of("name", JsonSchemaType.STRING));
configRepository.writeActorCatalogFetchEvent(
actorCatalog, source.getSourceId(), DOCKER_IMAGE_TAG, CONFIG_HASH);

Expand Down Expand Up @@ -201,7 +201,8 @@ void testSimpleInsertActorCatalog() throws IOException, JsonValidationException,
assertEquals(expectedActorCatalog, Jsons.object(catalogNewConfig.get().getCatalog(), AirbyteCatalog.class));

final int catalogDbEntry2 = database.query(ctx -> ctx.selectCount().from(ACTOR_CATALOG)).fetchOne().into(int.class);
assertEquals(2, catalogDbEntry2);
// TODO this should be 2 once we re-enable datatypes v1
assertEquals(1, catalogDbEntry2);
}

@Test
Expand Down Expand Up @@ -484,13 +485,16 @@ void testGetStandardSyncUsingOperation() throws IOException {
}

private List<StandardSync> copyWithV1Types(final List<StandardSync> syncs) {
return syncs.stream()
.map(standardSync -> {
final StandardSync copiedStandardSync = Jsons.deserialize(Jsons.serialize(standardSync), StandardSync.class);
copiedStandardSync.setCatalog(MockData.getConfiguredCatalogWithV1DataTypes());
return copiedStandardSync;
})
.toList();
return syncs;
// TODO adjust with data types feature flag testing
// return syncs.stream()
// .map(standardSync -> {
// final StandardSync copiedStandardSync = Jsons.deserialize(Jsons.serialize(standardSync),
// StandardSync.class);
// copiedStandardSync.setCatalog(MockData.getConfiguredCatalogWithV1DataTypes());
// return copiedStandardSync;
// })
// .toList();
}

private void assertSyncsMatch(final List<StandardSync> expectedSyncs, final List<StandardSync> actualSyncs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -931,9 +931,13 @@ private static JobConfig parseJobConfigFromString(final String jobConfigString)
final JobConfig jobConfig = Jsons.deserialize(jobConfigString, JobConfig.class);
// On-the-fly migration of persisted data types related objects (protocol v0->v1)
if (jobConfig.getConfigType() == ConfigType.SYNC && jobConfig.getSync() != null) {
CatalogMigrationV1Helper.upgradeSchemaIfNeeded(jobConfig.getSync().getConfiguredAirbyteCatalog());
// TODO feature flag this for data types rollout
// CatalogMigrationV1Helper.upgradeSchemaIfNeeded(jobConfig.getSync().getConfiguredAirbyteCatalog());
CatalogMigrationV1Helper.downgradeSchemaIfNeeded(jobConfig.getSync().getConfiguredAirbyteCatalog());
} else if (jobConfig.getConfigType() == ConfigType.RESET_CONNECTION && jobConfig.getResetConnection() != null) {
CatalogMigrationV1Helper.upgradeSchemaIfNeeded(jobConfig.getResetConnection().getConfiguredAirbyteCatalog());
// TODO feature flag this for data types rollout
// CatalogMigrationV1Helper.upgradeSchemaIfNeeded(jobConfig.getResetConnection().getConfiguredAirbyteCatalog());
CatalogMigrationV1Helper.downgradeSchemaIfNeeded(jobConfig.getResetConnection().getConfiguredAirbyteCatalog());
}
return jobConfig;
}
Expand All @@ -960,9 +964,13 @@ private static JobOutput parseJobOutputFromString(final String jobOutputString)
final JobOutput jobOutput = Jsons.deserialize(jobOutputString, JobOutput.class);
// On-the-fly migration of persisted data types related objects (protocol v0->v1)
if (jobOutput.getOutputType() == OutputType.DISCOVER_CATALOG && jobOutput.getDiscoverCatalog() != null) {
CatalogMigrationV1Helper.upgradeSchemaIfNeeded(jobOutput.getDiscoverCatalog().getCatalog());
// TODO feature flag this for data types rollout
// CatalogMigrationV1Helper.upgradeSchemaIfNeeded(jobOutput.getDiscoverCatalog().getCatalog());
CatalogMigrationV1Helper.downgradeSchemaIfNeeded(jobOutput.getDiscoverCatalog().getCatalog());
} else if (jobOutput.getOutputType() == OutputType.SYNC && jobOutput.getSync() != null) {
CatalogMigrationV1Helper.upgradeSchemaIfNeeded(jobOutput.getSync().getOutputCatalog());
// TODO feature flag this for data types rollout
// CatalogMigrationV1Helper.upgradeSchemaIfNeeded(jobOutput.getSync().getOutputCatalog());
CatalogMigrationV1Helper.downgradeSchemaIfNeeded(jobOutput.getSync().getOutputCatalog());
}
return jobOutput;
}
Expand Down
2 changes: 1 addition & 1 deletion airbyte-server/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ airbyte:
root: ${WORKSPACE_ROOT}
protocol:
min-version: ${AIRBYTE_PROTOCOL_VERSION_MIN:0.0.0}
max-version: ${AIRBYTE_PROTOCOL_VERSION_MAX:1.0.0}
max-version: ${AIRBYTE_PROTOCOL_VERSION_MAX:0.3.0}

temporal:
cloud:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -585,19 +585,6 @@ public List<JsonNode> retrieveSourceRecords(final Database database, final Strin
return database.query(context -> context.fetch(String.format("SELECT * FROM %s;", table)))
.stream()
.map(Record::intoMap)
.map(rec -> {
// The protocol requires converting numbers to strings. source-postgres does that internally,
// but we're querying the DB directly, so we have to do it manually.
final Map<String, Object> stringifiedNumbers = new HashMap<>();
for (final String key : rec.keySet()) {
Object o = rec.get(key);
if (o instanceof Number) {
o = o.toString();
}
stringifiedNumbers.put(key, o);
}
return stringifiedNumbers;
})
.map(Jsons::jsonNode)
.collect(Collectors.toList());
}
Expand Down
Loading