Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor KibanaThreadPoolIT/SystemIndexThreadPoolTestCase for resiliency #108463

Merged
merged 11 commits into from
May 14, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -9,30 +9,66 @@
package org.elasticsearch.kibana;

import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.search.SearchPhaseExecutionException;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.index.IndexingPressure;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.indices.SystemIndexThreadPoolTestCase;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.Collection;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Phaser;
import java.util.stream.Stream;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.startsWith;

public class KibanaThreadPoolIT extends SystemIndexThreadPoolTestCase {
/**
* Tests to verify that system indices are bypassing user-space thread pools
*
* <p>We can block thread pools by setting them to one thread and 1 element queue, then submitting
* threads that wait on a phaser. This lets us verify that operations on system indices
* are being directed to other thread pools.</p>
*/
public class KibanaThreadPoolIT extends ESIntegTestCase {

@Override
protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal, otherSettings))
.put(IndexingPressure.MAX_INDEXING_BYTES.getKey(), "1KB")
.put("thread_pool.search.size", 1)
.put("thread_pool.search.queue_size", 1)
.put("thread_pool.write.size", 1)
.put("thread_pool.write.queue_size", 1)
.put("thread_pool.get.size", 1)
.put("thread_pool.get.queue_size", 1)
.build();
}

private static final String USER_INDEX = "user_index";
// For system indices that use ExecutorNames.CRITICAL_SYSTEM_INDEX_THREAD_POOLS, we'll want to
// block normal system index thread pools as well.
private static final Set<String> THREAD_POOLS_TO_BLOCK = Set.of(ThreadPool.Names.GET, ThreadPool.Names.WRITE, ThreadPool.Names.SEARCH);

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Set.of(KibanaPlugin.class);
}

public void testKibanaThreadPool() {
public void testKibanaThreadPoolByPassesBlockedThreadPools() throws Exception {
List<String> kibanaSystemIndices = Stream.of(
KibanaPlugin.KIBANA_INDEX_DESCRIPTOR.getIndexPattern(),
KibanaPlugin.REPORTING_INDEX_DESCRIPTOR.getIndexPattern(),
Expand Down Expand Up @@ -61,4 +97,108 @@ public void testKibanaThreadPool() {
}
});
}

public void testBlockedThreadPoolsRejectUserRequests() throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: is this the right place for this test? This one is not Kibana specific, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, in a way this is a 'negative test case' for what KibanaThreadPool test is doing.
previously it was part of a super class in a testframework, not sure if it is worthy at the moment to refactor it into its own test class.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

assertAcked(client().admin().indices().prepareCreate(USER_INDEX));

runWithBlockedThreadPools(this::assertThreadPoolsBlocked);

assertAcked(client().admin().indices().prepareDelete(USER_INDEX));
}

private void assertThreadPoolsBlocked() {

var e1 = expectThrows(
EsRejectedExecutionException.class,
() -> client().prepareIndex(USER_INDEX).setSource(Map.of("foo", "bar")).get()
);
assertThat(e1.getMessage(), startsWith("rejected execution of TimedRunnable"));
var e2 = expectThrows(EsRejectedExecutionException.class, () -> client().prepareGet(USER_INDEX, "id").get());
assertThat(e2.getMessage(), startsWith("rejected execution of ActionRunnable"));
var e3 = expectThrows(
SearchPhaseExecutionException.class,
() -> client().prepareSearch(USER_INDEX)
.setQuery(QueryBuilders.matchAllQuery())
// Request times out if max concurrent shard requests is set to 1
.setMaxConcurrentShardRequests(usually() ? SearchRequest.DEFAULT_MAX_CONCURRENT_SHARD_REQUESTS : randomIntBetween(2, 10))
.get()
);
assertThat(e3.getMessage(), containsString("all shards failed"));
}

protected void runWithBlockedThreadPools(Runnable runnable) throws Exception {
Phaser phaser = new Phaser();

// register this test's thread
phaser.register();

blockThreadPool(phaser);
phaser.arriveAndAwaitAdvance();
Copy link
Contributor Author

@pgomulka pgomulka May 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here is a new version of making sure all threadpools are blocked before the queues are being filled.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch 🎉

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems to me the key to fix the test was to block all, waiting for all to be blocked, and only then fill the queues (and proceed with the test), correct?
If so, maybe it's worth a couple of comments (to be honest, method names are already good, but since this was an edge case a comment on that would be a good addition)


fillQueues();

logger.debug("number of nodes " + internalCluster().getNodeNames().length);
logger.debug("number of parties arrived " + phaser.getArrivedParties());
try {
runnable.run();
} finally {
phaser.arriveAndAwaitAdvance();
}
}

private void blockThreadPool(Phaser phaser) {
for (String nodeName : internalCluster().getNodeNames()) {
ThreadPool threadPool = internalCluster().getInstance(ThreadPool.class, nodeName);
for (String threadPoolName : THREAD_POOLS_TO_BLOCK) {
blockThreadPool(threadPoolName, threadPool, phaser);
}
}
}

private void fillQueues() {
for (String nodeName : internalCluster().getNodeNames()) {
ThreadPool threadPool = internalCluster().getInstance(ThreadPool.class, nodeName);
for (String threadPoolName : THREAD_POOLS_TO_BLOCK) {
fillThreadPoolQueues(threadPoolName, threadPool);
}
}
}

private static void blockThreadPool(String threadPoolName, ThreadPool threadPool, Phaser phaser) {
ThreadPool.Info info = threadPool.info(threadPoolName);

Runnable waitAction = () -> {
phaser.arriveAndAwaitAdvance();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: maybe add a comment to indicate which step each of the arriveAndAwait guards

phaser.arriveAndAwaitAdvance();
};

phaser.bulkRegister(info.getMax());

for (int i = 0; i < info.getMax(); i++) {
// we need to make sure that there is a task blocking a thread pool
// otherwise a queue might end up having a spot
do {
try {
threadPool.executor(threadPoolName).execute(waitAction);
break;
} catch (EsRejectedExecutionException e) {
// if exception was thrown when submitting, retry.
}
} while (true);
}
}

private static void fillThreadPoolQueues(String threadPoolName, ThreadPool threadPool) {
ThreadPool.Info info = threadPool.info(threadPoolName);

for (int i = 0; i < info.getQueueSize().singles(); i++) {
try {
threadPool.executor(threadPoolName).execute(() -> {});
} catch (EsRejectedExecutionException e) {
// we can't be sure that some other task won't get queued in a test cluster
// but the threadpool's thread is already blocked
}
}
}

}

This file was deleted.