Nadeem Mohammad edited this page Mar 31, 2017 · 29 revisions

Dexecutor is a very light weight framework to execute dependent/Independent tasks in a reliable way, to do this it provides the minimal API.

  1. An API to add nodes in the graph (addDependency, addIndependent, addAsDependentOnAllLeafNodes, addAsDependencyToAllInitialNodes Later two are the hybrid version of the first two)
  2. and the other to execute the nodes in order.

Install Dexecutor

Maven

<dependency>
  <groupId>com.github.dexecutor</groupId>
  <artifactId>dexecutor-core</artifactId>
  <version>LATEST_RELEASE</version>
</dependency>

Refer this for other options/build tools

Usage

Lets take a look at an example, here is the content of DexecutorNonTerminationTest, which would help you understand the API

    @Test
	public void testDependentTaskExecution() {

		ExecutorService executorService = newExecutor();

		try {
			DexecutorConfig<Integer, Integer> config = new DexecutorConfig<>(executorService, new SleepyTaskProvider());
			DefaultDexecutor<Integer, Integer> executor = new DefaultDexecutor<Integer, Integer>(config);
			executor.addDependency(1, 2);
			executor.addDependency(1, 2);
			executor.addDependency(1, 3);
			executor.addDependency(3, 4);
			executor.addDependency(3, 5);
			executor.addDependency(3, 6);
			executor.addDependency(2, 7);
			executor.addDependency(2, 9);
			executor.addDependency(2, 8);
			executor.addDependency(9, 10);
			executor.addDependency(12, 13);
			executor.addDependency(13, 4);
			executor.addDependency(13, 14);
			executor.addIndependent(11);

			executor.execute(ExecutionConfig.NON_TERMINATING);
			
			Collection<Node<Integer, Integer>> processedNodesOrder = TestUtil.processedNodesOrder(executor);
			assertThat(processedNodesOrder).containsAll(executionOrderExpectedResult());
			assertThat(processedNodesOrder).size().isEqualTo(14);
			
		} finally {
			try {
				executorService.shutdownNow();
				executorService.awaitTermination(1, TimeUnit.SECONDS);
			} catch (InterruptedException e) {

			}
		}
	}
	
	private Collection<Node<Integer, Integer>> executionOrderExpectedResult() {
		List<Node<Integer, Integer>> result = new ArrayList<Node<Integer, Integer>>();
		result.add(new Node<Integer, Integer>(1));
		result.add(new Node<Integer, Integer>(2));
		result.add(new Node<Integer, Integer>(7));
		result.add(new Node<Integer, Integer>(9));
		result.add(new Node<Integer, Integer>(10));
		result.add(new Node<Integer, Integer>(8));
		result.add(new Node<Integer, Integer>(11));
		result.add(new Node<Integer, Integer>(12));
		result.add(new Node<Integer, Integer>(3));
		result.add(new Node<Integer, Integer>(13));
		result.add(new Node<Integer, Integer>(5));
		result.add(new Node<Integer, Integer>(6));
		result.add(new Node<Integer, Integer>(4));
		result.add(new Node<Integer, Integer>(14));
		return result;
	}

	private ExecutorService newExecutor() {
		return Executors.newFixedThreadPool(ThreadPoolUtil.ioIntesivePoolSize());
	}

	private static class SleepyTaskProvider implements TaskProvider<Integer, Integer> {

		public Task<Integer, Integer> provideTask(final Integer id) {

			return new Task<Integer, Integer>() {

				private static final long serialVersionUID = 1L;

				public Integer execute() {
					if (id == 2) {
						throw new IllegalArgumentException("Invalid task");
					}
					return id;
				}
			};
		}
	}

As can be seen above, Dexecutor requires two things

  1. Instance of ExecutorService : API would use this Service to schedule tasks
  2. Instance of TaskProvider : API represents graph using just the basic information (could be task id), it consults the TaskProvider to provide the task when it comes to actual execution.

There are two phases

  1. Graph construction: When you say executor.addDependency(1, 2) it means tasks 1 should finish before task 2can start, executor.addIndependent(11) means as of now 11 does not depended on other nodes, but later time dependency to it can be added.
    executor.addDependency(1, 2);
    executor.addDependency(1, 3);
    executor.addDependency(3, 4);
    executor.addDependency(3, 5);
    executor.addDependency(3, 6);
    //executor.addDependency(10, 2); // cycle
    executor.addDependency(2, 7);
    executor.addDependency(2, 9);
    executor.addDependency(2, 8);
    executor.addDependency(9, 10);
    executor.addDependency(12, 13);
    executor.addDependency(13, 4);
    executor.addDependency(13, 14);
    executor.addIndependent(11);

