Skip to content

Commit

Permalink
[Connector API] Support cleaning up sync jobs when deleting a connect…
Browse files Browse the repository at this point in the history
…or (#107253)
  • Loading branch information
jedrazb authored and craigtaverner committed Apr 11, 2024
1 parent 54a9ece commit a983a1d
Show file tree
Hide file tree
Showing 13 changed files with 289 additions and 18 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/107253.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 107253
summary: "[Connector API] Support cleaning up sync jobs when deleting a connector"
area: Application
type: feature
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,13 @@
}
}
]
},
"params": {
"delete_sync_jobs": {
"type": "boolean",
"default": false,
"description": "Determines whether associated sync jobs are also deleted."
}
}
}
}
1 change: 1 addition & 0 deletions x-pack/plugin/ent-search/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ dependencies {
testImplementation(testArtifact(project(xpackModule('core'))))
testImplementation project(":test:framework")
testImplementation(project(':modules:lang-mustache'))
testImplementation(project(':modules:reindex'))

javaRestTestImplementation(project(path: xpackModule('core')))
javaRestTestImplementation(testArtifact(project(xpackModule('core'))))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,80 @@ setup:
connector.get:
connector_id: test-connector-to-delete


---
"Delete Connector - deletes associated sync jobs":

- do:
connector_sync_job.post:
body:
id: test-connector-to-delete
job_type: full
trigger_method: on_demand
- do:
connector_sync_job.post:
body:
id: test-connector-to-delete
job_type: full
trigger_method: on_demand
- do:
connector_sync_job.post:
body:
id: test-connector-to-delete
job_type: full
trigger_method: on_demand

- do:
connector_sync_job.list:
connector_id: test-connector-to-delete

- match: { count: 3 }

- do:
connector.delete:
connector_id: test-connector-to-delete
delete_sync_jobs: true

- match: { acknowledged: true }


- do:
connector_sync_job.list:
connector_id: test-connector-to-delete

- match: { count: 0 }


---
"Delete Connector - doesn't associated sync jobs when delete_sync_jobs is false":

- do:
connector_sync_job.post:
body:
id: test-connector-to-delete
job_type: full
trigger_method: on_demand

- do:
connector_sync_job.list:
connector_id: test-connector-to-delete

- match: { count: 1 }

- do:
connector.delete:
connector_id: test-connector-to-delete
delete_sync_jobs: false

- match: { acknowledged: true }


- do:
connector_sync_job.list:
connector_id: test-connector-to-delete

- match: { count: 1 }

---
"Delete Connector - Connector does not exist":
- do:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@
import org.elasticsearch.xpack.application.connector.action.UpdateConnectorSchedulingAction;
import org.elasticsearch.xpack.application.connector.action.UpdateConnectorServiceTypeAction;
import org.elasticsearch.xpack.application.connector.action.UpdateConnectorStatusAction;
import org.elasticsearch.xpack.application.connector.syncjob.ConnectorSyncJob;
import org.elasticsearch.xpack.application.connector.syncjob.ConnectorSyncJobIndexService;

import java.time.Instant;
import java.util.Arrays;
Expand Down Expand Up @@ -253,12 +255,13 @@ public void getConnector(String connectorId, ActionListener<ConnectorSearchResul
}

/**
* Deletes the {@link Connector} in the underlying index.
* Deletes the {@link Connector} and the related instances of {@link ConnectorSyncJob} in the underlying index.
*
* @param connectorId The id of the connector object.
* @param listener The action listener to invoke on response/failure.
* @param connectorId The id of the {@link Connector}.
* @param shouldDeleteSyncJobs The flag indicating if {@link ConnectorSyncJob} should also be deleted.
* @param listener The action listener to invoke on response/failure.
*/
public void deleteConnector(String connectorId, ActionListener<DeleteResponse> listener) {
public void deleteConnector(String connectorId, boolean shouldDeleteSyncJobs, ActionListener<DeleteResponse> listener) {

final DeleteRequest deleteRequest = new DeleteRequest(CONNECTOR_INDEX_NAME).id(connectorId)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
Expand All @@ -269,7 +272,11 @@ public void deleteConnector(String connectorId, ActionListener<DeleteResponse> l
l.onFailure(new ResourceNotFoundException(connectorNotFoundErrorMsg(connectorId)));
return;
}
l.onResponse(deleteResponse);
if (shouldDeleteSyncJobs) {
new ConnectorSyncJobIndexService(client).deleteAllSyncJobsByConnectorId(connectorId, l.map(r -> deleteResponse));
} else {
l.onResponse(deleteResponse);
}
}));
} catch (Exception e) {
listener.onFailure(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.elasticsearch.xcontent.ToXContentObject;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentParser;
import org.elasticsearch.xpack.application.connector.ConnectorTemplateRegistry;

import java.io.IOException;
import java.util.Objects;
Expand All @@ -35,16 +36,20 @@ private DeleteConnectorAction() {/* no instances */}
public static class Request extends ConnectorActionRequest implements ToXContentObject {

private final String connectorId;
private final boolean deleteSyncJobs;

private static final ParseField CONNECTOR_ID_FIELD = new ParseField("connector_id");
private static final ParseField DELETE_SYNC_JOB_FIELD = new ParseField("delete_sync_jobs");

public Request(StreamInput in) throws IOException {
super(in);
this.connectorId = in.readString();
this.deleteSyncJobs = in.readBoolean();
}

public Request(String connectorId) {
public Request(String connectorId, boolean deleteSyncJobs) {
this.connectorId = connectorId;
this.deleteSyncJobs = deleteSyncJobs;
}

@Override
Expand All @@ -62,40 +67,55 @@ public String getConnectorId() {
return connectorId;
}

public boolean shouldDeleteSyncJobs() {
return deleteSyncJobs;
}

@Override
public String[] indices() {
// When deleting a connector, corresponding sync jobs can also be deleted
return new String[] {
ConnectorTemplateRegistry.CONNECTOR_SYNC_JOBS_INDEX_NAME_PATTERN,
ConnectorTemplateRegistry.CONNECTOR_INDEX_NAME_PATTERN };
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(connectorId);
out.writeBoolean(deleteSyncJobs);
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Request request = (Request) o;
return Objects.equals(connectorId, request.connectorId);
return deleteSyncJobs == request.deleteSyncJobs && Objects.equals(connectorId, request.connectorId);
}

@Override
public int hashCode() {
return Objects.hash(connectorId);
return Objects.hash(connectorId, deleteSyncJobs);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(CONNECTOR_ID_FIELD.getPreferredName(), connectorId);
builder.field(DELETE_SYNC_JOB_FIELD.getPreferredName(), deleteSyncJobs);
builder.endObject();
return builder;
}

private static final ConstructingObjectParser<DeleteConnectorAction.Request, Void> PARSER = new ConstructingObjectParser<>(
"delete_connector_request",
false,
(p) -> new Request((String) p[0])
(p) -> new Request((String) p[0], (boolean) p[1])
);
static {
PARSER.declareString(constructorArg(), CONNECTOR_ID_FIELD);
PARSER.declareBoolean(constructorArg(), DELETE_SYNC_JOB_FIELD);
}

public static DeleteConnectorAction.Request parse(XContentParser parser) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,11 @@ public List<Route> routes() {

@Override
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
DeleteConnectorAction.Request request = new DeleteConnectorAction.Request(restRequest.param("connector_id"));

String connectorId = restRequest.param("connector_id");
boolean shouldDeleteSyncJobs = restRequest.paramAsBoolean("delete_sync_jobs", false);

DeleteConnectorAction.Request request = new DeleteConnectorAction.Request(connectorId, shouldDeleteSyncJobs);
return channel -> client.execute(DeleteConnectorAction.INSTANCE, request, new RestToXContentListener<>(channel));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public TransportDeleteConnectorAction(
@Override
protected void doExecute(Task task, DeleteConnectorAction.Request request, ActionListener<AcknowledgedResponse> listener) {
String connectorId = request.getConnectorId();
connectorIndexService.deleteConnector(connectorId, listener.map(v -> AcknowledgedResponse.TRUE));
boolean shouldDeleteSyncJobs = request.shouldDeleteSyncJobs();
connectorIndexService.deleteConnector(connectorId, shouldDeleteSyncJobs, listener.map(v -> AcknowledgedResponse.TRUE));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,23 @@

package org.elasticsearch.xpack.application.connector.syncjob;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DelegatingActionListener;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
Expand All @@ -33,6 +36,9 @@
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.index.query.TermsQueryBuilder;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.DeleteByQueryAction;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
Expand All @@ -58,6 +64,7 @@
import java.util.Objects;
import java.util.Optional;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;
Expand Down Expand Up @@ -586,6 +593,37 @@ public void updateConnectorSyncJobError(String connectorSyncJobId, String error,
}
}

/**
* Deletes all {@link ConnectorSyncJob} documents that match a specific {@link Connector} id in the underlying index.
* Gracefully handles non-existent sync job index.
*
* @param connectorId The id of the {@link Connector} to match in the sync job documents.
* @param listener The action listener to invoke on response/failure.
*/
public void deleteAllSyncJobsByConnectorId(String connectorId, ActionListener<BulkByScrollResponse> listener) {
DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(CONNECTOR_SYNC_JOB_INDEX_NAME).setQuery(
new TermQueryBuilder(
ConnectorSyncJob.CONNECTOR_FIELD.getPreferredName() + "." + Connector.ID_FIELD.getPreferredName(),
connectorId
)
).setRefresh(true).setIndicesOptions(IndicesOptions.fromOptions(true, true, false, false));

client.execute(DeleteByQueryAction.INSTANCE, deleteByQueryRequest, listener.delegateFailureAndWrap((l, r) -> {
final List<BulkItemResponse.Failure> bulkDeleteFailures = r.getBulkFailures();
if (bulkDeleteFailures.isEmpty() == false) {
l.onFailure(
new ElasticsearchException(
"Error deleting sync jobs associated with connector ["
+ connectorId
+ "] "
+ bulkDeleteFailures.stream().map(BulkItemResponse.Failure::getMessage).collect(Collectors.joining("\n"))
)
);
}
l.onResponse(r);
}));
}

/**
* Listeners that checks failures for IndexNotFoundException and DocumentMissingException,
* and transforms them in ResourceNotFoundException, invoking onFailure on the delegate listener.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import java.util.stream.IntStream;

import static org.elasticsearch.xpack.application.connector.ConnectorTestUtils.getRandomCronExpression;
import static org.elasticsearch.xpack.application.connector.ConnectorTestUtils.registerSimplifiedConnectorIndexTemplates;
import static org.hamcrest.CoreMatchers.anyOf;
import static org.hamcrest.CoreMatchers.equalTo;

Expand All @@ -66,6 +67,7 @@ public class ConnectorIndexServiceTests extends ESSingleNodeTestCase {

@Before
public void setup() {
registerSimplifiedConnectorIndexTemplates(indicesAdmin());
this.connectorIndexService = new ConnectorIndexService(client());
}

Expand Down Expand Up @@ -104,11 +106,11 @@ public void testDeleteConnector() throws Exception {
}

String connectorIdToDelete = connectorIds.get(0);
DeleteResponse resp = awaitDeleteConnector(connectorIdToDelete);
DeleteResponse resp = awaitDeleteConnector(connectorIdToDelete, false);
assertThat(resp.status(), equalTo(RestStatus.OK));
expectThrows(ResourceNotFoundException.class, () -> awaitGetConnector(connectorIdToDelete));

expectThrows(ResourceNotFoundException.class, () -> awaitDeleteConnector(connectorIdToDelete));
expectThrows(ResourceNotFoundException.class, () -> awaitDeleteConnector(connectorIdToDelete, false));
}

public void testUpdateConnectorConfiguration_FullConfiguration() throws Exception {
Expand Down Expand Up @@ -526,11 +528,11 @@ public void testUpdateConnectorApiKeyIdOrApiKeySecretId() throws Exception {
assertThat(updateApiKeyIdRequest.getApiKeySecretId(), equalTo(indexedConnector.getApiKeySecretId()));
}

private DeleteResponse awaitDeleteConnector(String connectorId) throws Exception {
private DeleteResponse awaitDeleteConnector(String connectorId, boolean deleteConnectorSyncJobs) throws Exception {
CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<DeleteResponse> resp = new AtomicReference<>(null);
final AtomicReference<Exception> exc = new AtomicReference<>(null);
connectorIndexService.deleteConnector(connectorId, new ActionListener<>() {
connectorIndexService.deleteConnector(connectorId, deleteConnectorSyncJobs, new ActionListener<>() {
@Override
public void onResponse(DeleteResponse deleteResponse) {
resp.set(deleteResponse);
Expand Down

0 comments on commit a983a1d

Please sign in to comment.