-
Notifications
You must be signed in to change notification settings - Fork 24.3k
/
TransportDeleteTransformAction.java
177 lines (162 loc) · 8.02 KB
/
TransportDeleteTransformAction.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
package org.elasticsearch.xpack.transform.action;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.delete.TransportDeleteIndexAction;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.AcknowledgedTransportMasterNodeAction;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.transform.action.DeleteTransformAction;
import org.elasticsearch.xpack.core.transform.action.DeleteTransformAction.Request;
import org.elasticsearch.xpack.core.transform.action.StopTransformAction;
import org.elasticsearch.xpack.core.transform.transforms.TransformConfig;
import org.elasticsearch.xpack.transform.TransformServices;
import org.elasticsearch.xpack.transform.notifications.TransformAuditor;
import org.elasticsearch.xpack.transform.persistence.SeqNoPrimaryTermAndIndex;
import org.elasticsearch.xpack.transform.persistence.TransformConfigManager;
import org.elasticsearch.xpack.transform.transforms.TransformTask;
import static org.elasticsearch.xpack.core.ClientHelper.TRANSFORM_ORIGIN;
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
import static org.elasticsearch.xpack.core.ClientHelper.executeWithHeadersAsync;
public class TransportDeleteTransformAction extends AcknowledgedTransportMasterNodeAction<Request> {
private static final Logger logger = LogManager.getLogger(TransportDeleteTransformAction.class);
private final TransformConfigManager transformConfigManager;
private final TransformAuditor auditor;
private final Client client;
@Inject
public TransportDeleteTransformAction(
TransportService transportService,
ActionFilters actionFilters,
ThreadPool threadPool,
ClusterService clusterService,
IndexNameExpressionResolver indexNameExpressionResolver,
TransformServices transformServices,
Client client
) {
super(
DeleteTransformAction.NAME,
transportService,
clusterService,
threadPool,
actionFilters,
Request::new,
indexNameExpressionResolver,
EsExecutors.DIRECT_EXECUTOR_SERVICE
);
this.transformConfigManager = transformServices.getConfigManager();
this.auditor = transformServices.getAuditor();
this.client = client;
}
@Override
protected void masterOperation(Task task, Request request, ClusterState state, ActionListener<AcknowledgedResponse> listener) {
final TaskId parentTaskId = new TaskId(clusterService.localNode().getId(), task.getId());
final boolean transformIsRunning = TransformTask.getTransformTask(request.getId(), state) != null;
if (transformIsRunning && request.isForce() == false) {
listener.onFailure(
new ElasticsearchStatusException(
"Cannot delete transform [" + request.getId() + "] as the task is running. Stop the task first",
RestStatus.CONFLICT
)
);
return;
}
// <3> Delete transform config
ActionListener<AcknowledgedResponse> deleteDestIndexListener = ActionListener.wrap(
unusedAcknowledgedResponse -> transformConfigManager.deleteTransform(request.getId(), ActionListener.wrap(r -> {
logger.info("[{}] deleted transform", request.getId());
auditor.info(request.getId(), "Deleted transform.");
listener.onResponse(AcknowledgedResponse.of(r));
}, listener::onFailure)),
listener::onFailure
);
// <2> Delete destination index if requested
ActionListener<StopTransformAction.Response> stopTransformActionListener = ActionListener.wrap(unusedStopResponse -> {
if (request.isDeleteDestIndex()) {
deleteDestinationIndex(parentTaskId, request.getId(), request.timeout(), deleteDestIndexListener);
} else {
deleteDestIndexListener.onResponse(null);
}
}, listener::onFailure);
// <1> Stop transform if it's currently running
stopTransform(transformIsRunning, parentTaskId, request.getId(), request.timeout(), stopTransformActionListener);
}
private void stopTransform(
boolean transformIsRunning,
TaskId parentTaskId,
String transformId,
TimeValue timeout,
ActionListener<StopTransformAction.Response> listener
) {
if (transformIsRunning == false) {
listener.onResponse(null);
return;
}
StopTransformAction.Request stopTransformRequest = new StopTransformAction.Request(transformId, true, true, timeout, true, false);
stopTransformRequest.setParentTask(parentTaskId);
executeAsyncWithOrigin(client, TRANSFORM_ORIGIN, StopTransformAction.INSTANCE, stopTransformRequest, listener);
}
private void deleteDestinationIndex(
TaskId parentTaskId,
String transformId,
TimeValue timeout,
ActionListener<AcknowledgedResponse> listener
) {
// <3> Check if the error is "index not found" error. If so, just move on. The index is already deleted.
ActionListener<AcknowledgedResponse> deleteDestIndexListener = ActionListener.wrap(listener::onResponse, e -> {
if (e instanceof IndexNotFoundException) {
listener.onResponse(AcknowledgedResponse.TRUE);
} else {
listener.onFailure(e);
}
});
// <2> Delete destination index
ActionListener<Tuple<TransformConfig, SeqNoPrimaryTermAndIndex>> getTransformConfigurationListener = ActionListener.wrap(
transformConfigAndVersion -> {
TransformConfig config = transformConfigAndVersion.v1();
String destIndex = config.getDestination().getIndex();
DeleteIndexRequest deleteDestIndexRequest = new DeleteIndexRequest(destIndex);
deleteDestIndexRequest.timeout(timeout);
deleteDestIndexRequest.setParentTask(parentTaskId);
executeWithHeadersAsync(
config.getHeaders(),
TRANSFORM_ORIGIN,
client,
TransportDeleteIndexAction.TYPE,
deleteDestIndexRequest,
deleteDestIndexListener
);
},
listener::onFailure
);
// <1> Fetch transform configuration
transformConfigManager.getTransformConfigurationForUpdate(transformId, getTransformConfigurationListener);
}
@Override
protected ClusterBlockException checkBlock(Request request, ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ);
}
}