Which would generate the following graph

dexecutor-graph.png

  1. Tasks execution

    executor.execute(ExecutionConfig.NON_TERMINATING);

Console Output

11:50:38.449 [main] DEBUG c.g.d.c.DefaultDependentTasksExecutor.doExecute 200 - Going to schedule 1 node
11:50:38.453 [main] DEBUG c.g.d.core.DefaultExecutionEngine.submit 93 - Received Task 1 
11:50:38.455 [main] DEBUG c.g.d.c.DefaultDependentTasksExecutor.doExecute 200 - Going to schedule 11 node
11:50:38.455 [main] DEBUG c.g.d.core.DefaultExecutionEngine.submit 93 - Received Task 11 
11:50:38.455 [main] DEBUG c.g.d.c.DefaultDependentTasksExecutor.doExecute 200 - Going to schedule 12 node
11:50:38.455 [main] DEBUG c.g.d.core.DefaultExecutionEngine.submit 93 - Received Task 12 
11:50:38.456 [pool-1-thread-2] DEBUG c.g.dexecutor.core.task.LoggerTask.execute 44 - Executing Node # 11
11:50:38.456 [pool-1-thread-1] DEBUG c.g.dexecutor.core.task.LoggerTask.execute 44 - Executing Node # 1
11:50:38.457 [pool-1-thread-3] DEBUG c.g.dexecutor.core.task.LoggerTask.execute 44 - Executing Node # 12
11:50:38.457 [pool-1-thread-2] DEBUG c.g.dexecutor.core.task.LoggerTask.execute 46 - Node # 11, Execution Done!
11:50:38.458 [pool-1-thread-1] DEBUG c.g.dexecutor.core.task.LoggerTask.execute 46 - Node # 1, Execution Done!
11:50:38.458 [pool-1-thread-3] DEBUG c.g.dexecutor.core.task.LoggerTask.execute 46 - Node # 12, Execution Done!
11:50:38.460 [main] DEBUG c.g.d.c.DefaultDependentTasksExecutor.doWaitForExecution 262 - Processing of node 11 done, with status SUCCESS
11:50:38.461 [main] DEBUG c.g.d.c.DefaultDependentTasksExecutor.doWaitForExecution 262 - Processing of node 1 done, with status SUCCESS
11:50:38.461 [main] DEBUG c.g.d.c.DefaultDependentTasksExecutor.doExecute 200 - Going to schedule 2 node
11:50:38.462 [main] DEBUG c.g.d.core.DefaultExecutionEngine.submit 93 - Received Task 2 
11:50:38.462 [main] DEBUG c.g.d.c.DefaultDependentTasksExecutor.doExecute 200 - Going to schedule 3 node
11:50:38.462 [pool-1-thread-4] DEBUG c.g.dexecutor.core.task.LoggerTask.execute 44 - Executing Node # 2
11:50:38.462 [main] DEBUG c.g.d.core.DefaultExecutionEngine.submit 93 - Received Task 3 
11:50:38.463 [main] DEBUG c.g.d.c.DefaultDependentTasksExecutor.doWaitForExecution 262 - Processing of node 12 done, with status SUCCESS
11:50:38.463 [main] DEBUG c.g.d.c.DefaultDependentTasksExecutor.doExecute 200 - Going to schedule 13 node
11:50:38.463 [main] DEBUG c.g.d.core.DefaultExecutionEngine.submit 93 - Received Task 13 
11:50:38.465 [pool-1-thread-6] DEBUG c.g.dexecutor.core.task.LoggerTask.execute 44 - Executing Node # 13
11:50:38.465 [pool-1-thread-6] DEBUG c.g.dexecutor.core.task.LoggerTask.execute 46 - Node # 13, Execution Done!
11:50:38.465 [main] DEBUG c.g.d.c.DefaultDependentTasksExecutor.doWaitForExecution 262 - Processing of node 13 done, with status SUCCESS
11:50:38.466 [main] DEBUG c.g.d.c.DefaultDependentTasksExecutor.doExecute 209 - node 4 depends on [3, 13]
11:50:38.466 [pool-1-thread-4] ERROR c.g.d.core.DefaultExecutionEngine$1.call 84 - Error Execution Task # 2
java.lang.IllegalArgumentException: Invalid task
	at com.github.dexecutor.core.DefaultDependentTasksExecutorNonTerminationTest$SleepyTaskProvider$1.execute(DefaultDependentTasksExecutorNonTerminationTest.java:97)
	at com.github.dexecutor.core.DefaultDependentTasksExecutorNonTerminationTest$SleepyTaskProvider$1.execute(DefaultDependentTasksExecutorNonTerminationTest.java:1)
	at com.github.dexecutor.core.task.LoggerTask.execute(LoggerTask.java:45)
	at com.github.dexecutor.core.DefaultExecutionEngine$1.call(DefaultExecutionEngine.java:79)
	at com.github.dexecutor.core.DefaultExecutionEngine$1.call(DefaultExecutionEngine.java:72)
	at java.util.concurrent.FutureTask.run(FutureTask.java:262)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
	at java.util.concurrent.FutureTask.run(FutureTask.java:262)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
	at java.lang.Thread.run(Thread.java:745)
