Skip to content

Commit

Permalink
Change Set<Task> to List<Task> to lower contention
Browse files Browse the repository at this point in the history
Lightweight asynchronous tasks are shared objects that hold their
data in thread-unsafe prism objects. This is to be fixed somehow.

Using HashSet<Task> structures makes this problem even worse because
hashCode/equals are called repetitively in various threads that
access those sets. So here we replace sets with lists to treat
at least some of the concurrency issues (MID-5136).
  • Loading branch information
mederly committed Feb 15, 2019
1 parent 3dab79a commit db9db4e
Show file tree
Hide file tree
Showing 7 changed files with 29 additions and 23 deletions.
Expand Up @@ -1018,9 +1018,9 @@ void savePendingModifications(OperationResult parentResult) throws ObjectNotFoun

boolean isLightweightAsynchronousTask();

Set<? extends Task> getLightweightAsynchronousSubtasks();
Collection<? extends Task> getLightweightAsynchronousSubtasks();

Set<? extends Task> getRunningLightweightAsynchronousSubtasks();
Collection<? extends Task> getRunningLightweightAsynchronousSubtasks();

boolean lightweightHandlerStartRequested();

Expand Down
Expand Up @@ -357,7 +357,7 @@ void modifyTask(String oid, Collection<? extends ItemDelta> modifications, Opera
*
* @return tasks that currently run on this node.
*/
Set<Task> getLocallyRunningTasks(OperationResult parentResult);
Collection<Task> getLocallyRunningTasks(OperationResult parentResult);

/**
* Returns the local scheduler information.
Expand Down
Expand Up @@ -37,6 +37,7 @@
import com.evolveum.midpoint.prism.ItemDefinition;
import com.evolveum.midpoint.prism.crypto.Protector;
import com.evolveum.midpoint.prism.path.ItemPath;
import com.evolveum.midpoint.prism.util.CloneUtil;
import com.evolveum.midpoint.repo.api.*;
import com.evolveum.midpoint.schema.*;
import com.evolveum.midpoint.task.api.*;
Expand Down Expand Up @@ -1879,7 +1880,7 @@ public String getNodeId() {
}

@Override
public Set<Task> getLocallyRunningTasks(OperationResult parentResult) {
public Collection<Task> getLocallyRunningTasks(OperationResult parentResult) {
return executionManager.getLocallyRunningTasks(parentResult);
}

Expand Down
Expand Up @@ -61,6 +61,7 @@
import javax.xml.datatype.XMLGregorianCalendar;
import javax.xml.namespace.QName;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;

import static com.evolveum.midpoint.prism.xml.XmlTypeConverter.createXMLGregorianCalendar;
Expand Down Expand Up @@ -108,9 +109,9 @@ public class TaskQuartzImpl implements Task {
* Lightweight asynchronous subtasks.
* Each task here is a LAT, i.e. transient and with assigned lightweight handler.
* <p>
* This must be synchronized, because interrupt() method uses it.
* This must be concurrent, because interrupt() method uses it.
*/
private Set<TaskQuartzImpl> lightweightAsynchronousSubtasks = Collections.synchronizedSet(new HashSet<>());
private Map<String, TaskQuartzImpl> lightweightAsynchronousSubtasks = new ConcurrentHashMap<>();
private Task parentForLightweightAsynchronousTask; // EXPERIMENTAL

/*
Expand Down Expand Up @@ -2588,7 +2589,8 @@ public Task createSubtask() {
public Task createSubtask(LightweightTaskHandler handler) {
TaskQuartzImpl sub = ((TaskQuartzImpl) createSubtask());
sub.setLightweightTaskHandler(handler);
lightweightAsynchronousSubtasks.add(sub);
assert sub.getTaskIdentifier() != null;
lightweightAsynchronousSubtasks.put(sub.getTaskIdentifier(), sub);
sub.parentForLightweightAsynchronousTask = this;
return sub;
}
Expand Down Expand Up @@ -2764,20 +2766,20 @@ public Future getLightweightHandlerFuture() {
}

@Override
public Set<? extends TaskQuartzImpl> getLightweightAsynchronousSubtasks() {
return Collections.unmodifiableSet(lightweightAsynchronousSubtasks);
public Collection<? extends TaskQuartzImpl> getLightweightAsynchronousSubtasks() {
return Collections.unmodifiableList(new ArrayList<>(lightweightAsynchronousSubtasks.values()));
}

@Override
public Set<? extends TaskQuartzImpl> getRunningLightweightAsynchronousSubtasks() {
public Collection<? extends TaskQuartzImpl> getRunningLightweightAsynchronousSubtasks() {
// beware: Do not touch task prism here, because this method can be called asynchronously
Set<TaskQuartzImpl> retval = new HashSet<>();
List<TaskQuartzImpl> retval = new ArrayList<>();
for (TaskQuartzImpl subtask : getLightweightAsynchronousSubtasks()) {
if (subtask.getExecutionStatus() == TaskExecutionStatus.RUNNABLE && subtask.lightweightHandlerStartRequested()) {
retval.add(subtask);
}
}
return Collections.unmodifiableSet(retval);
return Collections.unmodifiableList(retval);
}

@Override
Expand Down
Expand Up @@ -108,7 +108,7 @@ public boolean stopSchedulersAndTasks(Collection<String> nodeIdentifiers, long t

LOGGER.debug("{} task(s) found on nodes that are going down, stopping them.", taskInfoList.size());

Set<Task> tasks = new HashSet<>();
List<Task> tasks = new ArrayList<>();
for (ClusterStatusInformation.TaskInfo taskInfo : taskInfoList) {
try {
tasks.add(taskManager.getTask(taskInfo.getOid(), result));
Expand Down Expand Up @@ -225,7 +225,7 @@ public boolean stopAllTasksOnThisNodeAndWait(long timeToWait, OperationResult pa
result.addParam("timeToWait", timeToWait);

LOGGER.info("Stopping all tasks on local node");
Set<Task> tasks = localNodeManager.getLocallyRunningTasks(result);
Collection<Task> tasks = localNodeManager.getLocallyRunningTasks(result);
boolean retval = stopTasksRunAndWait(tasks, null, timeToWait, false, result);
result.computeStatus();
return retval;
Expand Down Expand Up @@ -544,7 +544,7 @@ public Set<String> getLocallyRunningTasksOids(OperationResult parentResult) {
return localNodeManager.getLocallyRunningTasksOids(parentResult);
}

public Set<Task> getLocallyRunningTasks(OperationResult parentResult) {
public Collection<Task> getLocallyRunningTasks(OperationResult parentResult) {
return localNodeManager.getLocallyRunningTasks(parentResult);
}

Expand Down Expand Up @@ -770,7 +770,7 @@ public Thread getTaskThread(String oid) {
public String getRunningTasksThreadsDump(OperationResult parentResult) {
OperationResult result = parentResult.createSubresult(ExecutionManager.DOT_CLASS + "getRunningTasksThreadsDump");
try {
Set<Task> locallyRunningTasks = taskManager.getLocallyRunningTasks(result);
Collection<Task> locallyRunningTasks = taskManager.getLocallyRunningTasks(result);
StringBuilder output = new StringBuilder();
for (Task task : locallyRunningTasks) {
try {
Expand All @@ -791,7 +791,7 @@ public String getRunningTasksThreadsDump(OperationResult parentResult) {
public String recordRunningTasksThreadsDump(String cause, OperationResult parentResult) throws ObjectAlreadyExistsException {
OperationResult result = parentResult.createSubresult(ExecutionManager.DOT_CLASS + "recordRunningTasksThreadsDump");
try {
Set<Task> locallyRunningTasks = taskManager.getLocallyRunningTasks(result);
Collection<Task> locallyRunningTasks = taskManager.getLocallyRunningTasks(result);
StringBuilder output = new StringBuilder();
for (Task task : locallyRunningTasks) {
try {
Expand Down Expand Up @@ -823,7 +823,7 @@ public String getTaskThreadsDump(String taskOid, OperationResult parentResult)
}
output.append("*** Root thread for task ").append(task).append(":\n\n");
addTaskInfo(output, localTask, rootThread);
for (Task subtask : new HashSet<>(localTask.getLightweightAsynchronousSubtasks())) {
for (Task subtask : localTask.getLightweightAsynchronousSubtasks()) {
TaskQuartzImpl subtaskImpl = (TaskQuartzImpl) subtask;
Thread thread = subtaskImpl.getLightweightThread();
output.append("** Information for lightweight asynchronous subtask ").append(subtask).append(":\n\n");
Expand Down
Expand Up @@ -54,6 +54,7 @@

import javax.xml.datatype.Duration;
import java.util.*;
import java.util.Objects;

@DisallowConcurrentExecution
public class JobExecutor implements InterruptableJob {
Expand Down Expand Up @@ -225,7 +226,7 @@ public void execute(JobExecutionContext context) throws JobExecutionException {

static class GroupExecInfo {
int limit;
Set<Task> tasks = new HashSet<>();
Collection<Task> tasks = new ArrayList<>();

GroupExecInfo(Integer l) {
limit = l != null ? l : Integer.MAX_VALUE;
Expand All @@ -235,7 +236,9 @@ public void accept(Integer limit, Task task) {
if (limit != null && limit < this.limit) {
this.limit = limit;
}
this.tasks.add(task);
if (tasks.stream().noneMatch(t -> Objects.equals(t.getOid(), task.getOid()))) { // just for sure
tasks.add(task);
}
}

@Override
Expand All @@ -257,7 +260,7 @@ private boolean checkExecutionConstraints(TaskQuartzImpl task, OperationResult r
for (Map.Entry<String, GroupExecInfo> entry : groupMap.entrySet()) {
String group = entry.getKey();
int limit = entry.getValue().limit;
Set<Task> tasksInGroup = entry.getValue().tasks;
Collection<Task> tasksInGroup = entry.getValue().tasks;
if (tasksInGroup.size() >= limit) {
RescheduleTime rescheduleTime = getRescheduleTime(executionConstraints,
DEFAULT_RESCHEDULE_TIME_FOR_GROUP_LIMIT, task.getNextRunStartTime(result));
Expand Down
Expand Up @@ -484,11 +484,11 @@ Thread getLocalTaskThread(@NotNull String oid) {
*
* @return
*/
Set<Task> getLocallyRunningTasks(OperationResult parentResult) {
Collection<Task> getLocallyRunningTasks(OperationResult parentResult) {

OperationResult result = parentResult.createSubresult(LocalNodeManager.class.getName() + ".getLocallyRunningTasks");

Set<Task> retval = new HashSet<>();
List<Task> retval = new ArrayList<>();

for (String oid : getLocallyRunningTasksOids(result)) {
OperationResult result1 = result.createSubresult(LocalNodeManager.class.getName() + ".getLocallyRunningTask");
Expand Down

0 comments on commit db9db4e

Please sign in to comment.