Skip to content

Commit

Permalink
Bmoric/restore update with temporal (#10713)
Browse files Browse the repository at this point in the history
Restore the missing update call to temporal.

It was making the update of a schedule to not be effective immediately.
  • Loading branch information
benmoriceau authored and etsybaev committed Mar 5, 2022
1 parent 1e1306e commit 082737d
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,11 @@ public ConnectionRead updateConnection(final ConnectionUpdate connectionUpdate)
new HashSet<>(connectionUpdate.getOperationIds()));

configRepository.writeStandardSync(newConnection);

if (featureFlags.usesNewScheduler()) {
temporalWorkerRunFactory.update(connectionUpdate);
}

return buildConnectionRead(connectionUpdate.getConnectionId());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,8 +272,6 @@ public WebBackendConnectionRead webBackendUpdateConnection(final WebBackendConne
connectionRead = connectionsHandler.updateConnection(connectionUpdate);

if (needReset) {
// todo (cgardens) - temporalWorkerRunFactory CANNOT be here.
temporalWorkerRunFactory.update(connectionUpdate);

// todo (cgardens) - temporalWorkerRunFactory CANNOT be here.
temporalWorkerRunFactory.synchronousResetConnection(webBackendConnectionUpdate.getConnectionId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

class ConnectionsHandlerTest {

Expand Down Expand Up @@ -293,8 +295,12 @@ void testCreateConnectionWithBadDefinitionIds() throws JsonValidationException,

}

@Test
void testUpdateConnection() throws JsonValidationException, ConfigNotFoundException, IOException {
@ParameterizedTest
@ValueSource(booleans = {true, false})
void testUpdateConnection(boolean useNewScheduler) throws JsonValidationException, ConfigNotFoundException, IOException {
when(featureFlags.usesNewScheduler())
.thenReturn(useNewScheduler);

final AirbyteCatalog catalog = ConnectionHelpers.generateBasicApiCatalog();
catalog.getStreams().get(0).getStream().setName("azkaban_users");
catalog.getStreams().get(0).getConfig().setAliasName("azkaban_users");
Expand Down Expand Up @@ -349,6 +355,10 @@ void testUpdateConnection() throws JsonValidationException, ConfigNotFoundExcept
assertEquals(expectedConnectionRead, actualConnectionRead);

verify(configRepository).writeStandardSync(updatedStandardSync);

if (useNewScheduler) {
verify(temporalWorkflowHandler).update(connectionUpdate);
}
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,7 @@ void testUpdateConnectionWithUpdatedSchemaNewScheduler() throws JsonValidationEx
final ConnectionIdRequestBody connectionId = new ConnectionIdRequestBody().connectionId(result.getConnectionId());
verify(schedulerHandler, times(0)).resetConnection(connectionId);
verify(schedulerHandler, times(0)).syncConnection(connectionId);
verify(connectionsHandler, times(1)).updateConnection(any());
final InOrder orderVerifier = inOrder(temporalWorkerRunFactory);
orderVerifier.verify(temporalWorkerRunFactory, times(1)).synchronousResetConnection(connectionId.getConnectionId());
orderVerifier.verify(temporalWorkerRunFactory, times(1)).startNewManualSync(connectionId.getConnectionId());
Expand Down

0 comments on commit 082737d

Please sign in to comment.