From 355ba422b98a27f9d6257dd4b5316a27d205321f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Galland?= Date: Tue, 24 Mar 2020 23:30:47 +0100 Subject: [PATCH] [sre] Multiple agent spawning is now run in parallel for each agent. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit close #987 Signed-off-by: Stéphane Galland --- .../configs/subconfigs/messages.properties | 2 +- .../executor/AbstractExecutorService.sarl | 35 +++ .../services/executor/ExecutorService.sarl | 41 ++- .../services/executor/JreExecutorService.sarl | 4 + .../lifecycle/AbstractLifecycleService.sarl | 2 +- .../executor/AbstractExecutorServiceTest.sarl | 235 +++++++++++++++++- 6 files changed, 313 insertions(+), 6 deletions(-) diff --git a/sre/io.janusproject/io.janusproject.plugin/src/main/sarl/io/sarl/sre/boot/configs/subconfigs/messages.properties b/sre/io.janusproject/io.janusproject.plugin/src/main/sarl/io/sarl/sre/boot/configs/subconfigs/messages.properties index 045c44db2b..217c440faa 100644 --- a/sre/io.janusproject/io.janusproject.plugin/src/main/sarl/io/sarl/sre/boot/configs/subconfigs/messages.properties +++ b/sre/io.janusproject/io.janusproject.plugin/src/main/sarl/io/sarl/sre/boot/configs/subconfigs/messages.properties @@ -35,5 +35,5 @@ ServicesConfigModule_4 = {true|false} ServicesConfigModuleProvider_0 = The configuration for the SRE services. ContextsConfigModule_0 = Specify the delay in milliseconds before testing if a space should be destroyed if it becomes empty; Default is {0}. -ServicesConfigModule_1 = delay +ContextsConfigModule_1 = delay ContextsConfigModuleProvider_0 = The configuration for the SRE space repository. \ No newline at end of file diff --git a/sre/io.janusproject/io.janusproject.plugin/src/main/sarl/io/sarl/sre/services/executor/AbstractExecutorService.sarl b/sre/io.janusproject/io.janusproject.plugin/src/main/sarl/io/sarl/sre/services/executor/AbstractExecutorService.sarl index 4805b8ab07..ea917f82c2 100644 --- a/sre/io.janusproject/io.janusproject.plugin/src/main/sarl/io/sarl/sre/services/executor/AbstractExecutorService.sarl +++ b/sre/io.janusproject/io.janusproject.plugin/src/main/sarl/io/sarl/sre/services/executor/AbstractExecutorService.sarl @@ -222,4 +222,39 @@ abstract class AbstractExecutorService extends AbstractSreService implements Exe StreamSupport::stream(collection.spliterator, true).forEach(task.protectConsumer(logger)) } + /** Execute the given task in parallel but do not wait for the termination. + * + * @param task the task to run. + * @param nbExecutions number of runs + * @param runGroupSize the size of a parallel group + */ + protected final def executeInThreadsWithoutWaiting(task : SreRunnable, nbExecutions : int, runGroupSize : int) { + assert runGroupSize >= 1 + val es = executorService + if (runGroupSize > 1) { + val numberOfGroups = nbExecutions / runGroupSize + val rest = nbExecutions - numberOfGroups * runGroupSize + for (var i = 0; i < numberOfGroups; i++) { + es.execute [ + for (var j = 0; j < runGroupSize; j++) { + task.run + } + ] + } + if (rest > 0) { + es.execute [ + for (var j = 0; j < rest; j++) { + task.run + } + ] + } + } else { + for (var i = 0; i < nbExecutions; i++) { + es.execute [ + task.run + ] + } + } + } + } diff --git a/sre/io.janusproject/io.janusproject.plugin/src/main/sarl/io/sarl/sre/services/executor/ExecutorService.sarl b/sre/io.janusproject/io.janusproject.plugin/src/main/sarl/io/sarl/sre/services/executor/ExecutorService.sarl index c99513c41b..f34ddd8f2c 100644 --- a/sre/io.janusproject/io.janusproject.plugin/src/main/sarl/io/sarl/sre/services/executor/ExecutorService.sarl +++ b/sre/io.janusproject/io.janusproject.plugin/src/main/sarl/io/sarl/sre/services/executor/ExecutorService.sarl @@ -208,7 +208,7 @@ interface ExecutorService extends Service { * @param task the task to submit. * @param thrownExceptions indicates if the exceptions in the given tasks are thrown forward by this function. * @param logger the logger to use for errors. - * @since 0.6.0 + * @since 0.6 */ def executeBlockingTasks(logger : Logger = null, thrownExceptions : boolean = false, task : Collection) @@ -225,7 +225,7 @@ interface ExecutorService extends Service { * @param logger the logger to use for errors. * @param collection the collection of elements to iterate on. * @param task the task to submit. Be sure that the task is synchronized on the given collection. - * @since 0.6.0 + * @since 0.6 */ def applyBlockingConsumer(logger : Logger = null, collection : Iterable, task : Consumer) with T @@ -262,9 +262,44 @@ interface ExecutorService extends Service { * @param runGroupSize the number of tasks to be run by a single thread. * @return the number of successful runs. * @throws InterruptedException when the function cannot wait for task termination. - * @since 0.5.0 + * @since 0.5 */ def executeBlockingTask(logger : Logger = null, nbExecutions : int, runGroupSize : int, task : Runnable) : int + /** + * Submit a single task multiple times to the executor service. + * + *

