-
Notifications
You must be signed in to change notification settings - Fork 24.5k
/
AsyncTwoPhaseIndexer.java
430 lines (373 loc) · 16.9 KB
/
AsyncTwoPhaseIndexer.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.indexing;
import org.apache.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
/**
* An abstract class that builds an index incrementally. A background job can be launched using {@link #maybeTriggerAsyncJob(long)},
* it will create the index from the source index up to the last complete bucket that is allowed to be built (based on job position).
* Only one background job can run simultaneously and {@link #onFinish} is called when the job
* finishes. {@link #onStop()} is called after the current search returns when the job is stopped early via a call
* to {@link #stop()}. {@link #onFailure(Exception)} is called if the job fails with an exception and {@link #onAbort()}
* is called if the indexer is aborted while a job is running. The indexer must be started ({@link #start()}
* to allow a background job to run when {@link #maybeTriggerAsyncJob(long)} is called.
* {@link #stop()} can be used to stop the background job without aborting the indexer.
*
* In a nutshell this is a 2 cycle engine: 1st it sends a query, 2nd it indexes documents based on the response, sends the next query,
* indexes, queries, indexes, ... until a condition lets the engine pause until the source provides new input.
*
* @param <JobPosition> Type that defines a job position to be defined by the implementation.
*/
public abstract class AsyncTwoPhaseIndexer<JobPosition, JobStats extends IndexerJobStats> {
private static final Logger logger = Logger.getLogger(AsyncTwoPhaseIndexer.class.getName());
private final JobStats stats;
private final AtomicReference<IndexerState> state;
private final AtomicReference<JobPosition> position;
private final Executor executor;
protected AsyncTwoPhaseIndexer(Executor executor, AtomicReference<IndexerState> initialState,
JobPosition initialPosition, JobStats jobStats) {
this.executor = executor;
this.state = initialState;
this.position = new AtomicReference<>(initialPosition);
this.stats = jobStats;
}
/**
* Get the current state of the indexer.
*/
public IndexerState getState() {
return state.get();
}
/**
* Get the current position of the indexer.
*/
public JobPosition getPosition() {
return position.get();
}
/**
* Get the stats of this indexer.
*/
public JobStats getStats() {
return stats;
}
/**
* Sets the internal state to {@link IndexerState#STARTED} if the previous state
* was {@link IndexerState#STOPPED}. Setting the state to STARTED allows a job
* to run in the background when {@link #maybeTriggerAsyncJob(long)} is called.
*
* @return The new state for the indexer (STARTED, INDEXING or ABORTING if the
* job was already aborted).
*/
public synchronized IndexerState start() {
state.compareAndSet(IndexerState.STOPPED, IndexerState.STARTED);
return state.get();
}
/**
* Sets the internal state to {@link IndexerState#STOPPING} if an async job is
* running in the background, {@link #onStop()} will be called when the background job
* detects that the indexer is stopped.
* If there is no job running when this function is called
* the state is set to {@link IndexerState#STOPPED} and {@link #onStop()} called directly.
*
* @return The new state for the indexer (STOPPED, STOPPING or ABORTING if the job was already aborted).
*/
public synchronized IndexerState stop() {
AtomicBoolean wasStartedAndSetStopped = new AtomicBoolean(false);
IndexerState currentState = state.updateAndGet(previousState -> {
if (previousState == IndexerState.INDEXING) {
return IndexerState.STOPPING;
} else if (previousState == IndexerState.STARTED) {
wasStartedAndSetStopped.set(true);
return IndexerState.STOPPED;
} else {
return previousState;
}
});
if (wasStartedAndSetStopped.get()) {
onStop();
}
return currentState;
}
/**
* Sets the internal state to {@link IndexerState#ABORTING}. It returns false if
* an async job is running in the background and in such case {@link #onAbort}
* will be called as soon as the background job detects that the indexer is
* aborted. If there is no job running when this function is called, it returns
* true and {@link #onAbort()} will never be called.
*
* @return true if the indexer is aborted, false if a background job is running
* and abort is delayed.
*/
public synchronized boolean abort() {
IndexerState prevState = state.getAndUpdate((prev) -> IndexerState.ABORTING);
return prevState == IndexerState.STOPPED || prevState == IndexerState.STARTED;
}
/**
* Triggers a background job that builds the index asynchronously iff
* there is no other job that runs and the indexer is started
* ({@link IndexerState#STARTED}.
*
* @param now
* The current time in milliseconds (used to limit the job to
* complete buckets)
* @return true if a job has been triggered, false otherwise
*/
public synchronized boolean maybeTriggerAsyncJob(long now) {
final IndexerState currentState = state.get();
switch (currentState) {
case INDEXING:
case STOPPING:
case ABORTING:
logger.warn("Schedule was triggered for job [" + getJobId() + "], but prior indexer is still running " +
"(with state [" + currentState + "]");
return false;
case STOPPED:
logger.debug("Schedule was triggered for job [" + getJobId() + "] but job is stopped. Ignoring trigger.");
return false;
case STARTED:
logger.debug("Schedule was triggered for job [" + getJobId() + "], state: [" + currentState + "]");
stats.incrementNumInvocations(1);
if (state.compareAndSet(IndexerState.STARTED, IndexerState.INDEXING)) {
// fire off the search. Note this is async, the method will return from here
executor.execute(() -> {
onStart(now, ActionListener.wrap(r -> {
nextSearch(ActionListener.wrap(this::onSearchResponse, this::finishWithSearchFailure));
}, e -> {
finishAndSetState();
onFailure(e);
}));
});
logger.debug("Beginning to index [" + getJobId() + "], state: [" + currentState + "]");
return true;
} else {
logger.debug("Could not move from STARTED to INDEXING state because current state is [" + state.get() + "]");
return false;
}
default:
logger.warn("Encountered unexpected state [" + currentState + "] while indexing");
throw new IllegalStateException("Job encountered an illegal state [" + currentState + "]");
}
}
/**
* Called to get the Id of the job, used for logging.
*
* @return a string with the id of the job
*/
protected abstract String getJobId();
/**
* Called to process a response from the 1 search request in order to turn it into a {@link IterationResult}.
*
* @param searchResponse response from the search phase.
* @return Iteration object to be passed to indexing phase.
*/
protected abstract IterationResult<JobPosition> doProcess(SearchResponse searchResponse);
/**
* Called to build the next search request.
*
* @return SearchRequest to be passed to the search phase.
*/
protected abstract SearchRequest buildSearchRequest();
/**
* Called at startup after job has been triggered using {@link #maybeTriggerAsyncJob(long)} and the
* internal state is {@link IndexerState#STARTED}.
*
* @param now The current time in milliseconds passed through from {@link #maybeTriggerAsyncJob(long)}
* @param listener listener to call after done
*/
protected abstract void onStart(long now, ActionListener<Void> listener);
/**
* Executes the {@link SearchRequest} and calls <code>nextPhase</code> with the
* response or the exception if an error occurs.
*
* @param request
* The search request to execute
* @param nextPhase
* Listener for the next phase
*/
protected abstract void doNextSearch(SearchRequest request, ActionListener<SearchResponse> nextPhase);
/**
* Executes the {@link BulkRequest} and calls <code>nextPhase</code> with the
* response or the exception if an error occurs.
*
* @param request
* The bulk request to execute
* @param nextPhase
* Listener for the next phase
*/
protected abstract void doNextBulk(BulkRequest request, ActionListener<BulkResponse> nextPhase);
/**
* Called periodically during the execution of a background job. Implementation
* should persists the state somewhere and continue the execution asynchronously
* using <code>next</code>.
*
* @param state
* The current state of the indexer
* @param position
* The current position of the indexer
* @param next
* Runnable for the next phase
*/
protected abstract void doSaveState(IndexerState state, JobPosition position, Runnable next);
/**
* Called when a failure occurs in an async job causing the execution to stop.
*
* @param exc
* The exception
*/
protected abstract void onFailure(Exception exc);
/**
* Called when a background job finishes before the internal state changes from {@link IndexerState#INDEXING} back to
* {@link IndexerState#STARTED}.
*
* @param listener listener to call after done
*/
protected abstract void onFinish(ActionListener<Void> listener);
/**
* Called when the indexer is stopped. This is only called when the indexer is stopped
* via {@link #stop()} as opposed to {@link #onFinish(ActionListener)} which is called
* when the indexer's work is done.
*/
protected void onStop() {
}
/**
* Called when a background job detects that the indexer is aborted causing the
* async execution to stop.
*/
protected abstract void onAbort();
private void finishWithSearchFailure(Exception exc) {
stats.incrementSearchFailures();
doSaveState(finishAndSetState(), position.get(), () -> onFailure(exc));
}
private void finishWithIndexingFailure(Exception exc) {
stats.incrementIndexingFailures();
doSaveState(finishAndSetState(), position.get(), () -> onFailure(exc));
}
private IndexerState finishAndSetState() {
return state.updateAndGet(prev -> {
switch (prev) {
case INDEXING:
// ready for another job
return IndexerState.STARTED;
case STOPPING:
// must be started again
onStop();
return IndexerState.STOPPED;
case ABORTING:
// abort and exit
onAbort();
return IndexerState.ABORTING; // This shouldn't matter, since onAbort() will kill the task first
case STOPPED:
// No-op. Shouldn't really be possible to get here (should have to go through
// STOPPING
// first which will be handled) but is harmless to no-op and we don't want to
// throw exception here
return IndexerState.STOPPED;
default:
// any other state is unanticipated at this point
throw new IllegalStateException("Indexer job encountered an illegal state [" + prev + "]");
}
});
}
private void onSearchResponse(SearchResponse searchResponse) {
stats.markEndSearch();
try {
if (checkState(getState()) == false) {
return;
}
// allowPartialSearchResults is set to false, so we should never see shard failures here
assert (searchResponse.getShardFailures().length == 0);
stats.incrementNumPages(1);
IterationResult<JobPosition> iterationResult = doProcess(searchResponse);
if (iterationResult.isDone()) {
logger.debug("Finished indexing for job [" + getJobId() + "], saving state and shutting down.");
position.set(iterationResult.getPosition());
// execute finishing tasks
onFinish(ActionListener.wrap(
r -> doSaveState(finishAndSetState(), position.get(), () -> {}),
e -> doSaveState(finishAndSetState(), position.get(), () -> {})));
return;
}
final List<IndexRequest> docs = iterationResult.getToIndex();
final BulkRequest bulkRequest = new BulkRequest();
docs.forEach(bulkRequest::add);
// TODO this might be a valid case, e.g. if implementation filters
assert bulkRequest.requests().size() > 0;
stats.markStartIndexing();
doNextBulk(bulkRequest, ActionListener.wrap(bulkResponse -> {
// TODO we should check items in the response and move after accordingly to
// resume the failing buckets ?
if (bulkResponse.hasFailures()) {
logger.warn("Error while attempting to bulk index documents: " + bulkResponse.buildFailureMessage());
}
stats.incrementNumOutputDocuments(bulkResponse.getItems().length);
// check if indexer has been asked to stop, state {@link IndexerState#STOPPING}
if (checkState(getState()) == false) {
return;
}
JobPosition newPosition = iterationResult.getPosition();
position.set(newPosition);
onBulkResponse(bulkResponse, newPosition);
}, this::finishWithIndexingFailure));
} catch (Exception e) {
finishWithSearchFailure(e);
}
}
private void onBulkResponse(BulkResponse response, JobPosition position) {
stats.markEndIndexing();
try {
ActionListener<SearchResponse> listener = ActionListener.wrap(this::onSearchResponse, this::finishWithSearchFailure);
// TODO probably something more intelligent than every-50 is needed
if (stats.getNumPages() > 0 && stats.getNumPages() % 50 == 0) {
doSaveState(IndexerState.INDEXING, position, () -> {
nextSearch(listener);
});
} else {
nextSearch(listener);
}
} catch (Exception e) {
finishWithIndexingFailure(e);
}
}
private void nextSearch(ActionListener<SearchResponse> listener) {
stats.markStartSearch();
// ensure that partial results are not accepted and cause a search failure
SearchRequest searchRequest = buildSearchRequest().allowPartialSearchResults(false);
doNextSearch(searchRequest, listener);
}
/**
* Checks the {@link IndexerState} and returns false if the execution should be
* stopped.
*/
private boolean checkState(IndexerState currentState) {
switch (currentState) {
case INDEXING:
// normal state;
return true;
case STOPPING:
logger.info("Indexer job encountered [" + IndexerState.STOPPING + "] state, halting indexer.");
doSaveState(finishAndSetState(), getPosition(), () -> {});
return false;
case STOPPED:
return false;
case ABORTING:
logger.info("Requested shutdown of indexer for job [" + getJobId() + "]");
onAbort();
return false;
default:
// Anything other than indexing, aborting or stopping is unanticipated
logger.warn("Encountered unexpected state [" + currentState + "] while indexing");
throw new IllegalStateException("Indexer job encountered an illegal state [" + currentState + "]");
}
}
}