Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Transform] Halt Indexer on Stop/Abort API #107792

Merged
merged 3 commits into from
Apr 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/107792.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 107792
summary: Halt Indexer on Stop/Abort API
area: Transform
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -268,11 +268,19 @@ protected void createCheckpoint(ActionListener<TransformCheckpoint> listener) {
@Override
protected void onStart(long now, ActionListener<Boolean> listener) {
if (context.getTaskState() == TransformTaskState.FAILED) {
logger.debug("[{}] attempted to start while failed.", getJobId());
logger.debug("[{}] attempted to start while in state [{}].", getJobId(), TransformTaskState.FAILED.value());
listener.onFailure(new ElasticsearchException("Attempted to start a failed transform [{}].", getJobId()));
return;
}

switch (getState()) {
case ABORTING, STOPPING, STOPPED -> {
logger.debug("[{}] attempted to start while in state [{}].", getJobId(), getState().value());
listener.onResponse(false);
return;
}
}

if (context.getAuthState() != null && HealthStatus.RED.equals(context.getAuthState().getStatus())) {
// AuthorizationState status is RED which means there was permission check error during PUT or _update.
listener.onFailure(
Expand Down Expand Up @@ -543,7 +551,9 @@ private void executeRetentionPolicy(ActionListener<Void> listener) {
private void finalizeCheckpoint(ActionListener<Void> listener) {
try {
// reset the page size, so we do not memorize a low page size forever
context.setPageSize(function.getInitialPageSize());
if (function != null) {
context.setPageSize(function.getInitialPageSize());
przemekwitek marked this conversation as resolved.
Show resolved Hide resolved
}
// reset the changed bucket to free memory
if (changeCollector != null) {
changeCollector.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,9 @@ class MockedTransformIndexer extends TransformIndexer {

private TransformState persistedState;
private AtomicInteger saveStateListenerCallCount = new AtomicInteger(0);
private SearchResponse searchResponse = ONE_HIT_SEARCH_RESPONSE;
// used for synchronizing with the test
private CountDownLatch startLatch;
private CountDownLatch searchLatch;
private CountDownLatch doProcessLatch;

Expand Down Expand Up @@ -163,6 +165,10 @@ public CountDownLatch createCountDownOnResponseLatch(int count) {
return doProcessLatch = new CountDownLatch(count);
}

public CountDownLatch createAwaitForStartLatch(int count) {
return startLatch = new CountDownLatch(count);
}

@Override
void doGetInitialProgress(SearchRequest request, ActionListener<SearchResponse> responseListener) {
responseListener.onResponse(ONE_HIT_SEARCH_RESPONSE);
Expand All @@ -188,14 +194,24 @@ void refreshDestinationIndex(ActionListener<Void> responseListener) {

@Override
protected void doNextSearch(long waitTimeInNanos, ActionListener<SearchResponse> nextPhase) {
if (searchLatch != null) {
maybeWaitOnLatch(searchLatch);
threadPool.generic().execute(() -> nextPhase.onResponse(searchResponse));
}

private static void maybeWaitOnLatch(CountDownLatch countDownLatch) {
if (countDownLatch != null) {
try {
searchLatch.await();
countDownLatch.await();
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
}
threadPool.generic().execute(() -> nextPhase.onResponse(ONE_HIT_SEARCH_RESPONSE));
}

@Override
protected void onStart(long now, ActionListener<Boolean> listener) {
maybeWaitOnLatch(startLatch);
super.onStart(now, listener);
}

@Override
Expand Down Expand Up @@ -259,6 +275,10 @@ void persistState(TransformState state, ActionListener<Void> listener) {
void validate(ActionListener<ValidateTransformAction.Response> listener) {
listener.onResponse(null);
}

void finishCheckpoint() {
searchResponse = null;
}
}

class MockedTransformIndexerForStatePersistenceTesting extends TransformIndexer {
Expand Down Expand Up @@ -371,22 +391,7 @@ public void tearDownClient() {
}

public void testTriggerStatePersistence() {
TransformConfig config = new TransformConfig(
randomAlphaOfLength(10),
randomSourceConfig(),
randomDestConfig(),
null,
new TimeSyncConfig("timestamp", TimeValue.timeValueSeconds(1)),
null,
randomPivotConfig(),
null,
randomBoolean() ? null : randomAlphaOfLengthBetween(1, 1000),
null,
null,
null,
null,
null
);
TransformConfig config = createTransformConfig();
AtomicReference<IndexerState> state = new AtomicReference<>(IndexerState.INDEXING);

TransformContext context = new TransformContext(TransformTaskState.STARTED, "", 0, mock(TransformContext.Listener.class));
Expand Down Expand Up @@ -452,22 +457,7 @@ public void testTriggerStatePersistence() {
}

public void testStopAtCheckpoint() throws Exception {
TransformConfig config = new TransformConfig(
randomAlphaOfLength(10),
randomSourceConfig(),
randomDestConfig(),
null,
new TimeSyncConfig("timestamp", TimeValue.timeValueSeconds(1)),
null,
randomPivotConfig(),
null,
randomBoolean() ? null : randomAlphaOfLengthBetween(1, 1000),
null,
null,
null,
null,
null
);
TransformConfig config = createTransformConfig();

for (IndexerState state : IndexerState.values()) {
// skip indexing case, tested below
Expand Down Expand Up @@ -684,6 +674,130 @@ public void testStopAtCheckpoint() throws Exception {
}
}

/**
* Given a started transform
* And the indexer thread has not started yet
* When a user calls _stop?force=false
* Then the indexer thread should exit early
*/
public void testStopBeforeIndexingThreadStarts() throws Exception {
var indexer = createMockIndexer(
createTransformConfig(),
new AtomicReference<>(IndexerState.STARTED),
null,
threadPool,
auditor,
null,
new TransformIndexerStats(),
new TransformContext(TransformTaskState.STARTED, "", 0, mock(TransformContext.Listener.class))
);

// stop the indexer thread once it kicks off
var startLatch = indexer.createAwaitForStartLatch(1);
assertEquals(IndexerState.STARTED, indexer.start());
assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis()));
assertEquals(IndexerState.INDEXING, indexer.getState());

// stop the indexer, equivalent to _stop?force=false
assertEquals(IndexerState.STOPPING, indexer.stop());
assertEquals(IndexerState.STOPPING, indexer.getState());

// now let the indexer thread run
startLatch.countDown();

assertBusy(() -> {
assertThat(indexer.getState(), equalTo(IndexerState.STOPPED));
assertThat(indexer.getLastCheckpoint().getCheckpoint(), equalTo(-1L));
});
}

/**
* Given a started transform
* And the indexer thread has not started yet
* When a user calls _stop?force=true
* Then the indexer thread should exit early
*/
public void testForceStopBeforeIndexingThreadStarts() throws Exception {
var indexer = createMockIndexer(
createTransformConfig(),
new AtomicReference<>(IndexerState.STARTED),
null,
threadPool,
auditor,
null,
new TransformIndexerStats(),
new TransformContext(TransformTaskState.STARTED, "", 0, mock(TransformContext.Listener.class))
);

// stop the indexer thread once it kicks off
var startLatch = indexer.createAwaitForStartLatch(1);
assertEquals(IndexerState.STARTED, indexer.start());
assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis()));
assertEquals(IndexerState.INDEXING, indexer.getState());

// stop the indexer, equivalent to _stop?force=true
assertFalse("Transform Indexer thread should still be running", indexer.abort());
assertEquals(IndexerState.ABORTING, indexer.getState());

// now let the indexer thread run
startLatch.countDown();

assertBusy(() -> {
assertThat(indexer.getState(), equalTo(IndexerState.ABORTING));
assertThat(indexer.getLastCheckpoint().getCheckpoint(), equalTo(-1L));
});
}

/**
* Given a started transform
* And the indexer thread has not started yet
* When a user calls _stop?wait_for_checkpoint=true
* Then the indexer thread should not exit early
*/
public void testStopWaitForCheckpointBeforeIndexingThreadStarts() throws Exception {
var context = new TransformContext(TransformTaskState.STARTED, "", 0, mock(TransformContext.Listener.class));
var indexer = createMockIndexer(
createTransformConfig(),
new AtomicReference<>(IndexerState.STARTED),
null,
threadPool,
auditor,
null,
new TransformIndexerStats(),
context
);

// stop the indexer thread once it kicks off
var startLatch = indexer.createAwaitForStartLatch(1);
assertEquals(IndexerState.STARTED, indexer.start());
assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis()));
assertEquals(IndexerState.INDEXING, indexer.getState());

// stop the indexer, equivalent to _stop?wait_for_checkpoint=true
context.setShouldStopAtCheckpoint(true);
CountDownLatch stopLatch = new CountDownLatch(1);
countResponse(listener -> setStopAtCheckpoint(indexer, true, listener), stopLatch);

// now let the indexer thread run
indexer.finishCheckpoint();
startLatch.countDown();

// wait for all listeners
assertTrue("timed out after 5s", stopLatch.await(5, TimeUnit.SECONDS));

// there should be no listeners waiting
assertEquals(0, indexer.getSaveStateListenerCount());

// listener must have been called by the indexing thread between timesStopAtCheckpointChanged and 6 times
// this is not exact, because we do not know _when_ the other thread persisted the flag
assertThat(indexer.getSaveStateListenerCallCount(), lessThanOrEqualTo(1));

assertBusy(() -> {
assertThat(indexer.getState(), equalTo(IndexerState.STOPPED));
assertThat(indexer.getLastCheckpoint().getCheckpoint(), equalTo(1L));
});
}

@TestIssueLogging(
value = "org.elasticsearch.xpack.transform.transforms:DEBUG",
issueUrl = "https://github.com/elastic/elasticsearch/issues/92069"
Expand Down Expand Up @@ -868,4 +982,23 @@ private MockedTransformIndexerForStatePersistenceTesting createMockIndexerForSta
indexer.initialize();
return indexer;
}

private static TransformConfig createTransformConfig() {
return new TransformConfig(
randomAlphaOfLength(10),
randomSourceConfig(),
randomDestConfig(),
null,
new TimeSyncConfig("timestamp", TimeValue.timeValueSeconds(1)),
null,
randomPivotConfig(),
null,
randomBoolean() ? null : randomAlphaOfLengthBetween(1, 1000),
null,
null,
null,
null,
null
);
}
}