Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Generic threads deadlock related to ILMHistoryStore indexing #68468

Closed
DaveCTurner opened this issue Feb 3, 2021 · 24 comments · Fixed by #91238
Closed

Generic threads deadlock related to ILMHistoryStore indexing #68468

DaveCTurner opened this issue Feb 3, 2021 · 24 comments · Fixed by #91238
Assignees
Labels
>bug :Data Management/ILM+SLM Index and Snapshot lifecycle management Team:Data Management Meta label for data/management team

Comments

@DaveCTurner
Copy link
Contributor

DaveCTurner commented Feb 3, 2021

Elasticsearch version (bin/elasticsearch --version): 7.10.1

Plugins installed: Cloud

JVM version (java -version): 15.0.1+9

OS version (uname -a if on a Unix-like system): Cloud

Description of the problem including expected versus actual behavior:

All generic threads are waiting with the following stack trace:

at java.util.concurrent.locks.LockSupport.park(Ljava/lang/Object;)V (LockSupport.java:211)                                                                                                             
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(Ljava/util/concurrent/locks/AbstractQueuedSynchronizer$Node;IZZZJ)I (AbstractQueuedSynchronizer.java:714)                             
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(I)V (AbstractQueuedSynchronizer.java:937)                                                                                             
at java.util.concurrent.locks.ReentrantLock$Sync.lock()V (ReentrantLock.java:153)                                                                                                                      
at java.util.concurrent.locks.ReentrantLock.lock()V (ReentrantLock.java:322)                                                                                                                           
at org.elasticsearch.action.bulk.BulkProcessor.internalAdd(Lorg/elasticsearch/action/DocWriteRequest;)V (BulkProcessor.java:379)                                                                       
at org.elasticsearch.action.bulk.BulkProcessor.add(Lorg/elasticsearch/action/DocWriteRequest;)Lorg/elasticsearch/action/bulk/BulkProcessor; (BulkProcessor.java:361)                                   
at org.elasticsearch.action.bulk.BulkProcessor.add(Lorg/elasticsearch/action/index/IndexRequest;)Lorg/elasticsearch/action/bulk/BulkProcessor; (BulkProcessor.java:347)                                
at org.elasticsearch.xpack.ilm.history.ILMHistoryStore.lambda$putAsync$0(Lorg/elasticsearch/action/index/IndexRequest;Lorg/elasticsearch/xpack/ilm/history/ILMHistoryItem;)V (ILMHistoryStore.java:150)
at org.elasticsearch.xpack.ilm.history.ILMHistoryStore$$Lambda$8091+0x0000000801eda370.run()V (Unknown Source)                                                                                         
at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingRunnable.run()V (ThreadContext.java:678)                                                                                    
at java.util.concurrent.ThreadPoolExecutor.runWorker(Ljava/util/concurrent/ThreadPoolExecutor$Worker;)V (ThreadPoolExecutor.java:1130)                                                                 
at java.util.concurrent.ThreadPoolExecutor$Worker.run()V (ThreadPoolExecutor.java:630)                                                                                                                 
at java.lang.Thread.run()V (Thread.java:832)                                                                                                                                                           

Meanwhile one of the scheduler threads is holding the lock on which they're waiting, and is blocked here:

jdk.internal.misc.Unsafe.park(ZJ)V (Native Method)                                                                                                                     
java.util.concurrent.locks.LockSupport.park(Ljava/lang/Object;)V (LockSupport.java:211)                                                                                
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(Ljava/util/concurrent/locks/AbstractQueuedSynchronizer$Node;IZZZJ)I (AbstractQueuedSynchronizer.java:714)
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(I)V (AbstractQueuedSynchronizer.java:1046)                                            
java.util.concurrent.Semaphore.acquire()V (Semaphore.java:318)                                                                                                         
org.elasticsearch.action.bulk.BulkRequestHandler.execute(Lorg/elasticsearch/action/bulk/BulkRequest;J)V (BulkRequestHandler.java:59)                                   
org.elasticsearch.action.bulk.BulkProcessor.execute(Lorg/elasticsearch/action/bulk/BulkRequest;J)V (BulkProcessor.java:454)                                            
org.elasticsearch.action.bulk.BulkProcessor.execute()V (BulkProcessor.java:463)                                                                                        
org.elasticsearch.action.bulk.BulkProcessor.access$400(Lorg/elasticsearch/action/bulk/BulkProcessor;)V (BulkProcessor.java:54)                                         
org.elasticsearch.action.bulk.BulkProcessor$Flush.run()V (BulkProcessor.java:503)                                                                                      
org.elasticsearch.threadpool.Scheduler$ReschedulingRunnable.doRun()V (Scheduler.java:213)                                                                              
org.elasticsearch.common.util.concurrent.AbstractRunnable.run()V (AbstractRunnable.java:37)                                                                            
java.util.concurrent.Executors$RunnableAdapter.call()Ljava/lang/Object; (Executors.java:515)                                                                           
java.util.concurrent.FutureTask.run()V (FutureTask.java:264)                                                                                                           
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run()V (ScheduledThreadPoolExecutor.java:304)                                                     
java.util.concurrent.ThreadPoolExecutor.runWorker(Ljava/util/concurrent/ThreadPoolExecutor$Worker;)V (ThreadPoolExecutor.java:1130)                                    
java.util.concurrent.ThreadPoolExecutor$Worker.run()V (ThreadPoolExecutor.java:630)                                                                                    
java.lang.Thread.run()V (Thread.java:832)                                                                                                                              

The semaphore on which it is waiting is held, apparently, by another ongoing flush. I haven't chased this any further but I could believe that the ongoing flush needs a generic thread to make progress.

Steps to reproduce:

Unknown.

Provide logs (if relevant):

Not available, but I can share a heap dump privately.

Workaround:

The deadlocked node must be restarted, it will not recover on its own. If the issue persists then the problematic component can be disabled by setting indices.lifecycle.history_index_enabled: false in the elasticsearch.yml file on each master-eligible node and then restarting them all.

@DaveCTurner DaveCTurner added >bug :Data Management/ILM+SLM Index and Snapshot lifecycle management needs:triage Requires assignment of a team area label labels Feb 3, 2021
@elasticmachine elasticmachine added the Team:Data Management Meta label for data/management team label Feb 3, 2021
@elasticmachine
Copy link
Collaborator

Pinging @elastic/es-core-features (Team:Core/Features)

@DaveCTurner DaveCTurner removed the needs:triage Requires assignment of a team area label label Feb 3, 2021
@jakelandis
Copy link
Contributor

jakelandis commented Feb 4, 2021

It looks like there is incorrect lock that the ILM history store configuration is exacerbating.

Watcher and ILM history store both use the BulkProcessor which can build up requests for a given time period,size, or a given number of events before attempting to send off those request. The ILM history store configuration is most likely to attempt to send requests via a time period due to this configuration:

            .setBulkActions(100)
            .setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB))
            .setFlushInterval(TimeValue.timeValueSeconds(5))
            .setConcurrentRequests(1)

