# Java Threads and Concurrency Utilities

## Chapter - 8
## Additional Concurrency Utilities

Coverage of the concurrency utilities by introducing you to concurrent collections, atomic variables, the Fork/Join Framework, and completion services.


ArrayList, TreeSet, HashMap, and other classes that implement these interfaces
are not thread-safe. However, you can make them thread-safe by using the synchronized wrapper methods located in the java.util.Collections class. For example, you can pass an ArrayList instance to Collections.synchronizedList() to obtain a thread-safe variant of ArrayList.

The following list offers a short sample of concurrency-oriented collection types that you’ll find in the java.util.concurrent package:
* BlockingQueue is a subinterface of java.util.Queue that also supports blocking operations that wait for the queue to become nonempty before retrieving an element and wait for space to become available in the queue before storing an element. Each of the ArrayBlockingQueue, DelayQueue, LinkedBlockingQueue, PriorityBlockingQueue, and SynchronousQueue classes implements this interface directly. The LinkedBlockingDeque and LinkedTransferQueue classes implement this interface via BlockingQueue subinterfaces.
* ConcurrentMap is a subinterface of java.util.Map that declares additional indivisible putIfAbsent(), remove(), and replace() methods. The ConcurrentHashMap class (the concurrent equivalent of java.util.HashMap), the ConcurrentNavigableMap class, and the ConcurrentSkipListMap class implement this interface.

## Using BlockingQueue and ArrayBlockingQueue
BlockingQueue’s Javadoc reveals the heart of a producer-consumer application that’s vastly simpler than the equivalent application shown in Chapter 3

In [1]:
BlockingQueue<Character> bq;
bq = new ArrayBlockingQueue<Character>(1);
ExecutorService executor = Executors.newFixedThreadPool(2);

In [2]:
Runnable producer = () -> {
    for (char ch = 'A'; ch <= 'Z'; ch++) {
        try {
            bq.put(ch);
            System.out.printf("%c produced by " + "producer.%n", ch);
        } catch (InterruptedException ie) {
        }
    }
};

In [3]:
Runnable consumer = () -> {
    char ch = '\0';
    do {
        try {
            ch = bq.take();
            System.out.printf("%c consumed by " + "consumer.%n", ch);
        } catch (InterruptedException ie) {
        }
    } while (ch != 'Z');
};

In [4]:
executor.execute(producer);
executor.execute(consumer);
executor.shutdownNow();

A produced by producer.
A consumed by consumer.
B produced by producer.
B consumed by consumer.
C produced by producer.
C consumed by consumer.
D produced by producer.
D consumed by consumer.
E produced by producer.
E consumed by consumer.
F produced by producer.
G produced by producer.
F consumed by consumer.
G consumed by consumer.
H consumed by consumer.
H produced by producer.
I produced by producer.
J produced by producer.
I consumed by consumer.
J consumed by consumer.
K produced by producer.
L produced by producer.
K consumed by consumer.
L consumed by consumer.
M consumed by consumer.
M produced by producer.
N produced by producer.
O produced by producer.
N consumed by consumer.
O consumed by consumer.
P produced by producer.
Q produced by producer.
P consumed by consumer.
Q consumed by consumer.
R consumed by consumer.
R produced by producer.
S produced by producer.
T produced by producer.
S consumed by consumer.
T consumed by consumer.
U consumed by consumer.
U produced by pr

[]

In [5]:
public class BlockingQueueManMade<T> {
    int size;
    List<T> items;

    public BlockingQueueManMade(int size) {
        super();
        this.size = size;
        this.items = new ArrayList<>();
    }
    
    public synchronized void put(T item) throws InterruptedException {
        while(this.items.size() == this.size) {
            wait();
        }
        
        if(this.items.size() == 0) {
            notifyAll();
        }
        
        this.items.add(item);
    }
    
    public synchronized T take() throws InterruptedException{
        while(this.items.size() == 0) {
            wait();
        }
        
        if(this.items.size() == this.size) {
            notifyAll();
        }
        
        return this.items.remove(0);
    }
}

In [6]:
BlockingQueueManMade<Integer> blockingQueue = new BlockingQueueManMade<>(1);

