In [1]:
from IPython.core.display import display, HTML
display(HTML("<style>.container { width:80% !important; }</style>"))

In [1]:
%%loadFromPOM
<dependency>
    <groupId>com.google.guava</groupId>
    <artifactId>guava</artifactId>
    <version>31.0.1-jre</version>
</dependency>
<dependency>
    <groupId>org.apache.logging.log4j</groupId>
    <artifactId>log4j-core</artifactId>
    <version>2.16.0</version>
</dependency>
<dependency>
  <groupId>org.apache.logging.log4j</groupId>
  <artifactId>log4j-api</artifactId>
  <version>2.16.0</version>
</dependency>

<dependency>
    <groupId>com.lmax</groupId>
    <artifactId>disruptor</artifactId>
    <version>3.4.4</version>
</dependency>

In [2]:
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.FluentFuture;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.AsyncFunction;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import java.lang.Thread;
import java.util.function.Function;
import java.util.concurrent.TimeUnit;
import java.lang.Void;
import java.lang.Runnable;
import java.util.List;
import java.util.ArrayList;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.core.LoggerContext;
import org.apache.logging.log4j.core.config.Configurator;

In [3]:
Configurator.setRootLevel(Level.DEBUG);
Logger logger = LogManager.getLogger("AsyncJava");

# ListenableFuture\<T>
----------------

### ListenableFuture Execution Flow and Methods
* addListener
* cancel
* isCancelled
* isDone
* notify
* notifyAll
    
![ListenableFuture Execution Flow](static/LF.gif)

### Submitting a task

When submiting a long running task (implemented as a <a href=https://www.geeksforgeeks.org/difference-between-callable-and-runnable-in-java/>Callable</a>) to an executor to be ran on a seperate thread, the executor immediately returns a Future. If you want a ListenableFuture, you must decorate a normal executor object with the **listeningDecorator** method from the **MoreExecutors** library. This ListenableFuture<T> object is a placeholder for the future result that will be populated from the async running task.
    
    


In [4]:
// The executor is used to submit asynchrounous jobs. It manages the threadpool and tasks execution
final ListeningExecutorService executor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(5));


// Submit Some Long Running Task To Be Executed Asynchronously
ListenableFuture<String> future = executor.submit(
   () -> {
         //Some Long running Task
         TimeUnit.MILLISECONDS.sleep(1000);
         logger.info("Executed Long Running Task On Thread {}\n", Thread.currentThread());
         return Thread.currentThread().getName();
        }
    
)

### ListenableFuture get()

This ListenableFuture<T> is used to get the result whenver it is available by calling ListenableFuture<T>.get(). If the async running tasks has finished by the time .get() is called, the result is return immediately. However if the result has not been produced yet by the async running task, the .get() method will be blocking on the main thread. You can check for the result without blocking by calling the ListenableFuture<T>.isDone() method.
    
    
 

In [5]:

while (!future.isDone()){
    logger.info("Async Task Is Still Running");
    TimeUnit.MILLISECONDS.sleep(300);
}

logger.info("The Async Task Was Running On Thread {}\nWe Are Running on Thread {}", 
                  future.get(),
                  Thread.currentThread());

10:19:38.183 [IJava-executor-0] INFO  AsyncJava - Async Task Is Still Running
10:19:38.496 [IJava-executor-0] INFO  AsyncJava - Async Task Is Still Running
10:19:38.798 [IJava-executor-0] INFO  AsyncJava - Async Task Is Still Running
10:19:39.082 [pool-2-thread-1] INFO  AsyncJava - Executed Long Running Task On Thread Thread[pool-2-thread-1,5,main]

10:19:39.128 [IJava-executor-0] INFO  AsyncJava - The Async Task Was Running On Thread pool-2-thread-1
We Are Running on Thread Thread[IJava-executor-0,5,main]


### Adding A Listener Directly to the ListenableFuture

What makes a ListenableFuture<T> a ListenableFuture is the ability to add a listener to it. The Listener will run when the future completes either succesfully or exceptionally. This registered Listener will run, unpredictablly, on some thread from the executor pool. Also note that you dont get access to the result of the listener. The listener must implement the Runnable interface which does not have a return value.
    
![ListenableFuture Execution Flow](static/LFL.gif)

