Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

elasticsearch doesn't honor queue size #28547

Closed
pznamensky opened this issue Feb 7, 2018 · 9 comments

Comments

@pznamensky
Copy link

commented Feb 7, 2018

Elasticsearch version:
Version: 5.6.5, Build: 6a37571/2017-12-04T07:50:10.466Z, JVM: 1.8.0_161

JVM version:
openjdk version "1.8.0_161"
OpenJDK Runtime Environment (build 1.8.0_161-b14)
OpenJDK 64-Bit Server VM (build 25.161-b14, mixed mode)

OS version :
Linux 4.9.58-18.el7.x86_64 #1 SMP x86_64 x86_64 GNU/Linux

Description of the problem including expected versus actual behavior:
Actual behavior:
Requests count in search queue is greater than queue_size.

curl -s http://localhost:9200/_cat/thread_pool/search?v&h=id,host,name,active,rejected,completed,queue,queue_size

id                     host                    name   active rejected completed queue queue_size
DEf4XLr0Su-KiwzbH3_XeA srv1                    search      0        0    202143     0          5
iBA0J8kETD-JBm5sc_B4lw srv2                    search      37    1013   6533777   769          5
dJ73fwGgSQuyodaH7bDwmQ srv3                    search      37    1366   6553343   213          5

Expected behavior
Requests count in search queue can't be greater than queue_size. Requests which came after queue had filled up should get 50x or 429 errors.

Steps to reproduce:

  1. add thread_pool.search.queue_size: 5 into elasticsearch.yml on all nodes
  2. restart nodes
  3. add index and fill it up
  4. do lots of heavy search request
  5. search queue will become greater than search queue_size
@jasontedor

This comment has been minimized.

Copy link
Member

commented Feb 7, 2018

I think this is a reporting problem only. The issue here is that we increment the size of the queued items, and then check if the capacity of the queue is exceeded. If the capacity of the queue is exceeded, we decrement the size, do not queue the item, and return to the caller that their offer to the queue was rejected. For the search thread pool, this leads to the thread pool task being rejected.

If you call /_cat/thread_pool between the increment of the size and the subsequent decrement of the size on an overstuffed queue, you could see the size of the queue being larger than its capacity. However, the task will be rejected. If you are concurrently submitting many search requests, I think it would be possible to see the size greatly exceed the capacity of the queue (as a single search request can lead to many search thread pool tasks).

@jasontedor

This comment has been minimized.

Copy link
Member

commented Feb 7, 2018

I submitted prematurely.

In short, I do not think there is a bug here. I think these tasks are correctly being rejected. Do you have evidence to the contrary?

@pznamensky

This comment has been minimized.

Copy link
Author

commented Feb 7, 2018

We've got some graphs that corresponds to this issue for simular cluster.
As you can see, we've run out all search threads and requests are accumulating in queue.
And at the same time thread pool rejection rate remains about 1 per second.
Which is strange.
Also, what HTTP code is using in case when task is be rejected?
As I remember there were no 429 codes, only ones of 50x.
es_queue

@jasontedor

This comment has been minimized.

Copy link
Member

commented Feb 7, 2018

Here is a failing test case that supports my theory:

