Skip to content

Commit

Permalink
STORM-3355: Use supervisor.worker.shutdown.sleep.secs to set worker s…
Browse files Browse the repository at this point in the history
…uicide delay to allow users to configure how long they're willing to wait for orderly shutdown
  • Loading branch information
srdo committed Mar 15, 2019
1 parent ca3a17a commit a5aec8b
Show file tree
Hide file tree
Showing 7 changed files with 17 additions and 13 deletions.
2 changes: 1 addition & 1 deletion conf/defaults.yaml
Expand Up @@ -161,7 +161,7 @@ supervisor.run.worker.as.user: false
supervisor.worker.start.timeout.secs: 120
#how long between heartbeats until supervisor considers that worker dead and tries to restart it
supervisor.worker.timeout.secs: 30
#how many seconds to sleep for before shutting down threads on worker
#How many seconds to allow for graceful worker shutdown when killing workers before resorting to force kill
supervisor.worker.shutdown.sleep.secs: 3
#how frequently the supervisor checks on the status of the processes it's monitoring and restarts if necessary
supervisor.monitor.frequency.secs: 3
Expand Down
7 changes: 7 additions & 0 deletions storm-client/src/jvm/org/apache/storm/Config.java
Expand Up @@ -1041,6 +1041,13 @@ public class Config extends HashMap<String, Object> {
@isPositiveNumber
@NotNull
public static final String SUPERVISOR_WORKER_TIMEOUT_SECS = "supervisor.worker.timeout.secs";
/**
* How many seconds to allow for graceful worker shutdown when killing workers before resorting to force kill.
* If a worker fails to shut down gracefully within this delay, it will either suicide or be forcibly killed by the supervisor.
*/
@isInteger
@isPositiveNumber
public static final String SUPERVISOR_WORKER_SHUTDOWN_SLEEP_SECS = "supervisor.worker.shutdown.sleep.secs";
/**
* A list of hosts of ZooKeeper servers used to manage the cluster.
*/
Expand Down
Expand Up @@ -127,7 +127,9 @@ public static void main(String[] args) throws Exception {
Worker worker = new Worker(conf, null, stormId, assignmentId, Integer.parseInt(supervisorPort),
Integer.parseInt(portStr), workerId);
worker.start();
Utils.addShutdownHookWithForceKillIn1Sec(worker::shutdown);
int workerShutdownSleepSecs = ObjectReader.getInt(conf.get(Config.SUPERVISOR_WORKER_SHUTDOWN_SLEEP_SECS);
LOG.info("Adding shutdown hook with kill in {} secs", workerShutdownSleepSecs));
Utils.addShutdownHookWithDelayedForceKill(worker::shutdown, workerShutdownSleepSecs));
}

public void start() throws Exception {
Expand Down
6 changes: 4 additions & 2 deletions storm-client/src/jvm/org/apache/storm/task/IBolt.java
Expand Up @@ -14,6 +14,7 @@

import java.io.Serializable;
import java.util.Map;
import org.apache.storm.Config;
import org.apache.storm.tuple.Tuple;

/**
Expand Down Expand Up @@ -63,8 +64,9 @@ public interface IBolt extends Serializable {
void execute(Tuple input);

/**
* Called when an IBolt is going to be shutdown. There is no guarantee that cleanup will be called, because the supervisor kill -9's
* worker processes on the cluster.
* Called when an IBolt is going to be shutdown. Storm will make a best-effort attempt to call this if the worker shutdown is orderly.
* The {@link Config#SUPERVISOR_WORKER_SHUTDOWN_SLEEP_SECS} setting controls how long orderly shutdown is allowed to take.
* There is no guarantee that cleanup will be called if shutdown is not orderly, or if the shutdown exceeds the time limit.
*
* The one context where cleanup is guaranteed to be called is when a topology is killed when running Storm in local mode.
*/
Expand Down
7 changes: 0 additions & 7 deletions storm-server/src/main/java/org/apache/storm/DaemonConfig.java
Expand Up @@ -772,13 +772,6 @@ public class DaemonConfig implements Validated {
@isStringOrStringList
public static final String SUPERVISOR_CHILDOPTS = "supervisor.childopts";

/**
* How many seconds to sleep for before shutting down threads on worker.
*/
@isInteger
@isPositiveNumber
public static final String SUPERVISOR_WORKER_SHUTDOWN_SLEEP_SECS = "supervisor.worker.shutdown.sleep.secs";

/**
* How long a worker can go without heartbeating during the initial launch before the supervisor tries to restart the worker process.
* This value override supervisor.worker.timeout.secs during launch because there is additional overhead to starting and configuring the
Expand Down
Expand Up @@ -95,7 +95,7 @@ public Slot(AsyncLocalizer localizer, Map<String, Object> conf,
this.staticState = new StaticState(localizer,
ObjectReader.getInt(conf.get(Config.SUPERVISOR_WORKER_TIMEOUT_SECS)) * 1000,
ObjectReader.getInt(conf.get(DaemonConfig.SUPERVISOR_WORKER_START_TIMEOUT_SECS)) * 1000,
ObjectReader.getInt(conf.get(DaemonConfig.SUPERVISOR_WORKER_SHUTDOWN_SLEEP_SECS)) * 1000,
ObjectReader.getInt(conf.get(Config.SUPERVISOR_WORKER_SHUTDOWN_SLEEP_SECS)) * 1000,
ObjectReader.getInt(conf.get(DaemonConfig.SUPERVISOR_MONITOR_FREQUENCY_SECS)) * 1000,
containerLauncher,
host,
Expand Down
Expand Up @@ -508,7 +508,7 @@ void killWorkers(Collection<String> workerIds, ContainerLauncher launcher) throw
LOG.error("Error trying to kill {}", workerId, e);
}
}
int shutdownSleepSecs = ObjectReader.getInt(conf.get(DaemonConfig.SUPERVISOR_WORKER_SHUTDOWN_SLEEP_SECS), 1);
int shutdownSleepSecs = ObjectReader.getInt(conf.get(Config.SUPERVISOR_WORKER_SHUTDOWN_SLEEP_SECS));
if (!containers.isEmpty()) {
Time.sleepSecs(shutdownSleepSecs);
}
Expand Down

0 comments on commit a5aec8b

Please sign in to comment.