# Wykład 13 - Wielowątkowość - `Executor`

- Niskopoziomowe elementy jak `synchronized`, `wait()`, `notify()` są skomplikowane w zastosowaniu
- Duże poleganie na `synchronized` może doprowadzić do zbytniego obciążenia
- Zazwyczaj niezbędne są wysokopoziomowe konstrukty

Do Javy wprowadzono framework `java.util.concurent`:
- `Executor`
- `atomic` - klasy zapewniające wsparcie dla programowania **lock-free** na pojedynczych zmiennych
- `locks` - typ dostarczający funkcjonalność **lock-and-wait**

<img src="https://dz2cdn1.dzone.com/storage/temp/13048369-1581789944647.png" width="650"/>

## 13.1 `Executor`

Pozwala na wykonanie `Runnable` przez wyrażenie. Wykonanie odbywa się na
- aktualnym wątku
- nowym wątku
- na wątku z puli (`ThreadPool`)

In [None]:
public interface Executor {
  void execute(Runnable);
}

In [None]:
Executor executor = ...; 
executor.execute(new RunnableTask());

Ograniczenia:
- Działa tylko z obiektami `Runnable` - metoda `run` nie zwraca wartości, więc nie ma łatwej możliwości dostarczenia wartości zwrócionej do obiektu wywołującego
- Nie dostarcza metod śledzenia postępów wykonania
- Nie dostarcza metod anulowania zadania
- Nie dostarcza metod służących ustaleniu czy `Runnable` zakończył działania
- Nie dostarcza metod umożliwiających obsługę kolekcji `Runnable`
- Nie dostarcza metod umożliwiających przerwanie wykonania

## 13.2 `ExecutorService`

`ExecutorService` dostarcza odpowiednie metody do wykonania powyższych działań
- `boolean awaitTermination(long timeout, TimeUnit unit)` - oczekuj zakończenia wszystkich zadań, przekroczenia czasu oczekiwania lub przerwania
- `<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)` - wykonaj wszystkie `Callable` w kolekcji i zwróć `Future` (przechowuje status i rezultat)
- `<T> T invokeAny(Collection<? extends Callable<T>> tasks)` - wykonaj i zwróć rezultat dowolnego zakończonego sukcesem (nie rzucającego wyjątku) zadania
- `boolean isShutdown()` - zwróć `true` gdy `Executor` został zamknięty
- `boolean isTerminated()` - zwróć `true` gdy wszystkie zadania zostały wykonane
- `void shutdown()` - zainicjuj uporządkowane zamknięcie, w którym wcześniej przesłane zadania są wykonywane - żadne nowe zadanie nie zostanie zaakceptowane
- `<T> Future<T> submit(Callable<T> task)` - zgłoś `Callable` do wykonania i wróć `Future`
- `Future<?> submit(Runnabletask)` - zgłoś `Runnable` do wykonania i wróć `Future`
- `<T> Future<T> submit(Runnable task, T result)` - zgłoś `Runnable` do wykonania i wróć `Future` oraz wynik

`TimeUnit` - granulacja czasu:
- `TimeUnit.SECONDS`
- `TimeUnit.MICROSECONDS`
- `TimeUnit.MILLISECONDS`
- `TimeUnit.NANOSECONDS`
- `TimeUnit.DAYS`
- `TimeUnit.HOURS`

## 13.3 `Callable`

In [None]:
public interface Runnable {
    public void run();
}

public interface Callable<V> {
    V call() throws Exception;
}

`Callable` - interfejs generyczny z pojedynczą metodą zwracającą wartość `V`.

## 13.4 `Future`

`Future` - interfejs reprezentujący wynik asynchronicznego wykonania.

Metody:
- `boolean cancel(boolean mayInterruptIfRunning)` - spróbuj przerwać wykonanie zadania
- `V get()` - Czekaj (jeżeli to konieczne) na zakończenie zadania i zwróć wartość
- `V get(long timeout, TimeUnit unit)` - Czekaj przez określony czas na zakończenie zadania i zwróć wartość jeżeli jest dostępna
- `boolean isCancelled()` - sprawdź czy zadanie zostało przerwane przed zakończeniem
- `boolean isDone()` - sprawdź czy zadanie zostało zakończone