public void testQueueSize() throws InterruptedException {
        final String threadPoolName = randomThreadPool(ThreadPool.ThreadPoolType.FIXED);
        final int size = 8;
        final int queueSize = 1;
        final long rejections = 256;

        ThreadPool threadPool = null;
        final Settings nodeSettings =
                Settings.builder()
                        .put("node.name", "testRejectedExecutionCounter")
                        .put("thread_pool." + threadPoolName + ".size", size)
                        .put("thread_pool." + threadPoolName + ".queue_size", queueSize)
                        .build();
        try {
            threadPool = new ThreadPool(nodeSettings);

            // these tasks will consume the thread pool causing further
            // submissions to queue
            final CountDownLatch latch = new CountDownLatch(size);
            final CountDownLatch block = new CountDownLatch(1);
            for (int i = 0; i < size; i++) {
                threadPool.executor(threadPoolName).execute(() -> {
                    try {
                        latch.countDown();
                        block.await();
                    } catch (InterruptedException e) {
                        fail(e.toString());
                    }
                });
            }

            // wait for the submitted tasks to be consumed by the thread
            // pool
            latch.await();

            // these tasks will fill the thread pool queue
            for (int i = 0; i < queueSize; i++) {
                threadPool.executor(threadPoolName).execute(() -> {});
            }

            // these tasks will be rejected
            AtomicLong counter = new AtomicLong();
            final ThreadPool capturedThreadPool = threadPool;
            final CountDownLatch rejectionLatch = new CountDownLatch(1);
            final List<Thread> threads = new ArrayList<>();
            for (int i = 0; i < size * rejections; i++) {
                final Thread thread = new Thread(() -> {
                    try {
                        rejectionLatch.await();
                    } catch (InterruptedException e) {
                        fail(e.toString());
                    }
                    try {
                        capturedThreadPool.executor(threadPoolName).execute(() -> { });
                    } catch (EsRejectedExecutionException e) {
                        counter.incrementAndGet();
                    }

                });
                threads.add(thread);
                thread.start();
            }

            final AtomicLong maxQueueSize = new AtomicLong();
            final AtomicBoolean spin = new AtomicBoolean(true);
            final Thread queueSizeThread = new Thread(() -> {
                try {
                    rejectionLatch.await();
                } catch (InterruptedException e) {
                    fail(e.toString());
                }
                while (spin.get()) {
                    maxQueueSize.set(Math.max(maxQueueSize.get(), (long) stats(capturedThreadPool, threadPoolName).getQueue()));
                }
            });
            queueSizeThread.start();

            rejectionLatch.countDown();
            for (final Thread thread : threads) {
                thread.join();
            }
            spin.set(false);
            queueSizeThread.join();
            block.countDown();

            assertThat(counter.get(), equalTo(size * rejections));
            assertThat(stats(threadPool, threadPoolName).getRejected(), equalTo(size * rejections));

            assertThat("queue capacity exceeded", maxQueueSize.get(), lessThanOrEqualTo((long) queueSize));
        } finally {
            terminateThreadPoolIfNeeded(threadPool);
        }
    }

This test works by occupying the threads with tasks that I do not let complete. Then, I stuff the queue with tasks. After that, all additional tasks should be rejected. While I am submitting tasks that will be rejected, I busy poll the number of queued tasks.

If my explanation is correct, I expect this test to fail because of the final assertion, the assertion that the queue size was always below the configured size of the queue. However, the number of rejections is correct as established by the preceding assertions showing that even though the queue size was exceeded, the tasks were burned off and rejected rather than actually being queued and executed.

Here is the failing output:

java.lang.AssertionError: queue capacity exceeded
Expected: a value less than or equal to <1L>
     but: <3L> was greater than <1L>

This establishes the size exceeding the capacity is merely a reporting artifact.

@jasontedor

This comment has been minimized.

Copy link
Member

commented Feb 7, 2018

Also, what HTTP code is using in case when task is be rejected?

@jasontedor

This comment has been minimized.

Copy link
Member

commented Feb 8, 2018

Another important note here is that some search phases can not be rejected. For example, the can match search phase can never be rejected so we force put into the queue these phases. This can also lead to the size exceeding the capacity.

@jasontedor

This comment has been minimized.

Copy link
Member

commented Feb 8, 2018

Looking at your graph (thank you!) I think that the queue is behaving correctly here. I opened #28557 to address the reporting problem that I spotted. Otherwise, I think that there is not an issue here so I am going to close this one out.

@jasontedor jasontedor closed this Feb 8, 2018

@pznamensky

This comment has been minimized.

Copy link
Author

commented Feb 8, 2018

@jasontedor, thanks for the explanation and fixing the metric.

@jasontedor

This comment has been minimized.

Copy link
Member

commented Feb 9, 2018

Thanks for the report @pznamensky.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
3 participants
You can’t perform that action at this time.