The same task instance will be submit to the executor service. + * + *

{@code runGroupSize} indicates how many number of times the task will be run on + * a single thread. + * + *

This function is equivalent to: + *


+	 * for(i in [ 1 .. (nbExecutions/runGroupSize) ])
+	 * do
+	 * execute({
+	 * for(j in [1..runGroupSize]) {
+	 * task.run
+	 * }
+	 * })
+	 * done
+	 * 
+ * + *

Caution: if a {@code task} is failing, the exception will be output on the logger. This function never fails. + * + *

According to the implementation of the service, the given task may be run in the same or separated thread + * than the one of the caller. + * + *

If an exception occurs into the given consume, the exception is logged. It is never thrown by this function. + * + * @param logger the logger to use for errors. + * @param task the task to submit. + * @param nbExecutions the number of times the task must be run, usually greater than 1. + * @param runGroupSize the number of tasks to be run by a single thread. + * @since 0.11 + */ + def executeNotBlockingTask(logger : Logger = null, nbExecutions : int, runGroupSize : int, task : Runnable) + } diff --git a/sre/io.janusproject/io.janusproject.plugin/src/main/sarl/io/sarl/sre/services/executor/JreExecutorService.sarl b/sre/io.janusproject/io.janusproject.plugin/src/main/sarl/io/sarl/sre/services/executor/JreExecutorService.sarl index 5526e8a3e1..a237a765f5 100644 --- a/sre/io.janusproject/io.janusproject.plugin/src/main/sarl/io/sarl/sre/services/executor/JreExecutorService.sarl +++ b/sre/io.janusproject/io.janusproject.plugin/src/main/sarl/io/sarl/sre/services/executor/JreExecutorService.sarl @@ -189,6 +189,10 @@ class JreExecutorService extends AbstractExecutorService implements PreReleasabl executeInThreadsAndWait(task.protectRunnable(logger), nbExecutions, runGroupSize) } + def executeNotBlockingTask(logger : Logger, nbExecutions : int, runGroupSize : int, task : Runnable) { + executeInThreadsWithoutWaiting(task.protectRunnable(logger), nbExecutions, runGroupSize) + } + def executeAsap(logger : Logger, task : Runnable) : Future { executorService.submit(task.protectRunnable(logger)) } diff --git a/sre/io.janusproject/io.janusproject.plugin/src/main/sarl/io/sarl/sre/services/lifecycle/AbstractLifecycleService.sarl b/sre/io.janusproject/io.janusproject.plugin/src/main/sarl/io/sarl/sre/services/lifecycle/AbstractLifecycleService.sarl index 8a10fa8878..2b38c6807c 100644 --- a/sre/io.janusproject/io.janusproject.plugin/src/main/sarl/io/sarl/sre/services/lifecycle/AbstractLifecycleService.sarl +++ b/sre/io.janusproject/io.janusproject.plugin/src/main/sarl/io/sarl/sre/services/lifecycle/AbstractLifecycleService.sarl @@ -277,7 +277,7 @@ abstract class AbstractLifecycleService extends AbstractSreService implements Li } ] if (nbAgents > 1) { - this.executor.executeBlockingTask(loggingService.kernelLogger, nbAgents, + this.executor.executeNotBlockingTask(loggingService.kernelLogger, nbAgents, this.lifecycleConfig.agentSpawningCountPerThread, agentCreator) } else { this.executor.executeAsap(loggingService.kernelLogger, agentCreator) diff --git a/sre/io.janusproject/io.janusproject.tests/src/test/sarl/io/sarl/sre/tests/units/services/executor/AbstractExecutorServiceTest.sarl b/sre/io.janusproject/io.janusproject.tests/src/test/sarl/io/sarl/sre/tests/units/services/executor/AbstractExecutorServiceTest.sarl index 3c2435ec52..d43565e2be 100644 --- a/sre/io.janusproject/io.janusproject.tests/src/test/sarl/io/sarl/sre/tests/units/services/executor/AbstractExecutorServiceTest.sarl +++ b/sre/io.janusproject/io.janusproject.tests/src/test/sarl/io/sarl/sre/tests/units/services/executor/AbstractExecutorServiceTest.sarl @@ -313,7 +313,6 @@ abstract class AbstractExecutorServiceTest { exception.assertSame(capturedException.value) } - @Test @DisplayName("executeBlockingTask 1 task") def executeBlockingTask_noException_1task { @@ -548,4 +547,238 @@ abstract class AbstractExecutorServiceTest { exception.assertSame(capturedException.value) } + @Test + @DisplayName("executeNotBlockingTask 1 task") + def executeNotBlockingTask_noException_1task { + var run = typeof(Runnable).mock + doAnswer([ + (it.getArgument(0) as Runnable).run + return null + ]).when(this.executor).execute(any) + + this.service.executeNotBlockingTask(this.logger, 1, 100, run) + + this.executor.verify(1.times).execute(any) + + run.verify(1.times).run + + verifyNoMoreInteractions(this.logger) + } + + @Test + @DisplayName("executeNotBlockingTask 4 tasks with 1 task per group") + def executeNotBlockingTask_noException_4tasks_1member { + var run = typeof(Runnable).mock + doAnswer([ + (it.getArgument(0) as Runnable).run + return null + ]).when(this.executor).execute(any) + + this.service.executeNotBlockingTask(this.logger, 4, 1, run) + + this.executor.verify(4.times).execute(any) + + run.verify(4.times).run + + verifyNoMoreInteractions(this.logger) + } + + @Test + @DisplayName("executeNotBlockingTask 4 tasks with 2 tasks per group") + def executeNotBlockingTask_noException_4tasks_2members { + var run = typeof(Runnable).mock + doAnswer([ + (it.getArgument(0) as Runnable).run + return null + ]).when(this.executor).execute(any) + + this.service.executeNotBlockingTask(this.logger, 4, 2, run) + + this.executor.verify(2.times).execute(any) + + run.verify(4.times).run + + verifyNoMoreInteractions(this.logger) + } + + @Test + @DisplayName("executeNotBlockingTask 4 tasks with 3 tasks per group") + def executeNotBlockingTask_noException_4tasks_3members { + var run = typeof(Runnable).mock + doAnswer([ + (it.getArgument(0) as Runnable).run + return null + ]).when(this.executor).execute(any) + + this.service.executeNotBlockingTask(this.logger, 4, 3, run) + + this.executor.verify(2.times).execute(any) + + run.verify(4.times).run + + verifyNoMoreInteractions(this.logger) + } + + @Test + @DisplayName("executeNotBlockingTask 4 tasks with 4 tasks per group") + def executeNotBlockingTask_noException_4tasks_4members { + var run = typeof(Runnable).mock + doAnswer([ + (it.getArgument(0) as Runnable).run + return null + ]).when(this.executor).execute(any) + + this.service.executeNotBlockingTask(this.logger, 4, 4, run) + + this.executor.verify(1.times).execute(any) + + run.verify(4.times).run + + verifyNoMoreInteractions(this.logger) + } + + @Test + @DisplayName("executeNotBlockingTask 4 tasks with 5 tasks per group") + def executeNotBlockingTask_noException_4tasks_5members { + var run = typeof(Runnable).mock + doAnswer([ + (it.getArgument(0) as Runnable).run + return null + ]).when(this.executor).execute(any) + + this.service.executeBlockingTask(this.logger, 4, 5, run) + + this.executor.verify(1.times).execute(any) + + run.verify(4.times).run + + verifyNoMoreInteractions(this.logger) + } + + @Test + @DisplayName("executeNotBlockingTask 1 erroneous task") + def executeNotBlockingTask_exception_1task { + var exception = typeof(RuntimeException).mock + var run : Runnable = [throw exception] + doAnswer([ + (it.getArgument(0) as Runnable).run + return null + ]).when(this.executor).execute(any) + + this.service.executeNotBlockingTask(this.logger, 1, 100, run) + + this.executor.verify(1.times).execute(any) + + var capturedLevel = ArgumentCaptor::forClass(typeof(Level)) + var capturedException = ArgumentCaptor::forClass(typeof(Throwable)) + this.logger.verify(1.times).log(capturedLevel.capture, any, capturedException.capture) + Level::SEVERE.assertSame(capturedLevel.value) + exception.assertSame(capturedException.value) + } + + @Test + @DisplayName("executeNotBlockingTask 4 erroneous task with 1 task per group") + def executeNotBlockingTask_exception_4tasks_1member { + var exception = typeof(RuntimeException).mock + var run : Runnable = [throw exception] + doAnswer([ + (it.getArgument(0) as Runnable).run + return null + ]).when(this.executor).execute(any) + + this.service.executeNotBlockingTask(this.logger, 4, 1, run) + + this.executor.verify(4.times).execute(any) + + var capturedLevel = ArgumentCaptor::forClass(typeof(Level)) + var capturedException = ArgumentCaptor::forClass(typeof(Throwable)) + this.logger.verify(4.times).log(capturedLevel.capture, any, capturedException.capture) + Level::SEVERE.assertSame(capturedLevel.value) + exception.assertSame(capturedException.value) + } + + @Test + @DisplayName("executeNotBlockingTask 4 erroneous task with 2 tasks per group") + def executeNotBlockingTask_exception_4tasks_2members { + var exception = typeof(RuntimeException).mock + var run : Runnable = [throw exception] + doAnswer([ + (it.getArgument(0) as Runnable).run + return null + ]).when(this.executor).execute(any) + + this.service.executeNotBlockingTask(this.logger, 4, 2, run) + + this.executor.verify(2.times).execute(any) + + var capturedLevel = ArgumentCaptor::forClass(typeof(Level)) + var capturedException = ArgumentCaptor::forClass(typeof(Throwable)) + this.logger.verify(4.times).log(capturedLevel.capture, any, capturedException.capture) + Level::SEVERE.assertSame(capturedLevel.value) + exception.assertSame(capturedException.value) + } + + @Test + @DisplayName("executeNotBlockingTask 4 erroneous task with 3 tasks per group") + def executeNotBlockingTask_exception_4tasks_3members { + var exception = typeof(RuntimeException).mock + var run : Runnable = [throw exception] + doAnswer([ + (it.getArgument(0) as Runnable).run + return null + ]).when(this.executor).execute(any) + + this.service.executeBlockingTask(this.logger, 4, 3, run) + + this.executor.verify(2.times).execute(any) + + var capturedLevel = ArgumentCaptor::forClass(typeof(Level)) + var capturedException = ArgumentCaptor::forClass(typeof(Throwable)) + this.logger.verify(4.times).log(capturedLevel.capture, any, capturedException.capture) + Level::SEVERE.assertSame(capturedLevel.value) + exception.assertSame(capturedException.value) + } + + @Test + @DisplayName("executeNotBlockingTask 4 erroneous task with 3 tasks per group") + def executeNotBlockingTask_exception_4tasks_4members { + var exception = typeof(RuntimeException).mock + var run : Runnable = [throw exception] + doAnswer([ + (it.getArgument(0) as Runnable).run + return null + ]).when(this.executor).execute(any) + + this.service.executeNotBlockingTask(this.logger, 4, 4, run) + + this.executor.verify(1.times).execute(any) + + var capturedLevel = ArgumentCaptor::forClass(typeof(Level)) + var capturedException = ArgumentCaptor::forClass(typeof(Throwable)) + this.logger.verify(4.times).log(capturedLevel.capture, any, capturedException.capture) + Level::SEVERE.assertSame(capturedLevel.value) + exception.assertSame(capturedException.value) + } + + @Test + @DisplayName("executeNotBlockingTask 4 erroneous task with 5 tasks per group") + def executeNotBlockingTask_exception_4tasks_5members { + var exception = typeof(RuntimeException).mock + var run : Runnable = [throw exception] + doAnswer([ + (it.getArgument(0) as Runnable).run + return null + ]).when(this.executor).execute(any) + + this.service.executeNotBlockingTask(this.logger, 4, 5, run) + + verify(this.executor, times(1)).execute(any) + + var capturedLevel = ArgumentCaptor::forClass(typeof(Level)) + var capturedException = ArgumentCaptor::forClass(typeof(Throwable)) + this.logger.verify(4.times).log(capturedLevel.capture, any, capturedException.capture) + Level.SEVERE.assertSame(capturedLevel.value) + exception.assertSame(capturedException.value) + } + }