In [2]:
// java
import java.math.BigInteger;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class Main {
    final static int sum = 17;

    public static void main() {
        ExecutorService executor = Executors.newSingleThreadExecutor();
        Callable<BigInteger> callable;
        callable = new Callable<BigInteger>() {
            @Override
            public BigInteger call() {
                BigInteger result = BigInteger.ZERO;
                for (int i = 0; i <= sum; i++) {
                    result = result.add(BigInteger.valueOf(sum));
                }
                return result;
            }
        };

        Future<BigInteger> taskFuture = executor.submit(callable);
        try {
            while (!taskFuture.isDone())
                System.out.println("waiting");
            System.out.println(taskFuture.get());
        } catch (
                ExecutionException ee) {
            System.err.println("task threw an exception");
            ee.printStackTrace();
        } catch (
                InterruptedException ie) {
            System.err.println("interrupted while waiting");
        }
        executor.shutdown();
    }
}
Main.main()

waiting
306


In [6]:
// kotlin
import java.math.BigInteger
import java.util.concurrent.Callable
import java.util.concurrent.ExecutionException
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.Future


val sum = 17
fun main() {
    val executor = Executors.newSingleThreadExecutor()
    val callable: Callable<BigInteger> = Callable {
        var result = BigInteger.ZERO
        for (i in 0..sum) {
            result = result.add(BigInteger.valueOf(sum.toLong()))
        }
        result
    }
    val taskFuture = executor.submit(callable)
    try {
        while (!taskFuture.isDone) println("waiting")
        println(taskFuture.get())
    } catch (ee: ExecutionException) {
        System.err.println("task threw an exception")
        ee.printStackTrace()
    } catch (ie: InterruptedException) {
        System.err.println("interrupted while waiting")
    }
    executor.shutdown()
}
main()

waiting
waiting
waiting
306


## 13.5 `Executors`

- `newFixedThreadPool` - tworzy `ThreadPool` który wielokrotnie wykorzystuje określoną liczbę wątków z nieograniczonej współdzielonej kolejki.

```java
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>(),
                                  threadFactory);
}
```

- `newSingleThreadExecutor` - tworzy `Executor` wykorzystujący pojedynczy wątek roboczy operujący na nieograniczonej kolejce

```java
public static ExecutorService newSingleThreadExecutor() {
    return new DelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
    return new DelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>(),
                                threadFactory));
}
```

- `newCachedThreadPool` - tworzy `ThreadPool` tworzący nowe wątki gdy są potrzebne, używający już utworzonych wątków gdy są dostępne

```java
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>(),
                                  threadFactory);
```

- `newSingleThreadScheduledExecutor` - tworzy `Executor` który może zaplanować wykonanie zadań po określonym opóźnieniu lub periodycznie

```java
public static ScheduledExecutorService newSingleThreadScheduledExecutor()
public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory)
```

- `newScheduledThreadPool` - tworzy `ThreadPool` mogący zaplanować wykonanie zadań po określonym opóźnieniu lub periodycznie

```java
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize,
                                              ThreadFactory threadFactory)
```

In [6]:
// java

import java.math.BigInteger;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;

public class Main {
    private static BigInteger calculatedSum = BigInteger.ZERO;
    private static final int NUM_THREADS = 10;

    static class Sum implements Callable<BigInteger> {
        long from, to;
        BigInteger localSum = BigInteger.ZERO;

        public Sum(long from, long to) {
            this.from = from;
            this.to = to;
        }

        @Override
        public BigInteger call() {
            for (long i = from; i <= to; i++)
                localSum = localSum.add(BigInteger.valueOf(i));
            return localSum;
        }
    }

