Skip to content

Example : Reacting to Asynchronous Events with a Stream of CompletableFutures

johnmcclean-aol edited this page Feb 2, 2015 · 7 revisions

CompletableFutures can observe asynchronous events

You can use JDK8's exceptionally powerful CompletableFuture class to act as an Observer for asynchronously received data.

e.g.

CompletableFuture<Integer> future1 = new CompletableFuture<>();
...
future1.complete(100);

SimpleReact is a Stream of CompletableFutures

This means we can externally construct a Stream of CompletableFutures and pass that to SimpleReact e.g.

Queue<CompletableFuture<Integer>> queue = buildQueueOfAsyncEvents();
	
 Stage<String> convertedToStrings = new SimpleReact()
			         .fromStream(queue.stream())
				 .<String>then(it ->  it + "*");

With helper method buildQueueOfAsyncEvents

private Queue<CompletableFuture<Integer>> buildQueueOfAsyncEvents() {
	CompletableFuture<Integer> future1 = new CompletableFuture<>();
	CompletableFuture<Integer> future2 = new CompletableFuture<>();
	CompletableFuture<Integer> future3 = new CompletableFuture<>();
	Queue<CompletableFuture<Integer>> queue = new ConcurrentLinkedQueue(Arrays.asList(future1,future2,future3));
	return queue;
}

N.B. Because we aren't providing the CompletableFutures with data - calling block here will result in the current thread pausing indefinitely.

Data will flow asynchronously when future.complete(T result) is called

Data will flow from the initial CompleteableFutures to the String conversion stage when the complete method on each CompletableFuture object is called.

Example :

 CompletableFuture<Integer> future1 = new CompletableFuture<>();
  
 ...

 future1.complete(100);

  ...

  new SimpleReact().fromStream(Arrays.asList(future1).stream())
       .<String>then(it ->  it + "*"); //it=100 will result in 100*

The flow can be initialized by another thread or SimpleReact data flow

e.g. Given the following set up Queue<CompletableFuture> queue = buildQueueOfAsyncEvents();

	Stage<String> convertedToStrings = new SimpleReact()
							.fromStream(queue.stream())
							.<String>then(it ->  it + "*");
							

	convertedToStrings.stream().forEach(f -> assertFalse(f.isDone()));

We can generate another reactive flow to complete each future

	new SimpleReact(new ForkJoinPool(3)).react( ()-> 100, ()->200, ()->400)
                .then( it-> sleep(it))
                .then(it -> queue.poll().complete(it));

With helper method sleep

 private Integer sleep(Integer it) {
	try {
		Thread.currentThread().sleep(it);
	} catch (InterruptedException e) {
		
	}
	return it;
}

We can now block safely on the convertedToStrings stage

With our initial Futures being populated Asynchronously the reactive dataflow will complete and thus calling block is a safe operation.

 Queue<CompletableFuture<Integer>> queue = buildQueueOfAsyncEvents();
	
	Stage<String> convertedToStrings = new SimpleReact()
							.fromStream(queue.stream())
							.<String>then(it ->  it + "*");
							

	convertedToStrings.stream().forEach(f -> assertFalse(f.isDone()));
	
	new SimpleReact(new ForkJoinPool(3)).react( ()-> 100, ()->200, ()->400)
             .then( it-> sleep(it))
             .then(it -> queue.poll().complete(it));
	
	List<String> result = convertedToStrings.block();
	
	assertThat(result.size(),is(3));
	assertThat(result,hasItem("400*"));
Clone this wiki locally