## Le threading

### 1. Généralités

Les __threads__ et __les  processus__ sont des instructions issues de code à executer

  1. Les __threads__ s'exécutent dans le même segment de mémoire unique. 

  2. Les __processus__ s'exécutent dans des tas de mémoire séparés

Les deux concepts servent à raccoursir le temps du traitement des tâches à effecuter


Les __processus__ sont beaucoup plus délicat quant à leur gestion car, ils ne partagent  pas le même contexte

Les __threads__ quant à eux ils représentent une situation délicate aussi à géer qui concerne les ressources partagées de point de vue accès et modification

Le verrou d'interprète global __(Global Interpreter Luck)__ dans __CPython__ a été créé en tant que __mutex__ pour l'empêcher de déroutement de traitement des ressources qui causerons probabelement des instabilités au niveau programme, système ou encore l'aboutissement  à des resultats incohérents 

__Exemple de code qui s'execute séquentiellement__

In [67]:
from random import randint as r
import datetime 

class test:
    def __init__(self,value=0):
        self.value = value
    
    def process(self):
        time.sleep(r(1,2))
        if(self.value[0] < 4):
            self.value[0] = self.value[0] + 1
            print(f'La valeur est {self.value[0]} \n')
        


In [70]:
valeur = [1]

t1 = datetime.datetime.now().second
task = test(valeur)
for i in range(1,4):
    task.process()
t2 = datetime.datetime.now().second
print(f'Ecart: {t2 - t1}')

La valeur est 2 

La valeur est 3 

La valeur est 4 

Ecart: 4


__Exemple de code qui s'execute d'une façon parallele__ (en utilisant des threads)

In [71]:
from random import randint as r
import datetime  
from threading import Thread

class test(Thread):
    def __init__(self,value):
        # Appel du constructeur de la classe Thread
        Thread.__init__(self)
        self.value = value
    
    #Méthode qui fait partie la classe Thread
    def run(self):
        time.sleep(r(1,2))
        if(self.value[0] < 4):
            self.value[0] = self.value[0] + 1
            print(f'La valeur est {self.value[0]} \n')



In [73]:
valeur = [1]

t1 = datetime.datetime.now().second

task1 = test(valeur)
task2 = test(valeur)
task3 = test(valeur)
task4 = test(valeur)

task1.start()
task2.start()
task3.start()
task4.start()

task1.join()
task2.join()
task3.join()
task4.join()

t2 = datetime.datetime.now().second
print(f'Ecart: {t2 - t1}')


La valeur est 2 
La valeur est 3 
La valeur est 4 



Ecart: 2


In [74]:
valeur = [1]

t1 = datetime.datetime.now().second

for i in range(1,4):
    t = test(valeur)
    t.start()
for j in range(1,4):
    t.join()
t2 = datetime.datetime.now().second
print(f'Ecart: {t2 - t1}')

La valeur est 2 

La valeur est 3 
La valeur est 4 


Ecart: 2


### 2. Sémaphore

In [2]:
from random import randint as r
import time
import threading 

class test(Thread):
    def __init__(self,value,identifier,semaphore):
        # Appel du constructeur de la classe Thread
        Thread.__init__(self)
        self.value = value
        self.semaphore = semaphore
        self.identifier = identifier
    
    #Méthode qui fait partie la classe Thread
    def run(self):
        while(self.value[0]<100):  
            time.sleep(r(1,2))
            print(f'Thread {self.identifier}: Entrer dans la zonne critique \n')
            self.semaphore.acquire()
            self.value[0] = self.value[0] + 1
            print(f'La valeur est {self.value[0]} \n')
            self.semaphore.release()
            print(f'Thread {self.identifier}: Sortir de la zonne critique \n')

NameError: name 'Thread' is not defined

In [None]:
semaphore = threading.Semaphore()            
valeur = [1]

for i in range(1,4):
    t = test(valeur,i,semaphore)
    t.start()

### 3. Mutex  ( Utilisation de Lock ou Verouillage)

