Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change where a connection is deleted #19096

Merged
merged 10 commits into from
Nov 9, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,22 @@
@Slf4j
public class ConnectionManagerUtils {

/**
* Send a cancellation to the workflow. It will swallow any exception and won't check if the
* workflow is already deleted when being cancel.
*/
public void deleteWorkflowIfItExist(final WorkflowClient client,
final UUID connectionId) {
try {
final ConnectionManagerWorkflow connectionManagerWorkflow =
client.newWorkflowStub(ConnectionManagerWorkflow.class, getConnectionManagerName(connectionId));
connectionManagerWorkflow.deleteConnection();
} catch (final Exception e) {
log.warn("The workflow is not reachable when trying to cancel it");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can we log the exception?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Opps, I forget that. Will update.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}

}

/**
* Attempts to send a signal to the existing ConnectionManagerWorkflow for the provided connection.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -476,13 +476,13 @@ public ConnectionManagerWorkflow submitConnectionUpdaterAsync(final UUID connect
return connectionManagerWorkflow;
}

public void deleteConnection(final UUID connectionId) {
try {
connectionManagerUtils.signalWorkflowAndRepairIfNecessary(client, connectionId,
connectionManagerWorkflow -> connectionManagerWorkflow::deleteConnection);
} catch (final DeletedWorkflowException e) {
log.info("Connection {} has already been deleted.", connectionId);
}
/**
* This will cancel a workflow even if the connection is deleted already
*
* @param connectionId - connectionId to cancel
*/
public void forceDeleteWorkflow(final UUID connectionId) {
connectionManagerUtils.deleteWorkflowIfItExist(client, connectionId);
}

public void update(final UUID connectionId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -326,12 +326,12 @@ void migrateCalled() {

@Nested
@DisplayName("Test delete connection method.")
class DeleteConnection {
class ForceCancelConnection {

@Test
@SuppressWarnings(UNCHECKED)
@DisplayName("Test delete connection method when workflow is in a running state.")
void testDeleteConnection() {
void testforceCancelConnection() {
final ConnectionManagerWorkflow mConnectionManagerWorkflow = mock(ConnectionManagerWorkflow.class);
final WorkflowState mWorkflowState = mock(WorkflowState.class);
when(mConnectionManagerWorkflow.getState()).thenReturn(mWorkflowState);
Expand All @@ -349,54 +349,10 @@ void testDeleteConnection() {
.withConfiguredAirbyteCatalog(new ConfiguredAirbyteCatalog());

temporalClient.submitSync(JOB_ID, ATTEMPT_ID, syncConfig, CONNECTION_ID);
temporalClient.deleteConnection(CONNECTION_ID);
temporalClient.forceDeleteWorkflow(CONNECTION_ID);

verify(workflowClient, Mockito.never()).newSignalWithStartRequest();
verify(mConnectionManagerWorkflow).deleteConnection();
}

@Test
@SuppressWarnings(UNCHECKED)
@DisplayName("Test delete connection method when workflow is in an unexpected state")
void testDeleteConnectionInUnexpectedState() {
final ConnectionManagerWorkflow mTerminatedConnectionManagerWorkflow = mock(ConnectionManagerWorkflow.class);
when(mTerminatedConnectionManagerWorkflow.getState())
.thenThrow(new IllegalStateException(EXCEPTION_MESSAGE));
when(workflowClient.newWorkflowStub(any(Class.class), any(String.class))).thenReturn(mTerminatedConnectionManagerWorkflow);

final ConnectionManagerWorkflow mNewConnectionManagerWorkflow = mock(ConnectionManagerWorkflow.class);
when(workflowClient.newWorkflowStub(any(Class.class), any(WorkflowOptions.class))).thenReturn(mNewConnectionManagerWorkflow);
final BatchRequest mBatchRequest = mock(BatchRequest.class);
when(workflowClient.newSignalWithStartRequest()).thenReturn(mBatchRequest);

temporalClient.deleteConnection(CONNECTION_ID);
verify(workflowClient).signalWithStart(mBatchRequest);

// Verify that the deleteConnection signal was passed to the batch request by capturing the
// argument,
// executing the signal, and verifying that the desired signal was executed
final ArgumentCaptor<Proc> batchRequestAddArgCaptor = ArgumentCaptor.forClass(Proc.class);
verify(mBatchRequest).add(batchRequestAddArgCaptor.capture());
final Proc signal = batchRequestAddArgCaptor.getValue();
signal.apply();
verify(mNewConnectionManagerWorkflow).deleteConnection();
}

@Test
@SuppressWarnings(UNCHECKED)
@DisplayName("Test delete connection method when workflow has already been deleted")
void testDeleteConnectionOnDeletedWorkflow() {
final ConnectionManagerWorkflow mConnectionManagerWorkflow = mock(ConnectionManagerWorkflow.class);
final WorkflowState mWorkflowState = mock(WorkflowState.class);
when(mConnectionManagerWorkflow.getState()).thenReturn(mWorkflowState);
when(mWorkflowState.isDeleted()).thenReturn(true);
when(workflowClient.newWorkflowStub(any(), anyString())).thenReturn(mConnectionManagerWorkflow);
mockWorkflowStatus(WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_COMPLETED);

temporalClient.deleteConnection(CONNECTION_ID);

verify(temporalClient).deleteConnection(CONNECTION_ID);
verifyNoMoreInteractions(temporalClient);
verify(connectionManagerUtils).deleteWorkflowIfItExist(workflowClient, CONNECTION_ID);
verify(mConnectionManagerWorkflow).cancelJob();
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import io.airbyte.server.scheduler.EventRunner;
import io.airbyte.server.scheduler.TemporalEventRunner;
import io.airbyte.validation.json.JsonSchemaValidator;
import io.airbyte.workers.helper.ConnectionHelper;
import io.airbyte.workers.normalization.NormalizationRunnerFactory;
import io.temporal.serviceclient.WorkflowServiceStubs;
import java.net.http.HttpClient;
Expand Down Expand Up @@ -254,11 +255,14 @@ public static ServerRunnable getServer(final ServerFactory apiFactory,

final AttemptHandler attemptHandler = new AttemptHandler(jobPersistence);

final ConnectionHelper connectionHelper = new ConnectionHelper(configRepository, workspaceHelper);

final ConnectionsHandler connectionsHandler = new ConnectionsHandler(
configRepository,
workspaceHelper,
trackingClient,
eventRunner);
eventRunner,
connectionHelper);

final DestinationHandler destinationHandler = new DestinationHandler(
configRepository,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@
import io.airbyte.server.scheduler.SynchronousSchedulerClient;
import io.airbyte.validation.json.JsonSchemaValidator;
import io.airbyte.validation.json.JsonValidationException;
import io.airbyte.workers.helper.ConnectionHelper;
import java.io.File;
import java.io.IOException;
import java.util.Map;
Expand Down Expand Up @@ -169,11 +170,14 @@ public ConfigurationApi(final ConfigRepository configRepository,

final WorkspaceHelper workspaceHelper = new WorkspaceHelper(configRepository, jobPersistence);

final ConnectionHelper connectionHelper = new ConnectionHelper(configRepository, workspaceHelper);

connectionsHandler = new ConnectionsHandler(
configRepository,
workspaceHelper,
trackingClient,
eventRunner);
eventRunner,
connectionHelper);

schedulerHandler = new SchedulerHandler(
configRepository,
Expand Down Expand Up @@ -369,7 +373,7 @@ public void revokeSourceDefinitionFromWorkspace(final SourceDefinitionIdWithWork
}

@Override
public InternalOperationResult saveStats(SaveStatsRequestBody saveStatsRequestBody) {
public InternalOperationResult saveStats(final SaveStatsRequestBody saveStatsRequestBody) {
throw new UnsupportedOperationException();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,29 +78,34 @@ public class ConnectionsHandler {
private final WorkspaceHelper workspaceHelper;
private final TrackingClient trackingClient;
private final EventRunner eventRunner;
private final ConnectionHelper connectionHelper;

@VisibleForTesting
ConnectionsHandler(final ConfigRepository configRepository,
final Supplier<UUID> uuidGenerator,
final WorkspaceHelper workspaceHelper,
final TrackingClient trackingClient,
final EventRunner eventRunner) {
final EventRunner eventRunner,
final ConnectionHelper connectionHelper) {
this.configRepository = configRepository;
this.uuidGenerator = uuidGenerator;
this.workspaceHelper = workspaceHelper;
this.trackingClient = trackingClient;
this.eventRunner = eventRunner;
this.connectionHelper = connectionHelper;
}

public ConnectionsHandler(final ConfigRepository configRepository,
final WorkspaceHelper workspaceHelper,
final TrackingClient trackingClient,
final EventRunner eventRunner) {
final EventRunner eventRunner,
final ConnectionHelper connectionHelper) {
this(configRepository,
UUID::randomUUID,
workspaceHelper,
trackingClient,
eventRunner);
eventRunner,
connectionHelper);

}

Expand Down Expand Up @@ -545,8 +550,9 @@ public boolean matchSearch(final DestinationSearch destinationSearch, final Dest
return (destinationReadFromSearch == null || destinationReadFromSearch.equals(destinationRead));
}

public void deleteConnection(final UUID connectionId) {
eventRunner.deleteConnection(connectionId);
public void deleteConnection(final UUID connectionId) throws JsonValidationException, ConfigNotFoundException, IOException {
connectionHelper.deleteConnection(connectionId);
cgardens marked this conversation as resolved.
Show resolved Hide resolved
eventRunner.forceDeleteConnection(connectionId);
}

private ConnectionRead buildConnectionRead(final UUID connectionId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,11 +213,15 @@ public void deleteSource(final SourceRead source)
final var workspaceIdRequestBody = new WorkspaceIdRequestBody()
.workspaceId(source.getWorkspaceId());

connectionsHandler.listConnectionsForWorkspace(workspaceIdRequestBody)
final List<UUID> uuidsToDelete = connectionsHandler.listConnectionsForWorkspace(workspaceIdRequestBody)
.getConnections().stream()
.filter(con -> con.getSourceId().equals(source.getSourceId()))
.map(ConnectionRead::getConnectionId)
.forEach(connectionsHandler::deleteConnection);
.toList();

for (final UUID uuidToDelete : uuidsToDelete) {
connectionsHandler.deleteConnection(uuidToDelete);
}
Comment on lines +220 to +224
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could instead call .forEach instead of calling toList and then iterating over that list.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is done on purpose in order to have the exception thrown by deleteConnection be thrown by the method (deleteConnection starts throwing in this PR).


final var spec = getSpecFromSourceId(source.getSourceId());
final var fullConfig = secretsRepositoryReader.getSourceConnectionWithSecrets(source.getSourceId()).getConfiguration();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ public interface EventRunner {

ManualOperationResult resetConnection(final UUID connectionId, final List<StreamDescriptor> streamsToReset, final boolean runSyncImmediately);

void deleteConnection(final UUID connectionId);
void forceDeleteConnection(final UUID connectionId);

// TODO: Delete
@Deprecated(forRemoval = true)
void migrateSyncIfNeeded(final Set<UUID> connectionIds);

void update(final UUID connectionId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ public ManualOperationResult resetConnection(final UUID connectionId,
}

@Override
public void deleteConnection(final UUID connectionId) {
temporalClient.deleteConnection(connectionId);
public void forceDeleteConnection(final UUID connectionId) {
temporalClient.forceDeleteWorkflow(connectionId);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import io.airbyte.server.helpers.ConnectionHelpers;
import io.airbyte.server.scheduler.EventRunner;
import io.airbyte.validation.json.JsonValidationException;
import io.airbyte.workers.helper.ConnectionHelper;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -98,6 +99,7 @@ class ConnectionsHandlerTest {
private WorkspaceHelper workspaceHelper;
private TrackingClient trackingClient;
private EventRunner eventRunner;
private ConnectionHelper connectionHelper;

private static final String PRESTO_TO_HUDI = "presto to hudi";
private static final String PRESTO_TO_HUDI_PREFIX = "presto_to_hudi";
Expand Down Expand Up @@ -173,7 +175,7 @@ void setUp() throws IOException, JsonValidationException, ConfigNotFoundExceptio
workspaceHelper = mock(WorkspaceHelper.class);
trackingClient = mock(TrackingClient.class);
eventRunner = mock(EventRunner.class);

connectionHelper = mock(ConnectionHelper.class);
when(workspaceHelper.getWorkspaceForSourceIdIgnoreExceptions(sourceId)).thenReturn(workspaceId);
when(workspaceHelper.getWorkspaceForDestinationIdIgnoreExceptions(destinationId)).thenReturn(workspaceId);
when(workspaceHelper.getWorkspaceForOperationIdIgnoreExceptions(operationId)).thenReturn(workspaceId);
Expand All @@ -190,7 +192,8 @@ void setUp() throws JsonValidationException, ConfigNotFoundException, IOExceptio
uuidGenerator,
workspaceHelper,
trackingClient,
eventRunner);
eventRunner,
connectionHelper);

when(uuidGenerator.get()).thenReturn(standardSync.getConnectionId());
final StandardSourceDefinition sourceDefinition = new StandardSourceDefinition()
Expand Down Expand Up @@ -831,10 +834,11 @@ void testSearchConnections() throws JsonValidationException, ConfigNotFoundExcep
}

@Test
void testDeleteConnection() {
void testDeleteConnection() throws JsonValidationException, ConfigNotFoundException, IOException {
connectionsHandler.deleteConnection(connectionId);

verify(eventRunner).deleteConnection(connectionId);
verify(connectionHelper).deleteConnection(connectionId);
verify(eventRunner).startNewCancellation(connectionId);
}

@Test
Expand Down Expand Up @@ -904,7 +908,8 @@ void setUp() {
uuidGenerator,
workspaceHelper,
trackingClient,
eventRunner);
eventRunner,
connectionHelper);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -658,7 +658,6 @@ void testIncrementalSync() throws Exception {

}

@Disabled
@Test
@Order(14)
void testDeleteConnection() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,9 @@ public class ConnectionManagerWorkflowImpl implements ConnectionManagerWorkflow
private static final String CHECK_JOB_OUTPUT_TAG = "check_job_output";
private static final int CHECK_JOB_OUTPUT_TAG_CURRENT_VERSION = 1;

private static final String DONT_DELETE_IN_TEMPORAL_TAG = "dont_delete_in_temporal";
private static final int DONT_DELETE_IN_TEMPORAL_TAG_CURRENT_VERSION = 1;

private static final String DELETE_RESET_JOB_STREAMS_TAG = "delete_reset_job_streams";
private static final int DELETE_RESET_JOB_STREAMS_CURRENT_VERSION = 1;
private static final String RECORD_METRIC_TAG = "record_metric";
Expand Down Expand Up @@ -179,9 +182,13 @@ public void run(final ConnectionUpdaterInput connectionUpdaterInput) throws Retr
new RecordMetricInput(connectionUpdaterInput, Optional.of(FailureCause.CANCELED), OssMetricsRegistry.TEMPORAL_WORKFLOW_FAILURE, null));
}

if (workflowState.isDeleted()) {
final int dontDeleteInTemporal =
Workflow.getVersion(DONT_DELETE_IN_TEMPORAL_TAG, Workflow.DEFAULT_VERSION, DONT_DELETE_IN_TEMPORAL_TAG_CURRENT_VERSION);

if (dontDeleteInTemporal < DONT_DELETE_IN_TEMPORAL_TAG_CURRENT_VERSION && workflowState.isDeleted()) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not placed in the right place, if should be on like 195. I moved the PR back to a draft.

if (workflowState.isRunning()) {
log.info("Cancelling the current running job because a connection deletion was requested");
// This call is not needed anymore since this will be cancel using the the cancellation state
reportCancelled(connectionUpdaterInput.getConnectionId());
}
log.info("Workflow deletion was requested. Calling deleteConnection activity before terminating the workflow.");
Expand Down Expand Up @@ -503,6 +510,7 @@ public void cancelJob() {
cancellableSyncWorkflow.cancel();
}

// TODO: Delete when the don't delete in temporal is removed
@Trace(operationName = WORKFLOW_TRACE_OPERATION_NAME)
@Override
public void deleteConnection() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.io.IOException;
import java.util.Map;

// TODO: Deleted when version is removed
@Singleton
@Requires(env = WorkerMode.CONTROL_PLANE)
public class ConnectionDeletionActivityImpl implements ConnectionDeletionActivity {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,7 @@ void cancelNonRunning() throws InterruptedException {
Mockito.verifyNoInteractions(mJobCreationAndStatusUpdateActivity);
}

// TODO: delete when the signal method can be removed
@Test
@Timeout(value = 10,
unit = TimeUnit.SECONDS)
Expand Down Expand Up @@ -533,7 +534,7 @@ void deleteSync() throws InterruptedException {
&& changedStateEvent.isValue())
.isEmpty();

Mockito.verify(mConnectionDeletionActivity, Mockito.times(1)).deleteConnection(Mockito.any());
Mockito.verify(mConnectionDeletionActivity, Mockito.times(0)).deleteConnection(Mockito.any());
}

@Test
Expand Down