In [7]:
Runnable producer = () -> {
    try {
        int iterator = 0;
        while (true) {
            if (iterator++ == 10)
                break;

            String name = Thread.currentThread().getName();
            Integer i = (int) (Math.random() * 10);
            blockingQueue.put(i);
            System.out.println(name + " Producing:-> " + i + " at " +System.nanoTime());
        }
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
};

Runnable consumer = () -> {
    try {
        int iterator = 0;
        while (true) {
            if (iterator++ == 10)
                break;
            String name = Thread.currentThread().getName();
            Integer take = blockingQueue.take();
            System.out.println(name + " Consuming:<- " + take + " at " +System.nanoTime());
        }
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
};

In [8]:
BlockingQueueManMade<Integer> blockingQueue = new BlockingQueueManMade<>(1);

Thread threadProducer = new Thread(producer);
Thread threadConsumer = new Thread(consumer);
final ExecutorService executor = Executors.newFixedThreadPool(10);

In [9]:
executor.execute(threadProducer);
executor.execute(threadConsumer);
executor.shutdown();

pool-2-thread-1 Producing:-> 8 at 31848475216201
pool-2-thread-1 Producing:-> 1 at 31848517126999
pool-2-thread-2 Consuming:<- 8 at 31848517123356
pool-2-thread-2 Consuming:<- 1 at 31848518677342
pool-2-thread-1 Producing:-> 8 at 31848518694103
pool-2-thread-2 Consuming:<- 8 at 31848519218958
pool-2-thread-1 Producing:-> 9 at 31848519668466
pool-2-thread-2 Consuming:<- 9 at 31848520434282
pool-2-thread-1 Producing:-> 0 at 31848520997805
pool-2-thread-2 Consuming:<- 0 at 31848521491862
pool-2-thread-1 Producing:-> 7 at 31848521957319
pool-2-thread-2 Consuming:<- 7 at 31848522407420
pool-2-thread-1 Producing:-> 5 at 31848522869835
pool-2-thread-2 Consuming:<- 5 at 31848523480106
pool-2-thread-1 Producing:-> 7 at 31848524010833
pool-2-thread-2 Consuming:<- 7 at 31848524586268
pool-2-thread-1 Producing:-> 1 at 31848525123306
pool-2-thread-2 Consuming:<- 1 at 31848525670243
pool-2-thread-1 Producing:-> 2 at 31848526323742
pool-2-thread-2 Consuming:<- 2 at 31848527451696


### Blocking queue using Lock and Condition

In [10]:
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class BlockingQueue<T> {
    private Queue<T> queue = new LinkedList<T>();
    private int capacity;
    private Lock lock = new ReentrantLock();
    private Condition notFull = lock.newCondition();
    private Condition notEmpty = lock.newCondition();

    public BlockingQueue(int capacity) {
        this.capacity = capacity;
    }

    public void put(T element) throws InterruptedException {
        lock.lock();
        try {
            while (queue.size() == capacity) {
                notFull.await();
            }
            queue.add(element);
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
    }

    public T take() throws InterruptedException {
        lock.lock();
        try {
            while (queue.isEmpty()) {
                notEmpty.await();
            }
            T item = queue.remove();
            notFull.signal();
            return item;
        } finally {
            lock.unlock();
        }
    }
}

## Learning More About ConcurrentHashMap
The ConcurrentHashMap class behaves like HashMap but has been designed to work in multithreaded contexts without the need for explicit synchronization. 

In [11]:
ConcurrentHashMap<Integer, Integer> map = new ConcurrentHashMap<Integer, Integer>();
map.put(1, 1);
map

{1=1}

In [12]:
map.put(1, 2);
map

{1=2}

In [13]:
map.putIfAbsent(1, 3);
map

{1=2}

In [14]:
map.replace(1, 4);
map

{1=4}

## Atomic Variables

Java 5 provided the ability to create efficient nonblocking algorithms by introducing the java.util.concurrent.atomic package. According to this package’s JDK documentation, java.util.concurrent.atomic provides a small toolkit of classes that support lock-free, thread-safe operations on single variables.

The classes in the java.util.concurrent.atomic package extend the notion of volatile values, fields, and array elements to those that also provide an atomic conditional update so that external synchronization isn’t required. In other words, you get mutual exclusion along with the memory semantics associated with volatile variables without external synchronization.

Some of the classes located in java.util.concurrent.atomic are described here:
* AtomicBoolean: A boolean value that may be updated atomically.
* AtomicInteger: An int value that may be updated atomically.
* AtomicIntegerArray: An int array whose elements may be updated atomically.
* AtomicLong: A long value that may be updated atomically.
* AtomicLongArray: A long array whose elements may be updated atomically.
* AtomicReference: An object reference that may be updated atomically.
* AtomicReferenceArray: An object reference array whose elements may be updated atomically.

In [15]:
class ID {
private static volatile long nextID = 1;
   static synchronized long getNextID()
   {
      return nextID++;
   }
}

Although the code is properly synchronized (and visibility is accounted for), the intrinsic lock associated with synchronized can hurt performance under heavy thread contention. Furthermore, liveness problems such as deadlock can occur. Listing 8-3 shows you how to avoid these problems by replacing synchronized with an atomic variable.

In [16]:
import java.util.concurrent.atomic.AtomicLong;
class ID {
private static AtomicLong nextID = new AtomicLong(1);
   static long getNextID()
   {
return nextID.getAndIncrement(); }
}

Returning Unique IDs in a Thread-Safe Manner via AtomicLong

I’ve converted nextID from a long to an AtomicLong instance, initializing this object to 1. I’ve also refactored the getNextID() method to call AtomicLong’s getAndIncrement() method, which increments the AtomicLong instance’s internal long integer variable by 1 and returns the previous value in one indivisible step. There is no explicit synchronization.

The java.util.concurrent.atomic package includes DoubleAccumulator, DoubleAdder, LongAccumulator, and LongAdder classes that address a scalability problem in the context of maintaining a single count, sum, or some other value with the possibility of updates from many threads. These new classes “internally employ contention-reduction techniques that provide huge throughput improvements as compared to atomic variables. This is made possible by relaxing atomicity guarantees in a way that is acceptable in most applications.”

## Fork/Join Framework

There is always a need for code to execute faster. Historically, this need was met by increasing microprocessor speeds and/or by supporting multiple processors. However, somewhere around 2003, microprocessor speeds stopped increasing because of natural limits. To compensate, processor manufacturers started to add multiple processing cores to their processors, to increase speed through massive parallelism.

Java supports concurrency via its low-level threading features and higher-level concurrency utilities such as thread pools. The problem with concurrency is that it doesn’t maximize the use of available processor/core resources. For example, suppose you’ve created a sorting algorithm that divides an array into two halves, assigns two threads to sort each half, and merges the results after both threads finish.

Let’s assume that each thread runs on a different processor. Because different amounts of element reordering may occur in each half of the array, it’s possible that one thread will finish before the other thread and must wait before the merge can happen. In this case, a processor resource is wasted.

This problem (and the related problems of the code being verbose and harder to read) can be solved by recursively breaking a task into subtasks and combining results. These subtasks run in parallel and complete approximately at the same time (if not at the same moment), where their results are merged and passed up the stack to the previous layer of subtasks. Hardly any processor time is wasted through waiting, and the recursive code is less verbose and (usually) easier to understand. Java provides the Fork/Join Framework to implement this scenario.

Fork/Join consists of a special executor service and thread pool. The executor service makes a task available to the framework, and this task is broken into smaller tasks that are forked (executed by different threads) from the pool. A task waits until joined (its subtasks finish).

Fork/Join uses work stealing to minimize thread contention and overhead. Each worker thread from a pool of worker threads has its own double-ended work queue and pushes new tasks to this queue. It reads the task from the head of the queue. If the queue is empty, the worker thread tries to get a task from the tail of another queue. Stealing is infrequent because worker threads put tasks into their queues in a last-in, first-out (LIFO) order, and the size of work items gets smaller as a problem is divided into subproblems. You start by giving the tasks to a central worker and it keeps dividing them into smaller tasks. Eventually all of the workers have something to do with minimal synchronization.

## Fork/Join – RecursiveTask

In [17]:
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;
import java.util.stream.LongStream;

public class ForkJoinAdd extends RecursiveTask<Long> {

    private final long[] numbers;
    private final int start;
    private final int end;
    public static final long threshold = 10_000;

    public ForkJoinAdd(long[] numbers) {
        this(numbers, 0, numbers.length);
    }

    private ForkJoinAdd(long[] numbers, int start, int end) {
        this.numbers = numbers;
        this.start = start;
        this.end = end;
    }

    @Override
    protected Long compute() {

        int length = end - start;
        if (length <= threshold) {
            return add();
        }

        ForkJoinAdd firstTask = new ForkJoinAdd(numbers, start, start + length / 2);
        firstTask.fork(); //start asynchronously

        ForkJoinAdd secondTask = new ForkJoinAdd(numbers, start + length / 2, end);

        Long secondTaskResult = secondTask.compute();
        Long firstTaskResult = firstTask.join();

        return firstTaskResult + secondTaskResult;

    }

    private long add() {
        long result = 0;
        for (int i = start; i < end; i++) {
            result += numbers[i];
        }
        return result;
    }

    public static long startForkJoinSum(long n) {
        long[] numbers = LongStream.rangeClosed(1, n).toArray();
        ForkJoinTask<Long> task = new ForkJoinAdd(numbers);
        return new ForkJoinPool().invoke(task);
    }

}

In [18]:
System.out.println(ForkJoinAdd.startForkJoinSum(1_000_000));

500000500000


## Fork/Join – RecursiveAction

In [19]:
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveAction;

public class ForkJoinFibonacci extends RecursiveAction {

    private static final long threshold = 10;
    private volatile long number;

    public ForkJoinFibonacci(long number) {
        this.number = number;
    }

    public long getNumber() {
        return number;
    }

    @Override
    protected void compute() {
        long n = number;
        if (n <= threshold) {
            number = fib(n);
        } else {
            ForkJoinFibonacci f1 = new ForkJoinFibonacci(n - 1);
            ForkJoinFibonacci f2 = new ForkJoinFibonacci(n - 2);
            ForkJoinTask.invokeAll(f1, f2);
            number = f1.number + f2.number;
        }
    }

    private static long fib(long n) {
        if (n <= 1) return n;
        else return fib(n - 1) + fib(n - 2);
    }

}

In [20]:
ForkJoinFibonacci task = new ForkJoinFibonacci(50);
new ForkJoinPool().invoke(task);

System.out.println(task.getNumber());

12586269025


## Completion Services
A service that decouples the production of new asynchronous tasks from the consumption of the results of completed tasks. Producers submit tasks for execution. Consumers take completed tasks and process their results in the order they complete. A CompletionService can for example be used to manage asynchronous IO, in which tasks that perform reads are submitted in one part of a program or system, and then acted upon in a different part of the program when the reads complete, possibly in a different order than they were requested.
Typically, a CompletionService relies on a separate Executor to actually execute the tasks, in which case the CompletionService only manages an internal completion queue. The ExecutorCompletionService class provides an implementation of this approach.

Memory consistency effects: Actions in a thread prior to submitting a task to a CompletionService happen-before actions taken by that task, which in turn happen-before actions following a successful return from the corresponding take().

In [21]:
 class Worker implements Callable{
    @Override
    public Long call() throws Exception {
        //do some task and return back
        return System.currentTimeMillis();
    }
     
 }

In [22]:
Executor ex= Executors.newCachedThreadPool();
CompletionService<Long> cs = new ExecutorCompletionService<Long>(ex);
cs.submit(new Worker());
cs.submit(new Worker());
cs.submit(new Worker());

for(int i=0;i<3;i++){
    long l=cs.take().get();
    //utilize the result 
    System.out.println(l);
}

1594667656581
1594667656692
1594667656763


Summary
This chapter completed my tour of the concurrency utilities by introducing concurrent collections, atomic variables, the Fork/Join Framework, and completion services.

A concurrent collection is a concurrency performant and highly-scalable collections- oriented type that is stored in the java.util.concurrent package. It overcomes the ConcurrentModificationException and performance problems of thread-safe collections.

An atomic variable is an instance of a class that encapsulates a single variable and supports lock-free, thread-safe operations on that variable, for example, AtomicInteger.

The Fork/Join Framework consists of a special executor service and thread pool. The executor service makes a task available to the framework, and this task is broken down into smaller tasks that are forked (executed by different threads) from the pool. A task waits until it’s joined (its subtasks finish).

A completion service is an implementation of the CompletionService<V> interface that decouples the production of new asynchronous tasks (a producer) from the consumption of the results of completed tasks (a consumer). V is the type of a task result.