In [6]:
// CREATE LF BY SUBMITTING A TASK TO THE EXECUTOR
ListenableFuture<String> futureToAddListener = executor.submit(
    () -> {
            //Some long running task
            TimeUnit.MILLISECONDS.sleep(3000);
            logger.info("Async Task Finished!\n");
            
            return "Hello";
        }
    );


// ADD LISTENER TO THE ListenableFuture
futureToAddListener.addListener(
    () -> {
       logger.info("Listener Executed on Thread {}\n NOTE: The Listener Won't Necessarily Run On The Same Thread As The Submitted Task", Thread.currentThread());
    }
 ,executor);


//Get Result From Long Running Task
logger.info("{} World!\n", futureToAddListener.get());


10:19:42.210 [pool-2-thread-2] INFO  AsyncJava - Async Task Finished!

10:19:42.210 [IJava-executor-0] INFO  AsyncJava - Hello World!

10:19:42.211 [pool-2-thread-3] INFO  AsyncJava - Listener Executed on Thread Thread[pool-2-thread-3,5,main]
 NOTE: The Listener Won't Necessarily Run On The Same Thread As The Submitted Task


### Registering Callbacks to Handle Success and Failure

When wanting to add functionality to a single running Async task such that a user doesn't have to wait until the task is done before manually calling the task (blocking), We can use **Futures.addCallback**

```{java}
Futures.addCallback(

ListenableFuture<T>,   //LF you want to register a callback
FutureCallback<T>,    //The actual callback
Executor             //The executor to run the callback
)
```

In [19]:
ListenableFuture<Integer> asyncTask = executor.submit(
() -> {
        logger.info("Performing some async task");
        TimeUnit.MILLISECONDS.sleep(2000); return 10;}
);

Futures.addCallback(asyncTask,
new FutureCallback<Integer>() {
    @Override
    public void onSuccess(Integer result) {
        logger.info(result + 30);
    }

    @Override
    public void onFailure(Throwable t) {
        // do on failure
    }
}, executor);


logger.info("The async task result: {}",asyncTask.get());

10:23:10.447 [pool-2-thread-5] INFO  AsyncJava - Performing some async task
10:23:12.460 [pool-2-thread-5] INFO  AsyncJava - 40
10:23:12.460 [IJava-executor-1] INFO  AsyncJava - The async task result: 10


# Chaining Multiple Async Tasks


## Async Steps to Bake Cupcakes

It is possible to chain Async tasks such that they execute whatever logic you like upon completion. This allows the user to not have to use the blocking call of the .get() method. 

For example let's say we want to do the following tasks in order to make cupcakes. We need to

1. Preheat the oven 
    * This task can run while we are doing everything else
2. Collect the ingredients
    * Eggs, Milk, & Butter are in the Fridge and can be grabbed together (synchronously)
    * Flour, Sugar, Baking Soda, Icing are in the Pantry and can be grabbed together (synchronously)
    * Foil cups and the cupcake pan are in the same cupboard and can be grabbed together (synchronously)
3. Mix the ingredients together (requires all ingredients first)
4. Fill the cupcake pan and place in the oven
5. Bake for 45 minutes


### DAG

![Bake a Cake](static/Async_Chaining.jpg)



From the Directed Acyclic Graph (DAG) we can see some steps are isolated from each other and can be ran asynchronously (Preheating oven and gathering items). But there are some steps that have dependencies on on prior running async tasks.

#### Step 1 - Preheat the oven


It will take a long time for the oven to preheat. So we can start it and then work on all our other tasks

<img src="static/preheatOven.png" height="20%" width="20%">

In [8]:
//Preheat the oven
ListenableFuture<Boolean> ovenPreheated = executor.submit(
    () -> { 
            // The oven will take awhile to heat up
            logger.info("Preheating Oven");
            Thread.sleep(10000);
            return true;
    }
);

// Do some other stuff on main thread
logger.info("Doing other stuff on main thread")

10:19:44.369 [pool-2-thread-1] INFO  AsyncJava - Preheating Oven
10:19:44.379 [IJava-executor-0] INFO  AsyncJava - Doing other stuff on main thread


#### Step 2 - Collect the ingredients
Each step of collecting items from their respective locations will be their own Async Task / ListenableFuture

<img src="static/gatherItems.png" height="10%" width="10%">

