# Programmation concurrente en C++

Dans le cours **Programmes Coopérants** vous avez les bases de la programmation concurentes en Python à l'aide de la création de process ou de thread. Vous avez également vu comment il est possible de faire du calcul parallèle et donc exploiter les architecture des super calculateurs moderne via le protocol MPI. 

Dans ce cours nous allons reprendre une partie de ce qui a été abordé mais en nous focalisant sur l'utilisation du C++. Plus précisément nous allons voir comment faire de la programmation concurrente en C++ via deux aspects : (i) l'utilisation du multithread ; (ii) l'utilisation de modèle de programmation asynchrone. 

Alors si des puristes lisent ce notebook, ils pourraient être tenté de dire que le multi-thread ce n'est pas forcément du concurrent mais du parallèles et que de l'asynchrone ce n'est pas du concurrent... C'est une question de point de vue. Plus précisément ce que l'on va voir aujourd'hui c'est comment faire un programme qui va devoir effectuer un grand nombre de tâches (une tâches étant une série d'instruction) et les effectuer le plus rapidement possible en tirant au maximum parti des ressources matériel à disposition. 

## Rappel sur les *lambda* fonctions

Pour commencer faisons juste un petit rappel sur les fonctions anonymes ou *lambda* fonctions. En effet lorsque l'on fait de la programmation concurrentes en C++ les fonctions anonymes s'avèrent être extrèmement pratique pour se simplifier la vie. Nous allons donc rapidement faire un tour d'horizon des leurs définitions et utilisations. 

La syntaxe générale des fonctions anonymes est la suivante : 

```c++
[capture]( params ) -> ret { body }
```

* `params` représente la liste des paramètres d'entrée de votre fonction, donc une suite de paramètres nommés et typés, comme dans une fonction classique
* `ret` est le type de retour de votre fonction anonyme, vous pouvez ne pas le préciser il est alors automatiquement déduit si le mot clé `return est présent dans `body` sinon il est considéré comme `void`
* `body` le corp de votre fonction
* `capture` une liste de variables existantes dans le scope de déclaration de la fonction anonyme est devant être transmises au scope interne à la fonction.

Vous pouvez si vous le souhaitez stocker votre fonction anonyme dans une variables pour l'utiliser ensuite de la manière suivante :

```c++
auto f = [capture](params) -> ret { body }
```

Regardons tout de suite un exemple: 

```c++
#include <iostream>

int main{

    auto f = [](const std::string& msg){
        std::cout << msg << "\n";
    }

    f("coucou")

    return 0;
}
```


Pour expliquer cette histoire de `capture` regardons l'exemple suivant : 

```c++

double a=2.1;
auto f = [](){ std::cout << "a = " << a << "\n" ; };
// Compilation Error 'a' is not captured 
```

En effet ce code ne compile pas car `a` est bien définit mais n'est pas accessible depuis l'intérieur de la lambda fonction. Une solution me direz vous est alors de passer a comme arguments d'entrée de la fonction. Oui c'est vrai. Mais on peut également utiliser le `capture` de la lambda fonction pour capturer `a` dans le scope de la fonction. Cela donne : 

```c++

double a=2.1;
auto f = [a](){ std::cout << "a = " << a << "\n" ; };
```

En revanche avec cette approche, la variable `a` est en lecture seule au sein de la fonction anonyme. C'est-à-dire que l'on ne peut pas modifier la variable `a` dans la fonction. Si vous souhaitez pouvoir modifier la valeu de `a` il faut la capturer par référence. 

```c++

double a=2.1;
auto f = [&a](){ 
    std::cout << "a = " << a << "\n" ; 
    a = 1024.0;
    };
```


*Remarque* il existe une syntaxe particulière au niveau du capture permettant de capturer toutes les variables présentent dans le scope de définition de la fonction pour les injecter dans le scope interne de la fonction. Il s'agit de :

* `[=]` qui capture toutes les variables par copie
* `[&]` qui capture toutes les variables par références, permettant ainsi de les modifiers au sein de la fonction. 

*Remarque 2* bien évidemment ces notations qui ont l'air très pratiques d'utilisation sont plutôt à éviter car je suis sur que vous en conviendrez ce n'est pas très propre comme approche. Ca manque de classe !

## Programmation multi-thread

Après ce bref rappel au sujet des fonctions anonymes nous allons pouvoir entrer dans le vif du sujet. Nous allons donc commencer par voir comment faire de la programmation multi-thread en c++. 

Pour rappel le modèle de programmation multi-thread a pour principe d'exploiter au maximum l'architecture multi-coeurs des processeurs récent. Pour cela le programme principal va créer des threads qui vont s'exécuter de manière concurrente sur les différents coeurs de votre processeurs. Pour ceux qui auraient oublié un thread, également appelé processus léger, est un ensemble d'instuction machine regroupé au sein d'une pile d'exécution partageant sa mémoire avec le processus l'ayant créé. Formulé autrement des threads sont des blocs d'instruction c++ partageant entre eux la même mémoire. 

Depuis la norme 2011 du c++, c++11, l'utilisation des threads est fortement simplifiée. Il vous suffit d'inclure le header file correspondant 

```c++
#include <thread>
```

L'élément de base est la classe `std::thread` qui va nous permettre de créer un thread pour une fonction donnée.  

Considérons tout de suite l'exemple suivant d'une fonction `main` créant un `thread` chargé d'afficher dix fois le même message.

```c++
#include <thread>
#include <iostream>

int main(){
    std::thread t = std::thread([]()->void {
        for( int i=0; i<10; i++ ){
            std::cout << "Hello World from thread" << std::endl;
        }
    });

    std::cout << "Hello World from the main function " << std::endl;

    t.join();

    return 0;
}
```


On constate alors à l'exécution que le thread et le programme principale vive chacun leur vie. Bien evidémment il est possible de créer autant de thread qu'on le souhaite dans un programme.  

```c++
#include <thread>
#include <iostream>

int main(){
    const int n_threads {10};

    std::vector<std::thread> _threads;

    for( int i=0; i<n_threads; i++){
        _threads.push_back(std::thread([](const int& tid)->void {
                            for( int i=0; i<10; i++ ){
                                 std::cout << "Hello World from thread "<< tid << std::endl;
                            }
            }, i)
        );
    }

    std::cout << "Hello World from the main function " << std::endl;

    for(std::thread& t: _threads){
        t.join();
    }

    return 0;
}
```

En exécutant ce code plusieurs fois sur votre ordinateur portable vous allez alors voir apparaître quelque chose de potentiellement étrange... De temps quelques lignes de la sortie sont entremélées entre elles. Par exemple 

```
Hello World from thread 5Hello World from thread 7
Hello World from thread Hello World from thread 
Hello World from thread Hello World from thread 5
8
Hello World from thread 8
7
```

Bizarre non ? Un avis sur la question ? C'est un effet de ce que l'on appel un `race condition` ! Le principe est simple, nous avons deux threads qui accède simultanément à la même variables partagée et la modifie. C'est le risque lorsque l'on fait du multithread. Pour prévenir cela il est nécessaire de mettre en place des mécanismes de verrouillage. 
￼


### Utilisations des mutex

Un mutex, *Mutual Exclusion*, est un objet utilisé en programmation concurrente pour éviter que différents threads n'accèdent simulténament à des ressources partagées. Pour utiliser un `mutex` en c++ il vous suffit d'inclure le header file correspondant

```c++
#include <mutex>
```

L'objet de base est `std::mutex`, oui ça manque d'originalité tout ça je sais. Cet objet est extrèmement simple car il ne possède que deux méthodes `lock()` et `unlock()`. Comme le nom le laisse imaginer ces deux méthodes ont respectivement pour but de bloquer  et débloquer les threads. 

Reprenons tout de suite l'exemple précédent. 


```c++
#include <vector>
#include <thread>
#include <iostream>
#include <mutex>

std::mutex mtx;

int main(){
  const int n_threads {10};

  std::vector<std::thread> _threads;

  for( int i=0; i<n_threads; i++){
    _threads.push_back(std::thread([](const int& tid)->void {
          for( int i=0; i<10; i++ ){
            mtx.lock();
            std::cout << "Hello World from thread "<< tid << std::endl;
            mtx.unlock();
          }
	}, i)
      );
  }

  std::cout << "Hello World from the main function " << std::endl;

  for(std::thread& t: _threads){
    t.join();
  }

  return 0;
}
```

Afin d'assurer qu'il n'y a pas d'accès simultanés à la sorte standard, via le `std::cout`, on encadre donc la ligne `std::cout` par un appel à la méthode `lock()` entrainant ainsi l'arrêt des autres threads et ensuite un appel à `unlock` pour débloquer les threads. 

**Attention** l'utilisation d'un mutex permet en effet de résoudre les problèmes d'accès concurrents aux ressources partagées. En revanche cela a un prix, il s'agit de la performance. En effet le fait de bloquer/débloquer des threads prends un temps minime certe mais non nul. Donc si on répête cette opération de nombreuses foix cela va fortement dégrader les performances. 

**Attention** il faut faire très attention lorsque vous utilisez un mutex au fait qu'un `lock` doit toujours être associé à un `unlock` sinon votre programme va se bloquer définitivement. Cela peut s'avérer parfois délicat notamment quand on doit en plus gérer les exceptions. Pour faciliter cela il existe dans la librairie standard, l'objet `std::unique_lock<Mutex>`. Ce dernier permet de faire un `lock` et surtout de déverouiller le mutex à la sortie du context, i.e. sortie de fonction par exemple. Par exemple voici ci-dessous un usage où l'on ne fait pas le `unlock` manuellement :

```c++
#include <iostream>       // std::cout
#include <thread>         // std::thread
#include <mutex>          // std::mutex, std::unique_lock

std::mutex mtx;           // mutex for critical section

void print_block (int n, char c) {
  std::unique_lock<std::mutex> lck (mtx);
  for (int i=0; i<n; ++i) { std::cout << c; }
  std::cout << '\n';
}

int main ()
{
  std::thread th1 (print_block,50,'*');
  std::thread th2 (print_block,50,'$');

  th1.join();
  th2.join();

  return 0;
}
```

Pour illustrer cela considérons par exemple le calcul de $\pi$. Une manière possible pour calculer $\pi$ est d'évaluer numériquement l'intégrale suivante :

$$ \pi = \int_{0}^{1} \frac{4}{1+x^2} $$

Le calcul séquentiel classique peut se faire de la manière suivante : 

```c++
#include <iostream> 

int main(){

    int nb_point = 100000000;
    double l=1./nb_point;

    double pi=0;
    for( int i=0; i<nb_point; i++){
        double x=l*(i+0.5);
        pi += l*( 4. / (1. + x*x ) );
    }

    std::cout << "PI = " << pi << std::endl;

    return 0;
}
```

Si vous lancez alors ce code et mesurez le temps d'exécution vous obtenez le résultat suivant : 

```bash 
0.84user 0.00system 0:00.85elapsed 98%CPU
```

Si l'on fait alors une version multi-thread naïve, c'est à dire avec plein de `lock`/`unlock`. Cela pourrait donner le code suivant : 

```c++
std::mutex mtx;

void pi_thread_worker(const uint& nbpoint, const uint tid, const uint nbthread, double& pi){
    double s = 0.;
    double l = 1./nbpoint;
    int start = tid*(nbpoint/nbthread);
    int stop = (tid+1)*(nbpoint/nbthread);
    if( tid == nbthread-1){
        stop += nbpoint%nbthread;
    }

    double x;
    for( int i=start; i<stop; i++){
        x = l * ( i + 0.5 );
        mtx.lock();
        pi += l * ( 4. / (1 + x*x ) );
        mtx.unlock();   
    }
}

int main(){
    if( argc == 1 ){
        std::cerr << "Specify the number of thread" << std::endl;
        return 1;
    }

    int nb_point = 100000000;
    int nb_thread = atoi(argv[1]);

    double pi=0;
    std::vector<std::thread> threads;
    for( int i=0; i<nb_thread; i++){
        threads.push_back( std::thread( pi_thread_worker, nb_point, i, nb_thread, std::ref(pi) ) );
    }

    for( int i=0; i<nb_thread; i++){
        threads[i].join();
    }

    return 0;
}
```

Si on lance alors ce code avec deux threads on obtient le résultat suivant : 

```bash 
11.56user 11.78system 0:11.84elapsed 197%CPU
```

On est donc plus de 10 fois plus lent que la version séquentiel !!!!! C'est normal en même temps, le code précédent c'est du grand n'importe quoi !!! En effet plutôt que de faire les `lock`/`unlock` dans la boucle il est préférable de créer une variable temporaire dans le thread. 

```c++
void pi_thread_worker(const uint& nbpoint, const uint tid, const uint nbthread, double& pi){
    double s = 0.;
    double l = 1./nbpoint;
    int start = tid*(nbpoint/nbthread);
    int stop = (tid+1)*(nbpoint/nbthread);
    if( tid == nbthread-1){
        stop += nbpoint%nbthread;
    }

    double x;
    double tmp=0;
    for( int i=start; i<stop; i++){
        x = l * ( i + 0.5 );
        tmp += l * ( 4. / (1 + x*x ) );
    }
    mtx.lock();
    pi += tmp;
    mtx.unlock():
}
```

Si on relance alors ce code, avec cette légère modification, toujours avec deux threads on obtient le résultat suivant : 
```bash 
0.95user 0.00system 0:00.49elapsed 195%CPU
```

On obtient donc dans ce cas un temps d'exécution diviser d'un facteur presque 2 par rapport à la version séquentiel. Comme quoi le langage ne fait pas tout le rigolo derrière le clavier a une part de responsabilité ... 

**Exercice** refaite le calcul de $\pi$ en multi-thread sans aucun mutex ! 

## Programmation asynchrone

Nous allons à présent voir une autre approche de programmation concurrente la programmation asynchrone. Le principe est de permettre l'exécution de tâches nécessitants un temps non négligeable en parallèle du fil d'exécution principale. Le modèle asynchrone a commencé a apparaître avec l'émergence des services web, base de données en lignes, ... Le principe est de permettre l'exécution en "tâche de fond" des fonctions consommatrice en temps mais pas en ressources, par exemple des requêtes sur une base de données, l'écriture ou la lecture de fichiers, ...  

Depuis la norme c++11 le C++ offre tout ce qu'il faut pour faire de la programmation asynchrone facilement. Tout ce dont vous avez besoin se trouve dans le header `future`

```c++
#include <future>
```

L'élément de base est la fonction `std::async` qui prend en argument : 

* Une politique d'exécution qui permet de controler le comportement asynchrone 
    * `std::launch::async` exécute la fonction de manière asynchrone dans un thread séparé
    * `std::launch::deferred` exécute la fonction de manière synchrone.
* Une fonction 
* Les arguments de la fonction 

Le prototype de cette fonction `std::async` est le suivant

```c++
template <class Fn, class... Args>
future<typename result_of<Fn(Args...)>::type> async (launch policy, Fn&& fn, Args&&... args);
```

On voit alors que cette fonction `async` nous renvoie un objet de type `std::future<T>`. L'objet `std::future<T>` est templaté par le type de retour de la fonction `fn` que vous donnez à votre `std::async`. Le `std::future` permet une encapsulation du résultat de la fonction asynchrone, permettant ainsi de toujours avoir le résultat a porté de main dans le programme principale.

Pour récupérer la valeur associée au `std::future` il suffit d'appeler la méthode `get()` qui vous renvoie alors la valeur retournée par votre fonction asynchrone. Si la fonction ne s'est pas encore exécutée ou n'est pas terminée au moment du `get()` l'appel à `get()` est bloquant et donc attends la fin de l'exécution de la fonction asynchrone. 

Voyons tout de suite un exemple : 

```c++
#include <iostream>
#include <future>

int main(){

  std::future<int> val = std::async(std::launch::async, []()-> int {
      std::cout	<< "Hello from future" << std::endl;
      return 1.;
    });


  std::cout << "Hello World from main" << std::endl;

  int value = val.get();
  std::cout << "Value = " << value << std::endl;
  return 0;

}
```

```bash 
$ g++ main.cpp -o main -lpthread
$ ./main 
Hello World from main
Hello from future
Value = 1
```

Pour voir de manière plus concrête l'intérêt de l'asynchrone regardons un exemple un peu plus parlant. Considérons un programme qui doit récupérer des informations dans des bases de données. Chaque requêtes à une base prends 5 secondes. Le code séquentiel classique pour simuler ce comportement est le suivant : 

```c++ 
#include <iostream>
#include <string>
#include <chrono>
#include <thread>

using namespace std::chrono;

std::string request_db(std::string req){
  std::this_thread::sleep_for(seconds(5));
  return "DB_" + req;
}


int main(){
  system_clock::time_point start = system_clock::now();

  std::string data1 = request_db("req db1");
  std::string data2 = request_db("req db2");

  auto end = system_clock::now();

  auto diff = duration_cast < std::chrono::seconds > (end - start).count();
  std::cout << "Total Time = " << diff << " Seconds" << std::endl;

  std::string data = data1 + " :: " + data2;
  std::cout << "Data = " << data << std::endl;

  return 0;
}
```

L'exécution de ce code donne alors comme résultat : 

```
Total Time = 10 Seconds
Data = DB_req db1 :: DB_req db2
```

Si maintenant on modifie très légèrement le code à coup de `std::async` on peut obtenir le code suivant : 

```c++ 
#include <iostream>
#include <string>
#include <chrono>
#include <thread>
#include <future>

using namespace std::chrono;

std::string request_db(std::string req){
  std::this_thread::sleep_for(seconds(5));
  return "DB_" + req;
}


int main(){
  system_clock::time_point start = system_clock::now();

  std::future<std::string> data1 = std::async(std::launch::async, request_db, "req db1");
  std::future<std::string> data2 = std::async(std::launch::async, request_db,"req db2");

  std::string data = data1.get() + " :: " + data2.get();

  auto end = system_clock::now();
  auto diff = duration_cast < std::chrono::seconds > (end - start).count();
  std::cout << "Total Time = " << diff << " Seconds" << std::endl;

  std::cout << "Data = " << data << std::endl;

  return 0;
}
```

On obtient alors le résultat suivant : 
```
Total Time = 5 Seconds
Data = DB_req db1 :: DB_req db2
```

Donc comme prévu le temps d'exécution est réduit à 5 secondes car les tâches se font simultanément !!! Wouaaaahhh oui je sais c'est génial. 

Pour finir ce premier tour d'horizon de la concurrence en C++ je vous propose de faire un exemple de gestin d'un ensemble de tâches. Il s'agit d'un système où l'on a une queue contenant les différents paramètres d'entrée, par exemple une requère à faire à une base de donnée, et des `workers` viennent piocher dans la queue font la requête et mettent le résultat dans une nouvelle `queue`.

Si vous êtes encore bien eveillé vous aurez peut-être remarqué que dans le scénario précédent on a besoin d'une `queue` mais il faut qu'elle soit un peu particulière puisque qu'elle va être partagée entre plusieurs fil d'exécution... Il faut donc que l'on s'occupe des problèmes d'accès concurrents. En d'autre mot quand on va faire un `pop()` par exemple et bien il faut être sur qu'un autre fil d'exécution ne fait pas un `pop()` exactement au même moment. Il nous faut donc une `queue` qui soit *thread-safe*.  Alors dans les cours Python vous avez l'habitude que je vous dise qu'il s'agit d'un langage merveilleux et donc que tout est déjà fait pour vous ... Et bien pas de chance en C++ c'est pas la même chose, néanmoins C++ reste un langage merveilleux pas de doute à ce sujet. Mais on vous prend moins la main donc pas de `queue` thread-safe sur étagère. Mais pas d'inquiétude vous allez voir c'est facile. 

Alors pour faire notre `queue` asynchrone, que l'on appelera avec beaucoup d'originalité `AsyncQueue` nous allons avoir besoin de trois choses : 

* La queue synchrone classique de la librairie standard `std::queue`
* Un mutex qui nous permettra de vérouiller la queue lorsque l'on opère dessus
* Une variable conditionnelle `std::condition_variable`, il s'agit d'un objet de la librairie standard qui permet de synchroniser les threads et de notifier des threads de certains évènements. 

Le fonctionnement attendu de la `AsyncQueue` est le suivant : lorsqu'un thread fait une action sur la queue (`push`, `pop` ou `empty`), un threads qui voudrait faire une action quelconque sur le queue est bloqué, un thread qui ne touche pas à la queue continu sont exécution. 
 
Ci-dessous un exemple d'implémentation de cette `AsyncQueue`, vous remarquerez l'usage de la méthode `wait` dans la méthode `pop` qui dans le cas où la queue est vide permet à un thread d'attendre qu'un autre thread fasse un `push` pour retourner un résultat.

```c++
#include <queue>
#include <mutex>
#include <condition_variable>


template<typename T>
class AsyncQueue{
private:
    std::queue<T> _queue;
    std::mutex _mtx;
    std::condition_variable _notifier;

public:
    AsyncQueue()=default;
    AsyncQueue(const AsyncQueue&) = delete;
    AsyncQueue& operator=(const AsyncQueue&) = delete;

    bool empty(){
        std::unique_lock<std::mutex> lock(this->_mutex);
        bool ret = this->_queue.empty();
        mlock.unlock();
        return ret;
    }

    void push(const T& x){
        std::unique_lock<std::mutex> lock(this->_mutex);
        this->_queue.push(x);
        mlock.unlock();
        this->_notifier.notify_one();
    }

    T pop(){
        std::unique_lock<std::mutex> lock(this->_mutex);
        while(this->queue.empty()){
            this->_notifier.wait(mlock);
        }
        T val = this->_queue.front(x);
        this->_queue.pop();
        return val;
    }
}
```

Une fois que nous avons fait notre `AsyncQueue` nous avons fait le plus dur !! Le reste c'est facile. Alors juste pour le cosmétique nous allons commencer par faire une fonction `async_print` qui va s'assurer que l'on ait pas de chevauchement des lignes à l'affichage. 

```c++
void async_print(std::string x) {     // Thread safe print 
  static std::mutex mutex;
  std::unique_lock<std::mutex> locker(mutex);
  std::cout << x << "\n";
  locker.unlock();
}
```

Ensuite on fait notre fonction `worker` qui prend en entrée la queue contenant les données d'entrées et la queue initialement vide qui va nous permettre de stocker les résultats. Le principe de cette fonction est très simple, tant que la queue des entrées n'est pas vide on fait quelque chose. Le quelque chose en question dans ce cas étant de dormir pendant X secondes avec X le numéro du worker. 


```c++
void worker(AsyncQueue<int>& input, unsigned int id, AsyncQueue<std::string>& output) {
  while( ! input.empty() ){
    auto item = input.pop();
    std::ostringstream tmp;
    tmp << " " << item << " --> C" << id;
    async_print(tmp.str());
    std::this_thread::sleep_for(std::chrono::seconds(id));
    tmp.str("");
    tmp << "       " << item << " done " << "C" << id << " --->  results";
    async_print(tmp.str());
    tmp.str("");
    tmp << "done " << item;
    output.push( tmp.str() );
  }
}
```


Pour finir nous pouvons alors écrire notre `main` de la manière suivante par exemple.

```c++
int main()
{
  const int nbWorker {4};
  const int nbInput {14};

  AsyncQueue<int> q;
  AsyncQueue<std::string> results;

  for( int i=0; i<nbInput ; i++){
    q.push( i );
  }

  std::vector<std::future<void> > workers;
  for (int i = 0 ; i < nbWorker ; ++i) {
    std::future<void> w = std::async(std::launch::async, worker, std::ref(q), i + 1, std::ref(results));
    workers.push_back(std::move(w));
  }

  for (auto& w : workers) {
    c.get();
  }

  while(!results.empty()){
    std::cout << results.pop() << std::endl;
  }
}
```

L'exécution de ce code donne alors le résultat suivant : 

```
 1 --> Worker2
 0 --> Worker1
 2 --> Worker3
 3 --> Worker4
       0 done Worker1 --->  results
 4 --> Worker1
       1 done Worker2 --->  results
 5 --> Worker2
       4 done Worker1 --->  results
 6 --> Worker1
       2 done Worker3 --->  results
 7 --> Worker3
       6 done Worker1 --->  results
 8 --> Worker1
       5 done Worker2 --->  results
 9 --> Worker2
       3 done Worker4 --->  results
 10 --> Worker4
       8 done Worker1 --->  results
 11 --> Worker1
       11 done Worker1 --->  results
 12 --> Worker1
       9 done Worker2 --->  results
 13 --> Worker2
       7 done Worker3 --->  results
       12 done Worker1 --->  results
       10 done Worker4 --->  results
       13 done Worker2 --->  results
done 0
done 1
done 4
done 2
done 6
done 5
done 3
done 8
done 11
done 9
done 7
done 12
done 10
done 13
```