diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/ConnectionsHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/ConnectionsHandler.java index c5eb017c85b3a..8bac37471b502 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/ConnectionsHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/ConnectionsHandler.java @@ -212,6 +212,11 @@ public ConnectionRead updateConnection(final ConnectionUpdate connectionUpdate) new HashSet<>(connectionUpdate.getOperationIds())); configRepository.writeStandardSync(newConnection); + + if (featureFlags.usesNewScheduler()) { + temporalWorkerRunFactory.update(connectionUpdate); + } + return buildConnectionRead(connectionUpdate.getConnectionId()); } diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendConnectionsHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendConnectionsHandler.java index e99bcd781ead2..836ebfd378131 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendConnectionsHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendConnectionsHandler.java @@ -272,8 +272,6 @@ public WebBackendConnectionRead webBackendUpdateConnection(final WebBackendConne connectionRead = connectionsHandler.updateConnection(connectionUpdate); if (needReset) { - // todo (cgardens) - temporalWorkerRunFactory CANNOT be here. - temporalWorkerRunFactory.update(connectionUpdate); // todo (cgardens) - temporalWorkerRunFactory CANNOT be here. temporalWorkerRunFactory.synchronousResetConnection(webBackendConnectionUpdate.getConnectionId()); diff --git a/airbyte-server/src/test/java/io/airbyte/server/handlers/ConnectionsHandlerTest.java b/airbyte-server/src/test/java/io/airbyte/server/handlers/ConnectionsHandlerTest.java index 1d596113c5e93..2ada8688995f5 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/handlers/ConnectionsHandlerTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/handlers/ConnectionsHandlerTest.java @@ -64,6 +64,8 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; class ConnectionsHandlerTest { @@ -293,8 +295,12 @@ void testCreateConnectionWithBadDefinitionIds() throws JsonValidationException, } - @Test - void testUpdateConnection() throws JsonValidationException, ConfigNotFoundException, IOException { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testUpdateConnection(boolean useNewScheduler) throws JsonValidationException, ConfigNotFoundException, IOException { + when(featureFlags.usesNewScheduler()) + .thenReturn(useNewScheduler); + final AirbyteCatalog catalog = ConnectionHelpers.generateBasicApiCatalog(); catalog.getStreams().get(0).getStream().setName("azkaban_users"); catalog.getStreams().get(0).getConfig().setAliasName("azkaban_users"); @@ -349,6 +355,10 @@ void testUpdateConnection() throws JsonValidationException, ConfigNotFoundExcept assertEquals(expectedConnectionRead, actualConnectionRead); verify(configRepository).writeStandardSync(updatedStandardSync); + + if (useNewScheduler) { + verify(temporalWorkflowHandler).update(connectionUpdate); + } } @Test diff --git a/airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendConnectionsHandlerTest.java b/airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendConnectionsHandlerTest.java index 244780c29b4ff..095c16778f99a 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendConnectionsHandlerTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendConnectionsHandlerTest.java @@ -589,6 +589,7 @@ void testUpdateConnectionWithUpdatedSchemaNewScheduler() throws JsonValidationEx final ConnectionIdRequestBody connectionId = new ConnectionIdRequestBody().connectionId(result.getConnectionId()); verify(schedulerHandler, times(0)).resetConnection(connectionId); verify(schedulerHandler, times(0)).syncConnection(connectionId); + verify(connectionsHandler, times(1)).updateConnection(any()); final InOrder orderVerifier = inOrder(temporalWorkerRunFactory); orderVerifier.verify(temporalWorkerRunFactory, times(1)).synchronousResetConnection(connectionId.getConnectionId()); orderVerifier.verify(temporalWorkerRunFactory, times(1)).startNewManualSync(connectionId.getConnectionId());