In [None]:
Mutex est similaire à Sémaphore même le code est presque identique, cependant il y a quelques différences

    1. Mutex à la possibilité de tracer les threads par id
    2. Mutex permet de différencier les threads sur la base du critère de priorité
    3. Mutex est récursive la  ressource pourra être allouée plusieurs fois au même thread

In [29]:
from random import randint as r
import time
import threading 

class test(Thread):
    def __init__(self,value,identifier,lock):
        # Appel du constructeur de la classe Thread
        Thread.__init__(self)
        self.value = value
        self.lock = lock
        self.identifier = identifier
    
    #Méthode qui fait partie la classe Thread
    def run(self):
        while(self.value[0]<100):  
            time.sleep(r(1,2))
            print(f'Thread {self.identifier}: Entrer dans la zonne critique \n')
            self.lock.acquire()
            self.value[0] = self.value[0] + 1
            print(f'La valeur est {self.value[0]} \n')
            self.lock.release()
            print(f'Thread {self.identifier}: Sortir de la zonne critique \n')

In [None]:
lock = threading.Lock()            
valeur = [1]

for i in range(1,4):
    t = test(valeur,i,lock)
    t.start()

### 4. Utilisation de RLock (Verouillage multiple le plus par apport à                                 Sémaphore)

1. A la différence de __Lock__ verouillage simple, __RLock__ est un verrou réentrant.

2. La méthode __acquire()__ peut être appelée plusieurs fois par le même thread sans blocage. 

3. En même temps __release()__ doit être appelée le même nombre de fois pour déverrouiller la ressource.

In [34]:
from random import randint as r
import time
import threading 

class test(Thread):
    def __init__(self,value,identifier,lock):
        # Appel du constructeur de la classe Thread
        Thread.__init__(self)
        self.value = value
        self.lock = lock
        self.identifier = identifier
    
    #Méthode qui fait partie la classe Thread
    def run(self):
        while(self.value[0]<100):  
            time.sleep(r(1,2))
            print(f'Thread {self.identifier}: Entrer dans la zonne critique \n')
            # ça va pas générer une erreur mais cause une déstabilisation
            # en cas d'utilisation de Lock et non pas RLock
            self.lock.acquire()
            self.lock.acquire() 
            self.value[0] = self.value[0] + 1
            print(f'La valeur est {self.value[0]} \n')
            self.lock.release()
            self.lock.release()
            

In [None]:
 # ça va pas générer une erreur mais cause une déstabilisation
 # en cas d'utilisation de Lock et non pas RLock
lock = threading.Lock()            
valeur = [1]

for i in range(1,4):
    t = test(valeur,i,lock)
    t.start()

In [None]:
# ça va pas générer une erreur mais cause une déstabilisation
 # en cas d'utilisation de Lock et non pas RLock
lock = threading.RLock()            
valeur = [1]

for i in range(1,4):
    t = test(valeur,i,lock)
    t.start()

### 5. Utilisation de Conditions (wait/notifiy)

Parfois nous avons recourt à des cas plus complèxes qu'une simple incrémentaton ou de partage de section critique

On aura besoin d'un mécanisme de communication des informations outre que le partage de ressources, il faut que sa soit fiable

In [1]:
import threading
import time
import random
class Serveur(threading.Thread):
    def __init__(self, liste, condition):
        threading.Thread.__init__(self)
        self.liste = liste
        self.condition = condition
    
    def run(self):
        while True:
            element = random.randint(0, 256)
            self.condition.acquire()
            print (f'Verous pris par {self.name}')
            self.liste.append(element) 
            print (f'{element} est ajouté à la liste par {self.name}')
            print (f'Notification de la part du {self.name}')
            self.condition.notify()
            print (f'Condition est libérée par {self.name}') 
            self.condition.release()
            time.sleep(1)

In [2]:
import threading
import random
class Client(threading.Thread):
    def __init__(self, integers, condition):
        threading.Thread.__init__(self)
        self.liste = liste
        self.condition = condition
    
    def run(self):
        while True:
            self.condition.acquire()
            print (f'Verous pris par { self.name}')
            while True:
                if self.liste:
                    element = self.liste.pop()
                    print (f'{element} extrait du {self.name}')
                    break
                print (f'Attente de la part de  {self.name}')
                self.condition.wait()
            print (f'Condition est libérée par  {self.name}') 
            self.condition.release()

In [None]:
liste = []
condition = threading.Condition()
s = Serveur(liste,condition)
c = Client(liste,condition)
s.start()
c.start()
s.join()
c.join()

### 6. Utilisation des queues (Alternative plus riche que condition)

Queues représente 4 état d'échange inter thread représentés par 4 méthodes de file d'attente suivantes:<br>

__put:__ met un thread dans la file d'attente.<br>
__get:__ supprime et renvoie un thread vers la file d'attente.<br>
__task_done:__ doit être appelé chaque fois qu'un thread a été traité.<br>
__join:__ Bloque un thread jusqu'à ce que tous les threads précedents délibèrent les ressources acquises 

In [None]:
class Server(threading.Thread):
    def __init__(self, queue):
        threading.Thread.__init__(self)
        self.queue = queue
    
    def run(self):
        while True:
            integer = random.randint(0, 256)
            self.queue.put(integer) 
            print (f'{integer}  ajouté à {self.name}')
            time.sleep(1)

In [None]:
class Client(threading.Thread):
    def __init__(self, queue):
        threading.Thread.__init__(self)
        self.queue = queue 
    def run(self):
        while True:
            integer = self.queue.get()
            print (f'{integer}  extrait par {self.name}') 
            self.queue.task_done()

### 7. Les événements 

In [None]:
class Client(threading.Thread):
    def __init__(self, liste, event):
      
        threading.Thread.__init__(self)
        self.liste = liste
        self.event = event
    def run(self):
        while True:
            self.event.wait()
            try:
                element = self.integers.pop()
                print (f'{element} extrait par {self.name}')
            except IndexError:
                # catch pop on empty list
                time.sleep(1)

In [None]:
class Serveur(threading.Thread):
    def __init__(self, liste, event):
        threading.Thread.__init__(self)
        self.liste = liste
        self.event = event
    
    def run(self):
        while True:
            element = random.randint(0, 256)
            self.liste.append(integer) 
            print (f'{element} ajouté à la liste par {self.name}')
            print (f'{self.event} set by {self.name}')
            self.event.set()
            self.event.clear()
            print 'event cleared by %s' % self.name
            time.sleep(1)

## 8. Le coroutine

### 8.1.Généralités

Les mots clés:<br><br>

a. __asyncio__ c'est la coroutine API

b. __async/await__ sont les mots clé dans le concpet de coroutine

Les axes:<br>

a. __La programmation parallèle__ (Répartition/segementation des tâches sur les coeurs du processeur)<br>

b. __La collaboration d'exécution__ Execution concurentielle d'une même tâche à travers des Threads 


### 8.2. Le coroutine en pratique

In [8]:
%%writefile c:\temp\threadtest.py
import threading
import asyncio
from time import perf_counter  

valeur = [1]

async def incremente(valeur):
    valeur[0]+=1
    print(valeur)
    
async def main():
    await asyncio.gather( 
          incremente(valeur),
          incremente(valeur),
          incremente(valeur),
          incremente(valeur),
          incremente(valeur)
    ) 

if __name__ =='__main__':
    debut = perf_counter() 
    asyncio.run(main())
    fin = perf_counter()
    ecart = float(fin) - float(debut) 
    print(f'{ecart:0.002f}')

Overwriting c:\temp\threadtest.py


 ### 8.2.1 Le pattern Chaining en coroutine

In [9]:
%%writefile c:\temp\chaining.py
import threading
import asyncio
from time import perf_counter  

valeur = [1] 

async def incremente(valeur):
    valeur[0]+=1
    print(valeur)
    return valeur

async def incrementeprime(valeur):
    valeur[0]+=2
    print(valeur)
    return valeur

async def incrementeseconde(valeur):
    valeur[0]+=2
    print(valeur)
    return valeur

async def chaine():
    v1  = await incremente(valeur)
    v2  = await incrementeprime(v1)
    v3  = await incrementeseconde(v2)

async def main():
    await chaine()
    

if __name__ == '__main__':
        asyncio.run(main())

Overwriting c:\temp\chaining.py
