In [1]:


#### Utilizzo class Thread

import threading

def func(id):
    print("Thread: ", id, " running\n")


if __name__ == "__main__":

    # creating thread
    t1 = threading.Thread(target=func, args=(1,))
    t2 = threading.Thread(target=func, args=(2,))
  
    # starting thread
    t1.start()
    t2.start()

    # wait until the thread finishes
    t1.join()
    t2.join()
    print("thread terminati")
    
    threads = list()
    
    for i in range(1,5):
        t = threading.Thread(target=func, args=("T"+str(i),))
        threads.append(t)
        t.start()
    
    """
        NOTA sul metodo enumerate, che ci permette di ottenere
        un oggetto enumerate che è una tupla contenente un contatore (che parte da 0 by default) 
        e i valori ottenuti iterando sopra l'oggetto target
    
        E.g.:
        seasons = ['Spring', 'Summer', 'Fall', 'Winter']
        list(enumerate(seasons))
        [(0, 'Spring'), (1, 'Summer'), (2, 'Fall'), (3, 'Winter')]
        list(enumerate(seasons, start=1))
        [(1, 'Spring'), (2, 'Summer'), (3, 'Fall'), (4, 'Winter')]
    """
    
    for index, t in enumerate(threads):
        print("joining thread index: ", index, "thread: ", t)

        t.join()
        



Thread:  1  running

Thread:  2  running

thread terminati
Thread:  T1  running

Thread:  T2  running

Thread:  T3  running

Thread:  T4  running

joining thread index:  0 thread:  <Thread(Thread-7 (func), stopped 8560)>
joining thread index:  1 thread:  <Thread(Thread-8 (func), stopped 2824)>
joining thread index:  2 thread:  <Thread(Thread-9 (func), stopped 14832)>
joining thread index:  3 thread:  <Thread(Thread-10 (func), stopped 2000)>


In [1]:
#### Utilizzo class Thread.local

import threading
import logging
import random

logging.basicConfig(level=logging.DEBUG,
                    format='(%(threadName)-0s)%(message)s',)

def show(d):
    try:
        val = d.val
    except AttributeError:
        logging.debug('No value yet')
    else:
        logging.debug('value=%s', val)

def f(d):
    show(d)
    d.val = random.randint(1, 100)
    show(d)

if __name__ == '__main__':
    d = threading.local()
    show(d)
    d.val = 999
    show(d)

    for i in range(2):
        t = threading.Thread(target=f, args=(d,))
        t.start()



(MainThread)No value yet
(MainThread)value=999
(Thread-4 (f))No value yet
(Thread-5 (f))No value yet
(Thread-4 (f))value=52
(Thread-5 (f))value=91


In [2]:
# utilizzo classe Lock

"""
Il codice utilizza la classe Lock dal modulo threading di Python per 
sincronizzare l'accesso a una risorsa condivisa, 
in questo caso la variabile x e gli attributi a e b dell'istanza della classe X.

"""

import threading
from threading import Lock
import time
 
x = 10
 
def increment(increment_by,lock):
    """
        global in Python permette di modificare la variabile al di fuori
        dello scope corrente. E' utilizzato per le variabili globali.
        Di default x senza global è locale alla funzione.
        Una var. definita al di fuori di una funzione è automaticamente global.
        global serve solo all'interno delle funzioni per variabili definite
        all'esterno di esse.
    """
    global x
 
    """
        provare a rimuovere l'acquire e il release...far vedere che se togliessi
        time.sleep alla fine funzionerebbe lo stesso anche senza proteggere
        la sez. critica...questo perchè c'è il Global Interpreter Lock che viene
        rilasciato solo per thread I/O bound (e.g., sleep)
    """

    lock.acquire()
 
    local_counter = x
    local_counter += increment_by
 
    time.sleep(1)
 
    x = local_counter
    print(f'{threading.current_thread().name} increments x by {increment_by}, x: {x}')
 
    lock.release()
 
