In [None]:
#### Utilizzo class Thread

import threading

def func(id):
    print("Thread: ", id, " running\n")


if __name__ == "__main__":

    # creating thread
    t1 = threading.Thread(target=func, args=(1,))
    t2 = threading.Thread(target=func, args=(2,))
  
    # starting thread
    t1.start()
    t2.start()

    # wait until the thread finishes
    t1.join()
    t2.join()
    print("thread terminati")
    
    threads = list()
    
    for i in range(1,5):
        t = threading.Thread(target=func, args=("T"+str(i),))
        threads.append(t)
        t.start()
    
    """
        NOTA sul metodo enumerate, che ci permette di ottenere
        un oggetto enumerate che è una tupla contenente un contatore (che parte da 0 by default) 
        e i valori ottenuti iterando sopra l'oggetto target
    
        E.g.:
        seasons = ['Spring', 'Summer', 'Fall', 'Winter']
        list(enumerate(seasons))
        [(0, 'Spring'), (1, 'Summer'), (2, 'Fall'), (3, 'Winter')]
        list(enumerate(seasons, start=1))
        [(1, 'Spring'), (2, 'Summer'), (3, 'Fall'), (4, 'Winter')]
    """
    
    for index, t in enumerate(threads):
        print("joining thread index: ", index, "thread: ", t)

        t.join()
        



In [None]:
#### Utilizzo class Thread.local

import threading
import logging
import random

logging.basicConfig(level=logging.DEBUG,
                    format='(%(threadName)-0s)%(message)s',)

def show(d):
    try:
        val = d.val
    except AttributeError:
        logging.debug('No value yet')
    else:
        logging.debug('value=%s', val)

def f(d):
    show(d)
    d.val = random.randint(1, 100)
    show(d)

if __name__ == '__main__':
    d = threading.local()
    show(d)
    d.val = 999
    show(d)

    for i in range(2):
        t = threading.Thread(target=f, args=(d,))
        t.start()



In [None]:
# utilizzo classe Lock

import threading
from threading import Lock
import time
 
x = 10
 
def increment(increment_by,lock):
    """
        global in Python permette di modificare la variabile al di fuori
        dello scope corrente. E' utilizzato per le variabili globali.
        Di default x senza global è locale alla funzione.
        Una var. definite al di fuori di una funzione è automaticamente global.
        global serve solo all'interno delle funzioni per variabili definite
        all'esterno di esse.
    """
    global x
 
    """
        provare a rimuovere l'acquire e il release...far vedere che se togliessi
        time.sleep alla fine funzionerebbe lo stesso anche senza proteggere
        la sez. critica...questo perchè c'è il Global Interpreter Lock che viene
        rilasciato solo per thread I/O bound (e.g., sleep)
    """

    lock.acquire()
 
    local_counter = x
    local_counter += increment_by
 
    time.sleep(1)
 
    x = local_counter
    print(f'{threading.current_thread().name} increments x by {increment_by}, x: {x}')
 
    lock.release()
 
lock = Lock()
 
# creating threads
t1 = threading.Thread(target=increment, args=(5,lock))
t2 = threading.Thread(target=increment, args=(10,lock))
 
# starting the threads
t1.start()
t2.start()
 
# waiting for the threads to complete
t1.join()
t2.join()
 
print(f'The final value of x is {x}')

In [None]:
## Esempio di uso del RLock, ovvero metodo di una classe che usa un lock su cui c'è un lock di un'altro
## metodo che viene invocato

class X:
    def __init__(self):
        self.a = 1
        self.b = 2
        self.lock = threading.RLock() # provare a cambiare RLock in Lock e vedere cosa accade...

    def changeA(self):
        with self.lock:
            self.a = self.a + 1

    def changeB(self):
        with self.lock:
            self.b = self.b + self.a

    def changeAandB(self):
        # you can use chanceA and changeB thread-safe!
        with self.lock:
            self.changeA() # a usual lock would block at here
            self.changeB()

x = X()
x.changeA()
x.changeB()
x.changeAandB()

In [None]:
### Uso della classe Semaphore

from threading import *
from time import sleep
from random import random

# creating thread instance where count = 3
obj = Semaphore(3)

def display(name):

    obj.acquire()
    
    value = random()
    sleep(value)
    print(f'Thread {name} got {value}')

    obj.release()

if __name__ == '__main__':

    threads = []

    # creating and starting multiple threads
    for i in range(10):
        t = Thread(target = display , args = ('Thread-' + str(i),))
        threads.append(t)
        t.start()

    # wait for the threads to complete
    for thread in threads:
        print(thread)
        thread.join()


In [None]:
### utilizzo della classe Event

import threading as thd
import time

def task(event, timeout):
   print("Started thread but waiting for event...")

   # make the thread wait for event with timeout set
   event_set = event.wait(timeout)

   if event_set:
     print("Event received, releasing thread...")
   else:
     print("Time out, moving ahead without event...")
    
if __name__ == '__main__':

    # initializing the event object
    e = thd.Event()
  
    # starting the thread
    thread = thd.Thread(name='Event-Thread', target=task, args=(e,4))
    thread.start()

    # sleeping the main thread for 3 seconds
    time.sleep(3)

    # generating the event
    e.set()
    print("Event is set.")

    # wait for the thread to complete
    thread.join()
    print("Event-Thread joined")
  



In [None]:
### Uso di multiprocessing/multiprocess callable object
### NOTA: in jupiter notebook multiprocessing non funziona
### installare multiprocess che è la stessa cosa (pip install multiprocess)

import multiprocess as mp

def func():
   print ('Process running')
   return


if __name__ == '__main__':

    # creating process
    p = mp.Process(target = func)

    # starting process
    p.start()

    # wait until the process finishes
    p.join()

    print("all joined")

In [None]:
### Uso di multiprocessing/multiprocess con class

import multiprocess as mp

class MyProcess(mp.Process):

    def run(self):
       print ('Process running')
       return

if __name__ == '__main__':

    # creating process
    p = MyProcess()

    # starting process
    p.start()

    # wait until the process finishes
    p.join()

In [None]:
from multiprocess import Process, Lock

def f(l, i):
    l.acquire()
    try:
        print('hello world', i)
    finally:
        l.release()

if __name__ == '__main__':
    lock = Lock()
    for num in range(10):
        Process(target=f, args=(lock, num)).start()

In [None]:
### uso di multiprocess Pipe

from multiprocess import Process, Pipe

def parentData(parent):
    parent.send(['Hello'])
    parent.close()
    
def childData(child):
    child.send(['Bye'])
    child.close()

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p1 = Process(target=parentData, args=(parent_conn,))
    p1.start()
    p2 = Process(target=childData, args=(child_conn,))
    p2.start()
    print(parent_conn.recv())
    print(child_conn.recv())   
    p1.join()
    p2.join()


In [None]:
## uso di multiprocess Queue

from multiprocess import Process, Queue

def f(q):
    q.put([42, None, 'hello'])

if __name__ == '__main__':
    q = Queue()
    p = Process(target=f, args=(q,))
    p.start()
    print(q.get())
    p.join()


In [None]:
### uso di multiprocess shared memory
from multiprocess import Process, Value, Array

def f(n, a):
    n.value = 3.1415927
    for i in range(len(a)):
        a[i] = -a[i]

if __name__ == '__main__':
    num = Value('d', 0.0)
    arr = Array('i', range(10))
    p = Process(target=f, args=(num, arr))
    p.start()
    p.join()
    print(num.value)
    print(arr[:])


In [None]:
#### ESERCIZIO PROD_CONS con SEMAFORI con MULTITHREADING

import logging
import threading
import time
from random import randint

CONSUMER = 'Consumer'
PRODUCER = 'Producer'
N_CONSUMERS = 10
N_PRODUCERS = 10
QUEUE_SIZE = 5

logging.basicConfig(level=logging.DEBUG, format='[%(threadName)-0s] %(message)s',)

def get_an_available_item(queue):
    return queue.pop(0)


def make_an_item_available(queue):
    item = randint(0, 100)
    queue.append(item)

    return item