This means "flush" every 5 seconds, 100 documents, or 5MB which ever comes first and make sure that there is only 1 outstanding bulk request (else block the submitting thread). The internal data structures for adding documents to the bulk request are not thread safe, so the intent is to lock just before adding to the bulk request and unlock just after adding to the bulk request. This is a very fast operation and should minimal impact on performance. After the lock is released (in the internalAdd method) it will send the request if the number of documents is over 100, or the size is over 5MB, but "flush" is handled by scheduled thread execution. Due to the way ILM history store is configured is almost always going to use "flush" by scheduled thread execution way to send the request (we don't expect 100+ or 5MB worth of ILM history every 5s).

    private void internalAdd(DocWriteRequest<?> request) {
        //bulkRequest and instance swapping is not threadsafe, so execute the mutations under a lock.
        //once the bulk request is ready to be shipped swap the instance reference unlock and send the local reference to the handler.
        Tuple<BulkRequest, Long> bulkRequestToExecute = null;
        lock.lock();
        try {
            ensureOpen();
            bulkRequest.add(request);
            bulkRequestToExecute = newBulkRequestIfNeeded();
        } finally {
            lock.unlock();
        }
        //execute sending the local reference outside the lock to allow handler to control the concurrency via it's configuration.
        if (bulkRequestToExecute != null) {
            execute(bulkRequestToExecute.v1(), bulkRequestToExecute.v2());
        }
    }

So, for the document count, or size, things should be fine....and this is primary execution for Watcher (document count) so it makes sense we don't see this for Watcher (on this version) even though it uses (a different instance and config) the BulkProcessor.

The problem is that the Flush runnable uses the same lock as internalAdd...and it's call into execute() is NOT fast. When the concurrency is set to one it is blocking and has to wait on the execute() method to finish before it will unlock the the lock. Execute is actually doing the work of calling the index action and its OK that that work is blocking since it is configured to do so with a concurrency of 1 (it only blocks the schedeluer thread) but the mistake here is that it also holds the lock to internalAdd. The fix here should be to ensure that from the Flush runnable [1] the lock is not held for the entire duration of the execute(..) method, only long enough to mutate the internal data structures, and call the execute (with 2 params) outside of the lock similar to how internalAdd unlocks then calls execute.

Note - There are actually 2 execute(..) methods...one is safe to call with out a lock and the other is not (there are comments in the code). I believe that this can be compounded by an retry with an incremental backoff in the case of a failure (holding the lock even longer).

[1]

class Flush implements Runnable {
        @Override
        public void run() {
            lock.lock();
            try {
                if (closed) {
                    return;
                }
                if (bulkRequest.numberOfActions() == 0) {
                    return;
                }
                execute();
            } finally {
                lock.unlock();
            }
        }
    }

I think the only way to reproduce would be put a long sleep() (longer then 5 seconds) just after the execute() method in the flush runnable [1], set ILM polling interval to 5 second, and create an ILM policy to rollover ever 1 document and feed it documents with something like the Logstash generator. Eventually, I think it will deadlock.

On a side note... this whole class could use a complete rewrite to less lock-y.

@jonathanl-telenav
Copy link

jonathanl-telenav commented Jun 21, 2021

I've run into this before. Batch processing queues are subtly difficult. The only way I have been able to get this kind of code to work is by hiding all the locking functionality and creating a new abstraction (let me know if you find issues I can fix, this particular pair of classes hasn't got any real testing yet):

StateWatcher.java
StateMachine.java

@jonathanl-telenav
Copy link

I made a few changes to this code so you might be able to use or adapt it:

Batcher.java

@original-brownbear
Copy link
Member

This one is pretty serious now. With the improvements to ILM which trigger a much higher rate of ILM task completion than previously possible, this reproduces a deadlock in our many shards benchmarking reliably after a couple of hours.

fcofdez added a commit to fcofdez/elasticsearch that referenced this issue May 17, 2022
This commit releases the BulkProcessor lock after a new bulk request
is created during flush instead of waiting until the request is
dispatched. This prevents a deadlock when the GENERIC threadpool
is saturated with tasks adding new items to the BulkProcessor
waiting to obtain the lock, this might prevent the original
BulkRequest of making progress in certain scenarios, as some
retry operations might be dispatched in the GENERIC threadpool.

Closes elastic#68468
fcofdez added a commit to fcofdez/elasticsearch that referenced this issue May 17, 2022
This commit releases the BulkProcessor lock after a new bulk request
is created during flush instead of waiting until the request is
dispatched. This prevents a deadlock when the GENERIC threadpool
is saturated with tasks adding new items to the BulkProcessor
waiting to obtain the lock, this might prevent the original
BulkRequest of making progress in certain scenarios, as some
retry operations might be dispatched in the GENERIC threadpool.

Closes elastic#68468
@jaymode
Copy link
Member

jaymode commented Jul 27, 2022

Bump to see if any movement can be made here. This one can negatively affect clusters in ESS and make things difficult to operate/fix.

@dakrone
Copy link
Member

dakrone commented Jul 28, 2022

@jaymode we discussed this a bit more today within the DM team and @joegallo is going to have a look at #86862 to try to move it along.

@masseyke
Copy link
Member