lock = Lock()
 
# creating threads
t1 = threading.Thread(target=increment, args=(5,lock))
t2 = threading.Thread(target=increment, args=(10,lock))
 
# starting the threads
t1.start()
t2.start()
 
# waiting for the threads to complete
t1.join()
t2.join()
 
print(f'The final value of x is {x}')

Thread-5 (increment) increments x by 5, x: 15
Thread-6 (increment) increments x by 10, x: 25
The final value of x is 25


In [3]:
## Esempio di uso del RLock, ovvero metodo di una classe che usa un lock su cui c'è un lock di un'altro
## metodo che viene invocato

import threading

"""
Viene definita una classe X che contiene due attributi, a e b, e un oggetto Lock
"""

class X:
    def __init__(self):
        self.a = 1
        self.b = 2
        self.lock = threading.RLock() # provare a cambiare RLock in Lock e vedere cosa accade...

    """ 
    I metodi changeA e changeB modificano rispettivamente gli attributi a e b 
    dell'istanza di X, utilizzando il blocco with self.lock 
    per sincronizzare l'accesso ai dati condivisi
    """
    def changeA(self):
        with self.lock:
            self.a = self.a + 1

    def changeB(self):
        with self.lock:
            self.b = self.b + self.a

    """ 
    il metodo changeAandB chiama i metodi changeA e changeB 
    all'interno di un unico blocco with self.lock, garantendo che 
    entrambi i cambiamenti avvengano in modo atomico e sincronizzato.
    """
    
    def changeAandB(self):
        # you can use chanceA and changeB thread-safe!
        with self.lock:
            self.changeA() # a usual lock would block at here
            self.changeB()

x = X()
x.changeA()
x.changeB()
x.changeAandB()

In [2]:
### Uso della classe Semaphore

from threading import *
from time import sleep
from random import random

# creating thread instance where count = 3
obj = Semaphore(3)

def display(name):

    obj.acquire()
    
    value = random()
    sleep(value)
    print(f'Thread {name} got {value}')

    obj.release()

if __name__ == '__main__':

    threads = []

    # creating and starting multiple threads
    for i in range(10):
        t = Thread(target = display , args = ('Thread-' + str(i),))
        threads.append(t)
        t.start()

    # wait for the threads to complete
    for thread in threads:
        print(thread)
        thread.join()


<Thread(Thread-6 (display), started 13200)>
Thread Thread-1 got 0.3406285184017147
Thread Thread-0 got 0.44935265302860317
<Thread(Thread-7 (display), stopped 18620)>
<Thread(Thread-8 (display), started 9176)>
Thread Thread-2 got 0.5232672697162241
<Thread(Thread-9 (display), started 15556)>
Thread Thread-4 got 0.29802353136493964
Thread Thread-5 got 0.2616326484238263
Thread Thread-7 got 0.011593078327443607
Thread Thread-3 got 0.5628929937073931
<Thread(Thread-10 (display), stopped 12532)>
<Thread(Thread-11 (display), stopped 2348)>
<Thread(Thread-12 (display), started 6416)>
Thread Thread-9 got 0.1632440061270437
Thread Thread-6 got 0.338474400850155
<Thread(Thread-13 (display), stopped 15716)>
<Thread(Thread-14 (display), started 15200)>
Thread Thread-8 got 0.29955275519038616
<Thread(Thread-15 (display), stopped 5016)>


In [4]:
### utilizzo della classe Event

import threading as thd
import time

#Definisce una funzione task che prende due parametri: 
# - event (un oggetto Event) 
# - timeout (il tempo massimo di attesa per l'evento).

def task(event, timeout):
   print("Started thread but waiting for event...")

   # make the thread wait for event with timeout set
   event_set = event.wait(timeout)

   if event_set:
     print("Event received, releasing thread...")
   else:
     print("Time out, moving ahead without event...")
    
