Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML][Data Frame] moves failure state transition for MT safety #45676

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -160,10 +160,8 @@ public synchronized boolean maybeTriggerAsyncJob(long now) {
} else {
finishAndSetState();
}
}, e -> {
finishAndSetState();
onFailure(e);
}));
},
this::finishWithFailure));
});
logger.debug("Beginning to index [" + getJobId() + "], state: [" + currentState + "]");
return true;
Expand Down Expand Up @@ -250,8 +248,9 @@ public synchronized boolean maybeTriggerAsyncJob(long now) {
/**
* Called when a failure occurs in an async job causing the execution to stop.
*
* @param exc
* The exception
* This is called before the internal state changes from the state in which the failure occurred.
*
* @param exc The exception
*/
protected abstract void onFailure(Exception exc);

Expand Down Expand Up @@ -279,12 +278,19 @@ protected void onStop() {

private void finishWithSearchFailure(Exception exc) {
stats.incrementSearchFailures();
doSaveState(finishAndSetState(), position.get(), () -> onFailure(exc));
onFailure(exc);
doSaveState(finishAndSetState(), position.get(), () -> {});
}

private void finishWithIndexingFailure(Exception exc) {
stats.incrementIndexingFailures();
doSaveState(finishAndSetState(), position.get(), () -> onFailure(exc));
onFailure(exc);
doSaveState(finishAndSetState(), position.get(), () -> {});
}

private void finishWithFailure(Exception exc) {
onFailure(exc);
finishAndSetState();
}

private IndexerState finishAndSetState() {
Expand Down Expand Up @@ -390,8 +396,7 @@ private void onSearchResponse(SearchResponse searchResponse) {
ActionListener<SearchResponse> listener = ActionListener.wrap(this::onSearchResponse, this::finishWithSearchFailure);
nextSearch(listener);
} catch (Exception e) {
finishAndSetState();
onFailure(e);
finishWithFailure(e);
}
}
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.common.Strings;
Expand All @@ -32,7 +31,6 @@
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.oneOf;

@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/45664")
public class DataFrameTaskFailedStateIT extends DataFrameRestTestCase {

private final List<String> failureTransforms = new ArrayList<>();
Expand All @@ -45,6 +43,7 @@ public void setClusterSettings() throws IOException {
addFailureRetrySetting.setJsonEntity(
"{\"transient\": {\"xpack.data_frame.num_transform_failure_retries\": \"" + 0 + "\"," +
"\"logger.org.elasticsearch.action.bulk\": \"info\"," + // reduces bulk failure spam
"\"logger.org.elasticsearch.xpack.core.indexing.AsyncTwoPhaseIndexer\": \"trace\"," +
"\"logger.org.elasticsearch.xpack.dataframe\": \"trace\"}}");
client().performRequest(addFailureRetrySetting);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -338,13 +338,13 @@ public synchronized void stop(boolean force) {

@Override
public synchronized void triggered(Event event) {
if (getIndexer() == null) {
logger.warn("Data frame task [{}] triggered with an unintialized indexer", getTransformId());
// Ignore if event is not for this job
if (event.getJobName().equals(schedulerJobName()) == false) {
return;
}

// Ignore if event is not for this job
if (event.getJobName().equals(schedulerJobName()) == false) {
if (getIndexer() == null) {
logger.warn("Data frame task [{}] triggered with an unintialized indexer", getTransformId());
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;

Expand Down Expand Up @@ -163,15 +164,24 @@ private static class NonEmptyRollupIndexer extends RollupIndexer {
final Function<SearchRequest, SearchResponse> searchFunction;
final Function<BulkRequest, BulkResponse> bulkFunction;
final Consumer<Exception> failureConsumer;
final BiConsumer<IndexerState, Map<String, Object>> saveStateCheck;
private CountDownLatch latch;

NonEmptyRollupIndexer(Executor executor, RollupJob job, AtomicReference<IndexerState> initialState,
Map<String, Object> initialPosition, Function<SearchRequest, SearchResponse> searchFunction,
Function<BulkRequest, BulkResponse> bulkFunction, Consumer<Exception> failureConsumer) {
this(executor, job, initialState, initialPosition, searchFunction, bulkFunction, failureConsumer, (i, m) -> {});
}

NonEmptyRollupIndexer(Executor executor, RollupJob job, AtomicReference<IndexerState> initialState,
Map<String, Object> initialPosition, Function<SearchRequest, SearchResponse> searchFunction,
Function<BulkRequest, BulkResponse> bulkFunction, Consumer<Exception> failureConsumer,
BiConsumer<IndexerState, Map<String, Object>> saveStateCheck) {
super(executor, job, initialState, initialPosition);
this.searchFunction = searchFunction;
this.bulkFunction = bulkFunction;
this.failureConsumer = failureConsumer;
this.saveStateCheck = saveStateCheck;
}

private CountDownLatch newLatch(int count) {
Expand Down Expand Up @@ -209,6 +219,7 @@ protected void doNextBulk(BulkRequest request, ActionListener<BulkResponse> next
@Override
protected void doSaveState(IndexerState state, Map<String, Object> position, Runnable next) {
assert state == IndexerState.STARTED || state == IndexerState.INDEXING || state == IndexerState.STOPPED;
saveStateCheck.accept(state, position);
next.run();
}

Expand Down Expand Up @@ -758,14 +769,17 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws

Consumer<Exception> failureConsumer = e -> {
assertThat(e.getMessage(), equalTo("Could not identify key in agg [foo]"));
};

BiConsumer<IndexerState, Map<String, Object>> doSaveStateCheck = (indexerState, position) -> {
isFinished.set(true);
};

final ExecutorService executor = Executors.newFixedThreadPool(1);
try {

NonEmptyRollupIndexer indexer = new NonEmptyRollupIndexer(executor, job, state, null,
searchFunction, bulkFunction, failureConsumer);
searchFunction, bulkFunction, failureConsumer, doSaveStateCheck);
final CountDownLatch latch = indexer.newLatch(1);
indexer.start();
assertThat(indexer.getState(), equalTo(IndexerState.STARTED));
Expand Down