I was able to reproduce this (or at least two variations of it, but there's no way to know for sure if it's what customers are seeing) in a couple of new tests in ILMHistoryStoreTests. The basics are:
Submit a batch that fails with a TOO_MANY_REQUESTS failure in order to force a retry
Sleep for 5 seconds in the thread doing the retry before doing any work
Submit another batch that fails with an Error.

The reason that this causes deadlock is that that first batch grabs the BulkRequestProcessor semaphore and does not let go of in between retries, but does let go of the BulkProcessor lock. So then the 2nd batch gets the BulkProcessor lock but cannot get the BulkRequestProcessor semaphore. Then the 1st batch wakes up and tries to get the BulkProcessor lock and can't.

If I change the BulkRequestProcessor code so that the semaphore is acquired and released by the rerun consumer then my tests pass. But it's not entirely clear to me what exactly that semaphore is meant to protect (based on Jake's comment I think it's just the client call in the consumer?) so I'm not sure what other problems this might unleash.

@DaveCTurner
Copy link
Contributor Author

I'm not sure we can fix this in a satisfactory way just by adjusting the locking discipline. Today we block threads when overloaded which kind of works as a backpressure mechanism, but if we move away from that then we need to think about how else the system will behave when overloaded. There's currently no facility for shedding load (e.g. dropping docs) so effectively we have an unbounded queue somewhere, and that's going to cause trouble.

@masseyke
Copy link
Member

We could go the other way then right? Don't release the BulkProcessor lock during retries. That would slow things down when we have retries but it sounds like (1) that's probably a good thing and (2) it's better than deadlock.

@masseyke
Copy link
Member

Also for what it's worth, the PR at #86862 doesn't fix my test.

@DaveCTurner
Copy link
Contributor Author

We could go the other way then right?

Unclear. I think that would still mean we could block every single GENERIC thread, and that seems risky given the chances that something on which they're waiting might need one of those threads.

@original-brownbear
Copy link
Member

There's currently no facility for shedding load (e.g. dropping docs) so effectively we have an unbounded queue somewhere, and that's going to cause trouble.

++ the choice here, no matter what we do is between either eventually running out of memory or blocking the cluster state updates unless we drop things eventually. Given that we can't block CS updates, we simply have to introduce some load shedding solution here in my opinion.

@masseyke
Copy link
Member

Well on the bright side I've got something that reproduces it (although it's definitely possible that this is not the path we have seen in production) at https://github.com/elastic/elasticsearch/compare/main...masseyke:fix/ilm-history-deadlock?expand=1. So we can try whatever we do against that.

@masseyke
Copy link
Member

OK after looking at this some more, I think we have a handful of problems here that we might want to address separately:

  1. We have an unbounded number of BulkRequests that can be in flight in the BulkRequestHandler::execute method if (and only if) those are retries.
  2. We have a deadlock situation, where two different threads each have one lock and want another. This also only happens if there are bulk retries. This is artificially preventing the number of BulkRequests from # 1 from getting too large right now, but in maybe the most unpleasant way imaginable.
  3. In the "normal" (non-retry) case we only allow at most as many BulkRequests to be in flight as there are threads in the pool, and each BulkRequest has a bound to the number of documents allowed in it. And calls to BulkProcessor::add are throttled because they require the BulkProcessor lock, which is owned by the thread(s) executing the updates. So while there's no unbounded queue in this case, BulkProcessor users could be waiting an unbounded amount of time waiting to acquire the BulkProcessor lock, and to them it might be not much better than deadlock.

For # 1 we could pretty easily start keeping track of the number of BulkRequests in flight, and if there are too many (some configurable number) throw a EsRejectedExecutionException (not sure if you had something different in mind for "load shedding"). I don't really know how that would impact client code but at that point the cluster is in pretty bad shape so maybe it's the best we can do.

For # 2 I think we can just change BulkRequestHandler::execute to acquire and release the semaphore within the consumer's accept method. That makes it so that it's not possible that we ever try to acquire the BulkProcessor lock while we have the BulkRequestHandler semaphore (i.e. to get the locks in the wrong order). The downside of doing that alone is that if we get into a situation where we're doing a bunch of retries, we can wind up with an unbounded number of retries. So we'd probably want to do that in conjunction with solving problem # 1.