11:50:38.466 [main] DEBUG c.g.d.c.DefaultDependentTasksExecutor.doExecute 200 - Going to schedule 14 node
11:50:38.466 [pool-1-thread-5] DEBUG c.g.dexecutor.core.task.LoggerTask.execute 44 - Executing Node # 3
11:50:38.466 [main] DEBUG c.g.d.core.DefaultExecutionEngine.submit 93 - Received Task 14 
11:50:38.467 [pool-1-thread-5] DEBUG c.g.dexecutor.core.task.LoggerTask.execute 46 - Node # 3, Execution Done!
11:50:38.467 [main] DEBUG c.g.d.c.DefaultDependentTasksExecutor.doWaitForExecution 262 - Processing of node 2 done, with status ERRORED
11:50:38.467 [pool-1-thread-7] DEBUG c.g.dexecutor.core.task.LoggerTask.execute 44 - Executing Node # 14
11:50:38.467 [main] DEBUG c.g.d.c.DefaultDependentTasksExecutor.doExecute 200 - Going to schedule 7 node
11:50:38.467 [pool-1-thread-7] DEBUG c.g.dexecutor.core.task.LoggerTask.execute 46 - Node # 14, Execution Done!
11:50:38.468 [main] DEBUG c.g.d.core.DefaultExecutionEngine.submit 93 - Received Task 7 
11:50:38.468 [main] DEBUG c.g.d.c.DefaultDependentTasksExecutor.doExecute 200 - Going to schedule 9 node
11:50:38.468 [pool-1-thread-8] DEBUG c.g.dexecutor.core.task.LoggerTask.execute 44 - Executing Node # 7
11:50:38.468 [main] DEBUG c.g.d.core.DefaultExecutionEngine.submit 93 - Received Task 9 
11:50:38.468 [pool-1-thread-8] DEBUG c.g.dexecutor.core.task.LoggerTask.execute 46 - Node # 7, Execution Done!
11:50:38.469 [main] DEBUG c.g.d.c.DefaultDependentTasksExecutor.doExecute 200 - Going to schedule 8 node
11:50:38.469 [pool-1-thread-9] DEBUG c.g.dexecutor.core.task.LoggerTask.execute 44 - Executing Node # 9
11:50:38.469 [main] DEBUG c.g.d.core.DefaultExecutionEngine.submit 93 - Received Task 8 
11:50:38.469 [pool-1-thread-9] DEBUG c.g.dexecutor.core.task.LoggerTask.execute 46 - Node # 9, Execution Done!
11:50:38.469 [main] DEBUG c.g.d.c.DefaultDependentTasksExecutor.doWaitForExecution 262 - Processing of node 3 done, with status SUCCESS
11:50:38.469 [pool-1-thread-10] DEBUG c.g.dexecutor.core.task.LoggerTask.execute 44 - Executing Node # 8
11:50:38.470 [main] DEBUG c.g.d.c.DefaultDependentTasksExecutor.doExecute 200 - Going to schedule 4 node
11:50:38.470 [pool-1-thread-10] DEBUG c.g.dexecutor.core.task.LoggerTask.execute 46 - Node # 8, Execution Done!
11:50:38.470 [main] DEBUG c.g.d.core.DefaultExecutionEngine.submit 93 - Received Task 4 
11:50:38.470 [main] DEBUG c.g.d.c.DefaultDependentTasksExecutor.doExecute 200 - Going to schedule 5 node
11:50:38.470 [pool-1-thread-11] DEBUG c.g.dexecutor.core.task.LoggerTask.execute 44 - Executing Node # 4
11:50:38.470 [main] DEBUG c.g.d.core.DefaultExecutionEngine.submit 93 - Received Task 5 
11:50:38.471 [pool-1-thread-11] DEBUG c.g.dexecutor.core.task.LoggerTask.execute 46 - Node # 4, Execution Done!
11:50:38.471 [main] DEBUG c.g.d.c.DefaultDependentTasksExecutor.doExecute 200 - Going to schedule 6 node
11:50:38.471 [pool-1-thread-12] DEBUG c.g.dexecutor.core.task.LoggerTask.execute 44 - Executing Node # 5
11:50:38.471 [main] DEBUG c.g.d.core.DefaultExecutionEngine.submit 93 - Received Task 6 
11:50:38.471 [pool-1-thread-12] DEBUG c.g.dexecutor.core.task.LoggerTask.execute 46 - Node # 5, Execution Done!
11:50:38.472 [main] DEBUG c.g.d.c.DefaultDependentTasksExecutor.doWaitForExecution 262 - Processing of node 14 done, with status SUCCESS
11:50:38.472 [pool-1-thread-13] DEBUG c.g.dexecutor.core.task.LoggerTask.execute 44 - Executing Node # 6
11:50:38.472 [main] DEBUG c.g.d.c.DefaultDependentTasksExecutor.doWaitForExecution 262 - Processing of node 7 done, with status SUCCESS
11:50:38.472 [pool-1-thread-13] DEBUG c.g.dexecutor.core.task.LoggerTask.execute 46 - Node # 6, Execution Done!
11:50:38.472 [main] DEBUG c.g.d.c.DefaultDependentTasksExecutor.doWaitForExecution 262 - Processing of node 9 done, with status SUCCESS
11:50:38.473 [main] DEBUG c.g.d.c.DefaultDependentTasksExecutor.doExecute 200 - Going to schedule 10 node
11:50:38.473 [main] DEBUG c.g.d.core.DefaultExecutionEngine.submit 93 - Received Task 10 
11:50:38.474 [main] DEBUG c.g.d.c.DefaultDependentTasksExecutor.doWaitForExecution 262 - Processing of node 8 done, with status SUCCESS
11:50:38.474 [main] DEBUG c.g.d.c.DefaultDependentTasksExecutor.doWaitForExecution 262 - Processing of node 4 done, with status SUCCESS
11:50:38.474 [main] DEBUG c.g.d.c.DefaultDependentTasksExecutor.doWaitForExecution 262 - Processing of node 5 done, with status SUCCESS
11:50:38.474 [main] DEBUG c.g.d.c.DefaultDependentTasksExecutor.doWaitForExecution 262 - Processing of node 6 done, with status SUCCESS
11:50:38.475 [pool-1-thread-14] DEBUG c.g.dexecutor.core.task.LoggerTask.execute 44 - Executing Node # 10
11:50:38.475 [pool-1-thread-14] DEBUG c.g.dexecutor.core.task.LoggerTask.execute 46 - Node # 10, Execution Done!
11:50:38.475 [main] DEBUG c.g.d.c.DefaultDependentTasksExecutor.doWaitForExecution 262 - Processing of node 10 done, with status SUCCESS
11:50:38.476 [main] DEBUG c.g.d.c.DefaultDependentTasksExecutor.execute 151 - Total Time taken to process 14 jobs is 28 ms.
11:50:38.476 [main] DEBUG c.g.d.c.DefaultDependentTasksExecutor.execute 152 - Processed Nodes Ordering [11, 1, 12, 13, 2, 3, 14, 7, 9, 8, 4, 5, 6, 10]

More Examples

Refer this project for more complex example

You can’t perform that action at this time.
You signed in with another tab or window. Reload to refresh your session. You signed out in another tab or window. Reload to refresh your session.
Press h to open a hovercard with more details.