Skip to content

Fix race condition in KubernetesTaskRunner when task is added to the map#14643

Merged
kfaraz merged 6 commits intoapache:masterfrom
YongGang:fix-race-task
Jul 27, 2023
Merged

Fix race condition in KubernetesTaskRunner when task is added to the map#14643
kfaraz merged 6 commits intoapache:masterfrom
YongGang:fix-race-task

Conversation

@YongGang
Copy link
Contributor

@YongGang YongGang commented Jul 24, 2023

Description

Seems there is a multi-threading issue introduced from this change to KubernetesTaskRunner #14435
Following exception was thrown under high load:

org.apache.druid.java.util.common.ISE: Task [partial_dimension_cardinality_xxx] disappeared
	at org.apache.druid.k8s.overlord.KubernetesTaskRunner.doTask(KubernetesTaskRunner.java:167) ~[?:?]
	at org.apache.druid.k8s.overlord.KubernetesTaskRunner.runTask(KubernetesTaskRunner.java:151) ~[?:?]
	at org.apache.druid.k8s.overlord.KubernetesTaskRunner.lambda$null$0(KubernetesTaskRunner.java:138) ~[?:?]
	at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[?:?]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[?:?]
	at java.lang.Thread.run(Thread.java:829) ~[?:?]

It's due to task is added to tasks map from the main thread and in doTask (called by runTask) it will check task existence from a pool thread thus caused race condition as shown in the following code:

return tasks.computeIfAbsent(
        task.getId(), k -> new KubernetesWorkItem(task, exec.submit(() -> runTask(task)))
    ).getResult();

In this PR we changed the KubernetesWorkItem constructor to allow TaskStatusFuture set by a method after the instance has been initialized.

Release note

  • fix race condition in K8s task runner.

This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • a release note entry in the PR description.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

@kfaraz
Copy link
Contributor

kfaraz commented Jul 24, 2023

I feel that re-introducing locks is a step backwards. If the changes in #14435 seem to be causing trouble, we should just revert that commit rather than introduce a new kind of locking. This would also make sense given the upcoming release of Druid 27.


Alternatively,

IIUC, the problem here is that while one thread is in the middle of adding the work item using tasks.computeIfAbsent, the executor has already picked it up for running.

There are two easy ways to avoid that (unless I am missing something):

Option 1: Set result in KubernetesWorkItem only after work item has been added to map:

@Override
  public ListenableFuture<TaskStatus> run(Task task)
  {
    final KubernetesWorkItem workItem = tasks.computeIfAbsent(
        task.getId(), k -> new KubernetesWorkItem(task))
    );
    workItem.setResultIfRequired(
        exec.submit(() -> runTask(task))
    );
    return workItem.getResult();
  }

This requires creating a new method synchronized void setResultIfRequired() inside KubernetesWorkItem.
Something similar would need to be done for joinAsync method too.

Option 2: Start doTask only after computeIfAbsent has finished.

private TaskStatus runTask(Task task)
  {
    final AtomicReference<TaskStatus> taskStatus = new AtomicReference<>();
    tasks.compute(
        task.getId(), (taskId, workItem) -> {
          taskStatus.set(doTask(task, workItem, true));
          return workItem;
        }
    );
    return taskStatus.get();
  }

Something similar would have to be done for joinTask method too.


I personally prefer option 1 as it doesn't unncessarily block an executor thread (even if it is for a little bit) and is logically simpler to follow.
@YongGang , @georgew5656 , what do you think?

@YongGang
Copy link
Contributor Author

The problem of option 1 is the constructor of KubernetesWorkItem will call the superclass' constructor as well. And even if we opt to change both KubernetesWorkItem and TaskRunnerWorkItem class to add setResultIfRequired method is still not ideal and error prone as developers need to remember to call setResultIfRequired right after instance being created.