In [9]:
// Items from the fridge
ListenableFuture<List<String>> fridgeItems = executor.submit(
    () -> {
        ArrayList<String> items = new ArrayList<String>();

        logger.info("Getting items from the fridge");
        TimeUnit.MILLISECONDS.sleep(1000);
        items.add("Eggs");
        TimeUnit.MILLISECONDS.sleep(1000);
        items.add("Milk");
        TimeUnit.MILLISECONDS.sleep(1000);
        items.add("Butter");
        TimeUnit.MILLISECONDS.sleep(1000);
        
        return items;
    }
);
    
    
// Items from the pantry
ListenableFuture<List<String>> pantryItems = executor.submit(
    () -> {
        ArrayList<String> items = new ArrayList<String>();

        logger.info("Getting items from the pantry");
        TimeUnit.MILLISECONDS.sleep(1000);
        items.add("Flour");
        TimeUnit.MILLISECONDS.sleep(1000);
        items.add("Sugar");
        TimeUnit.MILLISECONDS.sleep(1000);
        items.add("Baking Soda");
        TimeUnit.MILLISECONDS.sleep(1000);
        
        return items;
    }
);
    
    
// Items from the cupboard
ListenableFuture<List<String>> cupboardItems = executor.submit(
    () -> {
        ArrayList<String> items = new ArrayList<String>();

        logger.info("Getting items from the cupboard");
        TimeUnit.MILLISECONDS.sleep(1000);
        items.add("Foil Cups");
        TimeUnit.MILLISECONDS.sleep(1000);
        items.add("Baking Pan");
        TimeUnit.MILLISECONDS.sleep(1000);
        
        return items;
    }
        
);


// Do some other stuff on main thread
logger.info("Doing other stuff on main thread")

10:19:44.452 [pool-2-thread-2] INFO  AsyncJava - Getting items from the fridge
10:19:44.467 [pool-2-thread-3] INFO  AsyncJava - Getting items from the pantry
10:19:44.481 [pool-2-thread-4] INFO  AsyncJava - Getting items from the cupboard
10:19:44.496 [IJava-executor-0] INFO  AsyncJava - Doing other stuff on main thread


#### Step 3 - Mixing the Items Together

The next process in the graph is taking all the items we collected from the fridge and the pantry, then mixing them together. To do this, we can utilize the **Futures.whenAllSucceed()** method to register an AsyncCallable function.

When both async running tasks complete, they will be combined in the AsyncCallable. This Callable will also run asynchronously. If we wish to run it synchronously, we use the **.call()** method as opposed to the **.callAsync()**;

```{java}
Futures.whenAllSucceed(
    ListenableFuture<T>, ...     // 1 or more ListenableFutures
)
.callAsync(
    AsyncCallabe<C>,      // AsyncCallable function
    Executor              // Executor
    )
    
 
returns ListenableFuture<T>
```



<img src="static/mixed.gif">


In [10]:
// Combining the fridgeItems & pantryItems ListenableFutures
ListenableFuture<String> mixedIngredients = Futures.whenAllSucceed(fridgeItems, pantryItems)
    .callAsync(
        () -> {
                for (String item : Futures.getDone(fridgeItems)){
                    logger.info("Mixing {}\n", item);
                    TimeUnit.MILLISECONDS.sleep(300);
                }
            
                for (String item : Futures.getDone(pantryItems)){
                    logger.info("Mixing {}\n", item);
                    TimeUnit.MILLISECONDS.sleep(300);
                }
        
               
               return Futures.immediateFuture("Batter");}
        , executor);

// Do some other stuff on main thread
logger.info("Doing other stuff on main thread")

10:19:44.590 [IJava-executor-0] INFO  AsyncJava - Doing other stuff on main thread


#### Step 4 - Pouring batter in the cupcake pan

After getting the Foil cups and the cupcake pan as well as mixing the batter, we are ready to pour the batter in each cup. This step requires the completion of the "mixing the ingredients" task as well as the "getting items from cupboard" task. 

<img src="static/readybake.gif">

In [11]:
// Combine cupboardItems and mixedIngredients ListenableFuture
ListenableFuture<String> readyToBake = Futures.whenAllSucceed(cupboardItems, mixedIngredients)
    .callAsync(
                () -> {
                    logger.info("Pouring batter into each cup in the pan\n");
                    TimeUnit.MILLISECONDS.sleep(5000);
                    return Futures.immediateFuture("Ready for Oven");
                }
                , executor);

// Do some other stuff on main thread
logger.info("Doing other stuff on main thread")

