Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prevent concurrent tasks from parallelizing further #12569

Merged
merged 6 commits into from
Sep 20, 2023

Conversation

javanna
Copy link
Contributor

@javanna javanna commented Sep 18, 2023

Concurrent search is currently applied once per search call, either when search is called, or when concurrent query rewrite happens. They generally don't happen within one another. There are situations in which we are going to introduce parallelism in places where there could be multiple inner levels of parallelism requested as each task could try to parallelize further. In these cases, with certain executor implementations, like ThreadPoolExecutor, we may deadlock as we are waiting for all tasks to complete but they are waiting for threads to free up to complete their execution.

This commit introduces a simple safeguard that makes sure that we only parallelize via the executor at the top-level invokeAll call. When each task tries to parallelize further, we just execute them directly instead of submitting them to the executor.

Concurrent search is currently applied once per search call, either when
search is called, or when concurrent query rewrite happens. They
generally don't happen within one another. There are situations in which
we are going to introduce parallelism in places where there could be
multiple inner levels of parallelism requested as each task could try to
parallelize further. In these cases, with certain executor
implementations, like ThreadPoolExecutor, we may deadlock as we are
waiting for all tasks to complete but they are waiting for threads to
free up to complete their execution.

This commit introduces a simple safeguard that makes sure that we only
parallelize via the executor at the top-level invokeAll call. When each
task tries to parallelize further, we just execute them directly instead
of submitting them to the executor.
@javanna javanna marked this pull request as ready for review September 18, 2023 18:36
@javanna javanna added this to the 9.8.0 milestone Sep 18, 2023
Comment on lines 41 to 42
* This is to prevent deadlock with certain types of executors, as well as to limit the level of
* parallelism.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If have a question here, since we are limiting the levels of parallelism to 1 will we be not affecting the executor implementations that won't deadlock due to this and instead wants to make use of multi level parallelism instead. Will we not be restricting those opportunities and maybe in future we would also want to multi-level parallelism?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good question. I assumed that once we are in a concurrent task, parallelizing further may cause more harm than good, especially given the "block and wait" for all tasks to be completed. In practice, I wonder what other executor implementations are desirable to provide to the index searcher that are not thread pool based: say that you have an executor that creates a new thread for each execute request (does not seem like a good idea anyways), would we be ok allowing for more than one level of parallelism? Is that a valid usecase?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would remove the bit about limiting the level of parallelism. I don't think it's a goal, mostly a side effect of the logic to avoid deadlocks.

It's true that this might hurt executors that are not subject to deadlocks, but I would be very surprised if there were many users relying on it today since it can only happen when running a rewrite or a search from a rewrite or a search, which is not typical.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So it seems its thats not super important based on current use cases but maybe in future there are cases where we would like to have something like this i.e. to allow higher level of parallelism?. But I agree with @jpountz based on current scenario it might be a good choice to remove this bit instead and block directly if it is a exceutor thread.

