Skip to content

Commit

Permalink
fix(ec2): Revise shutdown behavior
Browse files Browse the repository at this point in the history
  • Loading branch information
ansoncfit committed Nov 27, 2018
1 parent eb649b1 commit d7b6869
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@ public AnalysisWorkerController (AnalystWorker analystWorker) {
}

public Object handleSinglePoint (Request request, Response response) throws IOException {
// Record the fact that this worker is busy so it will not shut down
analystWorker.lastSinglePointTime = System.currentTimeMillis();
// This header will cause the Spark Framework to gzip the data automatically if requested by the client.
// FIXME I'm not seeing this on the wire, is the client asking for gzipped responses?
response.header("Content-Encoding", "gzip");
Expand Down
97 changes: 46 additions & 51 deletions src/main/java/com/conveyal/r5/analyst/cluster/AnalystWorker.java
Original file line number Diff line number Diff line change
Expand Up @@ -119,17 +119,23 @@ public class AnalystWorker implements Runnable {
/** In the type of tests described above, this is how often the worker will fail to return a result for a task. */
public static final int TESTING_FAILURE_RATE_PERCENT = 20;

/** The minimum amount of time (in minutes) that this worker should stay alive after processing a single-point task. */
public static final int SINGLE_KEEPALIVE_MINUTES = 20;
/** The amount of time (in minutes) a worker will stay alive after starting certain work */
static final int PRELOAD_KEEPALIVE_MINUTES = 90;
static final int REGIONAL_KEEPALIVE_MINUTES = 2;
static final int SINGLE_KEEPALIVE_MINUTES = 60;

/** The minimum amount of time (in minutes) that this worker should stay alive after processing a regional job task. */
public static final int REGIONAL_KEEPALIVE_MINUTES = 10;
/** Clock time (milliseconds since epoch) at which the worker should be considered to have finished preloading */
long preloadActivityExpiration = 0;

/** Clock time (milliseconds since epoch) at which the worker should be considered idle on regional work */
long regionalActivityExpiration = 0;

/** Clock time (milliseconds since epoch) at which the worker should be considered idle on single-point work */
long singleActivityExpiration = 0;

/** Whether this worker should shut down automatically when idle. */
public final boolean autoShutdown;

long startupTime, nextShutdownCheckTime;

public static final Random random = new Random();

/** The common root of all API URLs contacted by this worker, e.g. http://localhost:7070/api/ */
Expand Down Expand Up @@ -186,18 +192,6 @@ public static HttpClient makeHttpClient () {
/** Information about the EC2 instance (if any) this worker is running on. */
EC2Info ec2info;

/**
* The time the last single point task was processed, in milliseconds since the epoch.
* Used to check if the machine should be shut down.
*/
protected long lastSinglePointTime = 0;

/**
* The time the last regional task was processed, in milliseconds since the epoch.
* Used to check if the machine should be shut down.
*/
protected long lastRegionalTaskTime = 0;

/** If true Analyst is running locally, do not use internet connection and remote services such as S3. */
private boolean workOffline;

Expand Down Expand Up @@ -257,9 +251,8 @@ public AnalystWorker(Properties config, TransportNetworkCache transportNetworkCa
this.networkPreloader = new NetworkPreloader(transportNetworkCache);
this.autoShutdown = Boolean.parseBoolean(config.getProperty("auto-shutdown", "false"));

// Consider shutting this worker down every 10 minutes, starting 30 minutes after it started up.
startupTime = System.currentTimeMillis();
nextShutdownCheckTime = startupTime + 30 * 60 * 1000;
// Keep the worker alive for an initial window to prepare for analysis
preloadActivityExpiration = System.currentTimeMillis() + PRELOAD_KEEPALIVE_MINUTES * 60 * 1000;

// Discover information about what EC2 instance / region we're running on, if any.
// If the worker isn't running in Amazon EC2, then region will be unknown so fall back on a default, because
Expand All @@ -275,36 +268,35 @@ public AnalystWorker(Properties config, TransportNetworkCache transportNetworkCa
}

/**
* Consider shutting down if enough time has passed.
* The strategy depends on the fact that we are billed by the hour for EC2 machines.
* We only need to consider shutting them down once an hour. Leaving them alive for the rest of the hour provides
* immediate compute capacity in case someone starts another job.
* So a few minutes before each hour of uptime has elapsed, we check how long the machine has been idle.
* Shut down if enough time has passed since certain events (startup or handling an analysis request). When EC2
* billing was in hourly increments, the worker would only consider shutting down every 60 minutes. But EC2
* billing is now by the second, so we check more frequently (during regular polling).
*/
public void considerShuttingDown() {
long now = System.currentTimeMillis();
if (now > nextShutdownCheckTime && autoShutdown) {
// Check again exactly ten minutes later (note that billing is now in second increments)
nextShutdownCheckTime += 10 * 60 * 1000;
if (now > lastSinglePointTime + (SINGLE_KEEPALIVE_MINUTES * 60 * 1000) &&
now > lastRegionalTaskTime + (REGIONAL_KEEPALIVE_MINUTES * 60 * 1000)) {
LOG.info("Machine has been idle for at least {} minutes (single point) and {} minutes (regional), " +
"shutting down.", SINGLE_KEEPALIVE_MINUTES, REGIONAL_KEEPALIVE_MINUTES);
// Stop accepting any new single-point requests while shutdown is happening.
// TODO maybe actively tell the broker this worker is shutting down.
sparkHttpService.stop();
try {
Process process = new ProcessBuilder("sudo", "/sbin/shutdown", "-h", "now").start();
process.waitFor();
} catch (Exception ex) {
LOG.error("Unable to terminate worker", ex);
// TODO email us or something
} finally {
System.exit(0);
}

// Shutdown worker if it did any work and is now idle, or if it never successfully did work and it's now past
// the allowable time for preloading
boolean shutdown = ((singleActivityExpiration != 0 || regionalActivityExpiration != 0) &&
now > singleActivityExpiration && now > regionalActivityExpiration) ||
(singleActivityExpiration == 0 && regionalActivityExpiration == 0 && now > preloadActivityExpiration);

if (shutdown) {
LOG.info("Machine has been idle for at least {} minutes (single point) and {} minutes (regional), " +
"shutting down.", SINGLE_KEEPALIVE_MINUTES , REGIONAL_KEEPALIVE_MINUTES);
// Stop accepting any new single-point requests while shutdown is happening.
// TODO maybe actively tell the broker this worker is shutting down.
sparkHttpService.stop();
try {
Process process = new ProcessBuilder("sudo", "/sbin/shutdown", "-h", "now").start();
process.waitFor();
} catch (Exception ex) {
LOG.error("Unable to terminate worker", ex);
// TODO email us or something
} finally {
System.exit(0);
}
}

}

/**
Expand Down Expand Up @@ -352,7 +344,7 @@ public void run() {
if (tasks == null || tasks.isEmpty()) {
// Either there was no work, or some kind of error occurred.
// Sleep for a while before polling again, adding a random component to spread out the polling load.
considerShuttingDown();
if (autoShutdown) {considerShuttingDown();}
int randomWait = random.nextInt(POLL_MAX_RANDOM_WAIT);
LOG.info("Polling the broker did not yield any regional tasks. Sleeping {} + {} sec.", POLL_WAIT_SECONDS, randomWait);
sleepSeconds(POLL_WAIT_SECONDS + randomWait);
Expand Down Expand Up @@ -395,8 +387,6 @@ public void sleepSeconds (int seconds) {
protected byte[] handleOneSinglePointTask (TravelTimeSurfaceTask task)
throws WorkerNotReadyException, ScenarioApplicationException, IOException {

// Record the fact that the worker is busy so it won't shut down.
lastSinglePointTime = System.currentTimeMillis();
LOG.info("Handling single-point task {}", task.toString());

// Get all the data needed to run one analysis task, or at least begin preparing it.
Expand All @@ -416,6 +406,10 @@ protected byte[] handleOneSinglePointTask (TravelTimeSurfaceTask task)
networkId = task.graphId;
TransportNetwork transportNetwork = networkLoaderState.value;

// After the AsyncLoader has reported all required data are ready for analysis, advance the shutdown clock to
// reflect that the worker is performing single-point work.
singleActivityExpiration = System.currentTimeMillis() + SINGLE_KEEPALIVE_MINUTES * 60 * 1000;

// Perform the core travel time computations.
TravelTimeComputer computer = new TravelTimeComputer(task, transportNetwork, gridCache);
OneOriginResult oneOriginResult = computer.computeTravelTimes();
Expand Down Expand Up @@ -452,8 +446,6 @@ protected byte[] handleOneSinglePointTask (TravelTimeSurfaceTask task)
*/
protected void handleOneRegionalTask(RegionalTask task) {

// Record the fact that the worker is busy so it won't shut down.
lastRegionalTaskTime = System.currentTimeMillis();
LOG.info("Handling regional task {}", task.toString());

// If this worker is being used in a test of the task redelivery mechanism. Report most work as completed
Expand Down Expand Up @@ -491,6 +483,9 @@ protected void handleOneRegionalTask(RegionalTask task) {
saveStaticSiteMetadata(task, transportNetwork);
}

// Advance the shutdown clock to reflect that the worker is performing regional work.
regionalActivityExpiration = System.currentTimeMillis() + REGIONAL_KEEPALIVE_MINUTES * 60 * 1000;

// Perform the core travel time and accessibility computations.
TravelTimeComputer computer = new TravelTimeComputer(task, transportNetwork, gridCache);
OneOriginResult oneOriginResult = computer.computeTravelTimes();
Expand Down

0 comments on commit d7b6869

Please sign in to comment.