class consumerThread(threading.Thread):
    
    def __init__(self, mutex_C, empty, full, queue, name):

        threading.Thread.__init__(self, name=name)
        self.mutex_C = mutex_C
        self.empty = empty
        self.full = full
        self.queue = queue

    def run(self):
        logging.debug('\t\t\tStarted')

        logging.debug('\t\t\tChecking full semaphore ...')

        ####
        self.full.acquire() ### full == -1 se entra per primo il consumatore

        ### mutex.acquire()
        with self.mutex_C: ### entrerò se mutex>=0,l ockare il mutex prima dell'acquire di empty è un errore perchè non consentiamo consumazioni di elementi consumabili
            logging.debug('\t\t\tAcquired mutex')
        
            time.sleep(1.0)
            item = get_an_available_item(self.queue)
            logging.debug('\t\t\tItem: %r', item)

            logging.debug('\t\t\tRelease mutex')
            
        ## mutex.release()
            
        self.empty.release() ### andrò a risvegliare i prod. che sono in attesa
        
        logging.debug('\t\t\tReleased empty semaphore')

        ## CONSUMER CON VAR. COND e monitor signale and continue
        """
        with self.consumer_cv:
            logging.debug('\t\t\tObtained lock')
        
            while not an_item_is_available(self.queue):
                logging.debug('\t\t\tWaiting')
                self.consumer_cv.wait() ## non posso consumare perchè non c'è spazio disp.
        
            time.sleep(1.0)
            item = get_an_available_item(self.queue)
            logging.debug('\t\t\tItem: %r', item)

            logging.debug('\t\t\tNotify')
            self.producer_cv.notify() ### notifico i produttori che sono sospesi
        """


def produce_one_item(mutex_P, empty, full, queue):
    logging.debug('Started')

    logging.debug('Checking empty semaphore...')

    empty.acquire() ### empty = 4 se sono il primo prod ad entrare

    with mutex_P: # lockare il mutex prima dell'acquire di empty è un errore perchè non consentiamo produzione di elementi producibili
        logging.debug('Acquired mutex')

        time.sleep(1.0)
        item = make_an_item_available(queue)
        logging.debug('Item: %r', item)


        logging.debug('Release mutex')
        
    full.release() ## avviserò i consumatori che sono in attesa, che possono consumare

    logging.debug('Released full semaphore')

    ## PRODUCER CON VAR. COND e monitor signal and continue
    """
    with producer_cv:
        logging.debug('Obtained lock')

        while not a_space_is_available(queue):
            logging.debug('Waiting')
            producer_cv.wait()

        time.sleep(1.0)
        item = make_an_item_available(queue)
        logging.debug('Item: %r', item)


        logging.debug('Notify')
        consumer_cv.notify()
    """


def main():
    
    # generating the queue, coda fatta con una list
    queue = [] 

    # Se volessi usare le var cond
    """
    cv_lock = threading.Lock()
    producer_cv = threading.Condition(lock=cv_lock) # uso un Lock per la procuder_cv, non posso usare un RLock
    consumer_cv = threading.Condition(lock=cv_lock) # uso un Lock per la consumer_cv, non posso usare un RLock
    """

    # Creiamo i semafori per gestire il problem prod/cons multipli
    mutex_P = threading.Semaphore() ### = 1 mutua esclusione tra i diversi prod durante la produzione
    mutex_C = threading.Semaphore() ### = 1 mutua esclusione tra i diversi cons durante la consumazione
    
    empty = threading.Semaphore(QUEUE_SIZE) ### semaforo per la produzione, inizializzato a QUEUE_SIZE (N produttori possono produrre)
    full = threading.Semaphore(0) ### semaforo per la consumazione, inizializzato a 0 (NON POSSO CONSUMARE ALL'inizio)

    consumers = []
    producers = []

    # generating the consumers
    for i in range (N_CONSUMERS):
        
        name=CONSUMER+str(i)

        # creazione del thread con estensione della classe Thread
        ct = consumerThread(mutex_C, empty, full, queue, name)
        ct.start()

        consumers.append(ct)


    # generating the producers
    for i in range (N_PRODUCERS):

        # creazione del thread con callable object (func)
        pt = threading.Thread(target=produce_one_item, name=PRODUCER+str(i),
                                args=(mutex_P, empty, full, queue),)

        pt.start()

        producers.append(pt)

    
    # waiting consumers termination
    for i in range (N_CONSUMERS):

        consumers[i].join()


    # waiting producers termination
    for i in range (N_PRODUCERS):

        producers[i].join()



if __name__ == '__main__':
    main()


In [None]:
#### ESERCIZIO PROD_CONS con variabili condition

import logging
import threading
import time
from random import randint

CONSUMER = 'Consumer'
PRODUCER = 'Producer'
N_CONSUMERS = 10
N_PRODUCERS = 10
QUEUE_SIZE = 5

logging.basicConfig(level=logging.DEBUG, format='[%(threadName)-0s] %(message)s',)


def an_item_is_available(queue):
    return not (len(queue) == 0)

def a_space_is_available(queue):
    return not (len(queue) == QUEUE_SIZE)


def get_an_available_item(queue):
    return queue.pop(0)


def make_an_item_available(queue):
    item = randint(0, 100)
    queue.append(item)

    return item