    public static void main() {
        ExecutorService executorService = Executors.newFixedThreadPool(NUM_THREADS);
        List<Future<BigInteger>> summationTasks = new ArrayList<>();

        long n = 1000000L;
        long taskNum = n / 10; // 10 tasks

        for (int i = 0; i < NUM_THREADS; i++) {
            long fromInInnerRange = (taskNum * i) + 1;
            long toInInnerRange = taskNum * (i + 1);

            System.out.printf("Wątek dla sumy w zakresie %d - %d %n", fromInInnerRange, toInInnerRange);
            Callable<BigInteger> summationTask = new Sum(fromInInnerRange, toInInnerRange);
            Future<BigInteger> futureSum = executorService.submit(summationTask);
            summationTasks.add(futureSum);
        }

        for (Future<BigInteger> partialSum : summationTasks) {
            try {
                calculatedSum = calculatedSum.add(partialSum.get());
            } catch (CancellationException | ExecutionException | InterruptedException exception) {
                exception.printStackTrace();
            }
        }
        System.out.printf("Suma = %d", calculatedSum);
        executorService.shutdown();
    }
}

Main.main()

Wątek dla sumy w zakresie 1 - 100000 
Wątek dla sumy w zakresie 100001 - 200000 
Wątek dla sumy w zakresie 200001 - 300000 
Wątek dla sumy w zakresie 300001 - 400000 
Wątek dla sumy w zakresie 400001 - 500000 
Wątek dla sumy w zakresie 500001 - 600000 
Wątek dla sumy w zakresie 600001 - 700000 
Wątek dla sumy w zakresie 700001 - 800000 
Wątek dla sumy w zakresie 800001 - 900000 
Wątek dla sumy w zakresie 900001 - 1000000 
Suma = 1000001000000

In [3]:
// kotlin

import java.math.BigInteger
import java.util.concurrent.*


var calculatedSum = BigInteger.ZERO
val NUM_THREADS = 10
fun main() {
    val executorService = Executors.newFixedThreadPool(NUM_THREADS)
    val summationTasks: MutableList<Future<BigInteger>> = ArrayList()
    val n = 1000000L
    val taskNum = n / 10 // 10 tasks
        
    for (i in 0 until NUM_THREADS) {
        val fromInInnerRange = taskNum * i + 1
        val toInInnerRange = taskNum * (i + 1)
        System.out.printf("Wątek dla sumy w zakresie %d - %d %n", fromInInnerRange, toInInnerRange)
        val summationTask: Callable<BigInteger> = Sum(fromInInnerRange, toInInnerRange)
        val futureSum = executorService.submit(summationTask)
        summationTasks.add(futureSum)
    }
    
    for (partialSum in summationTasks) {
        try {
            calculatedSum = calculatedSum.add(partialSum.get())
        } catch (exception: CancellationException) {
            exception.printStackTrace()
        } catch (exception: ExecutionException) {
            exception.printStackTrace()
        } catch (exception: InterruptedException) {
            exception.printStackTrace()
        }
    }
    
    System.out.printf("Suma = %d", calculatedSum)
    executorService.shutdown()
}

internal class Sum(var from: Long, var to: Long) : Callable<BigInteger> {
    var localSum: BigInteger = BigInteger.ZERO
    override fun call(): BigInteger {
        for (i in from..to) localSum = localSum.add(BigInteger.valueOf(i))
        return localSum
    }
}
main()

Wątek dla sumy w zakresie 1 - 100000 
Wątek dla sumy w zakresie 100001 - 200000 
Wątek dla sumy w zakresie 200001 - 300000 
Wątek dla sumy w zakresie 300001 - 400000 
Wątek dla sumy w zakresie 400001 - 500000 
Wątek dla sumy w zakresie 500001 - 600000 
Wątek dla sumy w zakresie 600001 - 700000 
Wątek dla sumy w zakresie 700001 - 800000 
Wątek dla sumy w zakresie 800001 - 900000 
Wątek dla sumy w zakresie 900001 - 1000000 
Suma = 500000500000