Navigation Menu

Skip to content

Commit

Permalink
[sre] Multiple agent spawning is now run in parallel for each agent.
Browse files Browse the repository at this point in the history
close #987

Signed-off-by: Stéphane Galland <galland@arakhne.org>
  • Loading branch information
gallandarakhneorg committed Mar 24, 2020
1 parent ef5d6d5 commit 355ba42
Show file tree
Hide file tree
Showing 6 changed files with 313 additions and 6 deletions.
Expand Up @@ -35,5 +35,5 @@ ServicesConfigModule_4 = {true|false}
ServicesConfigModuleProvider_0 = The configuration for the SRE services.

ContextsConfigModule_0 = Specify the delay in milliseconds before testing if a space should be destroyed if it becomes empty; Default is {0}.
ServicesConfigModule_1 = delay
ContextsConfigModule_1 = delay
ContextsConfigModuleProvider_0 = The configuration for the SRE space repository.
Expand Up @@ -222,4 +222,39 @@ abstract class AbstractExecutorService extends AbstractSreService implements Exe
StreamSupport::stream(collection.spliterator, true).forEach(task.protectConsumer(logger))
}

/** Execute the given task in parallel but do not wait for the termination.
*
* @param task the task to run.
* @param nbExecutions number of runs
* @param runGroupSize the size of a parallel group
*/
protected final def executeInThreadsWithoutWaiting(task : SreRunnable, nbExecutions : int, runGroupSize : int) {
assert runGroupSize >= 1
val es = executorService
if (runGroupSize > 1) {
val numberOfGroups = nbExecutions / runGroupSize
val rest = nbExecutions - numberOfGroups * runGroupSize
for (var i = 0; i < numberOfGroups; i++) {
es.execute [
for (var j = 0; j < runGroupSize; j++) {
task.run
}
]
}
if (rest > 0) {
es.execute [
for (var j = 0; j < rest; j++) {
task.run
}
]
}
} else {
for (var i = 0; i < nbExecutions; i++) {
es.execute [
task.run
]
}
}
}

}
Expand Up @@ -208,7 +208,7 @@ interface ExecutorService extends Service {
* @param task the task to submit.
* @param thrownExceptions indicates if the exceptions in the given tasks are thrown forward by this function.
* @param logger the logger to use for errors.
* @since 0.6.0
* @since 0.6
*/
def executeBlockingTasks(logger : Logger = null, thrownExceptions : boolean = false, task : Collection<Runnable>)

Expand All @@ -225,7 +225,7 @@ interface ExecutorService extends Service {
* @param logger the logger to use for errors.
* @param collection the collection of elements to iterate on.
* @param task the task to submit. Be sure that the task is synchronized on the given collection.
* @since 0.6.0
* @since 0.6
*/
def applyBlockingConsumer(logger : Logger = null, collection : Iterable<T>, task : Consumer<? super T>) with T

Expand Down Expand Up @@ -262,9 +262,44 @@ interface ExecutorService extends Service {
* @param runGroupSize the number of tasks to be run by a single thread.
* @return the number of successful runs.
* @throws InterruptedException when the function cannot wait for task termination.
* @since 0.5.0
* @since 0.5
*/
def executeBlockingTask(logger : Logger = null, nbExecutions : int, runGroupSize : int,
task : Runnable) : int

/**
* Submit a single task multiple times to the executor service.
*
* <p>The same task instance will be submit to the executor service.
*
* <p>{@code runGroupSize} indicates how many number of times the task will be run on
* a single thread.
*
* <p>This function is equivalent to:
* <pre><code>
* for(i in [ 1 .. (nbExecutions/runGroupSize) ])
* do
* execute({
* for(j in [1..runGroupSize]) {
* task.run
* }
* })
* done
* </code></pre>
*
* <p>Caution: if a {@code task} is failing, the exception will be output on the logger. This function never fails.
*
* <p>According to the implementation of the service, the given task may be run in the same or separated thread
* than the one of the caller.
*
* <p>If an exception occurs into the given consume, the exception is logged. It is never thrown by this function.
*
* @param logger the logger to use for errors.
* @param task the task to submit.
* @param nbExecutions the number of times the task must be run, usually greater than 1.
* @param runGroupSize the number of tasks to be run by a single thread.
* @since 0.11
*/
def executeNotBlockingTask(logger : Logger = null, nbExecutions : int, runGroupSize : int, task : Runnable)

}
Expand Up @@ -189,6 +189,10 @@ class JreExecutorService extends AbstractExecutorService implements PreReleasabl
executeInThreadsAndWait(task.protectRunnable(logger), nbExecutions, runGroupSize)
}

def executeNotBlockingTask(logger : Logger, nbExecutions : int, runGroupSize : int, task : Runnable) {
executeInThreadsWithoutWaiting(task.protectRunnable(logger), nbExecutions, runGroupSize)
}

def executeAsap(logger : Logger, task : Runnable) : Future<?> {
executorService.submit(task.protectRunnable(logger))
}
Expand Down
Expand Up @@ -277,7 +277,7 @@ abstract class AbstractLifecycleService extends AbstractSreService implements Li
}
]
if (nbAgents > 1) {
this.executor.executeBlockingTask(loggingService.kernelLogger, nbAgents,
this.executor.executeNotBlockingTask(loggingService.kernelLogger, nbAgents,
this.lifecycleConfig.agentSpawningCountPerThread, agentCreator)
} else {
this.executor.executeAsap(loggingService.kernelLogger, agentCreator)
Expand Down
Expand Up @@ -313,7 +313,6 @@ abstract class AbstractExecutorServiceTest<T extends AbstractExecutorService> {
exception.assertSame(capturedException.value)
}


@Test
@DisplayName("executeBlockingTask 1 task")
def executeBlockingTask_noException_1task {
Expand Down Expand Up @@ -548,4 +547,238 @@ abstract class AbstractExecutorServiceTest<T extends AbstractExecutorService> {
exception.assertSame(capturedException.value)
}

@Test
@DisplayName("executeNotBlockingTask 1 task")
def executeNotBlockingTask_noException_1task {
var run = typeof(Runnable).mock
doAnswer([
(it.getArgument(0) as Runnable).run
return null
]).when(this.executor).execute(any)

this.service.executeNotBlockingTask(this.logger, 1, 100, run)

this.executor.verify(1.times).execute(any)

run.verify(1.times).run

verifyNoMoreInteractions(this.logger)
}

@Test
@DisplayName("executeNotBlockingTask 4 tasks with 1 task per group")
def executeNotBlockingTask_noException_4tasks_1member {
var run = typeof(Runnable).mock
doAnswer([
(it.getArgument(0) as Runnable).run
return null
]).when(this.executor).execute(any)

this.service.executeNotBlockingTask(this.logger, 4, 1, run)

this.executor.verify(4.times).execute(any)

run.verify(4.times).run

verifyNoMoreInteractions(this.logger)
}

@Test
@DisplayName("executeNotBlockingTask 4 tasks with 2 tasks per group")
def executeNotBlockingTask_noException_4tasks_2members {
var run = typeof(Runnable).mock
doAnswer([
(it.getArgument(0) as Runnable).run
return null
]).when(this.executor).execute(any)

this.service.executeNotBlockingTask(this.logger, 4, 2, run)

this.executor.verify(2.times).execute(any)

run.verify(4.times).run

verifyNoMoreInteractions(this.logger)
}

@Test
@DisplayName("executeNotBlockingTask 4 tasks with 3 tasks per group")
def executeNotBlockingTask_noException_4tasks_3members {
var run = typeof(Runnable).mock
doAnswer([
(it.getArgument(0) as Runnable).run
return null
]).when(this.executor).execute(any)

this.service.executeNotBlockingTask(this.logger, 4, 3, run)

this.executor.verify(2.times).execute(any)

run.verify(4.times).run

verifyNoMoreInteractions(this.logger)
}

@Test
@DisplayName("executeNotBlockingTask 4 tasks with 4 tasks per group")
def executeNotBlockingTask_noException_4tasks_4members {
var run = typeof(Runnable).mock
doAnswer([
(it.getArgument(0) as Runnable).run
return null
]).when(this.executor).execute(any)

this.service.executeNotBlockingTask(this.logger, 4, 4, run)

this.executor.verify(1.times).execute(any)

run.verify(4.times).run

verifyNoMoreInteractions(this.logger)
}

@Test
@DisplayName("executeNotBlockingTask 4 tasks with 5 tasks per group")
def executeNotBlockingTask_noException_4tasks_5members {
var run = typeof(Runnable).mock
doAnswer([
(it.getArgument(0) as Runnable).run
return null
]).when(this.executor).execute(any)

this.service.executeBlockingTask(this.logger, 4, 5, run)

this.executor.verify(1.times).execute(any)

run.verify(4.times).run

verifyNoMoreInteractions(this.logger)
}

@Test
@DisplayName("executeNotBlockingTask 1 erroneous task")
def executeNotBlockingTask_exception_1task {
var exception = typeof(RuntimeException).mock
var run : Runnable = [throw exception]
doAnswer([
(it.getArgument(0) as Runnable).run
return null
]).when(this.executor).execute(any)

this.service.executeNotBlockingTask(this.logger, 1, 100, run)

this.executor.verify(1.times).execute(any)

var capturedLevel = ArgumentCaptor::forClass(typeof(Level))
var capturedException = ArgumentCaptor::forClass(typeof(Throwable))
this.logger.verify(1.times).log(capturedLevel.capture, any, capturedException.capture)
Level::SEVERE.assertSame(capturedLevel.value)
exception.assertSame(capturedException.value)
}

@Test
@DisplayName("executeNotBlockingTask 4 erroneous task with 1 task per group")
def executeNotBlockingTask_exception_4tasks_1member {
var exception = typeof(RuntimeException).mock
var run : Runnable = [throw exception]
doAnswer([
(it.getArgument(0) as Runnable).run
return null
]).when(this.executor).execute(any)

this.service.executeNotBlockingTask(this.logger, 4, 1, run)

this.executor.verify(4.times).execute(any)

var capturedLevel = ArgumentCaptor::forClass(typeof(Level))
var capturedException = ArgumentCaptor::forClass(typeof(Throwable))
this.logger.verify(4.times).log(capturedLevel.capture, any, capturedException.capture)
Level::SEVERE.assertSame(capturedLevel.value)
exception.assertSame(capturedException.value)
}

@Test
@DisplayName("executeNotBlockingTask 4 erroneous task with 2 tasks per group")
def executeNotBlockingTask_exception_4tasks_2members {
var exception = typeof(RuntimeException).mock
var run : Runnable = [throw exception]
doAnswer([
(it.getArgument(0) as Runnable).run
return null
]).when(this.executor).execute(any)

this.service.executeNotBlockingTask(this.logger, 4, 2, run)

this.executor.verify(2.times).execute(any)

var capturedLevel = ArgumentCaptor::forClass(typeof(Level))
var capturedException = ArgumentCaptor::forClass(typeof(Throwable))
this.logger.verify(4.times).log(capturedLevel.capture, any, capturedException.capture)
Level::SEVERE.assertSame(capturedLevel.value)
exception.assertSame(capturedException.value)
}

@Test
@DisplayName("executeNotBlockingTask 4 erroneous task with 3 tasks per group")
def executeNotBlockingTask_exception_4tasks_3members {
var exception = typeof(RuntimeException).mock
var run : Runnable = [throw exception]
doAnswer([
(it.getArgument(0) as Runnable).run
return null
]).when(this.executor).execute(any)

this.service.executeBlockingTask(this.logger, 4, 3, run)

this.executor.verify(2.times).execute(any)

var capturedLevel = ArgumentCaptor::forClass(typeof(Level))
var capturedException = ArgumentCaptor::forClass(typeof(Throwable))
this.logger.verify(4.times).log(capturedLevel.capture, any, capturedException.capture)
Level::SEVERE.assertSame(capturedLevel.value)
exception.assertSame(capturedException.value)
}

@Test
@DisplayName("executeNotBlockingTask 4 erroneous task with 3 tasks per group")
def executeNotBlockingTask_exception_4tasks_4members {
var exception = typeof(RuntimeException).mock
var run : Runnable = [throw exception]
doAnswer([
(it.getArgument(0) as Runnable).run
return null
]).when(this.executor).execute(any)

this.service.executeNotBlockingTask(this.logger, 4, 4, run)

this.executor.verify(1.times).execute(any)

var capturedLevel = ArgumentCaptor::forClass(typeof(Level))
var capturedException = ArgumentCaptor::forClass(typeof(Throwable))
this.logger.verify(4.times).log(capturedLevel.capture, any, capturedException.capture)
Level::SEVERE.assertSame(capturedLevel.value)
exception.assertSame(capturedException.value)
}

@Test
@DisplayName("executeNotBlockingTask 4 erroneous task with 5 tasks per group")
def executeNotBlockingTask_exception_4tasks_5members {
var exception = typeof(RuntimeException).mock
var run : Runnable = [throw exception]
doAnswer([
(it.getArgument(0) as Runnable).run
return null
]).when(this.executor).execute(any)

this.service.executeNotBlockingTask(this.logger, 4, 5, run)

verify(this.executor, times(1)).execute(any)

var capturedLevel = ArgumentCaptor::forClass(typeof(Level))
var capturedException = ArgumentCaptor::forClass(typeof(Throwable))
this.logger.verify(4.times).log(capturedLevel.capture, any, capturedException.capture)
Level.SEVERE.assertSame(capturedLevel.value)
exception.assertSame(capturedException.value)
}

}

0 comments on commit 355ba42

Please sign in to comment.