Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor KibanaThreadPoolIT/SystemIndexThreadPoolTestCase for resiliency #108463

Merged
merged 11 commits into from
May 14, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -9,30 +9,74 @@
package org.elasticsearch.kibana;

import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.search.SearchPhaseExecutionException;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.index.IndexingPressure;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.indices.SystemIndexThreadPoolTestCase;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.Collection;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.Phaser;
import java.util.stream.Stream;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.startsWith;

public class KibanaThreadPoolIT extends SystemIndexThreadPoolTestCase {
/**
* Tests to verify that system indices are bypassing user-space thread pools
*
* <p>We can block thread pools by setting them to one thread and no queue, then submitting
* threads that wait on a countdown latch. This lets us verify that operations on system indices
* are being directed to other thread pools.</p>
*
* <p>When implementing this class, don't forget to override {@link ESIntegTestCase#nodePlugins()} if
* the relevant system index is defined in a plugin.</p>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This Javadoc isn't accurate anymore, better to remove

*/
public class KibanaThreadPoolIT extends ESIntegTestCase {

@Override
protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal, otherSettings))
.put(IndexingPressure.MAX_INDEXING_BYTES.getKey(), "1KB")
.put("thread_pool.search.size", 1)
.put("thread_pool.search.queue_size", 1)
.put("thread_pool.write.size", 1)
.put("thread_pool.write.queue_size", 1)
.put("thread_pool.get.size", 1)
.put("thread_pool.get.queue_size", 1)
.build();
}

private static final String USER_INDEX = "user_index";

// For system indices that use ExecutorNames.CRITICAL_SYSTEM_INDEX_THREAD_POOLS, we'll want to
// block normal system index thread pools as well.
protected Set<String> threadPoolsToBlock() {
return Set.of(ThreadPool.Names.GET, ThreadPool.Names.WRITE, ThreadPool.Names.SEARCH);
}

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Set.of(KibanaPlugin.class);
}

public void testKibanaThreadPool() {
public void testKibanaThreadPool() throws Exception {
List<String> kibanaSystemIndices = Stream.of(
KibanaPlugin.KIBANA_INDEX_DESCRIPTOR.getIndexPattern(),
KibanaPlugin.REPORTING_INDEX_DESCRIPTOR.getIndexPattern(),
Expand Down Expand Up @@ -61,4 +105,96 @@ public void testKibanaThreadPool() {
}
});
}

public void testBlockedThreadPoolsRejectUserRequests() throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: is this the right place for this test? This one is not Kibana specific, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, in a way this is a 'negative test case' for what KibanaThreadPool test is doing.
previously it was part of a super class in a testframework, not sure if it is worthy at the moment to refactor it into its own test class.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

assertAcked(client().admin().indices().prepareCreate(USER_INDEX));

runWithBlockedThreadPools(this::assertThreadPoolsBlocked);

assertAcked(client().admin().indices().prepareDelete(USER_INDEX));
}

private void assertThreadPoolsBlocked() {

var e1 = expectThrows(
EsRejectedExecutionException.class,
() -> client().prepareIndex(USER_INDEX).setSource(Map.of("foo", "bar")).get()
);
assertThat(e1.getMessage(), startsWith("rejected execution of TimedRunnable"));
var e2 = expectThrows(EsRejectedExecutionException.class, () -> client().prepareGet(USER_INDEX, "id").get());
assertThat(e2.getMessage(), startsWith("rejected execution of ActionRunnable"));
var e3 = expectThrows(
SearchPhaseExecutionException.class,
() -> client().prepareSearch(USER_INDEX)
.setQuery(QueryBuilders.matchAllQuery())
// Request times out if max concurrent shard requests is set to 1
.setMaxConcurrentShardRequests(usually() ? SearchRequest.DEFAULT_MAX_CONCURRENT_SHARD_REQUESTS : randomIntBetween(2, 10))
.get()
);
assertThat(e3.getMessage(), containsString("all shards failed"));
}

protected void runWithBlockedThreadPools(Runnable runnable) throws Exception {
Phaser phaser = new Phaser();

// register this test's thread
phaser.register();

for (String nodeName : internalCluster().getNodeNames()) {
ThreadPool threadPool = internalCluster().getInstance(ThreadPool.class, nodeName);
for (String threadPoolName : threadPoolsToBlock()) {
blockThreadPool(threadPoolName, threadPool, phaser);
fillThreadPoolQueues(threadPoolName, threadPool);
}
}

assertBusy(
pgomulka marked this conversation as resolved.
Show resolved Hide resolved
() -> assertThat(phaser.getArrivedParties(), equalTo(threadPoolsToBlock().size() * internalCluster().getNodeNames().length))
);
logger.debug("number of nodes " + internalCluster().getNodeNames().length);
logger.debug("number of parties arrived " + phaser.getArrivedParties());
phaser.arriveAndAwaitAdvance();
try {
runnable.run();
} finally {
phaser.arriveAndAwaitAdvance();
}
}

private static void blockThreadPool(String threadPoolName, ThreadPool threadPool, Phaser phaser) {
ThreadPool.Info info = threadPool.info(threadPoolName);

Runnable waitAction = () -> {
phaser.arriveAndAwaitAdvance();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: maybe add a comment to indicate which step each of the arriveAndAwait guards

phaser.arriveAndAwaitAdvance();
};

phaser.bulkRegister(info.getMax());

for (int i = 0; i < info.getMax(); i++) {
// we need to make sure that there is a task blocking a thread pool
// otherwise a queue might end up having a spot
Future<?> submit = null;
do {
try {
submit = threadPool.executor(threadPoolName).submit(waitAction);
} catch (EsRejectedExecutionException e) {}
} while (submit == null);
}
}

private static void fillThreadPoolQueues(String threadPoolName, ThreadPool threadPool) {
ThreadPool.Info info = threadPool.info(threadPoolName);

for (int i = 0; i < info.getQueueSize().singles(); i++) {
Future<?> submit = null;
do {
try {
submit = threadPool.executor(threadPoolName).submit(() -> {});
} catch (EsRejectedExecutionException e) {}
} while (submit == null);
pgomulka marked this conversation as resolved.
Show resolved Hide resolved
// we can't be sure that some other task won't get queued in a test cluster
}
}

}

This file was deleted.