From 752950070eb6bedce5ac2ea68ebb4f7fbb6694bc Mon Sep 17 00:00:00 2001 From: Benoit Moriceau Date: Fri, 25 Feb 2022 12:09:23 -0800 Subject: [PATCH 1/3] Restore the temporal update call --- .../java/io/airbyte/server/handlers/ConnectionsHandler.java | 5 +++++ .../server/handlers/WebBackendConnectionsHandler.java | 2 -- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/ConnectionsHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/ConnectionsHandler.java index c5eb017c85b3..8bac37471b50 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/ConnectionsHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/ConnectionsHandler.java @@ -212,6 +212,11 @@ public ConnectionRead updateConnection(final ConnectionUpdate connectionUpdate) new HashSet<>(connectionUpdate.getOperationIds())); configRepository.writeStandardSync(newConnection); + + if (featureFlags.usesNewScheduler()) { + temporalWorkerRunFactory.update(connectionUpdate); + } + return buildConnectionRead(connectionUpdate.getConnectionId()); } diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendConnectionsHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendConnectionsHandler.java index e99bcd781ead..836ebfd37813 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendConnectionsHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendConnectionsHandler.java @@ -272,8 +272,6 @@ public WebBackendConnectionRead webBackendUpdateConnection(final WebBackendConne connectionRead = connectionsHandler.updateConnection(connectionUpdate); if (needReset) { - // todo (cgardens) - temporalWorkerRunFactory CANNOT be here. - temporalWorkerRunFactory.update(connectionUpdate); // todo (cgardens) - temporalWorkerRunFactory CANNOT be here. temporalWorkerRunFactory.synchronousResetConnection(webBackendConnectionUpdate.getConnectionId()); From c9f14c9d5fbb61778a57237f80acb701fcdb9523 Mon Sep 17 00:00:00 2001 From: Benoit Moriceau Date: Mon, 28 Feb 2022 09:28:16 -0800 Subject: [PATCH 2/3] Add test --- .../server/handlers/ConnectionsHandlerTest.java | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/airbyte-server/src/test/java/io/airbyte/server/handlers/ConnectionsHandlerTest.java b/airbyte-server/src/test/java/io/airbyte/server/handlers/ConnectionsHandlerTest.java index 1d596113c5e9..2ada8688995f 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/handlers/ConnectionsHandlerTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/handlers/ConnectionsHandlerTest.java @@ -64,6 +64,8 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; class ConnectionsHandlerTest { @@ -293,8 +295,12 @@ void testCreateConnectionWithBadDefinitionIds() throws JsonValidationException, } - @Test - void testUpdateConnection() throws JsonValidationException, ConfigNotFoundException, IOException { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testUpdateConnection(boolean useNewScheduler) throws JsonValidationException, ConfigNotFoundException, IOException { + when(featureFlags.usesNewScheduler()) + .thenReturn(useNewScheduler); + final AirbyteCatalog catalog = ConnectionHelpers.generateBasicApiCatalog(); catalog.getStreams().get(0).getStream().setName("azkaban_users"); catalog.getStreams().get(0).getConfig().setAliasName("azkaban_users"); @@ -349,6 +355,10 @@ void testUpdateConnection() throws JsonValidationException, ConfigNotFoundExcept assertEquals(expectedConnectionRead, actualConnectionRead); verify(configRepository).writeStandardSync(updatedStandardSync); + + if (useNewScheduler) { + verify(temporalWorkflowHandler).update(connectionUpdate); + } } @Test From a743383b13f93205550656cc3b317807c3ba85f2 Mon Sep 17 00:00:00 2001 From: Benoit Moriceau Date: Mon, 28 Feb 2022 09:36:12 -0800 Subject: [PATCH 3/3] Update webbackend test --- .../server/handlers/WebBackendConnectionsHandlerTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendConnectionsHandlerTest.java b/airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendConnectionsHandlerTest.java index 244780c29b4f..095c16778f99 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendConnectionsHandlerTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendConnectionsHandlerTest.java @@ -589,6 +589,7 @@ void testUpdateConnectionWithUpdatedSchemaNewScheduler() throws JsonValidationEx final ConnectionIdRequestBody connectionId = new ConnectionIdRequestBody().connectionId(result.getConnectionId()); verify(schedulerHandler, times(0)).resetConnection(connectionId); verify(schedulerHandler, times(0)).syncConnection(connectionId); + verify(connectionsHandler, times(1)).updateConnection(any()); final InOrder orderVerifier = inOrder(temporalWorkerRunFactory); orderVerifier.verify(temporalWorkerRunFactory, times(1)).synchronousResetConnection(connectionId.getConnectionId()); orderVerifier.verify(temporalWorkerRunFactory, times(1)).startNewManualSync(connectionId.getConnectionId());