From 082737d26b62d0f6089376bfb924dbb7269715d4 Mon Sep 17 00:00:00 2001 From: Benoit Moriceau Date: Mon, 28 Feb 2022 11:18:35 -0800 Subject: [PATCH] Bmoric/restore update with temporal (#10713) Restore the missing update call to temporal. It was making the update of a schedule to not be effective immediately. --- .../server/handlers/ConnectionsHandler.java | 5 +++++ .../handlers/WebBackendConnectionsHandler.java | 2 -- .../server/handlers/ConnectionsHandlerTest.java | 14 ++++++++++++-- .../handlers/WebBackendConnectionsHandlerTest.java | 1 + 4 files changed, 18 insertions(+), 4 deletions(-) diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/ConnectionsHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/ConnectionsHandler.java index c5eb017c85b3a6..8bac37471b502e 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/ConnectionsHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/ConnectionsHandler.java @@ -212,6 +212,11 @@ public ConnectionRead updateConnection(final ConnectionUpdate connectionUpdate) new HashSet<>(connectionUpdate.getOperationIds())); configRepository.writeStandardSync(newConnection); + + if (featureFlags.usesNewScheduler()) { + temporalWorkerRunFactory.update(connectionUpdate); + } + return buildConnectionRead(connectionUpdate.getConnectionId()); } diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendConnectionsHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendConnectionsHandler.java index e99bcd781ead22..836ebfd3781314 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendConnectionsHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendConnectionsHandler.java @@ -272,8 +272,6 @@ public WebBackendConnectionRead webBackendUpdateConnection(final WebBackendConne connectionRead = connectionsHandler.updateConnection(connectionUpdate); if (needReset) { - // todo (cgardens) - temporalWorkerRunFactory CANNOT be here. - temporalWorkerRunFactory.update(connectionUpdate); // todo (cgardens) - temporalWorkerRunFactory CANNOT be here. temporalWorkerRunFactory.synchronousResetConnection(webBackendConnectionUpdate.getConnectionId()); diff --git a/airbyte-server/src/test/java/io/airbyte/server/handlers/ConnectionsHandlerTest.java b/airbyte-server/src/test/java/io/airbyte/server/handlers/ConnectionsHandlerTest.java index 1d596113c5e93c..2ada8688995f57 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/handlers/ConnectionsHandlerTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/handlers/ConnectionsHandlerTest.java @@ -64,6 +64,8 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; class ConnectionsHandlerTest { @@ -293,8 +295,12 @@ void testCreateConnectionWithBadDefinitionIds() throws JsonValidationException, } - @Test - void testUpdateConnection() throws JsonValidationException, ConfigNotFoundException, IOException { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testUpdateConnection(boolean useNewScheduler) throws JsonValidationException, ConfigNotFoundException, IOException { + when(featureFlags.usesNewScheduler()) + .thenReturn(useNewScheduler); + final AirbyteCatalog catalog = ConnectionHelpers.generateBasicApiCatalog(); catalog.getStreams().get(0).getStream().setName("azkaban_users"); catalog.getStreams().get(0).getConfig().setAliasName("azkaban_users"); @@ -349,6 +355,10 @@ void testUpdateConnection() throws JsonValidationException, ConfigNotFoundExcept assertEquals(expectedConnectionRead, actualConnectionRead); verify(configRepository).writeStandardSync(updatedStandardSync); + + if (useNewScheduler) { + verify(temporalWorkflowHandler).update(connectionUpdate); + } } @Test diff --git a/airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendConnectionsHandlerTest.java b/airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendConnectionsHandlerTest.java index 244780c29b4ff8..095c16778f99a0 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendConnectionsHandlerTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendConnectionsHandlerTest.java @@ -589,6 +589,7 @@ void testUpdateConnectionWithUpdatedSchemaNewScheduler() throws JsonValidationEx final ConnectionIdRequestBody connectionId = new ConnectionIdRequestBody().connectionId(result.getConnectionId()); verify(schedulerHandler, times(0)).resetConnection(connectionId); verify(schedulerHandler, times(0)).syncConnection(connectionId); + verify(connectionsHandler, times(1)).updateConnection(any()); final InOrder orderVerifier = inOrder(temporalWorkerRunFactory); orderVerifier.verify(temporalWorkerRunFactory, times(1)).synchronousResetConnection(connectionId.getConnectionId()); orderVerifier.verify(temporalWorkerRunFactory, times(1)).startNewManualSync(connectionId.getConnectionId());