if __name__ == '__main__':

    # initializing the event object
    e = thd.Event()
  
    # starting the thread
    thread = thd.Thread(name='Event-Thread', target=task, args=(e,4))
    thread.start()

    # sleeping the main thread for 3 seconds
    time.sleep(3)

    # generating the event
    e.set()
    print("Event is set.")

    # wait for the thread to complete
    thread.join()
    print("Event-Thread joined")
  



Started thread but waiting for event...
Event is set.Event received, releasing thread...

Event-Thread joined


In [5]:
### Uso di multiprocessing/multiprocess callable object
### NOTA: in jupiter notebook multiprocessing non funziona
### installare multiprocess che è la stessa cosa (pip install multiprocess)

import multiprocess as mp

def func():
   print ('Process running')
   return


if __name__ == '__main__':

    # creating process
    p = mp.Process(target = func)

    # starting process
    p.start()

    # wait until the process finishes
    p.join()

    print("all joined")

"""
NOTA: Il motivo per cui non vedi "Process running" stampato potrebbe essere dovuto a un problema di sincronizzazione delle stampe tra processi. 
In Python, quando si utilizzano processi multipli con il modulo multiprocessing, ogni processo ha il proprio spazio di stampa separato. 
Pertanto, le stampe effettuate all'interno del processo figlio non saranno visibili direttamente nel processo genitore, 
a meno che non si utilizzino metodi specifici per comunicare tra i processi.
"""

all joined


In [6]:
### Uso di multiprocessing/multiprocess con class

import multiprocess as mp

class MyProcess(mp.Process):

    def run(self):
       print ('Process running')
       return

if __name__ == '__main__':

    # creating process
    p = MyProcess()

    # starting process
    p.start()

    # wait until the process finishes
    p.join()

In [7]:
from multiprocess import Process, Lock

def f(l, i):
    l.acquire()
    try:
        print('hello world', i)
    finally:
        l.release()

if __name__ == '__main__':
    lock = Lock()
    for num in range(10):
        Process(target=f, args=(lock, num)).start()

In [8]:
### uso di multiprocess Pipe

"""
Una Pipe in Python è un meccanismo di comunicazione bidirezionale che permette lo scambio di dati tra processi. 
È costituita da due endpoint, uno per il processo genitore e uno per il processo figlio. 
I processi possono scrivere dati in un endpoint e leggerli dall'altro. 
La classe Pipe del modulo multiprocessing crea una Pipe e restituisce due oggetti connessione che rappresentano i due endpoint.

"""

from multiprocess import Process, Pipe

def parentData(parent):
    parent.send(['Hello'])
    parent.close()
    
def childData(child):
    child.send(['Bye'])
    child.close()

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p1 = Process(target=parentData, args=(parent_conn,))
    p1.start()
    p2 = Process(target=childData, args=(child_conn,))
    p2.start()
    print(parent_conn.recv()) #il parent riceve Bye
    print(child_conn.recv())  #il child riceve Hello 
    p1.join()
    p2.join()


['Bye']
['Hello']


In [9]:
## uso di multiprocess Queue

from multiprocess import Process, Queue

def f(q):
    q.put([42, None, 'hello'])

if __name__ == '__main__':
    q = Queue()
    p = Process(target=f, args=(q,))
    p.start()
    print(q.get())
    p.join()


[42, None, 'hello']


In [1]:
### uso di multiprocess shared memory

# Value viene utilizzata per creare una variabile condivisa tra i processi 
# Array viene utilizzata per creare un array condiviso tra i processi.
from multiprocess import Process, Value, Array

def f(n, a):
    n.value = 3.1415927
    for i in range(len(a)):
        a[i] = -a[i]

if __name__ == '__main__':
    num = Value('d', 0.0)
    arr = Array('i', range(10))
    p = Process(target=f, args=(num, arr))
    p.start()
    p.join()
    print(num.value)
    print(arr[:])


3.1415927
[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]


In [None]:
#### ESERCIZIO PROD_CONS con SEMAFORI con MULTITHREADING

