diff --git a/client/src/main/java/com/metamx/druid/BaseNode.java b/client/src/main/java/com/metamx/druid/BaseNode.java
index 7e75ea383473..dc45cd6eef87 100644
--- a/client/src/main/java/com/metamx/druid/BaseNode.java
+++ b/client/src/main/java/com/metamx/druid/BaseNode.java
@@ -30,7 +30,7 @@
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.common.logger.Logger;
import com.metamx.druid.http.RequestLogger;
-import com.metamx.druid.index.v1.serde.ComplexMetricRegistererer;
+import com.metamx.druid.index.v1.serde.Registererer;
import com.metamx.druid.initialization.Initialization;
import com.metamx.druid.initialization.ServerConfig;
import com.metamx.druid.initialization.ZkClientConfig;
@@ -174,7 +174,7 @@ public T registerJacksonSubtype(NamedType... namedTypes)
}
@SuppressWarnings("unchecked")
- public T registerComplexMetric(ComplexMetricRegistererer registererer)
+ public T registerHandler(Registererer registererer)
{
registererer.register();
return (T) this;
diff --git a/client/src/main/java/com/metamx/druid/http/BrokerNode.java b/client/src/main/java/com/metamx/druid/http/BrokerNode.java
index 665a2fbd4272..b8f1f5c4d319 100644
--- a/client/src/main/java/com/metamx/druid/http/BrokerNode.java
+++ b/client/src/main/java/com/metamx/druid/http/BrokerNode.java
@@ -221,7 +221,7 @@ private void initializeDiscovery() throws Exception
final ServiceDiscoveryConfig serviceDiscoveryConfig = getConfigFactory().build(ServiceDiscoveryConfig.class);
CuratorFramework curatorFramework = Initialization.makeCuratorFrameworkClient(
- serviceDiscoveryConfig.getZkHosts(), lifecycle
+ serviceDiscoveryConfig, lifecycle
);
final ServiceDiscovery serviceDiscovery = Initialization.makeServiceDiscoveryClient(
diff --git a/client/src/main/java/com/metamx/druid/initialization/CuratorConfig.java b/client/src/main/java/com/metamx/druid/initialization/CuratorConfig.java
new file mode 100644
index 000000000000..8b83b3c8cf94
--- /dev/null
+++ b/client/src/main/java/com/metamx/druid/initialization/CuratorConfig.java
@@ -0,0 +1,35 @@
+/*
+ * Druid - a distributed column store.
+ * Copyright (C) 2012 Metamarkets Group Inc.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+
+package com.metamx.druid.initialization;
+
+import org.skife.config.Config;
+import org.skife.config.Default;
+
+/**
+ */
+public abstract class CuratorConfig
+{
+ @Config("druid.zk.service.host")
+ public abstract String getZkHosts();
+
+ @Config("druid.zk.service.sessionTimeoutMs")
+ @Default("15000")
+ public abstract int getZkSessionTimeoutMs();
+}
diff --git a/client/src/main/java/com/metamx/druid/initialization/Initialization.java b/client/src/main/java/com/metamx/druid/initialization/Initialization.java
index 99aaf8de3a3a..1d2c5c12a111 100644
--- a/client/src/main/java/com/metamx/druid/initialization/Initialization.java
+++ b/client/src/main/java/com/metamx/druid/initialization/Initialization.java
@@ -69,7 +69,8 @@ public class Initialization
"druid.zk.paths.announcementsPath",
"druid.zk.paths.servedSegmentsPath",
"druid.zk.paths.loadQueuePath",
- "druid.zk.paths.masterPath"};
+ "druid.zk.paths.masterPath"
+ };
public static final String DEFAULT_ZPATH = "/druid";
public static ZkClient makeZkClient(ZkClientConfig config, Lifecycle lifecycle)
@@ -119,10 +120,12 @@ public static ZKPhoneBook createPhoneBook(
}
- /** Load properties.
+ /**
+ * Load properties.
* Properties are layered, high to low precedence: cmdLine -D, runtime.properties file, stored in zookeeper.
* Idempotent. Thread-safe. Properties are only loaded once.
* If property druid.zk.service.host=none then do not load properties from zookeeper.
+ *
* @return Properties ready to use.
*/
public synchronized static Properties loadProperties()
@@ -139,7 +142,9 @@ public synchronized static Properties loadProperties()
final InputStream stream = ClassLoader.getSystemResourceAsStream("runtime.properties");
if (stream == null) {
- log.info("runtime.properties not found as a resource in classpath, relying only on system properties, and zookeeper now.");
+ log.info(
+ "runtime.properties not found as a resource in classpath, relying only on system properties, and zookeeper now."
+ );
} else {
log.info("Loading properties from runtime.properties");
try {
@@ -202,7 +207,7 @@ public String getZkHosts()
log.warn("property druid.zk.service.host is not set, so no way to contact zookeeper for coordination.");
}
// validate properties now that all levels of precedence are loaded
- if (! validateResolveProps(tmp_props)) {
+ if (!validateResolveProps(tmp_props)) {
log.error("Properties failed to validate, cannot continue");
throw new RuntimeException("Properties failed to validate");
}
@@ -231,14 +236,19 @@ public static Server makeJettyServer(ServerConfig config)
}
public static CuratorFramework makeCuratorFrameworkClient(
- String zkHosts,
+ CuratorConfig curatorConfig,
Lifecycle lifecycle
) throws IOException
{
final CuratorFramework framework =
CuratorFrameworkFactory.builder()
- .connectString(zkHosts)
- .retryPolicy(new ExponentialBackoffRetry(1000, 30))
+ .connectString(curatorConfig.getZkHosts())
+ .retryPolicy(
+ new ExponentialBackoffRetry(
+ 1000,
+ 30
+ )
+ )
.build();
lifecycle.addHandler(
@@ -353,12 +363,15 @@ public static String makePropPath(String basePath)
return String.format("%s/%s", basePath, PROP_SUBPATH);
}
- /** Validate and Resolve Properties.
+ /**
+ * Validate and Resolve Properties.
* Resolve zpaths with props like druid.zk.paths.*Path using druid.zk.paths.base value.
* Check validity so that if druid.zk.paths.*Path props are set, all are set,
* if none set, then construct defaults relative to druid.zk.paths.base and add these
* to the properties chain.
+ *
* @param props
+ *
* @return true if valid zpath properties.
*/
public static boolean validateResolveProps(Properties props)
@@ -374,7 +387,9 @@ public static boolean validateResolveProps(Properties props)
final String propertiesZpathOverride = props.getProperty("druid.zk.paths.propertiesPath");
- if (!zpathValidateFailed) System.out.println("Effective zpath prefix=" + zpathEffective);
+ if (!zpathValidateFailed) {
+ System.out.println("Effective zpath prefix=" + zpathEffective);
+ }
// validate druid.zk.paths.*Path properties
//
@@ -403,22 +418,25 @@ public static boolean validateResolveProps(Properties props)
}
}
if (zpathOverridesNotAbs) {
- System.err.println("When overriding zk zpaths, with properties like druid.zk.paths.*Path " +
- "the znode path must start with '/' (slash) ; problem overrides:");
+ System.err.println(
+ "When overriding zk zpaths, with properties like druid.zk.paths.*Path " +
+ "the znode path must start with '/' (slash) ; problem overrides:"
+ );
System.err.print(sbErrors.toString());
}
if (zpathOverrideCount > 0) {
- if (zpathOverrideCount < SUB_PATH_PROPS.length + 1) {
+ if (zpathOverrideCount < SUB_PATH_PROPS.length) {
zpathValidateFailed = true;
- System.err.println("When overriding zk zpaths, with properties of form druid.zk.paths.*Path " +
- "all must be overridden together; missing overrides:");
+ System.err.println(
+ "When overriding zk zpaths, with properties of form druid.zk.paths.*Path " +
+ "all must be overridden together; missing overrides:"
+ );
for (int i = 0; i < SUB_PATH_PROPS.length; i++) {
String val = props.getProperty(SUB_PATH_PROPS[i]);
if (val == null) {
System.err.println(" " + SUB_PATH_PROPS[i]);
}
}
- if (propertiesZpathOverride == null) System.err.println(" " + "druid.zk.paths.propertiesPath");
} else { // proper overrides
// do not prefix with property druid.zk.paths.base
; // fallthru
@@ -435,13 +453,16 @@ public static boolean validateResolveProps(Properties props)
}
props.setProperty("druid.zk.paths.propertiesPath", zpathEffective + "/properties");
}
- return ! zpathValidateFailed;
+ return !zpathValidateFailed;
}
- /** Check znode zpath base for proper slash, no trailing slash.
- * @param zpathBase znode base path, if null then this method does nothing.
+ /**
+ * Check znode zpath base for proper slash, no trailing slash.
+ *
+ * @param zpathBase znode base path, if null then this method does nothing.
* @param errorMsgPrefix error context to use if errors are emitted, should indicate
* where the zpathBase value came from.
+ *
* @return true if validate failed.
*/
public static boolean zpathBaseCheck(String zpathBase, String errorMsgPrefix)
diff --git a/client/src/main/java/com/metamx/druid/initialization/ServiceDiscoveryConfig.java b/client/src/main/java/com/metamx/druid/initialization/ServiceDiscoveryConfig.java
index e5a97bffdc63..62cbfe44eb9b 100644
--- a/client/src/main/java/com/metamx/druid/initialization/ServiceDiscoveryConfig.java
+++ b/client/src/main/java/com/metamx/druid/initialization/ServiceDiscoveryConfig.java
@@ -23,7 +23,7 @@
/**
*/
-public abstract class ServiceDiscoveryConfig
+public abstract class ServiceDiscoveryConfig extends CuratorConfig
{
@Config("druid.service")
public abstract String getServiceName();
@@ -31,9 +31,6 @@ public abstract class ServiceDiscoveryConfig
@Config("druid.port")
public abstract int getPort();
- @Config("druid.zk.service.host")
- public abstract String getZkHosts();
-
@Config("druid.zk.paths.discoveryPath")
public abstract String getDiscoveryPath();
}
diff --git a/index-common/src/main/java/com/metamx/druid/index/v1/IndexIO.java b/index-common/src/main/java/com/metamx/druid/index/v1/IndexIO.java
index 75352a25137c..719b2350bbf8 100644
--- a/index-common/src/main/java/com/metamx/druid/index/v1/IndexIO.java
+++ b/index-common/src/main/java/com/metamx/druid/index/v1/IndexIO.java
@@ -36,12 +36,10 @@
import java.io.File;
import java.io.FileInputStream;
-import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
-import java.nio.channels.FileChannel;
import java.util.Map;
/**
diff --git a/index-common/src/main/java/com/metamx/druid/index/v1/serde/ComplexMetricRegistererer.java b/index-common/src/main/java/com/metamx/druid/index/v1/serde/Registererer.java
similarity index 88%
rename from index-common/src/main/java/com/metamx/druid/index/v1/serde/ComplexMetricRegistererer.java
rename to index-common/src/main/java/com/metamx/druid/index/v1/serde/Registererer.java
index c0dcade7ed93..f560dfdc1e60 100644
--- a/index-common/src/main/java/com/metamx/druid/index/v1/serde/ComplexMetricRegistererer.java
+++ b/index-common/src/main/java/com/metamx/druid/index/v1/serde/Registererer.java
@@ -20,14 +20,14 @@
package com.metamx.druid.index.v1.serde;
/**
- * This is a "factory" interface for registering complex metrics in the system. It exists because I'm unaware of
+ * This is a "factory" interface for registering handlers in the system. It exists because I'm unaware of
* another way to register the complex serdes in the MR jobs that run on Hadoop. As such, instances of this interface
* must be instantiatable via a no argument default constructor (the MR jobs on Hadoop use reflection to instantiate
* instances).
*
* The name is not a typo, I felt that it needed an extra "er" to make the pronunciation that much more difficult.
*/
-public interface ComplexMetricRegistererer
+public interface Registererer
{
public void register();
}
diff --git a/index-common/src/main/java/com/metamx/druid/kv/GenericIndexed.java b/index-common/src/main/java/com/metamx/druid/kv/GenericIndexed.java
index b168c4a790d4..d98abc1b670b 100644
--- a/index-common/src/main/java/com/metamx/druid/kv/GenericIndexed.java
+++ b/index-common/src/main/java/com/metamx/druid/kv/GenericIndexed.java
@@ -36,9 +36,9 @@
/**
* A generic, flat storage mechanism. Use static methods fromArray() or fromIterable() to construct. If input
* is sorted, supports binary search index lookups. If input is not sorted, only supports array-like index lookups.
- *
+ *
* V1 Storage Format:
- *
+ *
* byte 1: version (0x1)
* byte 2 == 0x1 => allowReverseLookup
* bytes 3-6 => numBytesUsed
@@ -253,6 +253,9 @@ public String fromByteBuffer(ByteBuffer buffer, int numBytes)
@Override
public byte[] toBytes(String val)
{
+ if (val == null) {
+ return new byte[]{};
+ }
return val.getBytes(Charsets.UTF_8);
}
diff --git a/merger/pom.xml b/merger/pom.xml
index e5fc6f99f69e..c60bd7e2bd5b 100644
--- a/merger/pom.xml
+++ b/merger/pom.xml
@@ -19,7 +19,7 @@
-->
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
4.0.0com.metamx.druiddruid-merger
@@ -178,6 +178,10 @@
easymocktest
+
+ com.netflix.curator
+ curator-test
+
diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/MergeTask.java b/merger/src/main/java/com/metamx/druid/merger/common/task/MergeTask.java
index db5ff0594dad..859352c2a7da 100644
--- a/merger/src/main/java/com/metamx/druid/merger/common/task/MergeTask.java
+++ b/merger/src/main/java/com/metamx/druid/merger/common/task/MergeTask.java
@@ -88,7 +88,7 @@ protected MergeTask(final String dataSource, final List segments)
@Override
public boolean apply(@Nullable DataSegment segment)
{
- return segment == null || !segment.getDataSource().equals(dataSource);
+ return segment == null || !segment.getDataSource().equalsIgnoreCase(dataSource);
}
}
)
diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/RemoteTaskRunner.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/RemoteTaskRunner.java
index eaf000e5276b..fce83b8618ae 100644
--- a/merger/src/main/java/com/metamx/druid/merger/coordinator/RemoteTaskRunner.java
+++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/RemoteTaskRunner.java
@@ -19,18 +19,26 @@
package com.metamx.druid.merger.coordinator;
+import com.google.common.base.Function;
import com.google.common.base.Joiner;
+import com.google.common.base.Predicate;
import com.google.common.base.Throwables;
-import com.google.common.collect.Maps;
+import com.google.common.collect.Lists;
import com.google.common.collect.MinMaxPriorityQueue;
+import com.google.common.collect.Sets;
import com.google.common.primitives.Ints;
import com.metamx.common.ISE;
+import com.metamx.common.concurrent.ScheduledExecutors;
+import com.metamx.common.guava.FunctionalIterable;
+import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
+import com.metamx.druid.PeriodGranularity;
import com.metamx.druid.merger.common.TaskHolder;
import com.metamx.druid.merger.common.TaskStatus;
-import com.metamx.druid.merger.common.TaskToolbox;
-import com.metamx.druid.merger.common.config.IndexerZkConfig;
import com.metamx.druid.merger.common.task.Task;
+import com.metamx.druid.merger.coordinator.config.RemoteTaskRunnerConfig;
+import com.metamx.druid.merger.coordinator.scaling.AutoScalingData;
+import com.metamx.druid.merger.coordinator.scaling.ScalingStrategy;
import com.metamx.druid.merger.worker.Worker;
import com.metamx.emitter.EmittingLogger;
import com.netflix.curator.framework.CuratorFramework;
@@ -39,15 +47,32 @@
import com.netflix.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.zookeeper.CreateMode;
import org.codehaus.jackson.map.ObjectMapper;
+import org.joda.time.DateTime;
+import org.joda.time.Duration;
+import org.joda.time.Period;
+import javax.annotation.Nullable;
import java.util.Comparator;
-import java.util.Map;
-import java.util.concurrent.Callable;
+import java.util.List;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
+ * The RemoteTaskRunner encapsulates all interactions with Zookeeper and keeps track of which workers
+ * are running which tasks. The RemoteTaskRunner is event driven and updates state according to ephemeral node
+ * changes in ZK.
+ *
+ * The RemoteTaskRunner will assign tasks to a node until the node hits capacity. RemoteTaskRunners have scaling
+ * strategies to help them decide when to create or delete new resources. When tasks are assigned to the remote
+ * task runner and no workers have capacity to handle the task, provisioning will be done according to the strategy.
+ * The remote task runner periodically runs a check to see if any worker nodes have not had any work for a
+ * specified period of time. If so, the worker node will be terminated.
+ *
+ * If a worker node becomes inexplicably disconnected from Zk, the RemoteTaskRunner will automatically retry any tasks
+ * that were associated with the node.
*/
public class RemoteTaskRunner implements TaskRunner
{
@@ -55,277 +80,530 @@ public class RemoteTaskRunner implements TaskRunner
private static final Joiner JOINER = Joiner.on("/");
private final ObjectMapper jsonMapper;
- private final TaskInventoryManager taskInventoryManager;
- private final IndexerZkConfig config;
+ private final RemoteTaskRunnerConfig config;
private final CuratorFramework cf;
+ private final PathChildrenCache workerPathCache;
private final ScheduledExecutorService scheduledExec;
private final RetryPolicyFactory retryPolicyFactory;
+ private final ScalingStrategy strategy;
- private final ConcurrentHashMap monitors = new ConcurrentHashMap();
+ // all workers that exist in ZK
+ private final ConcurrentHashMap zkWorkers = new ConcurrentHashMap();
+ // all tasks that are assigned or need to be assigned
+ private final ConcurrentHashMap tasks = new ConcurrentHashMap();
+
+ private final ConcurrentSkipListSet currentlyProvisioning = new ConcurrentSkipListSet();
+ private final ConcurrentSkipListSet currentlyTerminating = new ConcurrentSkipListSet();
+ private final Object statusLock = new Object();
+
+ private volatile DateTime lastProvisionTime = new DateTime();
+ private volatile DateTime lastTerminateTime = new DateTime();
+ private volatile boolean started = false;
public RemoteTaskRunner(
ObjectMapper jsonMapper,
- TaskInventoryManager taskInventoryManager,
- IndexerZkConfig config,
+ RemoteTaskRunnerConfig config,
CuratorFramework cf,
+ PathChildrenCache workerPathCache,
ScheduledExecutorService scheduledExec,
- RetryPolicyFactory retryPolicyFactory
+ RetryPolicyFactory retryPolicyFactory,
+ ScalingStrategy strategy
)
{
this.jsonMapper = jsonMapper;
- this.taskInventoryManager = taskInventoryManager;
this.config = config;
this.cf = cf;
+ this.workerPathCache = workerPathCache;
this.scheduledExec = scheduledExec;
this.retryPolicyFactory = retryPolicyFactory;
+ this.strategy = strategy;
+ }
+
+ @LifecycleStart
+ public void start()
+ {
+ try {
+ workerPathCache.getListenable().addListener(
+ new PathChildrenCacheListener()
+ {
+ @Override
+ public void childEvent(CuratorFramework client, final PathChildrenCacheEvent event) throws Exception
+ {
+ if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)) {
+ final Worker worker = jsonMapper.readValue(
+ event.getData().getData(),
+ Worker.class
+ );
+ log.info("New worker[%s] found!", worker.getHost());
+ addWorker(worker);
+ } else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)) {
+ final Worker worker = jsonMapper.readValue(
+ event.getData().getData(),
+ Worker.class
+ );
+ log.info("Worker[%s] removed!", worker.getHost());
+ removeWorker(worker.getHost());
+ }
+ }
+ }
+ );
+ workerPathCache.start();
+
+ // Schedule termination of worker nodes periodically
+ Period period = new Period(config.getTerminateResourcesDuration());
+ PeriodGranularity granularity = new PeriodGranularity(period, config.getTerminateResourcesOriginDateTime(), null);
+ final long startTime = granularity.next(granularity.truncate(new DateTime().getMillis()));
+
+ ScheduledExecutors.scheduleAtFixedRate(
+ scheduledExec,
+ new Duration(
+ System.currentTimeMillis(),
+ startTime
+ ),
+ config.getTerminateResourcesDuration(),
+ new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ if (currentlyTerminating.isEmpty()) {
+ if (zkWorkers.size() <= config.getMinNumWorkers()) {
+ return;
+ }
+
+ List thoseLazyWorkers = Lists.newArrayList(
+ FunctionalIterable
+ .create(zkWorkers.values())
+ .filter(
+ new Predicate()
+ {
+ @Override
+ public boolean apply(@Nullable WorkerWrapper input)
+ {
+ return input.getRunningTasks().isEmpty()
+ && System.currentTimeMillis() - input.getLastCompletedTaskTime().getMillis()
+ > config.getMaxWorkerIdleTimeMillisBeforeDeletion();
+ }
+ }
+ )
+ );
+
+ AutoScalingData terminated = strategy.terminate(
+ Lists.transform(
+ thoseLazyWorkers,
+ new Function()
+ {
+ @Override
+ public String apply(@Nullable WorkerWrapper input)
+ {
+ return input.getWorker().getHost();
+ }
+ }
+ )
+ );
+
+ if (terminated != null) {
+ currentlyTerminating.addAll(terminated.getNodeIds());
+ lastTerminateTime = new DateTime();
+ }
+ } else {
+ Duration durSinceLastTerminate = new Duration(new DateTime(), lastTerminateTime);
+ if (durSinceLastTerminate.isLongerThan(config.getMaxScalingDuration())) {
+ log.makeAlert(
+ "It has been %d millis since last scheduled termination but nodes remain",
+ durSinceLastTerminate.getMillis()
+ ).emit();
+ }
+
+ log.info(
+ "[%s] still terminating. Wait for all nodes to terminate before trying again.",
+ currentlyTerminating
+ );
+ }
+ }
+ }
+ );
+ started = true;
+ }
+ catch (Exception e) {
+ throw Throwables.propagate(e);
+ }
}
@LifecycleStop
public void stop()
{
- scheduledExec.shutdownNow();
+ try {
+ for (WorkerWrapper workerWrapper : zkWorkers.values()) {
+ workerWrapper.close();
+ }
+ }
+ catch (Exception e) {
+ throw Throwables.propagate(e);
+ }
+ finally {
+ started = false;
+ }
+ }
+
+ public boolean hasStarted()
+ {
+ return started;
+ }
+
+ public int getNumWorkers()
+ {
+ return zkWorkers.size();
}
@Override
- public void run(final Task task, final TaskContext taskContext, final TaskCallback callback)
+ public void run(Task task, TaskContext context, TaskCallback callback)
{
- run(task, taskContext, callback, retryPolicyFactory.makeRetryPolicy());
+ if (tasks.contains(task.getId())) {
+ throw new ISE("Assigned a task[%s] that already exists, WTF is happening?!", task.getId());
+ }
+ TaskWrapper taskWrapper = new TaskWrapper(
+ task, context, callback, retryPolicyFactory.makeRetryPolicy()
+ );
+ tasks.put(taskWrapper.getTask().getId(), taskWrapper);
+ assignTask(taskWrapper);
}
- private void run(
- final Task task,
- final TaskContext taskContext,
- final TaskCallback callback,
- final RetryPolicy retryPolicy
- )
+ private void assignTask(TaskWrapper taskWrapper)
{
- try {
- // If a worker is already running this task, check the status
- Map allRunningTasks = Maps.newHashMap();
- for (Worker worker : taskInventoryManager.getInventory()) {
- for (String taskId : worker.getTasks().keySet()) {
- allRunningTasks.put(taskId, worker);
- }
- }
+ WorkerWrapper workerWrapper = findWorkerRunningTask(taskWrapper);
+
+ // If the task already exists, we don't need to announce it
+ if (workerWrapper != null) {
+ final Worker worker = workerWrapper.getWorker();
+ try {
+ log.info("Worker[%s] is already running task[%s].", worker.getHost(), taskWrapper.getTask().getId());
+
+ TaskStatus taskStatus = jsonMapper.readValue(
+ workerWrapper.getStatusCache()
+ .getCurrentData(
+ JOINER.join(config.getStatusPath(), worker.getHost(), taskWrapper.getTask().getId())
+ )
+ .getData(),
+ TaskStatus.class
+ );
- Worker workerRunningThisTask = allRunningTasks.get(task.getId());
- if (workerRunningThisTask != null) {
- // If the status is complete, just run the callback, otherwise monitor for the completion of the task
- if (!verifyStatusComplete(jsonMapper, workerRunningThisTask, task, callback)) {
- monitorStatus(jsonMapper, workerRunningThisTask, task, taskContext, callback, retryPolicy);
+ if (taskStatus.isComplete()) {
+ TaskCallback callback = taskWrapper.getCallback();
+ if (callback != null) {
+ callback.notify(taskStatus);
+ }
+ new CleanupPaths(worker.getHost(), taskWrapper.getTask().getId()).run();
}
- return;
}
-
- // Run the task if it does not currently exist
- Worker theWorker = getLeastCapacityWorker();
- monitorStatus(jsonMapper, theWorker, task, taskContext, callback, retryPolicy);
- announceTask(theWorker, task, taskContext);
- }
- catch (Exception e) {
- log.error(e, "Failed to dispatch task. Retrying");
- retryTask(task, taskContext, callback, retryPolicy);
+ catch (Exception e) {
+ log.error(e, "Task exists, but hit exception!");
+ retryTask(new CleanupPaths(worker.getHost(), taskWrapper.getTask().getId()), taskWrapper);
+ }
+ } else {
+ // Announce the task or retry if there is not enough capacity
+ workerWrapper = findWorkerForTask();
+ if (workerWrapper != null) {
+ announceTask(workerWrapper.getWorker(), taskWrapper);
+ } else {
+ retryTask(null, taskWrapper);
+ }
}
}
+ /**
+ * Retries a task that has failed.
+ *
+ * @param pre - A runnable that is executed before the retry occurs
+ * @param taskWrapper - a container for task properties
+ */
private void retryTask(
- final Task task,
- final TaskContext taskContext,
- final TaskCallback callback,
- final RetryPolicy retryPolicy
+ final Runnable pre,
+ final TaskWrapper taskWrapper
)
{
+ final Task task = taskWrapper.getTask();
+ final RetryPolicy retryPolicy = taskWrapper.getRetryPolicy();
+
+ log.info("Registering retry for failed task[%s]", task.getId());
+
if (retryPolicy.hasExceededRetryThreshold()) {
log.makeAlert("Task [%s] has failed[%d] times, giving up!", task.getId(), retryPolicy.getNumRetries())
.emit();
- callback.notify(TaskStatus.failure(task.getId()));
-
return;
}
scheduledExec.schedule(
- new Callable