10:19:44.669 [IJava-executor-0] INFO  AsyncJava - Doing other stuff on main thread


#### Step 5 - Bake the cupcakes


<img src="static/baked.gif">

In [12]:
ListenableFuture<String> bakedCupcakes = Futures.whenAllSucceed(ovenPreheated, readyToBake)
    .call(
        () -> {
            boolean foo = Futures.getDone(ovenPreheated);
            logger.info("Baking the cupcakes\n");
            TimeUnit.MILLISECONDS.sleep(5000);
            return "Done";
        }
        , executor);
    
// Do some other stuff on main thread
logger.info("Doing other stuff on main thread")

10:19:44.749 [IJava-executor-0] INFO  AsyncJava - Doing other stuff on main thread


In [13]:
logger.info("{}", bakedCupcakes.get());

10:19:48.497 [pool-2-thread-5] INFO  AsyncJava - Mixing Eggs

10:19:48.799 [pool-2-thread-5] INFO  AsyncJava - Mixing Milk

10:19:49.111 [pool-2-thread-5] INFO  AsyncJava - Mixing Butter

10:19:49.413 [pool-2-thread-5] INFO  AsyncJava - Mixing Flour

10:19:49.728 [pool-2-thread-5] INFO  AsyncJava - Mixing Sugar

10:19:50.030 [pool-2-thread-5] INFO  AsyncJava - Mixing Baking Soda

10:19:50.345 [pool-2-thread-5] INFO  AsyncJava - Pouring batter into each cup in the pan

10:19:55.359 [pool-2-thread-5] INFO  AsyncJava - Baking the cupcakes

10:20:00.372 [IJava-executor-0] INFO  AsyncJava - Done


### Overall Flow
The overall flow of the asyc tasks we executed above can be visualized here

<img src="static/tot.gif">

# Locks

In [14]:
public class DataModel{
    @GuardedBy("this") private static int counter;
    private static DataModel instance = null;
    
    private DataModel(){
        counter = 0;
    }
    
    public static DataModel getDataModel(){
        if (instance == null){
            instance = new DataModel();
        }
        return instance;
    }
    
    public void incrementCounter(){
        counter++;
    }
    
    public void decrementCounter(){
        counter--;
    }
    
    public int getCounter(){
        return counter;
    }
}

public class TaskDoer  {
    private DataModel model;
    private int identifier;
    
    TaskDoer(int identifier){
        this.model = DataModel.getDataModel();
        this.identifier = identifier;
    }
    
    
    public synchronized void doTask(Integer sleep) throws InterruptedException{
        for (int i=0; i<10; i++){
            TimeUnit.MILLISECONDS.sleep(sleep);
            model.incrementCounter();
            logger.info("Current value of counter: {} from task {}", this.model.getCounter(), this.identifier);

        }
    }
    
}


ListenableFuture<String> task1 = executor.submit(() -> {TaskDoer t1 = new TaskDoer(1); t1.doTask(300); return "Done";});
ListenableFuture<String> task2 = executor.submit(() -> {TaskDoer t2 = new TaskDoer(2); t2.doTask(500); return "Done";});


task2.get();
task1.get();

10:20:00.768 [pool-2-thread-3] INFO  AsyncJava - Current value of counter: 1 from task 1
10:20:00.974 [pool-2-thread-4] INFO  AsyncJava - Current value of counter: 2 from task 2
10:20:01.071 [pool-2-thread-3] INFO  AsyncJava - Current value of counter: 3 from task 1
10:20:01.384 [pool-2-thread-3] INFO  AsyncJava - Current value of counter: 4 from task 1
10:20:01.480 [pool-2-thread-4] INFO  AsyncJava - Current value of counter: 5 from task 2
10:20:01.687 [pool-2-thread-3] INFO  AsyncJava - Current value of counter: 6 from task 1
10:20:01.986 [pool-2-thread-4] INFO  AsyncJava - Current value of counter: 7 from task 2
10:20:02.001 [pool-2-thread-3] INFO  AsyncJava - Current value of counter: 8 from task 1
10:20:02.316 [pool-2-thread-3] INFO  AsyncJava - Current value of counter: 9 from task 1
10:20:02.489 [pool-2-thread-4] INFO  AsyncJava - Current value of counter: 10 from task 2
10:20:02.631 [pool-2-thread-3] INFO  AsyncJava - Current value of counter: 11 from task 1
10:20:02.945 [pool-

Done