#avere una struttura dati per la comunicazione interprocesso non è nuovo, 
#lo avete già usato in sistemi operativi quando dovevate implementare prod/cons con code di messaggo


import logging
import threading
import time
from random import randint

CONSUMER = 'Consumer'
PRODUCER = 'Producer'
N_CONSUMERS = 10
N_PRODUCERS = 10
QUEUE_SIZE = 5

# La coda la implementiamo attraverso una lista, quindi q = [], 
# perché in multithreading non c'è un concetto che astrae la coda come in multiprocess

## Logging, non è essenziale ma se lo imparate ad usare è meglio. 
logging.basicConfig(level=logging.DEBUG, format='[%(threadName)-0s] %(message)s',)

def get_an_available_item(queue):
    return queue.pop(0)


def make_an_item_available(queue):
    item = randint(0, 100)
    queue.append(item)

    return item

# Usiamo approccio estensione classe thread
class consumerThread(threading.Thread):
    
    def __init__(self, mutex_C, empty, full, queue, name): #devo passare il mutex che uso per disciplinare i consumatori e i semafori per la cooperazione

        threading.Thread.__init__(self, name=name) #ricordate che ustilizza la lista di default
        self.mutex_C = mutex_C
        self.empty = empty
        self.full = full
        self.queue = queue

    # Ora devo sovrascrivere funzione run

    def run(self):
        logging.debug('\t\t\tStarted')

        logging.debug('\t\t\tChecking full semaphore ...')

        ####
        self.full.acquire() ### full == -1 se entra per primo il consumatore

        ### mutex.acquire()
        with self.mutex_C: ### entrerò se mutex>=0, lockare il mutex prima dell'acquire di empty è un errore perchè non consentiamo consumazioni di elementi consumabili
            logging.debug('\t\t\tAcquired mutex')
        
            time.sleep(1.0)
            item = get_an_available_item(self.queue) #sarà un metodo che possiamo implementare direttamente anche fuori (si trova all'inizio del codice)
            logging.debug('\t\t\tItem: %r', item)

            logging.debug('\t\t\tRelease mutex')
            
        ## mutex.release()
            
        self.empty.release() ### andrò a risvegliare i prod. che sono in attesa
        
        logging.debug('\t\t\tReleased empty semaphore')

        ## CONSUMER CON VAR. COND e monitor signale and continue
        """
        with self.consumer_cv:
            logging.debug('\t\t\tObtained lock')
        
            while not an_item_is_available(self.queue):
                logging.debug('\t\t\tWaiting')
                self.consumer_cv.wait() ## non posso consumare perchè non c'è spazio disp.
        
            time.sleep(1.0)
            item = get_an_available_item(self.queue)
            logging.debug('\t\t\tItem: %r', item)

            logging.debug('\t\t\tNotify')
            self.producer_cv.notify() ### notifico i produttori che sono sospesi
        """


def produce_one_item(mutex_P, empty, full, queue):
    logging.debug('Started')

    logging.debug('Checking empty semaphore...')

    empty.acquire() ### empty = 4 se sono il primo prod ad entrare

    with mutex_P: # lockare il mutex prima dell'acquire di empty è un errore perchè non consentiamo produzione di elementi producibili
        logging.debug('Acquired mutex')

        time.sleep(1.0)
        item = make_an_item_available(queue)
        logging.debug('Item: %r', item)


        logging.debug('Release mutex')
        
    full.release() ## avviserò i consumatori che sono in attesa, che possono consumare

    logging.debug('Released full semaphore')

    ## PRODUCER CON VAR. COND e monitor signal and continue
    """
    with producer_cv:
        logging.debug('Obtained lock')

        while not a_space_is_available(queue):
            logging.debug('Waiting')
            producer_cv.wait()

        time.sleep(1.0)
        item = make_an_item_available(queue)
        logging.debug('Item: %r', item)


        logging.debug('Notify')
        consumer_cv.notify()
    """


