Skip to content

Commit

Permalink
Deactivate JobContextReaper by default
Browse files Browse the repository at this point in the history
Since it sometimes killed jobs that were not really stuck but just slow.
  • Loading branch information
mfussenegger authored and chaudum committed Dec 3, 2015
1 parent 38fa4cd commit 1b4fd79
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 11 deletions.
3 changes: 3 additions & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ Changes for Crate
Unreleased
==========

- The ``jobs.keep_alive_timeout`` value can now be configured and the default
has been changed to 0 (deactivated) from ``5m``

2015/12/02 0.53.1
=================

Expand Down
22 changes: 22 additions & 0 deletions docs/configuration.txt
Original file line number Diff line number Diff line change
Expand Up @@ -587,6 +587,28 @@ Repositories are used to :ref:`backup <snapshot-restore>` a Crate cluster.

See also the :ref:`path.repo <conf-path-repo>` Setting.

Jobs
----

**jobs.keep_alive_timeout**
| *Runtime:* ``no``
| *Default:* ``0``

A time value that defines how long a job may appear idle before it is
automatically terminated.
If this is set to 0 (the default) jobs are never terminated.

The logic to determine if a job is idle isn't accurate and there might be
false positives.
For example a SELECT statement using a very slow scalar function might be
regarded as stuck and could be killed although it was active.

So be careful setting this to something other than 0.

The time value can be set either as long, double or string and using time
suffixes is possible. Valid suffixes are: (``ms``, ``s``, ``m``, ``h``,
``d``, ``w``)

.. _conf-cluster-settings:

Cluster Wide Settings
Expand Down
20 changes: 13 additions & 7 deletions sql/src/main/java/io/crate/jobs/JobContextService.java
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,16 @@ public JobContextService(Settings settings,
this.statsTables = statsTables;
this.keepAlive = keepAlive;
this.reaperImpl = reaper;
this.keepAliveReaper = threadPool.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
reaperImpl.killHangingJobs(JobContextService.this.keepAlive, activeContexts.values());
}
}, reaperInterval);
if (keepAlive.micros() > 0) {
this.keepAliveReaper = threadPool.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
reaperImpl.killHangingJobs(JobContextService.this.keepAlive, activeContexts.values());
}
}, reaperInterval);
} else {
this.keepAliveReaper = null;
}
}

@Override
Expand All @@ -114,7 +118,9 @@ public void addListener(KillAllListener listener) {

@Override
protected void doClose() throws ElasticsearchException {
keepAliveReaper.cancel(false);
if (keepAliveReaper != null) {
keepAliveReaper.cancel(false);
}
}

public JobExecutionContext getContext(UUID jobId) {
Expand Down
12 changes: 11 additions & 1 deletion sql/src/main/java/io/crate/jobs/JobModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,25 @@
package io.crate.jobs;

import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;

import static org.elasticsearch.common.unit.TimeValue.timeValueMinutes;
import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;

public class JobModule extends AbstractModule {

private static final String JOB_KEEP_ALIVE = "jobs.keep_alive_timeout";
private final TimeValue keepAlive;

public JobModule(Settings settings) {
keepAlive = settings.getAsTime(JOB_KEEP_ALIVE, timeValueSeconds(0));
}

@Override
protected void configure() {
bind(JobContextService.class).asEagerSingleton();
bind(TimeValue.class).annotatedWith(JobContextService.JobKeepAlive.class).toInstance(timeValueMinutes(5));
bind(TimeValue.class).annotatedWith(JobContextService.JobKeepAlive.class).toInstance(keepAlive);
bind(TimeValue.class).annotatedWith(JobContextService.ReaperInterval.class).toInstance(timeValueMinutes(1));
bind(Reaper.class).to(LocalReaper.class).asEagerSingleton();

Expand Down
2 changes: 1 addition & 1 deletion sql/src/main/java/io/crate/jobs/LocalReaper.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public void killHangingJobs(TimeValue maxKeepAliveTime, Collection<JobExecutionC
}
if ((time - lastAccessTime > maxKeepAliveTime.getMillis())) {
UUID id = context.jobId();
LOGGER.debug("closing job collect context [{}], time [{}], lastAccessTime [{}]",
LOGGER.debug("closing JobExecutionContext [{}], time [{}], lastAccessTime [{}]",
id, time, lastAccessTime);
try {
context.kill();
Expand Down
4 changes: 2 additions & 2 deletions sql/src/test/java/io/crate/jobs/JobContextServiceTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -293,8 +293,8 @@ public void testKeepAliveExpiration() throws Exception {
testThreadPool,
mock(StatsTables.class),
new LocalReaper(testThreadPool),
timeValueMillis(0),
timeValueMillis(1));
timeValueMillis(1),
timeValueMillis(5));

final AbstractExecutionSubContextTest.TestingExecutionSubContext executionSubContext = new AbstractExecutionSubContextTest.TestingExecutionSubContext();

Expand Down

0 comments on commit 1b4fd79

Please sign in to comment.