Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change where a connection is deleted #19096

Merged
merged 10 commits into from
Nov 9, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import io.airbyte.server.scheduler.EventRunner;
import io.airbyte.server.scheduler.TemporalEventRunner;
import io.airbyte.validation.json.JsonSchemaValidator;
import io.airbyte.workers.helper.ConnectionHelper;
import io.airbyte.workers.normalization.NormalizationRunnerFactory;
import io.temporal.serviceclient.WorkflowServiceStubs;
import java.net.http.HttpClient;
Expand Down Expand Up @@ -254,11 +255,14 @@ public static ServerRunnable getServer(final ServerFactory apiFactory,

final AttemptHandler attemptHandler = new AttemptHandler(jobPersistence);

final ConnectionHelper connectionHelper = new ConnectionHelper(configRepository, workspaceHelper);

final ConnectionsHandler connectionsHandler = new ConnectionsHandler(
configRepository,
workspaceHelper,
trackingClient,
eventRunner);
eventRunner,
connectionHelper);

final DestinationHandler destinationHandler = new DestinationHandler(
configRepository,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@
import io.airbyte.server.scheduler.SynchronousSchedulerClient;
import io.airbyte.validation.json.JsonSchemaValidator;
import io.airbyte.validation.json.JsonValidationException;
import io.airbyte.workers.helper.ConnectionHelper;
import java.io.File;
import java.io.IOException;
import java.util.Map;
Expand Down Expand Up @@ -169,11 +170,14 @@ public ConfigurationApi(final ConfigRepository configRepository,

final WorkspaceHelper workspaceHelper = new WorkspaceHelper(configRepository, jobPersistence);

final ConnectionHelper connectionHelper = new ConnectionHelper(configRepository, workspaceHelper);

connectionsHandler = new ConnectionsHandler(
configRepository,
workspaceHelper,
trackingClient,
eventRunner);
eventRunner,
connectionHelper);

schedulerHandler = new SchedulerHandler(
configRepository,
Expand Down Expand Up @@ -369,7 +373,7 @@ public void revokeSourceDefinitionFromWorkspace(final SourceDefinitionIdWithWork
}

@Override
public InternalOperationResult saveStats(SaveStatsRequestBody saveStatsRequestBody) {
public InternalOperationResult saveStats(final SaveStatsRequestBody saveStatsRequestBody) {
throw new UnsupportedOperationException();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,29 +78,34 @@ public class ConnectionsHandler {
private final WorkspaceHelper workspaceHelper;
private final TrackingClient trackingClient;
private final EventRunner eventRunner;
private final ConnectionHelper connectionHelper;

@VisibleForTesting
ConnectionsHandler(final ConfigRepository configRepository,
final Supplier<UUID> uuidGenerator,
final WorkspaceHelper workspaceHelper,
final TrackingClient trackingClient,
final EventRunner eventRunner) {
final EventRunner eventRunner,
final ConnectionHelper connectionHelper) {
this.configRepository = configRepository;
this.uuidGenerator = uuidGenerator;
this.workspaceHelper = workspaceHelper;
this.trackingClient = trackingClient;
this.eventRunner = eventRunner;
this.connectionHelper = connectionHelper;
}

public ConnectionsHandler(final ConfigRepository configRepository,
final WorkspaceHelper workspaceHelper,
final TrackingClient trackingClient,
final EventRunner eventRunner) {
final EventRunner eventRunner,
final ConnectionHelper connectionHelper) {
this(configRepository,
UUID::randomUUID,
workspaceHelper,
trackingClient,
eventRunner);
eventRunner,
connectionHelper);

}

Expand Down Expand Up @@ -545,8 +550,9 @@ public boolean matchSearch(final DestinationSearch destinationSearch, final Dest
return (destinationReadFromSearch == null || destinationReadFromSearch.equals(destinationRead));
}

public void deleteConnection(final UUID connectionId) {
eventRunner.deleteConnection(connectionId);
public void deleteConnection(final UUID connectionId) throws JsonValidationException, ConfigNotFoundException, IOException {
connectionHelper.deleteConnection(connectionId);
cgardens marked this conversation as resolved.
Show resolved Hide resolved
eventRunner.startNewCancellation(connectionId);
}

private ConnectionRead buildConnectionRead(final UUID connectionId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,11 +213,15 @@ public void deleteSource(final SourceRead source)
final var workspaceIdRequestBody = new WorkspaceIdRequestBody()
.workspaceId(source.getWorkspaceId());

connectionsHandler.listConnectionsForWorkspace(workspaceIdRequestBody)
final List<UUID> uuidsToDelete = connectionsHandler.listConnectionsForWorkspace(workspaceIdRequestBody)
.getConnections().stream()
.filter(con -> con.getSourceId().equals(source.getSourceId()))
.map(ConnectionRead::getConnectionId)
.forEach(connectionsHandler::deleteConnection);
.toList();

for (final UUID uuidToDelete : uuidsToDelete) {
connectionsHandler.deleteConnection(uuidToDelete);
}
Comment on lines +220 to +224
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could instead call .forEach instead of calling toList and then iterating over that list.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is done on purpose in order to have the exception thrown by deleteConnection be thrown by the method (deleteConnection starts throwing in this PR).


final var spec = getSpecFromSourceId(source.getSourceId());
final var fullConfig = secretsRepositoryReader.getSourceConnectionWithSecrets(source.getSourceId()).getConfiguration();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import io.airbyte.server.helpers.ConnectionHelpers;
import io.airbyte.server.scheduler.EventRunner;
import io.airbyte.validation.json.JsonValidationException;
import io.airbyte.workers.helper.ConnectionHelper;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -98,6 +99,7 @@ class ConnectionsHandlerTest {
private WorkspaceHelper workspaceHelper;
private TrackingClient trackingClient;
private EventRunner eventRunner;
private ConnectionHelper connectionHelper;

private static final String PRESTO_TO_HUDI = "presto to hudi";
private static final String PRESTO_TO_HUDI_PREFIX = "presto_to_hudi";
Expand Down Expand Up @@ -173,7 +175,7 @@ void setUp() throws IOException, JsonValidationException, ConfigNotFoundExceptio
workspaceHelper = mock(WorkspaceHelper.class);
trackingClient = mock(TrackingClient.class);
eventRunner = mock(EventRunner.class);

connectionHelper = mock(ConnectionHelper.class);
when(workspaceHelper.getWorkspaceForSourceIdIgnoreExceptions(sourceId)).thenReturn(workspaceId);
when(workspaceHelper.getWorkspaceForDestinationIdIgnoreExceptions(destinationId)).thenReturn(workspaceId);
when(workspaceHelper.getWorkspaceForOperationIdIgnoreExceptions(operationId)).thenReturn(workspaceId);
Expand All @@ -190,7 +192,8 @@ void setUp() throws JsonValidationException, ConfigNotFoundException, IOExceptio
uuidGenerator,
workspaceHelper,
trackingClient,
eventRunner);
eventRunner,
connectionHelper);

when(uuidGenerator.get()).thenReturn(standardSync.getConnectionId());
final StandardSourceDefinition sourceDefinition = new StandardSourceDefinition()
Expand Down Expand Up @@ -831,10 +834,11 @@ void testSearchConnections() throws JsonValidationException, ConfigNotFoundExcep
}

@Test
void testDeleteConnection() {
void testDeleteConnection() throws JsonValidationException, ConfigNotFoundException, IOException {
connectionsHandler.deleteConnection(connectionId);

verify(eventRunner).deleteConnection(connectionId);
verify(connectionHelper).deleteConnection(connectionId);
verify(eventRunner).startNewCancellation(connectionId);
}

@Test
Expand Down Expand Up @@ -904,7 +908,8 @@ void setUp() {
uuidGenerator,
workspaceHelper,
trackingClient,
eventRunner);
eventRunner,
connectionHelper);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -658,7 +658,6 @@ void testIncrementalSync() throws Exception {

}

@Disabled
@Test
@Order(14)
void testDeleteConnection() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,9 @@ public class ConnectionManagerWorkflowImpl implements ConnectionManagerWorkflow
private static final String CHECK_JOB_OUTPUT_TAG = "check_job_output";
private static final int CHECK_JOB_OUTPUT_TAG_CURRENT_VERSION = 1;

private static final String DONT_DELETE_IN_TEMPORAL_TAG = "dont_delete_in_temporal";
private static final int DONT_DELETE_IN_TEMPORAL_TAG_CURRENT_VERSION = 1;

private static final String DELETE_RESET_JOB_STREAMS_TAG = "delete_reset_job_streams";
private static final int DELETE_RESET_JOB_STREAMS_CURRENT_VERSION = 1;
private static final String RECORD_METRIC_TAG = "record_metric";
Expand Down Expand Up @@ -179,14 +182,20 @@ public void run(final ConnectionUpdaterInput connectionUpdaterInput) throws Retr
new RecordMetricInput(connectionUpdaterInput, Optional.of(FailureCause.CANCELED), OssMetricsRegistry.TEMPORAL_WORKFLOW_FAILURE, null));
}

if (workflowState.isDeleted()) {
if (workflowState.isRunning()) {
log.info("Cancelling the current running job because a connection deletion was requested");
reportCancelled(connectionUpdaterInput.getConnectionId());
final int dontDeleteInTemporal =
Workflow.getVersion(DONT_DELETE_IN_TEMPORAL_TAG, Workflow.DEFAULT_VERSION, DONT_DELETE_IN_TEMPORAL_TAG_CURRENT_VERSION);

if (dontDeleteInTemporal < DONT_DELETE_IN_TEMPORAL_TAG_CURRENT_VERSION) {
if (workflowState.isDeleted()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can combine these two if statements into one one to avoid one layer of nesting.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

if (workflowState.isRunning()) {
log.info("Cancelling the current running job because a connection deletion was requested");
// This call is not needed anymore since this will be cancel using the the cancellation state
reportCancelled(connectionUpdaterInput.getConnectionId());
}
log.info("Workflow deletion was requested. Calling deleteConnection activity before terminating the workflow.");
deleteConnectionBeforeTerminatingTheWorkflow();
return;
}
log.info("Workflow deletion was requested. Calling deleteConnection activity before terminating the workflow.");
deleteConnectionBeforeTerminatingTheWorkflow();
return;
}

// this means that the current workflow is being cancelled so that a reset can be run instead.
Expand Down Expand Up @@ -503,6 +512,7 @@ public void cancelJob() {
cancellableSyncWorkflow.cancel();
}

// TODO: Delete when the don't delete in temporal is removed
@Trace(operationName = WORKFLOW_TRACE_OPERATION_NAME)
@Override
public void deleteConnection() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.io.IOException;
import java.util.Map;

// TODO: Deleted when version is removed
@Singleton
@Requires(env = WorkerMode.CONTROL_PLANE)
public class ConnectionDeletionActivityImpl implements ConnectionDeletionActivity {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,7 @@ void cancelNonRunning() throws InterruptedException {
Mockito.verifyNoInteractions(mJobCreationAndStatusUpdateActivity);
}

// TODO: delete when the signal method can be removed
@Test
@Timeout(value = 10,
unit = TimeUnit.SECONDS)
Expand Down Expand Up @@ -533,7 +534,7 @@ void deleteSync() throws InterruptedException {
&& changedStateEvent.isValue())
.isEmpty();

Mockito.verify(mConnectionDeletionActivity, Mockito.times(1)).deleteConnection(Mockito.any());
Mockito.verify(mConnectionDeletionActivity, Mockito.times(0)).deleteConnection(Mockito.any());
}

@Test
Expand Down