diff --git a/helix-core/pom.xml b/helix-core/pom.xml
index b6a2539cab..a29243ec6b 100644
--- a/helix-core/pom.xml
+++ b/helix-core/pom.xml
@@ -219,6 +219,10 @@ under the License.
org.apache.helix.examples.ExampleProcess
start-helix-participant
+
+ org.apache.helix.tools.commandtools.ExampleParticipant
+ start-participants
+
org.apache.helix.tools.LocalZKServer
start-standalone-zookeeper
diff --git a/helix-core/src/main/java/org/apache/helix/examples/BrokerResourceOnlineOfflineStateModelFactory.java b/helix-core/src/main/java/org/apache/helix/examples/BrokerResourceOnlineOfflineStateModelFactory.java
new file mode 100644
index 0000000000..0b1b81fc25
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/examples/BrokerResourceOnlineOfflineStateModelFactory.java
@@ -0,0 +1,77 @@
+package org.apache.helix.examples;
+
+import org.apache.helix.NotificationContext;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.helix.participant.statemachine.StateModelInfo;
+import org.apache.helix.participant.statemachine.Transition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BrokerResourceOnlineOfflineStateModelFactory extends StateModelFactory {
+ private static final Logger LOGGER = LoggerFactory.getLogger(BrokerResourceOnlineOfflineStateModelFactory.class);
+
+ public BrokerResourceOnlineOfflineStateModelFactory() {
+
+ }
+
+ public static String getStateModelDef() {
+ return "BrokerResourceOnlineOfflineStateModel";
+ }
+
+ public StateModel createNewStateModel(String resourceName) {
+ return new BrokerResourceOnlineOfflineStateModelFactory.BrokerResourceOnlineOfflineStateModel();
+ }
+
+ @StateModelInfo(
+ states = {"{'OFFLINE','ONLINE', 'DROPPED'}"},
+ initialState = "OFFLINE"
+ )
+ public class BrokerResourceOnlineOfflineStateModel extends StateModel {
+ public BrokerResourceOnlineOfflineStateModel() {
+ }
+
+ @Transition(
+ from = "OFFLINE",
+ to = "ONLINE"
+ )
+ public void onBecomeOnlineFromOffline(Message message, NotificationContext context) {
+ LOGGER.info("Become Online from Offline");
+ }
+
+ @Transition(
+ from = "ONLINE",
+ to = "OFFLINE"
+ )
+ public void onBecomeOfflineFromOnline(Message message, NotificationContext context) {
+ LOGGER.info("Become Offline from Online");
+
+ }
+
+ @Transition(
+ from = "OFFLINE",
+ to = "DROPPED"
+ )
+ public void onBecomeDroppedFromOffline(Message message, NotificationContext context) {
+ LOGGER.info("Become Dropped from Offline");
+ }
+
+ @Transition(
+ from = "ONLINE",
+ to = "DROPPED"
+ )
+ public void onBecomeDroppedFromOnline(Message message, NotificationContext context) {
+ LOGGER.info("Become Dropped from Online");
+ }
+
+ @Transition(
+ from = "ERROR",
+ to = "OFFLINE"
+ )
+ public void onBecomeOfflineFromError(Message message, NotificationContext context) {
+ LOGGER.info("Become Offline from Error");
+ }
+ }
+}
+
diff --git a/helix-core/src/main/java/org/apache/helix/examples/SegmentOnlineOfflineStateModelFactory.java b/helix-core/src/main/java/org/apache/helix/examples/SegmentOnlineOfflineStateModelFactory.java
new file mode 100644
index 0000000000..30a23d8ca7
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/examples/SegmentOnlineOfflineStateModelFactory.java
@@ -0,0 +1,53 @@
+package org.apache.helix.examples;
+
+import org.apache.helix.NotificationContext;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.helix.participant.statemachine.StateModelInfo;
+import org.apache.helix.participant.statemachine.Transition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SegmentOnlineOfflineStateModelFactory extends StateModelFactory {
+ public SegmentOnlineOfflineStateModelFactory() {
+ }
+
+ @Override
+ public StateModel createNewStateModel(String partitionName) {
+ final SegmentOnlineOfflineStateModel SegmentOnlineOfflineStateModel = new SegmentOnlineOfflineStateModel();
+ return SegmentOnlineOfflineStateModel;
+ }
+
+ @StateModelInfo(states = "{'OFFLINE','ONLINE', 'DROPPED'}", initialState = "OFFLINE")
+ public static class SegmentOnlineOfflineStateModel extends StateModel {
+ private static final Logger LOGGER = LoggerFactory.getLogger(SegmentOnlineOfflineStateModel.class);
+
+ @Transition(from = "OFFLINE", to = "ONLINE")
+ public void onBecomeOnlineFromOffline(Message message, NotificationContext context) {
+ LOGGER.info("Become Online from Offline");
+ }
+
+ // Remove segment from InstanceDataManager.
+ // Still keep the data files in local.
+ @Transition(from = "ONLINE", to = "OFFLINE")
+ public void onBecomeOfflineFromOnline(Message message, NotificationContext context) {
+ LOGGER.info("Become Offline from Online");
+
+ }
+
+ // Delete segment from local directory.
+ @Transition(from = "OFFLINE", to = "DROPPED")
+ public void onBecomeDroppedFromOffline(Message message, NotificationContext context) {
+ LOGGER.info("Become Dropped from Offline");
+
+ }
+
+ @Transition(from = "ONLINE", to = "DROPPED")
+ public void onBecomeDroppedFromOnline(Message message, NotificationContext context) {
+ LOGGER.info("Become Dropped from Online");
+
+ }
+ }
+
+}
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
index 1590039e08..04f4b09956 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
@@ -22,8 +22,10 @@
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -79,9 +81,13 @@ public ResourceAssignment computeBestPossiblePartitionState(ClusterDataCache clu
LOG.debug("Workflow context is created for " + workflow);
}
+ Set finalStates = new HashSet<>(Arrays.asList(
+ new TaskState[] { TaskState.COMPLETED, TaskState.FAILED, TaskState.ABORTED,
+ TaskState.FAILED, TaskState.TIMED_OUT
+ }));
// Only generic workflow get timeouted and schedule rebalance for timeout. Will skip the set if
// the workflow already got timeouted. Job Queue will ignore the setup.
- if (!workflowCfg.isJobQueue() && !TaskState.TIMED_OUT.equals(workflowCtx.getWorkflowState())) {
+ if (!workflowCfg.isJobQueue() && !finalStates.contains(workflowCtx.getWorkflowState())) {
// If timeout point has already been passed, it will not be scheduled
scheduleRebalanceForTimeout(workflow, workflowCtx.getStartTime(), workflowCfg.getTimeout());
diff --git a/helix-core/src/main/java/org/apache/helix/tools/commandtools/ExampleParticipant.java b/helix-core/src/main/java/org/apache/helix/tools/commandtools/ExampleParticipant.java
new file mode 100644
index 0000000000..308507598f
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/tools/commandtools/ExampleParticipant.java
@@ -0,0 +1,207 @@
+package org.apache.helix.tools.commandtools;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Arrays;
+import java.util.List;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.OptionGroup;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.examples.BrokerResourceOnlineOfflineStateModelFactory;
+import org.apache.helix.examples.LeaderStandbyStateModelFactory;
+import org.apache.helix.examples.MasterSlaveStateModelFactory;
+import org.apache.helix.examples.OnlineOfflineStateModelFactory;
+import org.apache.helix.examples.SegmentOnlineOfflineStateModelFactory;
+import org.apache.helix.manager.zk.HelixManagerShutdownHook;
+import org.apache.helix.model.Message.MessageType;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ExampleParticipant {
+ private static final Logger LOG = LoggerFactory.getLogger(ExampleParticipant.class);
+
+ public static final String zkServer = "zkSvr";
+ public static final String cluster = "cluster";
+ public static final String instances = "instances";
+ public static final String help = "help";
+ public static final String transDelay = "transDelay";
+
+ private final String zkConnectString;
+ private final String clusterName;
+ private final String instanceName;
+ private HelixManager manager;
+
+ private StateModelFactory stateModelFactory;
+ private final int delay;
+
+ public ExampleParticipant(String zkConnectString, String clusterName, String instanceName,
+ int delay) {
+ this.zkConnectString = zkConnectString;
+ this.clusterName = clusterName;
+ this.instanceName = instanceName;
+ this.delay = delay;
+ }
+
+ public void start() throws Exception {
+ manager =
+ HelixManagerFactory.getZKHelixManager(clusterName, instanceName, InstanceType.PARTICIPANT,
+ zkConnectString);
+
+ // genericStateMachineHandler = new StateMachineEngine();
+ // genericStateMachineHandler.registerStateModelFactory(stateModelType,
+ // stateModelFactory);
+
+ StateMachineEngine stateMach = manager.getStateMachineEngine();
+ stateMach.registerStateModelFactory("MasterSlave",
+ new MasterSlaveStateModelFactory(this.instanceName, delay));
+ stateMach.registerStateModelFactory("OnlineOffline",
+ new OnlineOfflineStateModelFactory(this.instanceName, delay));
+ stateMach.registerStateModelFactory("LeaderStandby",
+ new LeaderStandbyStateModelFactory(this.instanceName, delay));
+ stateMach.registerStateModelFactory("BrokerResourceOnlineOfflineStateModel",
+ new BrokerResourceOnlineOfflineStateModelFactory());
+ stateMach.registerStateModelFactory("SegmentOnlineOfflineStateModel",
+ new SegmentOnlineOfflineStateModelFactory());
+
+ manager.connect();
+ manager.getMessagingService()
+ .registerMessageHandlerFactory(MessageType.STATE_TRANSITION.name(), stateMach);
+ }
+
+ public void stop() {
+ manager.disconnect();
+ }
+
+ public HelixManager getManager() {
+ return manager;
+ }
+
+ @SuppressWarnings("static-access")
+ private static Options constructCommandLineOptions() {
+ Option helpOption =
+ OptionBuilder.withLongOpt(help).withDescription("Prints command-line options info")
+ .create();
+
+ Option zkServerOption =
+ OptionBuilder.withLongOpt(zkServer).withDescription("Provide zookeeper address").create();
+ zkServerOption.setArgs(1);
+ zkServerOption.setRequired(true);
+ zkServerOption.setArgName("ZookeeperServerAddress(Required)");
+
+ Option clusterOption =
+ OptionBuilder.withLongOpt(cluster).withDescription("Provide cluster name").create();
+ clusterOption.setArgs(1);
+ clusterOption.setRequired(true);
+ clusterOption.setArgName("Cluster name (Required)");
+
+ Option instancesOption =
+ OptionBuilder.withLongOpt(instances).withDescription("Provide instance names, separated by ':").create();
+ instancesOption.setArgs(1);
+ instancesOption.setRequired(true);
+ instancesOption.setArgName("Instance names (Required)");
+
+ Option transDelayOption =
+ OptionBuilder.withLongOpt(transDelay).withDescription("Provide state trans delay").create();
+ transDelayOption.setArgs(1);
+ transDelayOption.setRequired(false);
+ transDelayOption.setArgName("Delay time in state transition, in MS");
+
+ OptionGroup optionGroup = new OptionGroup();
+ optionGroup.addOption(zkServerOption);
+
+ Options options = new Options();
+ options.addOption(helpOption);
+ options.addOption(clusterOption);
+ options.addOption(instancesOption);
+ options.addOption(transDelayOption);
+
+ options.addOptionGroup(optionGroup);
+
+ return options;
+ }
+
+ public static void printUsage(Options cliOptions) {
+ HelpFormatter helpFormatter = new HelpFormatter();
+ helpFormatter.setWidth(1000);
+ helpFormatter.printHelp("java " + ExampleParticipant.class.getName(), cliOptions);
+ }
+
+ public static CommandLine processCommandLineArgs(String[] cliArgs) throws Exception {
+ CommandLineParser cliParser = new GnuParser();
+ Options cliOptions = constructCommandLineOptions();
+ try {
+ return cliParser.parse(cliOptions, cliArgs);
+ } catch (ParseException pe) {
+ System.err.println("CommandLineClient: failed to parse command-line options: "
+ + pe.toString());
+ printUsage(cliOptions);
+ System.exit(1);
+ }
+ return null;
+ }
+
+ public static void main(String[] args) throws Exception {
+ int delay = 0;
+
+ CommandLine cmd = processCommandLineArgs(args);
+ String zkConnectString = cmd.getOptionValue(zkServer);
+ String clusterName = cmd.getOptionValue(cluster);
+ String instanceNames = cmd.getOptionValue(instances);
+ List hosts = Arrays.asList(instanceNames.split(":"));
+
+ if (cmd.hasOption(transDelay)) {
+ try {
+ delay = Integer.parseInt(cmd.getOptionValue(transDelay));
+ if (delay < 0) {
+ throw new Exception("delay must be positive");
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ delay = 0;
+ }
+ }
+
+ System.out.println("Starting Instances with ZK:" + zkConnectString + ", cluster: " + clusterName
+ + ", instances: " + hosts);
+
+ for (String instanceName : hosts) {
+ System.out.println("Starting Instance:" + instanceName);
+ ExampleParticipant process =
+ new ExampleParticipant(zkConnectString, clusterName, instanceName, delay);
+ process.start();
+ System.out.println("Started Instance:" + instanceName);
+ Runtime.getRuntime().addShutdownHook(new HelixManagerShutdownHook(process.getManager()));
+ }
+
+ Thread.currentThread().join();
+ }
+}
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowTimeout.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowTimeout.java
index c1f7060cb1..0229943400 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowTimeout.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowTimeout.java
@@ -12,6 +12,7 @@
import org.apache.helix.task.TaskUtil;
import org.apache.helix.task.Workflow;
import org.apache.helix.task.WorkflowConfig;
+import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@@ -81,4 +82,20 @@ public void testJobQueueNotApplyTimeout() throws InterruptedException {
// Add back the config
_jobBuilder.setJobCommandConfigMap(ImmutableMap.of(MockTask.TIMEOUT_CONFIG, "99999999"));
}
+
+ @Test
+ public void testWorkflowTimeoutWhenWorkflowCompleted() throws InterruptedException {
+ String workflowName = TestHelper.getTestMethodName();
+ _jobBuilder.setWorkflow(workflowName);
+ _jobBuilder.setJobCommandConfigMap(Collections.emptyMap());
+ Workflow.Builder workflowBuilder = new Workflow.Builder(workflowName)
+ .setWorkflowConfig(new WorkflowConfig.Builder(workflowName).setTimeout(0).build())
+ .addJob(JOB_NAME, _jobBuilder).setExpiry(2000L);
+
+ _driver.start(workflowBuilder.build());
+ // Pause the queue
+ Thread.sleep(2500);
+ Assert.assertNull(_driver.getWorkflowConfig(workflowName));
+ Assert.assertNull(_driver.getJobContext(workflowName));
+ }
}