final <T> List<T> invokeAll(Collection<RunnableFuture<T>> tasks) throws IOException {
for (Runnable task : tasks) {
executor.execute(task);
final <T> List<T> invokeAll(Collection<Task<T>> tasks) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a thought : To make us of this for concurrent search should we somehow ensure the caller always uses TaskExecutor's invokeAll instead of directly submitting the tasks to indexSearcher.getExecutor().execute(). Do we wish to enforce that somehow or leave it upto the user to decide how they would like to implement? I don't see any other usages of IndexSearcher#getExecutor in the codebase, though since its a public API I'm not sure if it would be right to deprecate this in this case. Appreciate any views on this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point: I'd consider deprecating getExecutor in favour of always going through the TaskExecutor.

*/
class TaskExecutor {
private static final ThreadLocal<Boolean> isConcurrentTask = ThreadLocal.withInitial(() -> false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of boolean should we allow the user to also pass the level of parallelism (L) which they would like to use with defaulting to 1 always like in this case? i.e. if any executor implementation(safeguarded from the deadlock situation) wants to make use of further concurrency they should be able to set that via an API in TaskExecutor?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would keep things simple and not allow this for now, but I'd like to hear what others think.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a few problems:

  • it is a static threadlocal, which is fine as it is unlikely that several TaskExecutors use the same threadpool. But there could be problems if you create two different TaskExecutors both with the same ThreadPool. In that case the TaskExecutors are no longer decoupled (one affects the other). It might not be a problem at all, but keep that in mind.
  • in the case of different TaskExecutors one task would set the thread local to false in its finally block, this may cause deadlock in the other TaskExecutor using same thread pool.
  • In addition the name isConcurrentTask is misleading, as the idea is to prevent more concurrent tasks from being executed in thread pool. It should maybe called "runSameThread".

To also support higher parallelism than 1, I'd change this to ThreadLocal<Integer> and increment on starting task and decrement in the finally. Then you could have a logic like "run in same thread if current value >=parallelism". This would also prevent the issues above, because when entering the run method it is incremented and when exiting it is decremented, so different executors can't confuse the other.

In general I am not fully happy with using a ThreadLocal here at all. Would it not be better to pass around the Task instance and the task instance has a method to spawn a subtask? This would be similar to fork/join framework where the RecursiveTask is used for exectly that.

IMHO, we should really switch to fork/join, as we need work stealing algorithms to prevent deadlocks.

Copy link
Contributor Author

@javanna javanna Sep 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am aware of some of the subtleties of thread locals. I started with a solution that did not use them but all in all it would do the same that thread locals do :) (e.g. keeping track of which thread runs what in a map).

I think that if we make sure that a single task executor is created, which is currently the case, we should be ok?

I am happy to address the naming as suggested, I was also not super happy with it.

I will play with the counter idea, though I don't think that allowing multiple levels of parallelism is required? Is that necessary in your opinion?

Would it not be better to pass around the Task instance and the task instance has a method to spawn a subtask?

I spent quite some time debating this with myself as well. The problem is that invokeAll does not have the current task. It may or may not be executed as part of a task. It looks like making the task available (without using thread locals!) would mean carrying the task around in many places, unless I am missing another way.

Thanks for all the feedback Uwe!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, the latest commit looks fine, as we have at least not the binary thread local anymore.

Small suggestion: Do we have a MutableInt class availble in Lucene? It would make it easier to decrement/increment. An alternative is to use ThreadLocal<int[]> with a one-length array. This would also prevent autoboxing. Just initialize with:

private static final ThreadLocal<int[]> runSameThread = ThreadLocal.withInitial(() -> new int[1]);

and use like this:

final int[] counter = runSameThread.get(); 
counter[0]++;
try {
....
} finally {
 counter[0]--;
}

My problem was mainly if external code like Elasticsearch passes a shared thread pool to multiple IndexSearchers (like different indexes on same node using same "searcher" thread pool).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it help? I would expect this number to always be 0 or 1, maybe 2 in rare cases, so we'd use the JVM's cached Integer instances?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Of course. The idea was that we'd only read the thread local once to get the mutable instance. Also to me code looks better. It was only a suggestion....

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense to move TaskExecutor under org.apache.lucene.util package as that seems like more suitable place to me for this sort of class and there could more probable usages of this class to achieve concurrency in various parts of codebase?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not entirely sure: one aspect is that I'd like to make sure that there is a single instance of TaskExecutor, which IndexSearcher creates. All other usages should go through the existing instance retrieved from the IndexSearcher. Does that seem like a reasonable expectation?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IndexSearcher should own the TaskExecutor and all queries/collectors can use it.

As this would change public methods, why not move to work-stealing fork/join here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

which public methods require changing? As far as I understand visibility of TaskExecutor needs to become publkic from package private, but that's not a breaking change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not move to work-stealing fork/join here?

There were concerns raised by others around that not being a good fit for I/O intensive operation. Also it's a bigger change and this change looked acceptable to move forward with #12183 .

Copy link
Contributor

@uschindler uschindler left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not happy with how the thread local is setup. It should be incrementing/decrementing instead binary true/false

*/
class TaskExecutor {
private static final ThreadLocal<Boolean> isConcurrentTask = ThreadLocal.withInitial(() -> false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a few problems:

  • it is a static threadlocal, which is fine as it is unlikely that several TaskExecutors use the same threadpool. But there could be problems if you create two different TaskExecutors both with the same ThreadPool. In that case the TaskExecutors are no longer decoupled (one affects the other). It might not be a problem at all, but keep that in mind.
  • in the case of different TaskExecutors one task would set the thread local to false in its finally block, this may cause deadlock in the other TaskExecutor using same thread pool.
  • In addition the name isConcurrentTask is misleading, as the idea is to prevent more concurrent tasks from being executed in thread pool. It should maybe called "runSameThread".

To also support higher parallelism than 1, I'd change this to ThreadLocal<Integer> and increment on starting task and decrement in the finally. Then you could have a logic like "run in same thread if current value >=parallelism". This would also prevent the issues above, because when entering the run method it is incremented and when exiting it is decremented, so different executors can't confuse the other.

In general I am not fully happy with using a ThreadLocal here at all. Would it not be better to pass around the Task instance and the task instance has a method to spawn a subtask? This would be similar to fork/join framework where the RecursiveTask is used for exectly that.

IMHO, we should really switch to fork/join, as we need work stealing algorithms to prevent deadlocks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IndexSearcher should own the TaskExecutor and all queries/collectors can use it.

As this would change public methods, why not move to work-stealing fork/join here?

Copy link
Contributor

@jpountz jpountz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left some comments. In general I like this approach better than walking the stack.

Comment on lines 41 to 42
* This is to prevent deadlock with certain types of executors, as well as to limit the level of
* parallelism.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would remove the bit about limiting the level of parallelism. I don't think it's a goal, mostly a side effect of the logic to avoid deadlocks.

It's true that this might hurt executors that are not subject to deadlocks, but I would be very surprised if there were many users relying on it today since it can only happen when running a rewrite or a search from a rewrite or a search, which is not typical.

@javanna
Copy link
Contributor Author

javanna commented Sep 20, 2023

I pushed new commits to address the latest review comments, thanks for all the input. This should be ready now.

Copy link
Contributor

@jpountz jpountz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. It might be worth using CallerRunsPolicy with a small queue in tests sometimes, as this is an interesting case that will make tasks run in the current thread.

@javanna
Copy link
Contributor Author

javanna commented Sep 20, 2023

It might be worth using CallerRunsPolicy with a small queue in tests sometimes, as this is an interesting case that will make tasks run in the current thread.

Given that TaskExecutor runs in the caller thread when the counter is above zero, I was thinking that we already cover the situation where we increment the counter for the caller thread. And that has no effect other than executing in the caller thread any further task.

@javanna javanna merged commit 937ebd4 into apache:main Sep 20, 2023
4 checks passed
javanna added a commit that referenced this pull request Sep 20, 2023
Concurrent search is currently applied once per search call, either when
search is called, or when concurrent query rewrite happens. They
generally don't happen within one another. There are situations in which
we are going to introduce parallelism in places where there could be
multiple inner levels of parallelism requested as each task could try to
parallelize further. In these cases, with certain executor
implementations, like ThreadPoolExecutor, we may deadlock as we are
waiting for all tasks to complete but they are waiting for threads to
free up to complete their execution.

This commit introduces a simple safeguard that makes sure that we only
parallelize via the executor at the top-level invokeAll call. When each
task tries to parallelize further, we just execute them directly instead
of submitting them to the executor.

Co-authored-by: Adrien Grand <jpountz@gmail.com>
@javanna javanna removed this from the 9.8.0 milestone Sep 20, 2023
javanna added a commit that referenced this pull request Sep 20, 2023
javanna added a commit that referenced this pull request Sep 20, 2023
@javanna javanna added this to the 9.9.0 milestone Sep 20, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants