In [15]:
#### Utilizzo class Thread

import threading

def func(id):
    print("Thread: ", id, " running\n")


if __name__ == "__main__":

    # creating thread
    t1 = threading.Thread(target=func, args=(1,))
    t2 = threading.Thread(target=func, args=(2,))
  
    # starting thread
    t1.start()
    t2.start()

    # wait until the thread finishes
    t1.join()
    t2.join()
    print("thread terminati")
    
    threads = list()
    
    for i in range(1,5):
        t = threading.Thread(target=func, args=("T"+str(i),))
        threads.append(t)
        t.start()
    
    """
        NOTA sul metodo enumerate, che ci permette di ottenere
        un oggetto enumerate che è una tupla contenente un contatore (che parte da 0 by default) 
        e i valori ottenuti iterando sopra l'oggetto target
    
        E.g.:
        seasons = ['Spring', 'Summer', 'Fall', 'Winter']
        list(enumerate(seasons))
        [(0, 'Spring'), (1, 'Summer'), (2, 'Fall'), (3, 'Winter')]
        list(enumerate(seasons, start=1))
        [(1, 'Spring'), (2, 'Summer'), (3, 'Fall'), (4, 'Winter')]
    """
    
    for index, t in enumerate(threads):
        print("joining thread index: ", index, "thread: ", t)

        t.join()
        



Thread:  1  running

Thread:  2  running

thread terminati
Thread:  T1  running

Thread: Thread:  T3  running

Thread:  T2  running

 T4  running

joining thread index:  0 thread:  <Thread(Thread-54, stopped 123145445109760)>
joining thread index:  1 thread:  <Thread(Thread-55, stopped 123145445109760)>
joining thread index:  2 thread:  <Thread(Thread-56, stopped 123145461899264)>
joining thread index:  3 thread:  <Thread(Thread-57, stopped 123145478688768)>


In [17]:
#### Utilizzo class Thread.local

import threading
import logging
import random

logging.basicConfig(level=logging.DEBUG,
                    format='(%(threadName)-0s)%(message)s',)

def show(d):
    try:
        val = d.val
    except AttributeError:
        logging.debug('No value yet')
    else:
        logging.debug('value=%s', val)

def f(d):
    show(d)
    d.val = random.randint(1, 100)
    show(d)

if __name__ == '__main__':
    d = threading.local()
    show(d)
    d.val = 999
    show(d)

    for i in range(2):
        t = threading.Thread(target=f, args=(d,))
        t.start()



(MainThread)No value yet
(MainThread)value=999
(Thread-58)No value yet
(Thread-59)No value yet
(Thread-58)value=97
(Thread-59)value=96


In [11]:
# utilizzo classe Lock

import threading
from threading import Lock
import time
 
x = 10
 
def increment(increment_by,lock):
    """
        global in Python permette di modificare la variabile al di fuori
        dello scope corrente. E' utilizzato per le variabili globali.
        Di default x senza global è locale alla funzione.
        Una var. definite al di fuori di una funzione è automaticamente global.
        global serve solo all'interno delle funzioni per variabili definite
        all'esterno di esse.
    """
    global x
 
    """
        provare a rimuovere l'acquire e il release...far vedere che se togliessi
        time.sleep alla fine funzionerebbe lo stesso anche senza proteggere
        la sez. critica...questo perchè c'è il Global Interpreter Lock che viene
        rilasciato solo per thread I/O bound (e.g., sleep)
    """

    lock.acquire()
 
    local_counter = x
    local_counter += increment_by
 
    time.sleep(1)
 
    x = local_counter
    print(f'{threading.current_thread().name} increments x by {increment_by}, x: {x}')
 
    lock.release()
 
lock = Lock()
 
# creating threads
t1 = threading.Thread(target=increment, args=(5,lock))
t2 = threading.Thread(target=increment, args=(10,lock))
 
# starting the threads
t1.start()
t2.start()
 
# waiting for the threads to complete
t1.join()
t2.join()
 
print(f'The final value of x is {x}')

Thread-513 increments x by 5, x: 15
Thread-514 increments x by 10, x: 25
The final value of x is 25


In [3]:
### Uso della classe Semaphore

from threading import *
from time import sleep
from random import random

# creating thread instance where count = 3
obj = Semaphore(3)

def display(name):

    obj.acquire()
    
    value = random()
    sleep(value)
    print(f'Thread {name} got {value}')

    obj.release()

if __name__ == '__main__':

    threads = []

    # creating and starting multiple threads
    for i in range(10):
        t = Thread(target = display , args = ('Thread-' + str(i),))
        threads.append(t)
        t.start()

    # wait for the threads to complete
    for thread in threads:
        print(thread)
        thread.join()


<Thread(Thread-18, started 123145631920128)>
Thread Thread-2 got 0.38436540834349897
Thread Thread-0 got 0.46918017097846965
<Thread(Thread-19, started 123145648709632)>
Thread Thread-1 got 0.7664924704650358
<Thread(Thread-20, stopped 123145665499136)>
<Thread(Thread-21, started 123145682288640)>
Thread Thread-5 got 0.28634592246906465
Thread Thread-6 got 0.045360035552779565
Thread Thread-7 got 0.05433158105652669
Thread Thread-4 got 0.8245272446821077
Thread Thread-3 got 0.9603268231430493
<Thread(Thread-22, stopped 123145699078144)>
<Thread(Thread-23, stopped 123145715867648)>
<Thread(Thread-24, stopped 123145732657152)>
<Thread(Thread-25, stopped 123145749446656)>
<Thread(Thread-26, started 123145766236160)>
Thread Thread-9 got 0.08629569133834392
Thread Thread-8 got 0.7187034918741149
<Thread(Thread-27, stopped 123145783025664)>


In [1]:
### utilizzo della classe Event

import threading as thd
import time

def task(event, timeout):
   print("Started thread but waiting for event...")

   # make the thread wait for event with timeout set
   event_set = event.wait(timeout)

   if event_set:
     print("Event received, releasing thread...")
   else:
     print("Time out, moving ahead without event...")
    
if __name__ == '__main__':

    # initializing the event object
    e = thd.Event()
  
    # starting the thread
    thread = thd.Thread(name='Event-Thread', target=task, args=(e,4))
    thread.start()

    # sleeping the main thread for 3 seconds
    time.sleep(3)

    # generating the event
    e.set()
    print("Event is set.")

    # wait for the thread to complete
    thread.join()
    print("Event-Thread joined")
  



Started thread but waiting for event...
Event is set.Event received, releasing thread...

Event-Thread joined


In [2]:
### Uso di multiprocessing/multiprocess callable object
### NOTA: in jupiter notebook multiprocessing non funziona
### installare multiprocess che è la stessa cosa (pip install multiprocess)

import multiprocess as mp

def func():
   print ('Process running')
   return


if __name__ == '__main__':

    # creating process
    p = mp.Process(target = func)

    # starting process
    p.start()

    # wait until the process finishes
    p.join()

    print("all joined")

Process running
all joined


In [32]:
### Uso di multiprocessing/multiprocess con class

import multiprocess as mp

class MyProcess(mp.Process):

    def run(self):
       print ('Process running')
       return

if __name__ == '__main__':

    # creating process
    p = MyProcess()

    # starting process
    p.start()

    # wait until the process finishes
    p.join()

Process running


In [3]:
from multiprocess import Process, Lock

def f(l, i):
    l.acquire()
    try:
        print('hello world', i)
    finally:
        l.release()

if __name__ == '__main__':
    lock = Lock()
    for num in range(10):
        Process(target=f, args=(lock, num)).start()

hello world 0
hello world1 
hello world 2
hello world 3
hello world 4


hello world 5
hello world 6
hello world 7
hello world 8
hello world 9


In [4]:
### uso di multiprocess Pipe

from multiprocess import Process, Pipe

def parentData(parent):
    parent.send(['Hello'])
    parent.close()
    
def childData(child):
    child.send(['Bye'])
    child.close()

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p1 = Process(target=parentData, args=(parent_conn,))
    p1.start()
    p2 = Process(target=childData, args=(child_conn,))
    p2.start()
    print(parent_conn.recv())
    print(child_conn.recv())   
    p1.join()
    p2.join()


['Bye']
['Hello']


In [36]:
## uso di multiprocess Queue

from multiprocess import Process, Queue

def f(q):
    q.put([42, None, 'hello'])

if __name__ == '__main__':
    q = Queue()
    p = Process(target=f, args=(q,))
    p.start()
    print(q.get())
    p.join()


[42, None, 'hello']


In [1]:
### uso di multiprocess shared memory
from multiprocess import Process, Value, Array

def f(n, a):
    n.value = 3.1415927
    for i in range(len(a)):
        a[i] = -a[i]

if __name__ == '__main__':
    num = Value('d', 0.0)
    arr = Array('i', range(10))
    p = Process(target=f, args=(num, arr))
    p.start()
    p.join()
    print(num.value)
    print(arr[:])


0.0
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]


Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "/Users/ldesi/opt/anaconda3/lib/python3.9/multiprocessing/spawn.py", line 116, in spawn_main
    exitcode = _main(fd, parent_sentinel)
  File "/Users/ldesi/opt/anaconda3/lib/python3.9/multiprocessing/spawn.py", line 126, in _main
    self = reduction.pickle.load(from_parent)
AttributeError: Can't get attribute 'f' on <module '__main__' (built-in)>


In [6]:
#### ESERCIZIO PROD_CONS con SEMAFORI

import logging
import threading
import time
from random import randint

CONSUMER = 'Consumer'
PRODUCER = 'Producer'
N_CONSUMERS = 10
N_PRODUCERS = 10
QUEUE_SIZE = 5

logging.basicConfig(level=logging.DEBUG, format='[%(threadName)-0s] %(message)s',)

def get_an_available_item(queue):
    return queue.pop(0)


def make_an_item_available(queue):
    item = randint(0, 100)
    queue.append(item)

    return item


class consumerThread(threading.Thread):
    
    def __init__(self, mutex, empty, full, queue, name):

        threading.Thread.__init__(self, name=name)
        self.mutex = mutex
        self.empty = empty
        self.full = full
        self.queue = queue

    def run(self):
        logging.debug('\t\t\tStarted')

        logging.debug('\t\t\tChecking full semaphore ...')

        ####
        self.full.acquire() ### full == -1 se entra per primo il consumatore

        ### mutex.acquire()
        with self.mutex: ### entrerò se mutex>=0
            logging.debug('\t\t\tAcquired mutex')
        
            time.sleep(1.0)
            item = get_an_available_item(self.queue)
            logging.debug('\t\t\tItem: %r', item)

            logging.debug('\t\t\tRelease mutex')
            
        ## mutex.release()
            
        self.empty.release() ### andrò a risvegliare i prod. che sono in attesa
        
        logging.debug('\t\t\tReleased empty semaphore')


def produce_one_item(mutex, empty, full, queue):
    logging.debug('Started')

    logging.debug('Checking empty semaphore...')

    empty.acquire() ### empty = 4 se sono il primo prod ad entrare

    with mutex:
        logging.debug('Acquired mutex')

        time.sleep(1.0)
        item = make_an_item_available(queue)
        logging.debug('Item: %r', item)


        logging.debug('Release mutex')
        
    full.release() ## avviserò i consumatori che sono in attesa, che possono consumare
    logging.debug('Released full semaphore')


def main():
    
    # generating the queue, coda fatta con una list
    queue = [] 

    # generating the condition variable
    mutex = threading.Semaphore() ### =1 mutua esclusione tra i diversi prod e cons
    empty = threading.Semaphore(QUEUE_SIZE) ### semaforo per la produzione, inizializzato a QUEUE_SIZE
    full = threading.Semaphore(0) ### semaforo per la consumazione, inizializzato a 0

    consumers = []
    producers = []

    # generating the consumers
    for i in range (N_CONSUMERS):
        
        name=CONSUMER+str(i)

        ct = consumerThread(mutex, empty, full, queue, name)
        ct.start()

        consumers.append(ct)


    # generating the producers
    for i in range (N_PRODUCERS):

        pt = threading.Thread(target=produce_one_item, name=PRODUCER+str(i),
                                args=(mutex, empty, full, queue),)

        pt.start()

        producers.append(pt)

    
    # waiting consumers termination
    for i in range (N_CONSUMERS):

        consumers[i].join()


    # waiting producers termination
    for i in range (N_PRODUCERS):

        producers[i].join()



if __name__ == '__main__':
    main()


[Consumer0] 			Started
[Consumer1] 			Started
[Consumer2] 			Started
[Consumer3] 			Started
[Consumer4] 			Started
[Consumer4] 			Checking full semaphore ...
[Consumer6] 			Started
[Consumer0] 			Checking full semaphore ...
[Consumer7] 			Started
[Consumer8] 			Started
[Consumer9] 			Started
[Consumer1] 			Checking full semaphore ...
[Producer0] Started
[Producer1] Started
[Producer2] Started
[Producer3] Started
[Producer4] Started
[Producer5] Started
[Consumer2] 			Checking full semaphore ...
[Producer6] Started
[Producer7] Started
[Consumer3] 			Checking full semaphore ...
[Producer8] Started
[Producer9] Started
[Consumer5] 			Started
[Consumer6] 			Checking full semaphore ...
[Consumer7] 			Checking full semaphore ...
[Consumer8] 			Checking full semaphore ...
[Consumer9] 			Checking full semaphore ...
[Producer0] Checking empty semaphore...
[Producer1] Checking empty semaphore...
[Producer2] Checking empty semaphore...
[Producer3] Checking empty semaphore...
[Producer4] Checking em

