Skip to content

Commit

Permalink
Add checking for concurrent task execution
Browse files Browse the repository at this point in the history
It has been reported that in some environments a task started
concurrently on two nodes. This is an optional check for such
situations.
  • Loading branch information
mederly committed Dec 13, 2019
1 parent b710c51 commit 955f1a9
Show file tree
Hide file tree
Showing 5 changed files with 154 additions and 96 deletions.
Expand Up @@ -2513,8 +2513,8 @@
Execution status provides information about the task overall high-level execution state.
It tells whether the task is running/runnable, waits for something or is done.
</xsd:documentation>
<xsd:appinfo>
<a:displayName>TaskType.executionStatus</a:displayName>
<xsd:appinfo>
<a:displayName>TaskType.executionStatus</a:displayName>
</xsd:appinfo>
</xsd:annotation>
</xsd:element>
Expand Down
Expand Up @@ -65,6 +65,7 @@ public class TaskManagerConfiguration {
private static final String NODE_ALIVENESS_CHECK_INTERVAL = "nodeAlivenessCheckInterval";
private static final String NODE_ALIVENESS_TIMEOUT = "nodeAlivenessTimeout";
private static final String NODE_TIMEOUT_CONFIG_ENTRY = "nodeTimeout";
private static final String CHECK_FOR_TASK_CONCURRENT_EXECUTION_CONFIG_ENTRY = "checkForTaskConcurrentExecution";
private static final String USE_JMX_CONFIG_ENTRY = "useJmx";
@Deprecated private static final String JMX_USERNAME_CONFIG_ENTRY = "jmxUsername";
@Deprecated private static final String JMX_PASSWORD_CONFIG_ENTRY = "jmxPassword";
Expand Down Expand Up @@ -102,6 +103,7 @@ public class TaskManagerConfiguration {
private static final int NODE_ALIVENESS_CHECK_INTERVAL_DEFAULT = 120;
private static final int NODE_ALIVENESS_TIMEOUT_DEFAULT = 900; // node should be down for 900 seconds before declaring as dead in the repository -- this is to avoid marking node as down during its own startup
private static final int NODE_TIMEOUT_DEFAULT = 30;
private static final boolean CHECK_FOR_TASK_CONCURRENT_EXECUTION_DEFAULT = false;
private static final boolean USE_JMX_DEFAULT = false;
@Deprecated private static final String JMX_USERNAME_DEFAULT = "midpoint";
@Deprecated private static final String JMX_PASSWORD_DEFAULT = "secret";
Expand Down Expand Up @@ -134,6 +136,7 @@ public class TaskManagerConfiguration {
private int nodeTimeout; // After what time should be node considered (temporarily) down.
private int nodeAlivenessTimeout; // After what time should be node considered (permanently) down and recorded as such in the repository.
private int nodeAlivenessCheckInterval; // How often to check for down nodes.
private boolean checkForTaskConcurrentExecution;
private UseThreadInterrupt useThreadInterrupt;
private int waitingTasksCheckInterval;
private int stalledTasksCheckInterval;
Expand Down Expand Up @@ -310,6 +313,7 @@ void setBasicInformation(MidpointConfiguration masterConfig, OperationResult res
nodeAlivenessCheckInterval = c.getInt(NODE_ALIVENESS_CHECK_INTERVAL, NODE_ALIVENESS_CHECK_INTERVAL_DEFAULT);
nodeAlivenessTimeout = c.getInt(NODE_ALIVENESS_TIMEOUT, NODE_ALIVENESS_TIMEOUT_DEFAULT);
nodeTimeout = c.getInt(NODE_TIMEOUT_CONFIG_ENTRY, NODE_TIMEOUT_DEFAULT);
checkForTaskConcurrentExecution = c.getBoolean(CHECK_FOR_TASK_CONCURRENT_EXECUTION_CONFIG_ENTRY, CHECK_FOR_TASK_CONCURRENT_EXECUTION_DEFAULT);

useJmx = c.getBoolean(USE_JMX_CONFIG_ENTRY, USE_JMX_DEFAULT);
jmxUsername = c.getString(JMX_USERNAME_CONFIG_ENTRY, JMX_USERNAME_DEFAULT);
Expand Down Expand Up @@ -591,6 +595,10 @@ public int getNodeAlivenessCheckInterval() {
return nodeAlivenessCheckInterval;
}

public boolean isCheckForTaskConcurrentExecution() {
return checkForTaskConcurrentExecution;
}

@SuppressWarnings("unused")
public int getQuartzNodeRegistrationCycleTime() {
return quartzNodeRegistrationCycleTime;
Expand Down
Expand Up @@ -6,86 +6,32 @@
*/
package com.evolveum.midpoint.task.quartzimpl;

import static com.evolveum.midpoint.schema.result.OperationResultStatus.IN_PROGRESS;
import static com.evolveum.midpoint.schema.result.OperationResultStatus.SUCCESS;
import static com.evolveum.midpoint.schema.result.OperationResultStatus.UNKNOWN;
import static java.util.Collections.emptySet;
import static java.util.Collections.singleton;

import java.text.ParseException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.ConcurrentModificationException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.function.Function;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.xml.datatype.Duration;
import javax.xml.datatype.XMLGregorianCalendar;

import com.evolveum.midpoint.common.configuration.api.ProfilingMode;
import com.evolveum.midpoint.common.crypto.CryptoUtil;
import com.evolveum.midpoint.prism.crypto.EncryptionException;
import com.evolveum.midpoint.prism.util.CloneUtil;
import com.evolveum.midpoint.schema.cache.CacheConfigurationManager;
import com.evolveum.midpoint.schema.util.ObjectTypeUtil;
import com.evolveum.midpoint.schema.util.TaskTypeUtil;
import com.evolveum.midpoint.task.api.*;
import com.evolveum.midpoint.xml.ns._public.common.common_3.*;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.Validate;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.quartz.JobKey;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.quartz.Trigger;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.DependsOn;
import org.springframework.stereotype.Service;

import com.evolveum.midpoint.common.LocalizationService;
import com.evolveum.midpoint.common.configuration.api.MidpointConfiguration;
import com.evolveum.midpoint.common.configuration.api.ProfilingMode;
import com.evolveum.midpoint.common.crypto.CryptoUtil;
import com.evolveum.midpoint.prism.ItemDefinition;
import com.evolveum.midpoint.prism.PrismContext;
import com.evolveum.midpoint.prism.PrismObject;
import com.evolveum.midpoint.prism.PrismObjectDefinition;
import com.evolveum.midpoint.prism.crypto.EncryptionException;
import com.evolveum.midpoint.prism.crypto.Protector;
import com.evolveum.midpoint.prism.delta.ItemDelta;
import com.evolveum.midpoint.prism.path.ItemPath;
import com.evolveum.midpoint.prism.polystring.PolyString;
import com.evolveum.midpoint.prism.query.ObjectQuery;
import com.evolveum.midpoint.prism.query.builder.S_AtomicFilterEntry;
import com.evolveum.midpoint.prism.util.CloneUtil;
import com.evolveum.midpoint.prism.xml.XmlTypeConverter;
import com.evolveum.midpoint.repo.api.PreconditionViolationException;
import com.evolveum.midpoint.repo.api.RepoAddOptions;
import com.evolveum.midpoint.repo.api.RepositoryService;
import com.evolveum.midpoint.repo.api.SystemConfigurationChangeDispatcher;
import com.evolveum.midpoint.repo.api.SystemConfigurationChangeListener;
import com.evolveum.midpoint.schema.GetOperationOptions;
import com.evolveum.midpoint.schema.RelationRegistry;
import com.evolveum.midpoint.schema.ResultHandler;
import com.evolveum.midpoint.schema.SchemaHelper;
import com.evolveum.midpoint.schema.SearchResultList;
import com.evolveum.midpoint.schema.SearchResultMetadata;
import com.evolveum.midpoint.schema.SelectorOptions;
import com.evolveum.midpoint.repo.api.*;
import com.evolveum.midpoint.schema.*;
import com.evolveum.midpoint.schema.cache.CacheConfigurationManager;
import com.evolveum.midpoint.schema.result.OperationResult;
import com.evolveum.midpoint.schema.result.OperationResultStatus;
import com.evolveum.midpoint.schema.util.ObjectTypeUtil;
import com.evolveum.midpoint.schema.util.TaskTypeUtil;
import com.evolveum.midpoint.security.api.SecurityContextManager;
import com.evolveum.midpoint.task.api.*;
import com.evolveum.midpoint.task.quartzimpl.cluster.ClusterManager;
import com.evolveum.midpoint.task.quartzimpl.cluster.ClusterStatusInformation;
import com.evolveum.midpoint.task.quartzimpl.execution.ExecutionManager;
Expand All @@ -94,18 +40,40 @@
import com.evolveum.midpoint.task.quartzimpl.handlers.PartitioningTaskHandler;
import com.evolveum.midpoint.task.quartzimpl.work.WorkStateManager;
import com.evolveum.midpoint.task.quartzimpl.work.workers.WorkersManager;
import com.evolveum.midpoint.util.exception.CommunicationException;
import com.evolveum.midpoint.util.exception.ConfigurationException;
import com.evolveum.midpoint.util.exception.ExpressionEvaluationException;
import com.evolveum.midpoint.util.exception.ObjectAlreadyExistsException;
import com.evolveum.midpoint.util.exception.ObjectNotFoundException;
import com.evolveum.midpoint.util.exception.SchemaException;
import com.evolveum.midpoint.util.exception.SecurityViolationException;
import com.evolveum.midpoint.util.exception.SystemException;
import com.evolveum.midpoint.util.exception.*;
import com.evolveum.midpoint.util.logging.LoggingUtils;
import com.evolveum.midpoint.util.logging.Trace;
import com.evolveum.midpoint.util.logging.TraceManager;
import com.evolveum.midpoint.xml.ns._public.common.common_3.*;
import com.evolveum.prism.xml.ns._public.types_3.PolyStringType;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.Validate;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.quartz.JobKey;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.quartz.Trigger;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.DependsOn;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.xml.datatype.Duration;
import javax.xml.datatype.XMLGregorianCalendar;
import java.text.ParseException;
import java.util.*;
import java.util.concurrent.*;
import java.util.function.Function;

import static com.evolveum.midpoint.schema.result.OperationResultStatus.*;
import static java.util.Collections.emptySet;
import static java.util.Collections.singleton;

/**
* Task Manager implementation using Quartz scheduler.
Expand Down Expand Up @@ -186,7 +154,7 @@ public class TaskManagerQuartzImpl implements TaskManager, BeanFactoryAware, Sys
// Use ONLY for those actions that need to work with these instances, e.g. when calling heartbeat() methods on them.
// For any other business please use LocalNodeManager.getLocallyRunningTasks(...).
// Maps task id -> task
private final HashMap<String,RunningTaskQuartzImpl> locallyRunningTaskInstancesMap = new HashMap<>();
private final Map<String,RunningTaskQuartzImpl> locallyRunningTaskInstancesMap = new ConcurrentHashMap<>();

private ExecutorService lightweightHandlersExecutor = Executors.newCachedThreadPool();

Expand Down

0 comments on commit 955f1a9

Please sign in to comment.