From 3dae1f16aa3e205948ebc8959b1d12793d9725b0 Mon Sep 17 00:00:00 2001 From: Anne <102554163+alovew@users.noreply.github.com> Date: Wed, 14 Dec 2022 12:16:50 -0800 Subject: [PATCH] Set breaking change to false when connection is fixed (#20315) * Set breaking change to false when connection is fixed --- .../config/persistence/ConfigRepository.java | 11 +++- .../WebBackendConnectionsHandler.java | 43 +++++++++++- .../WebBackendConnectionsHandlerTest.java | 66 ++++++++++++++++++- 3 files changed, 115 insertions(+), 5 deletions(-) diff --git a/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java b/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java index 43b62f6d810a1..614cd66a52a35 100644 --- a/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java +++ b/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java @@ -1305,13 +1305,22 @@ public Optional getActorCatalog(final UUID actorId, return records.stream().findFirst().map(DbConverter::buildActorCatalog); } + public Optional getMostRecentActorCatalogForSource(final UUID sourceId) throws IOException { + final Result records = database.query(ctx -> ctx.select(ACTOR_CATALOG.asterisk()) + .from(ACTOR_CATALOG) + .join(ACTOR_CATALOG_FETCH_EVENT) + .on(ACTOR_CATALOG_FETCH_EVENT.ACTOR_CATALOG_ID.eq(ACTOR_CATALOG.ID)) + .where(ACTOR_CATALOG_FETCH_EVENT.ACTOR_ID.eq(sourceId)) + .orderBy(ACTOR_CATALOG_FETCH_EVENT.CREATED_AT.desc()).limit(1).fetch()); + return records.stream().findFirst().map(DbConverter::buildActorCatalog); + } + public Optional getMostRecentActorCatalogFetchEventForSource(final UUID sourceId) throws IOException { final Result records = database.query(ctx -> ctx.select(ACTOR_CATALOG_FETCH_EVENT.asterisk()) .from(ACTOR_CATALOG_FETCH_EVENT) .where(ACTOR_CATALOG_FETCH_EVENT.ACTOR_ID.eq(sourceId)) .orderBy(ACTOR_CATALOG_FETCH_EVENT.CREATED_AT.desc()).limit(1).fetch()); - return records.stream().findFirst().map(DbConverter::buildActorCatalogFetchEvent); } diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendConnectionsHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendConnectionsHandler.java index 21af162630d3e..a3c3886807f38 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendConnectionsHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendConnectionsHandler.java @@ -20,6 +20,7 @@ import io.airbyte.api.model.generated.ConnectionUpdate; import io.airbyte.api.model.generated.DestinationIdRequestBody; import io.airbyte.api.model.generated.DestinationRead; +import io.airbyte.api.model.generated.FieldTransform; import io.airbyte.api.model.generated.JobRead; import io.airbyte.api.model.generated.OperationCreate; import io.airbyte.api.model.generated.OperationReadList; @@ -32,6 +33,7 @@ import io.airbyte.api.model.generated.SourceRead; import io.airbyte.api.model.generated.StreamDescriptor; import io.airbyte.api.model.generated.StreamTransform; +import io.airbyte.api.model.generated.StreamTransform.TransformTypeEnum; import io.airbyte.api.model.generated.WebBackendConnectionCreate; import io.airbyte.api.model.generated.WebBackendConnectionListItem; import io.airbyte.api.model.generated.WebBackendConnectionRead; @@ -45,6 +47,7 @@ import io.airbyte.commons.enums.Enums; import io.airbyte.commons.json.Jsons; import io.airbyte.commons.lang.MoreBooleans; +import io.airbyte.config.ActorCatalog; import io.airbyte.config.ActorCatalogFetchEvent; import io.airbyte.config.StandardSync; import io.airbyte.config.persistence.ConfigNotFoundException; @@ -510,6 +513,25 @@ public WebBackendConnectionRead webBackendUpdateConnection(final WebBackendConne final UUID connectionId = webBackendConnectionPatch.getConnectionId(); final ConnectionRead originalConnectionRead = connectionsHandler.getConnection(connectionId); + boolean breakingChange = originalConnectionRead.getBreakingChange() != null && originalConnectionRead.getBreakingChange(); + + // If there have been changes to the sync catalog, check whether these changes result in or fix a + // broken connection + if (webBackendConnectionPatch.getSyncCatalog() != null) { + // Get the most recent actor catalog fetched for this connection's source and the newly updated sync + // catalog + Optional mostRecentActorCatalog = configRepository.getMostRecentActorCatalogForSource(originalConnectionRead.getSourceId()); + AirbyteCatalog newAirbyteCatalog = webBackendConnectionPatch.getSyncCatalog(); + // Get the diff between these two catalogs to check for breaking changes + if (mostRecentActorCatalog.isPresent()) { + final io.airbyte.protocol.models.AirbyteCatalog mostRecentAirbyteCatalog = + Jsons.object(mostRecentActorCatalog.get().getCatalog(), io.airbyte.protocol.models.AirbyteCatalog.class); + final CatalogDiff catalogDiff = + connectionsHandler.getDiff(newAirbyteCatalog, CatalogConverter.toApi(mostRecentAirbyteCatalog), + CatalogConverter.toProtocol(newAirbyteCatalog)); + breakingChange = containsBreakingChange(catalogDiff); + } + } // before doing any updates, fetch the existing catalog so that it can be diffed // with the final catalog to determine which streams might need to be reset. @@ -520,7 +542,7 @@ public WebBackendConnectionRead webBackendUpdateConnection(final WebBackendConne // pass in operationIds because the patch object doesn't include operationIds that were just created // above. - final ConnectionUpdate connectionPatch = toConnectionPatch(webBackendConnectionPatch, newAndExistingOperationIds); + final ConnectionUpdate connectionPatch = toConnectionPatch(webBackendConnectionPatch, newAndExistingOperationIds, breakingChange); // persist the update and set the connectionRead to the updated form. final ConnectionRead updatedConnectionRead = connectionsHandler.updateConnection(connectionPatch); @@ -685,7 +707,8 @@ protected static ConnectionCreate toConnectionCreate(final WebBackendConnectionC */ @VisibleForTesting protected static ConnectionUpdate toConnectionPatch(final WebBackendConnectionUpdate webBackendConnectionPatch, - final List finalOperationIds) { + final List finalOperationIds, + boolean breakingChange) { final ConnectionUpdate connectionPatch = new ConnectionUpdate(); connectionPatch.connectionId(webBackendConnectionPatch.getConnectionId()); @@ -703,6 +726,7 @@ protected static ConnectionUpdate toConnectionPatch(final WebBackendConnectionUp connectionPatch.geography(webBackendConnectionPatch.getGeography()); connectionPatch.notifySchemaChanges(webBackendConnectionPatch.getNotifySchemaChanges()); connectionPatch.nonBreakingChangesPreference(webBackendConnectionPatch.getNonBreakingChangesPreference()); + connectionPatch.breakingChange(breakingChange); connectionPatch.operationIds(finalOperationIds); @@ -723,4 +747,19 @@ private record Stream(String name, String namespace) { } + private boolean containsBreakingChange(final CatalogDiff diff) { + for (final StreamTransform streamTransform : diff.getTransforms()) { + if (streamTransform.getTransformType() != TransformTypeEnum.UPDATE_STREAM) { + continue; + } + + final boolean anyBreakingFieldTransforms = streamTransform.getUpdateStream().stream().anyMatch(FieldTransform::getBreaking); + if (anyBreakingFieldTransforms) { + return true; + } + } + + return false; + } + } diff --git a/airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendConnectionsHandlerTest.java b/airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendConnectionsHandlerTest.java index 94935832373db..943cbfff5c135 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendConnectionsHandlerTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendConnectionsHandlerTest.java @@ -112,6 +112,7 @@ import java.util.stream.Collectors; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; import org.mockito.InOrder; class WebBackendConnectionsHandlerTest { @@ -667,9 +668,10 @@ void testToConnectionPatch() throws IOException { .syncCatalog(catalog) .geography(Geography.US) .nonBreakingChangesPreference(NonBreakingChangesPreference.DISABLE) - .notifySchemaChanges(false); + .notifySchemaChanges(false) + .breakingChange(false); - final ConnectionUpdate actual = WebBackendConnectionsHandler.toConnectionPatch(input, operationIds); + final ConnectionUpdate actual = WebBackendConnectionsHandler.toConnectionPatch(input, operationIds, false); assertEquals(expected, actual); } @@ -1049,6 +1051,66 @@ void testUpdateConnectionWithSkipReset() throws JsonValidationException, ConfigN verify(eventRunner, times(0)).resetConnection(any(), any(), eq(true)); } + @Test + void testUpdateConnectionFixingBreakingSchemaChange() throws JsonValidationException, ConfigNotFoundException, IOException { + final WebBackendConnectionUpdate updateBody = new WebBackendConnectionUpdate() + .namespaceDefinition(expected.getNamespaceDefinition()) + .namespaceFormat(expected.getNamespaceFormat()) + .prefix(expected.getPrefix()) + .connectionId(expected.getConnectionId()) + .schedule(expected.getSchedule()) + .status(expected.getStatus()) + .syncCatalog(expectedWithNewSchema.getSyncCatalog()) + .skipReset(false) + .connectionId(expected.getConnectionId()); + + final UUID sourceId = UUID.randomUUID(); + + // existing connection has a breaking change + when(connectionsHandler.getConnection(expected.getConnectionId())).thenReturn( + new ConnectionRead().connectionId(expected.getConnectionId()).breakingChange(true).sourceId(sourceId)); + + final CatalogDiff catalogDiff = new CatalogDiff().transforms(List.of()); + when(configRepository.getMostRecentActorCatalogForSource(sourceId)).thenReturn(Optional.of(new ActorCatalog().withCatalog(Jsons.deserialize( + "{\"streams\": [{\"name\": \"cat_names\", \"namespace\": \"public\", \"json_schema\": {\"type\": \"object\", \"properties\": {\"id\": {\"type\": \"number\", \"airbyte_type\": \"integer\"}}}}]}")))); + when(connectionsHandler.getDiff(any(), any(), any())).thenReturn(catalogDiff, catalogDiff); + + when(configRepository.getConfiguredCatalogForConnection(expected.getConnectionId())) + .thenReturn(ConnectionHelpers.generateBasicConfiguredAirbyteCatalog()); + when(operationsHandler.listOperationsForConnection(any())).thenReturn(operationReadList); + + final ConnectionRead connectionRead = new ConnectionRead() + .connectionId(expected.getConnectionId()) + .sourceId(expected.getSourceId()) + .destinationId(expected.getDestinationId()) + .name(expected.getName()) + .namespaceDefinition(expected.getNamespaceDefinition()) + .namespaceFormat(expected.getNamespaceFormat()) + .prefix(expected.getPrefix()) + .syncCatalog(expectedWithNewSchema.getSyncCatalog()) + .status(expected.getStatus()) + .schedule(expected.getSchedule()) + .breakingChange(false); + + when(connectionsHandler.updateConnection(any())).thenReturn(connectionRead); + + final WebBackendConnectionRead result = wbHandler.webBackendUpdateConnection(updateBody); + + assertEquals(expectedWithNewSchema.getSyncCatalog(), result.getSyncCatalog()); + + final ConnectionIdRequestBody connectionId = new ConnectionIdRequestBody().connectionId(result.getConnectionId()); + ArgumentCaptor expectedArgumentCaptor = ArgumentCaptor.forClass(ConnectionUpdate.class); + verify(connectionsHandler, times(1)).updateConnection(expectedArgumentCaptor.capture()); + List connectionUpdateValues = expectedArgumentCaptor.getAllValues(); + // Expect the ConnectionUpdate object to have breakingChange: false + assertEquals(false, connectionUpdateValues.get(0).getBreakingChange()); + + verify(schedulerHandler, times(0)).resetConnection(connectionId); + verify(schedulerHandler, times(0)).syncConnection(connectionId); + verify(connectionsHandler, times(2)).getDiff(any(), any(), any()); + verify(connectionsHandler, times(1)).updateConnection(any()); + } + @Test void testUpdateSchemaWithDiscoveryFromEmpty() { final AirbyteCatalog original = new AirbyteCatalog().streams(List.of());