In [1]:
#### ESERCIZIO PROD_CONS con variabili condition

import logging
import threading
import time
from random import randint

CONSUMER = 'Consumer'
PRODUCER = 'Producer'
N_CONSUMERS = 10
N_PRODUCERS = 10
QUEUE_SIZE = 5

logging.basicConfig(level=logging.DEBUG, format='[%(threadName)-0s] %(message)s',)


def an_item_is_available(queue):
    return not (len(queue) == 0)

def a_space_is_available(queue):
    return not (len(queue) == QUEUE_SIZE)


def get_an_available_item(queue):
    return queue.pop(0)


def make_an_item_available(queue):
    item = randint(0, 100)
    queue.append(item)

    return item


class consumerThread(threading.Thread):
    
    def __init__(self, producer_cv, consumer_cv, queue, name):

        threading.Thread.__init__(self, name=name)
        self.producer_cv = producer_cv
        self.consumer_cv = consumer_cv
        self.queue = queue

    def run(self):
        logging.debug('\t\t\tStarted')

        with self.consumer_cv:
            logging.debug('\t\t\tObtained lock')
        
            while not an_item_is_available(self.queue):
                logging.debug('\t\t\tWaiting')
                self.consumer_cv.wait() ## non posso consumare perchè non c'è spazio disp.
        
            time.sleep(1.0)
            item = get_an_available_item(self.queue)
            logging.debug('\t\t\tItem: %r', item)

            logging.debug('\t\t\tNotify')
            self.producer_cv.notify() ### notifico i produttori che sono sospesi

        logging.debug('\t\t\tReleased lock')


def produce_one_item(producer_cv, consumer_cv, queue):
    logging.debug('Started')
    
    with producer_cv:
        logging.debug('Obtained lock')

        while not a_space_is_available(queue):
            logging.debug('Waiting')
            producer_cv.wait()

        time.sleep(1.0)
        item = make_an_item_available(queue)
        logging.debug('Item: %r', item)


        logging.debug('Notify')
        consumer_cv.notify()

    logging.debug('Released lock')


def main():
    
    # generating the queue 
    queue = []

    # generating the condition variable
    cv_lock = threading.Lock()
    producer_cv = threading.Condition(lock=cv_lock) # uso un Lock per la procuder_cv, non posso usare un RLock
    consumer_cv = threading.Condition(lock=cv_lock) # uso un Lock per la consumer_cv, non posso usare un RLock

    consumers = []
    producers = []

    # generating the consumers
    for i in range (N_CONSUMERS):
        
        name=CONSUMER+str(i)

        ct = consumerThread(producer_cv, consumer_cv, queue, name)
        ct.start()

        consumers.append(ct)


    # generating the producers
    for i in range (N_PRODUCERS):

        pt = threading.Thread(
            target=produce_one_item,
            name=PRODUCER+str(i),
            args=(producer_cv, consumer_cv, queue),
        )

        pt.start()

        producers.append(pt)

    
    # waiting consumers termination
    for i in range (N_CONSUMERS):

        consumers[i].join()


    # waiting producers termination
    for i in range (N_PRODUCERS):

        producers[i].join()



if __name__ == '__main__':
    main()


[Consumer0] 			Started
[Consumer1] 			Started
[Consumer2] 			Started
[Consumer0] 			Obtained lock
[Consumer3] 			Started
[Consumer4] 			Started
[Consumer5] 			Started
[Consumer6] 			Started
[Consumer7] 			Started
[Consumer8] 			Started
[Consumer9] 			Started
[Producer0] Started
[Producer1] Started
[Producer2] Started
[Producer3] Started
[Producer4] Started
[Producer5] Started
[Consumer0] 			Waiting
[Producer6] Started
[Producer7] Started
[Producer8] Started
[Producer9] Started
[Consumer1] 			Obtained lock
[Consumer1] 			Waiting
[Consumer2] 			Obtained lock
[Consumer2] 			Waiting
[Consumer3] 			Obtained lock
[Consumer3] 			Waiting
[Consumer4] 			Obtained lock
[Consumer4] 			Waiting
[Consumer5] 			Obtained lock
[Consumer5] 			Waiting
[Consumer6] 			Obtained lock
[Consumer6] 			Waiting
[Consumer7] 			Obtained lock
[Consumer7] 			Waiting
[Consumer8] 			Obtained lock
[Consumer8] 			Waiting
[Consumer9] 			Obtained lock
[Consumer9] 			Waiting
[Producer0] Obtained lock
[Producer0] Item: 83
[Pr