def main():
    ##Questo main sarà una funzione che non e' veramente il main, ma utilizzeremo la tecnica del name per eseguire questa funzione
    
    # generating the queue, coda fatta con una list
    queue = [] ##viene definita nello scope globale. e' un oggetto condiviso tra i thread, multiprocess non si fa più, quando creeremo process, ci sarà una copia (una fork) dal processo padre ai processi figli, quindi questo oggetto sarà copiato e poi sarà indipendente

    # Se volessi usare le var cond
    """
    cv_lock = threading.Lock()
    producer_cv = threading.Condition(lock=cv_lock) # uso un Lock per la procuder_cv, non posso usare un RLock
    consumer_cv = threading.Condition(lock=cv_lock) # uso un Lock per la consumer_cv, non posso usare un RLock
    """

    # Creiamo i semafori per gestire il problem prod/cons multipli
    #NOTA: usiamo i semafori, me ne servono 4, 2 per disciplinare la competizione tra produttori e consumatori (che sono in realtà dei mutex), 2 che devono gestire la cooperazione (uno settato alla dim, e uno a 0)
    #devo prima produrre e poi faccio il lock, perché potete produrre in maniera concorrente. Questa cosa viene risolta introducendo il buffer di stato
    
    mutex_P = threading.Semaphore() ### = 1 mutua esclusione tra i diversi prod durante la produzione
    mutex_C = threading.Semaphore() ### = 1 mutua esclusione tra i diversi cons durante la consumazione
    
    empty = threading.Semaphore(QUEUE_SIZE) ### semaforo per la produzione, inizializzato a QUEUE_SIZE (N produttori possono produrre)
    full = threading.Semaphore(0) ### semaforo per la consumazione, inizializzato a 0 (NON POSSO CONSUMARE ALL'inizio)

    consumers = []
    producers = []

    # generating the consumers
    for i in range (N_CONSUMERS):
        
        name=CONSUMER+str(i)

        # creazione del thread con estensione della classe Thread
        ct = consumerThread(mutex_C, empty, full, queue, name)
        ct.start()

        consumers.append(ct)


    # generating the producers
    for i in range (N_PRODUCERS):

        # creazione del thread con callable object (func)
        pt = threading.Thread(target=produce_one_item, name=PRODUCER+str(i),
                                args=(mutex_P, empty, full, queue),)

        pt.start()

        producers.append(pt)

    
    # waiting consumers termination
    for i in range (N_CONSUMERS):

        consumers[i].join()


    # waiting producers termination
    for i in range (N_PRODUCERS):

        producers[i].join()



if __name__ == '__main__':
    main()


In [None]:
#### ESERCIZIO PROD_CONS con variabili condition

import logging
import threading
import time
from random import randint

CONSUMER = 'Consumer'
PRODUCER = 'Producer'
N_CONSUMERS = 10
N_PRODUCERS = 10
QUEUE_SIZE = 5

logging.basicConfig(level=logging.DEBUG, format='[%(threadName)-0s] %(message)s',)


def an_item_is_available(queue):
    return not (len(queue) == 0)

def a_space_is_available(queue):
    return not (len(queue) == QUEUE_SIZE)


def get_an_available_item(queue):
    return queue.pop(0)


def make_an_item_available(queue):
    item = randint(0, 100)
    queue.append(item)

    return item


class consumerThread(threading.Thread):
    
    def __init__(self, producer_cv, consumer_cv, queue, name):

        threading.Thread.__init__(self, name=name)
        self.producer_cv = producer_cv
        self.consumer_cv = consumer_cv
        self.queue = queue

    def run(self):
        logging.debug('\t\t\tStarted')

        with self.consumer_cv:
            logging.debug('\t\t\tObtained lock')
        
            while not an_item_is_available(self.queue):
                logging.debug('\t\t\tWaiting')
                self.consumer_cv.wait() ## non posso consumare perchè non c'è spazio disp.
        
            time.sleep(1.0)
            item = get_an_available_item(self.queue)
            logging.debug('\t\t\tItem: %r', item)

            logging.debug('\t\t\tNotify')
            self.producer_cv.notify() ### notifico i produttori che sono sospesi

        logging.debug('\t\t\tReleased lock')


def produce_one_item(producer_cv, consumer_cv, queue):
    logging.debug('Started')
    
    with producer_cv:
        logging.debug('Obtained lock')

        while not a_space_is_available(queue):
            logging.debug('Waiting')
            producer_cv.wait()

        time.sleep(1.0)
        item = make_an_item_available(queue)
        logging.debug('Item: %r', item)


        logging.debug('Notify')
        consumer_cv.notify()

    logging.debug('Released lock')


def main():
    
    # generating the queue 
    queue = []

    # generating the condition variable
    cv_lock = threading.Lock()
    producer_cv = threading.Condition(lock=cv_lock) # uso un Lock per la procuder_cv, non posso usare un RLock
    consumer_cv = threading.Condition(lock=cv_lock) # uso un Lock per la consumer_cv, non posso usare un RLock

    consumers = []
    producers = []

    # generating the consumers
    for i in range (N_CONSUMERS):
        
        name=CONSUMER+str(i)

        ct = consumerThread(producer_cv, consumer_cv, queue, name)
        ct.start()

        consumers.append(ct)


    # generating the producers
    for i in range (N_PRODUCERS):

        pt = threading.Thread(
            target=produce_one_item,
            name=PRODUCER+str(i),
            args=(producer_cv, consumer_cv, queue),
        )

        pt.start()

        producers.append(pt)

    
    # waiting consumers termination
    for i in range (N_CONSUMERS):

        consumers[i].join()


    # waiting producers termination
    for i in range (N_PRODUCERS):

        producers[i].join()



if __name__ == '__main__':
    main()


In [None]:
#### ESERCIZIO PROD_CONS con SEMAFORI con MULTIPROCESS

import logging
import multiprocess
from multiprocess import Queue
import time
from random import randint

CONSUMER = 'Consumer'
PRODUCER = 'Producer'
N_CONSUMERS = 10
N_PRODUCERS = 10
QUEUE_SIZE = 1

#logging.basicConfig(level=logging.DEBUG, format='%(message)s',)

def get_an_available_item(queue):
    return queue.get()


def make_an_item_available(queue):
    item = randint(0, 100)
    queue.put(item)

    return item


class consumerProcess(multiprocess.Process):
    
    def __init__(self, queue, name):

        multiprocess.Process.__init__(self, name=name)
        self.queue = queue

    def run(self):
     
            time.sleep(1.0)
            item = get_an_available_item(self.queue)
            print(f'[PID: {multiprocess.current_process().pid}] \t\t\tConsumed Item: {item}\n')
        
            
        

def produce_one_item(queue):
    
        time.sleep(1.0)
        item = make_an_item_available(queue)
        print(f'[PID: {multiprocess.current_process().pid}] Produced Item: {item}\n')


def main():
    
    # generating the queue, coda fatta con una list
    queue = Queue(QUEUE_SIZE)

    consumers = []
    producers = []
    #multiprocess.set_start_method("spawn")
    
    # generating the consumers
    for i in range (N_CONSUMERS):
        
        name=CONSUMER+str(i)

        # creazione del process con estensione della classe Process
        ct = consumerProcess(queue, name)
        ct.start()

        consumers.append(ct)


    # generating the producers
    for i in range (N_PRODUCERS):

        # creazione del process con callable object (func)
        pt = multiprocess.Process(target=produce_one_item, name=PRODUCER+str(i),
                                args=(queue,))

        pt.start()

        producers.append(pt)    

    
    # waiting consumers termination
    for i in range (N_CONSUMERS):

        consumers[i].join()


    # waiting producers termination
    for i in range (N_PRODUCERS):

        producers[i].join()



if __name__ == '__main__':
    main()