public class KubernetesWorkItem extends TaskRunnerWorkItem
{
  public KubernetesWorkItem(Task task, ListenableFuture<TaskStatus> statusFuture)
  {
    super(task.getId(), statusFuture);
    this.task = task;
  }

In this case I prefer to revert last commit. Though synchronized on tasks in all the operations seems overkill as it's already a ConcurrentHashMap but it makes the class more thread-safe (in theory).

@kfaraz
Copy link
Contributor

kfaraz commented Jul 24, 2023

I agree, we should not update the super class. The result field is not serializable and is there in the super class just for convenience. You can pass a null to it.

Since we have a specific use case, it is okay to override the default behaviour. You would need to maintain the result field at the KubernetesWorkItem level and override the getResult method to return this instead of the one in the super class.

If KubernetesWorkItem does not take a future in its constructor, then users would naturally know to have to set it. You can add javadocs to that effect and throw an exception in getResult if not set. So I am not sure the error prone concern is really valid.

@kfaraz
Copy link
Contributor

kfaraz commented Jul 24, 2023

Though synchronized on tasks in all the operations seems overkill as it's already a ConcurrentHashMap but it makes the class more thread-safe (in theory).

Yeah, it is certainly thread-safe (even in practice) to have everything be synchronized under the same lock but it also limits performance. A single bad call can potentially block everything.

We are trying to revisit all usages of locks in the Druid code base, and the original PR #14435 was in this same vein of improving performance.

@kfaraz kfaraz modified the milestone: 27.0 Jul 24, 2023
@georgew5656
Copy link
Contributor

georgew5656 commented Jul 24, 2023

@kfaraz @YongGang i think it it makes sense to revert removing the locks since this is a valid synchronization use case (the main thread inserting into the map has to happen before the worker thread reads from the map).

i think we can keep the non synchronized getRunningTasks/getKnownTasks/getPendingTasks from the other PR though.

edit: hmm just saw your other comment, let me look at that solution first.

i think solution 1 makes sense. there is still a concurrent access where the main thread is trying to set result while the worker thread is trying to get/set kubernetesPeonLifecycle but I afaik i don't think that should cause any issues.

i am wondering if there will be a issue with having a work item with no result future attached to it though? that might be the reason the constructor is the way it is. this could be tested out though i think.

@georgew5656
Copy link
Contributor

it seems to me like there are some methods in TaskQueue that assume that statusFuture will not be null, if we want to go with option 1 i think we need to filter getKnownTasks, getPendingTasks to check that result is not null.

@georgew5656
Copy link
Contributor

georgew5656 commented Jul 24, 2023

after thinking about this some more I feel like it's simpler to just leave the synchronized block in for run/joinAsync (run only happens once when tasks are run and joinAsync only happens once when the overlord is restarted), and in doTask (only happens once in worker threads). we can leave getPendingTasks, getKnownTasks, getRunningTasks unsynchronized and I think that will prevent most of the lock contention.

i think this is a safer solution for druid 27 and we can maybe investigate this change to remove synchronization entirely after doing some more scale testing to see how much it helps.

@kfaraz what do you think?

@kfaraz
Copy link
Contributor

kfaraz commented Jul 25, 2023

I feel like it's simpler to just leave the synchronized block in for run/joinAsync

@georgew5656 , @YongGang, if you prefer having the synchronized for now, then we can proceed with it. 👍🏻

i think this is a safer solution for druid 27 and we can maybe investigate this change to remove synchronization entirely after doing some more scale testing to see how much it helps.

This is not a blocker for Druid 27 as the bug is in a contrib extension. But yes, it would be better to have it addressed.

we can maybe investigate this change to remove synchronization entirely after doing some more scale testing to see how much it helps.

Removing this synchronization might not help a lot in performance as the critical section is very small and threads would not be blocked for long. The primary reason I did not prefer it was to have homogeneity in the code. It gets confusing to have synchronization in some places and not in the other places. And if we have synchronization in all places, then it does start affecting performance.

@YongGang
Copy link
Contributor Author

I feel like it's simpler to just leave the synchronized block in for run/joinAsync

@georgew5656 , @YongGang, if you prefer having the synchronized for now, then we can proceed with it. 👍🏻

If we agree on this then I will discard the changes in this PR (the Option 1 solution) and work on partially revert this change #14435

@YongGang
Copy link
Contributor Author

I feel like it's simpler to just leave the synchronized block in for run/joinAsync

@georgew5656 , @YongGang, if you prefer having the synchronized for now, then we can proceed with it. 👍🏻

If we agree on this then I will discard the changes in this PR (the Option 1 solution) and work on partially revert this change #14435

Done. Please have a look @georgew5656 @kfaraz

Comment on lines 143 to 144
tasks.computeIfAbsent(task.getId(), k -> new KubernetesWorkItem(task, exec.submit(() -> doTask(task, run))));
return tasks.get(task.getId()).getResult();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this can be a single chained statement. Also it would look more readable if the arguments to computeIfAbsent were on a different line as in the original method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated.

throw new ISE("Task [%s] disappeared", task.getId());
}
if (workItem == null) {
throw new ISE("Task [%s] disappeared", task.getId());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we maybe just return a failed TaskStatus here instead of throwing an exception? The exception thrown by this method may or may not be handled by the calling code, but no point depending on that if we already know that the reason for the task failure.

We should do the same thing in the catch block too.

But this doesn't need to be done as a part of this PR, just wanted to call it out.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I looked at what other TaskRunners do so we can have consistent behavior (throw error or return task failure).
In ThreadingTaskRunner, seems the code is similar to what we have here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, we can revisit this later.

try {
Task task = adapter.toTask(job);
tasks.add(Pair.of(task, joinAsync(task)));
restoredTasks.add(Pair.of(task, runOrJoinTask(task, false)));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I preferred the original separation between joinTask and runTask. Passing a boolean is cryptic and makes the code less readable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated. My thinking was since we added synchronized block (or lock in the previous commit), it's better to have a single place/method to have this complexity. But agree this may make the code less readable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that is why I didn't give this feedback originally. But upon reading the code again, it felt cleaner to have them separate.

}

protected ListenableFuture<TaskStatus> joinAsync(Task task)
protected ListenableFuture<TaskStatus> runOrJoinTask(Task task, boolean run)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was better to have this as two separate methods, seemed more readable and easy to understand.

Copy link
Contributor

@georgew5656 georgew5656 Jul 26, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah agree, we can just put the synchronized block back in in (revert parts of that previous pr i made to remove them) without changing any function names

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. My comment as above:

My thinking was since we added synchronized block (or lock in the previous commit), it's better to have a single place/method to have this complexity. But agree this may make the code less readable.

Copy link
Contributor

@kfaraz kfaraz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks a lot for your patience with the back and forth on this PR, @YongGang ! 🙂

@kfaraz
Copy link
Contributor

kfaraz commented Jul 27, 2023

Some checks were skipped, re-triggering them.

@kfaraz kfaraz closed this Jul 27, 2023
@kfaraz kfaraz reopened this Jul 27, 2023
@kfaraz kfaraz merged commit 9b88b78 into apache:master Jul 27, 2023
@YongGang YongGang deleted the fix-race-task branch August 2, 2023 17:30
@LakshSingla LakshSingla added this to the 28.0 milestone Oct 12, 2023
FrankChen021 pushed a commit that referenced this pull request Feb 3, 2025
…map (#14643)

Changes:
- Fix race condition in KubernetesTaskRunner introduced by #14435 
- Perform addition and removal from map inside a synchronized block
- Update tests
GabrielCWT pushed a commit to GabrielCWT/druid that referenced this pull request Sep 9, 2025
…map (apache#14643)

Changes:
- Fix race condition in KubernetesTaskRunner introduced by apache#14435 
- Perform addition and removal from map inside a synchronized block
- Update tests
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants