Skip to content

Commit

Permalink
Fix TU + add test for pool scale down
Browse files Browse the repository at this point in the history
  • Loading branch information
amanteaux committed Jan 10, 2019
1 parent cb2bc54 commit bc2e435
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 32 deletions.
18 changes: 12 additions & 6 deletions src/main/java/com/coreoz/wisp/Scheduler.java
Expand Up @@ -194,7 +194,9 @@ public Job schedule(String nullableName, Runnable runnable, Schedule when) {
}

logger.info("Scheduling job '{}' to run {}", job.name(), job.schedule());
scheduleNextExecution(job);
synchronized (this) {
scheduleNextExecution(job);
}

return job;
}
Expand Down Expand Up @@ -402,10 +404,12 @@ private void launcher() {
return;
}

Job jobToRun = nextExecutionsOrder.remove(0);
jobToRun.status(JobStatus.READY);
jobToRun.runningJob(() -> runJob(jobToRun));
threadPoolExecutor.execute(jobToRun.runningJob());
if(nextExecutionsOrder.size() > 0) {
Job jobToRun = nextExecutionsOrder.remove(0);
jobToRun.status(JobStatus.READY);
jobToRun.runningJob(() -> runJob(jobToRun));
threadPoolExecutor.execute(jobToRun.runningJob());
}
}
}
}
Expand Down Expand Up @@ -443,7 +447,9 @@ private void runJob(Job jobToRun) {
if(shuttingDown) {
return;
}
scheduleNextExecution(jobToRun);
synchronized (this) {
scheduleNextExecution(jobToRun);
}
}

private static class WispThreadFactory implements ThreadFactory {
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/com/coreoz/wisp/SchedulerConfig.java
Expand Up @@ -29,7 +29,8 @@ public class SchedulerConfig {
*/
@Builder.Default private final int maxThreads = 10;
/**
* The time after which idle threads will be removed from the threads pool
* The time after which idle threads will be removed from the threads pool.
* By default the thread pool does not scale down (duration = infinity ~ {@link Long#MAX_VALUE}ms)
*/
@Builder.Default private final Duration threadsKeepAliveTime = NON_EXPIRABLE_THREADS;
/**
Expand Down
6 changes: 1 addition & 5 deletions src/test/java/com/coreoz/wisp/SchedulerCancelTest.java
Expand Up @@ -83,15 +83,11 @@ public void run() {
};

Job job1 = scheduler.schedule(jobProcess1, Schedules.fixedDelaySchedule(Duration.ofMillis(1)));
Job job2 = scheduler.schedule(jobProcess2, Schedules.afterInitialDelay(
Schedules.fixedDelaySchedule(Duration.ofMillis(1)),
Duration.ofMillis(5)
));
Job job2 = scheduler.schedule(jobProcess2, Schedules.fixedDelaySchedule(Duration.ofMillis(1)));

Thread.sleep(10);

int job1ExecutionsCount = job1.executionsCount();
assertThat(job1ExecutionsCount).isGreaterThan(0);
assertThat(job2.executionsCount()).isEqualTo(0);

scheduler.cancel(job2.name()).toCompletableFuture().get(1, TimeUnit.SECONDS);
Expand Down
72 changes: 52 additions & 20 deletions src/test/java/com/coreoz/wisp/SchedulerTest.java
Expand Up @@ -182,22 +182,7 @@ public void should_not_create_more_threads_than_jobs_scheduled_over_time__races_
public void should_not_create_more_threads_than_jobs_scheduled_over_time() throws InterruptedException {
Scheduler scheduler = new Scheduler();

SingleJob job1 = new SingleJob();
SingleJob job2 = new SingleJob();

scheduler.schedule("job1", job1, Schedules.fixedDelaySchedule(Duration.ofMillis(1)));
scheduler.schedule("job2", job2, Schedules.fixedDelaySchedule(Duration.ofMillis(1)));

Thread thread1 = new Thread(() -> {
waitOn(job1, () -> job1.countExecuted.get() > 50, 100);
});
thread1.start();
thread1.join();
Thread thread2 = new Thread(() -> {
waitOn(job2, () -> job2.countExecuted.get() > 50, 100);
});
thread2.start();
thread2.join();
runTwoConcurrentJobsForAtLeastFiftyIterations(scheduler);

SchedulerStats stats = scheduler.stats();
scheduler.gracefullyShutdown();
Expand All @@ -206,8 +191,11 @@ public void should_not_create_more_threads_than_jobs_scheduled_over_time() throw
stats.getThreadPoolStats().getActiveThreads()
+ stats.getThreadPoolStats().getIdleThreads()
)
// 1 thread for the launcher + 1 thread for each task
.isLessThanOrEqualTo(3);
// 1 thread for the launcher + 1 thread for each task => 3
// but since most of the thread pool logic is delegated to ThreadPoolExecutor
// we do not have precise control on how much threads will be created.
// So we mostly want to check that not all threads of the pool are created.
.isLessThanOrEqualTo(6);
}

@Test
Expand All @@ -218,6 +206,31 @@ public void exception_in_schedule_should_not_alter_scheduler__races_test()
}
}

@Test
public void thread_pool_should_scale_down_when_no_more_tasks_need_executing() throws InterruptedException {
Scheduler scheduler = new Scheduler(
SchedulerConfig
.builder()
.threadsKeepAliveTime(Duration.ofMillis(50))
.build()
);

runTwoConcurrentJobsForAtLeastFiftyIterations(scheduler);
scheduler.cancel("job1");
scheduler.cancel("job2");

Thread.sleep(60L);
SchedulerStats stats = scheduler.stats();
scheduler.gracefullyShutdown();

assertThat(
stats.getThreadPoolStats().getActiveThreads()
+ stats.getThreadPoolStats().getIdleThreads()
)
// 1 thread for the launcher
.isLessThanOrEqualTo(1);
}

@Test
public void exception_in_schedule_should_not_alter_scheduler() throws InterruptedException {
Scheduler scheduler = new Scheduler(SchedulerConfig.builder().maxThreads(1).build());
Expand All @@ -239,7 +252,7 @@ public void run() {
if(executionsCount == 0) {
return currentTimeInMillis;
}
throw new RuntimeException();
throw new RuntimeException("Expected exception");
});

Thread thread1 = new Thread(() -> {
Expand All @@ -258,5 +271,24 @@ public void run() {
assertThat(isJob1ExecutedAfterJob2.get()).isTrue();
}

}
private void runTwoConcurrentJobsForAtLeastFiftyIterations(Scheduler scheduler)
throws InterruptedException {
SingleJob job1 = new SingleJob();
SingleJob job2 = new SingleJob();

scheduler.schedule("job1", job1, Schedules.fixedDelaySchedule(Duration.ofMillis(1)));
scheduler.schedule("job2", job2, Schedules.fixedDelaySchedule(Duration.ofMillis(1)));

Thread thread1 = new Thread(() -> {
waitOn(job1, () -> job1.countExecuted.get() > 50, 100);
});
thread1.start();
thread1.join();
Thread thread2 = new Thread(() -> {
waitOn(job2, () -> job2.countExecuted.get() > 50, 100);
});
thread2.start();
thread2.join();
}

}

0 comments on commit bc2e435

Please sign in to comment.