I'm not sure if # 3 is a big problem in practice. We could start rejecting things if we can't get the BulkProcessor lock within some amount of time, or queue things up and start rejecting things once the queue is full. But that seems like the lowest priority of the 3 to deal with. Maybe once we have 1 and 2 solved we can stop requiring that we own the BulkProcessor lock in order to execute updates (and only use that for the add), and solve this one the same way we do # 1.

@DaveCTurner
Copy link
Contributor Author

That sounds mostly right. I'm not convinced this only applies to retries: I think the deadlock described in the OP is because there's an ongoing (scheduled) flush which needs a GENERIC thread to proceed but all the GENERIC threads are blocked waiting for that flush to finish, and that can happen even without retries.

However I'm +1 on tracking the number (or total size) of the requests in flight and throwing some kind of exception to reject work when it reaches some bound.

Moreover I think if we had such tracking we should be able to do what we need, safely, without any blocking (locks/semaphores/...) at all.

@masseyke
Copy link
Member

masseyke commented Oct 3, 2022

I haven't been able to reproduce deadlock without retries. The reason is that (I believe but will double-check) the first try happens on the same thread that the add happens on (the generic thread) right? So as long as we're always acquiring the locks in the same order (which we currently do unless there's a retry) there's no way to deadlock is there?

@repantis
Copy link

Hi @joegallo, @masseyke, what would be the best way to resolve this issue? We had another red cluster seemingly due to this problem. Many thanks to you and everyone else that has worked on this so far.

@masseyke
Copy link
Member

Sorry I missed your question @repantis. We're actively looking at this one. The team is discussing it on Monday with the hope of tackling it during our upcoming tech debt week. You didn't happen to get a hot_threads dump from the customer who recently ran into this did you?

@masseyke
Copy link
Member

Here's what I'm proposing (which I think is in line with what @DaveCTurner suggested):
We change the code to put a bounded queue in the middle. The BulkProcessor lock is only needed to make changes to the BulkRequest while we're building it up. When it is time (whether by doc count, size, time, or explicit flush) to start a new BulkRequest, it puts the completed BulkRequest onto this new queue. If the queue is already full, we throw an EsRejectedExecutionException.
Another runnable is periodically checking the queue, and if it is able to pull of a BulkRequest it schedules it (on a generic thread?) to be loaded. We still have a semaphore here to make sure that no more than X BulkRequests are in flight, but that semaphore lock is held only while the single BulkRequest attempt is made (vs. how currently it is held for all retries). Any retries are bundled into BulkRequests and put back on the queue to be handled like anything else.
This way:

  • We are no longer doing any slow work while we hold the BulkProcessor lock
  • We no longer have an unbounded queue of BulkRequests to be retried
  • We no longer require a thread to obtain multiple locks, and as far as I know nothing is blocking so we would no longer ever run into the case where we would get stuck if we could not get another thread (I'd feel a lot better if we could reproduce the problem the way customers have though)
  • We still protect BulkRequests so that multiple threads can build them up
  • We still enforce a limit on the number of BulkRequests that can be in flight to the server

As a related thing we can do -- now that BulkProcessor.add() no longer has the risk of being a slow blocking operation, we can change ILMHistoryStore.putAsync to not schedule a task on another thread. I don't think that this would have been a problem in practice, but that's actually another unbounded queue we currently have (there's no bound on the number of tasks we can give the scheduler at once). This changes it so that BulkProcessor's current BulkRequest is effectively a blocking queue (we could make that a rejecting queue instead but at least for ILMHistoryStore I don't think that that will be needed(?).

@DaveCTurner
Copy link
Contributor Author

All sounds reasonable apart from this bit:

This changes it so that BulkProcessor's current BulkRequest is effectively a blocking queue (we could make that a rejecting queue instead but at least for ILMHistoryStore I don't think that that will be needed(?).

A blocking queue sounds like there's still a risk of deadlock here, although the risk may be smaller (or at least different) after these proposed changes. Of course this depends on the details of the implementation, particularly regarding how work is allocated to threads, but given the variety of threadpools involved in indexing and the way these things change over time I think in the limit we should opt to drop work rather than block any threads.

@repantis
Copy link

Sorry I missed your question @repantis. We're actively looking at this one. The team is discussing it on Monday with the hope of tackling it during our upcoming tech debt week. You didn't happen to get a hot_threads dump from the customer who recently ran into this did you?

Great to hear, @masseyke, thank you for confirming. @original-brownbear had asked for a thread dump but we didn't capture it while the issue was occurring unfortunately.

@masseyke
Copy link
Member

A blocking queue sounds like there's still a risk of deadlock here, although the risk may be smaller (or at least different) after these proposed changes. Of course this depends on the details of the implementation, particularly regarding how work is allocated to threads, but given the variety of threadpools involved in indexing and the way these things change over time I think in the limit we should opt to drop work rather than block any threads.

I think I agree with you. Right now we have an unbounded queue there instead (the queue of things in the scheduler scheduled to be added to the BulkRequest). I don't think we have enormous volume here and we get rid of them pretty quickly? But maybe it would be safest to put in a bounded rejecting queue of DocWriteRequests. So we'd have a queue of DocWriteRequests waiting to be bundled into a BulkReuqest, and then a queue of BulkRequests waiting to be sent to the server. And we'd throw EsRejectedExecutionExceptions if either queue were full.

@DaveCTurner
Copy link
Contributor Author

Yes, I would expect us to set limits that almost never trip when everything else is working as expected. The limits are more for dealing with a backlog caused by some other breakage.

elasticsearchmachine pushed a commit that referenced this issue Jan 9, 2023
We have been seeing deadlocks in ILMHistoryStore in production (#68468).
The deadlock appears to be caused by the fact that BulkProcessor uses
two locks (`BulkProcessor.lock` and `BulkRequestHandler.semaphore`) and
holds onto the latter lock for an indefinite amount of time.

This PR avoids deadlock by using a new BulkProcessor2 that does not
require that both locks be held at once, and drastically shortens the
amount of time that either lock needs to be held. It does this by adding
a new queue (in the Retry2 class). Note that we have left the original
BulkProcessor in place, and have cloned/modified it into BulkProcessor2.
BulkProcessor2 for now is only used by ILMHistoryStore but will likely
replace BulkProcessor altogether in the near future. 

The flow in the original BulkProcessor is like this:

- ILMHistoryStore adds IndexRequests to BulkProcessor asynchronously.
- BulkProcessor acquires its lock to build up a BulkRequest from these IndexRequests.
- If a call from ILMHistoryStore adds an IndexRequest that pushes the BulkRequest over its configured threshold (size or count) then it calls BulkRequestHandler to load that BulkRequest to the server.
- BulkRequestHandler must acquire a token from its semaphore to do this.
- It calls Client::bulk from the current thread.
- If it fails, it keeps the semaphore lock while it retries (possibly multiple times).

The flow in the new BulkProcessor2:

- ILMHistoryStore adds IndexRequests to BulkProcessor synchronously (since this part is very fast now).
- BulkProcessor acquires its lock to build up a BulkRequest from these IndexRequests.
- If a call from ILMHistoryStore adds an IndexRequest that pushes the BulkRequest over its configured threshold (size or count) then it calls Retry2::withBackoff to attempt to load the bulk a fixed number of times.
- If the number of bytes already in flight to Elasticsearch is higher than a configured number, or if Elasticsearch is too busy, the listener is notified with an EsRejectedExecutionException.
- Either way, control returns immediately and the BulkProcessor lock is released.

We are no longer using a semaphore to throttle how many concurrent
requests can be sent to Elasticsearch at once. And there is no longer
any blocking. Instead we throttle the total number of bytes in flight to
Elasticsearch (approximate), and allow Elasticsearch to throw an
EsRejectedExecutionException if it thinks there are too many concurrent
requests.

Closes #50440 Closes #68468
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
>bug :Data Management/ILM+SLM Index and Snapshot lifecycle management Team:Data Management Meta label for data/management team
Projects
None yet
10 participants