class consumerThread(threading.Thread):
    
    def __init__(self, producer_cv, consumer_cv, queue, name):

        threading.Thread.__init__(self, name=name)
        self.producer_cv = producer_cv
        self.consumer_cv = consumer_cv
        self.queue = queue

    def run(self):
        logging.debug('\t\t\tStarted')

        with self.consumer_cv:
            logging.debug('\t\t\tObtained lock')
        
            while not an_item_is_available(self.queue):
                logging.debug('\t\t\tWaiting')
                self.consumer_cv.wait() ## non posso consumare perchè non c'è spazio disp.
        
            time.sleep(1.0)
            item = get_an_available_item(self.queue)
            logging.debug('\t\t\tItem: %r', item)

            logging.debug('\t\t\tNotify')
            self.producer_cv.notify() ### notifico i produttori che sono sospesi

        logging.debug('\t\t\tReleased lock')


def produce_one_item(producer_cv, consumer_cv, queue):
    logging.debug('Started')
    
    with producer_cv:
        logging.debug('Obtained lock')

        while not a_space_is_available(queue):
            logging.debug('Waiting')
            producer_cv.wait()

        time.sleep(1.0)
        item = make_an_item_available(queue)
        logging.debug('Item: %r', item)


        logging.debug('Notify')
        consumer_cv.notify()

    logging.debug('Released lock')


def main():
    
    # generating the queue 
    queue = []

    # generating the condition variable
    cv_lock = threading.Lock()
    producer_cv = threading.Condition(lock=cv_lock) # uso un Lock per la procuder_cv, non posso usare un RLock
    consumer_cv = threading.Condition(lock=cv_lock) # uso un Lock per la consumer_cv, non posso usare un RLock

    consumers = []
    producers = []

    # generating the consumers
    for i in range (N_CONSUMERS):
        
        name=CONSUMER+str(i)

        ct = consumerThread(producer_cv, consumer_cv, queue, name)
        ct.start()

        consumers.append(ct)


    # generating the producers
    for i in range (N_PRODUCERS):

        pt = threading.Thread(
            target=produce_one_item,
            name=PRODUCER+str(i),
            args=(producer_cv, consumer_cv, queue),
        )

        pt.start()

        producers.append(pt)

    
    # waiting consumers termination
    for i in range (N_CONSUMERS):

        consumers[i].join()


    # waiting producers termination
    for i in range (N_PRODUCERS):

        producers[i].join()



if __name__ == '__main__':
    main()


In [None]:
#### ESERCIZIO PROD_CONS con SEMAFORI con MULTIPROCESS

import logging
import multiprocess
from multiprocess import Queue
import time
from random import randint

CONSUMER = 'Consumer'
PRODUCER = 'Producer'
N_CONSUMERS = 10
N_PRODUCERS = 10
QUEUE_SIZE = 1

#logging.basicConfig(level=logging.DEBUG, format='%(message)s',)

def get_an_available_item(queue):
    return queue.get()


def make_an_item_available(queue):
    item = randint(0, 100)
    queue.put(item)

    return item


class consumerProcess(multiprocess.Process):
    
    def __init__(self, queue, name):

        multiprocess.Process.__init__(self, name=name)
        self.queue = queue

    def run(self):
     
            time.sleep(1.0)
            item = get_an_available_item(self.queue)
            print(f'[PID: {multiprocess.current_process().pid}] \t\t\tConsumed Item: {item}\n')
        
            
        

def produce_one_item(queue):
    
        time.sleep(1.0)
        item = make_an_item_available(queue)
        print(f'[PID: {multiprocess.current_process().pid}] Produced Item: {item}\n')


def main():
    
    # generating the queue, coda fatta con una list
    queue = Queue(QUEUE_SIZE)

    consumers = []
    producers = []
    #multiprocess.set_start_method("spawn")
    
    # generating the consumers
    for i in range (N_CONSUMERS):
        
        name=CONSUMER+str(i)

        # creazione del process con estensione della classe Process
        ct = consumerProcess(queue, name)
        ct.start()

        consumers.append(ct)


    # generating the producers
    for i in range (N_PRODUCERS):

        # creazione del process con callable object (func)
        pt = multiprocess.Process(target=produce_one_item, name=PRODUCER+str(i),
                                args=(queue,))

        pt.start()

        producers.append(pt)    

    
    # waiting consumers termination
    for i in range (N_CONSUMERS):

        consumers[i].join()


    # waiting producers termination
    for i in range (N_PRODUCERS):

        producers[i].join()



if __name__ == '__main__':
    main()
