<a href="https://colab.research.google.com/github/TonatiuhGonzalez/Concurrencia-Python/blob/master/Reto2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **Tecnológico de Nacional de México campus Colima**
Maestría en Sistemas Computacionales

Tecnologías de programación

**Reto Práctico 2:** Primitivas de Sincronización

Elaborado por: Sabino Tonatiuh González Rodríguez

Profesora: Dra. Patricia Elizabeth Figueroa Millán

#**Lock**

Implementa un cierre de exclusión mutua para tareas asyncio. No es seguro en hilos.

Un cierre asyncio puede usarse para garantizar el acceso en exclusiva a un recurso compartido.

Un Lock se puede usar para proteger el acceso a un recurso compartido. Solamente el titular del bloqueo puede utilizar el recurso. Múltiples intentos de adquirir la cerradura se bloquearán de modo que solo haya un titular en cada momento.

In [None]:
import threading
import time
from random import randint
                    
class SharedCounter(object):
  
    def __init__(self, val = 0):
        self.lock = threading.Lock()
        self.counter = val
        
    def increment(self):
        print("Waiting for a lock")
        self.lock.acquire()
        try:
            print('Acquired a lock, counter value: ', self.counter)
            self.counter = self.counter + 1
        finally:
            print('Released a lock, counter value: ', self.counter)
            self.lock.release()

def task(c):
    # picking up a random number
    r = randint(1,5)
    # running increment for a random number of times
    for i in range(r):
      c.increment()
    print('Done')

if __name__ == '__main__':
    sCounter = SharedCounter()

    t1 = threading.Thread(target=task, args=(sCounter,))
    t1.start()
    
    t2 = threading.Thread(target=task, args=(sCounter,))
    t2.start()

    print('Waiting for worker threads')

Waiting for a lock
Acquired a lock, counter value:  0
Released a lock, counter value:  1
Waiting for a lock
Acquired a lock, counter value:  1
Released a lock, counter value:  2
Waiting for a lock
Acquired a lock, counter value:  2
Released a lock, counter value:  3
Waiting for a lock
Acquired a lock, counter value:  3
Released a lock, counter value:  4
Waiting for a lock
Acquired a lock, counter value:  4
Released a lock, counter value:  5
Done
Waiting for a lock
Acquired a lock, counter value:  5
Released a lock, counter value:  6
Waiting for a lock
Acquired a lock, counter value:  6
Released a lock, counter value:  7
Waiting for a lock
Acquired a lock, counter value:  7
Released a lock, counter value:  8
Waiting for a lock
Acquired a lock, counter value:  8
Released a lock, counter value:  9
Done
Waiting for worker threads


#**Event**

Un objeto de eventos. No es seguro en hilos.

Un evento asyncio puede usarse para notificar a múltiples tareas asyncio que ha ocurrido algún evento.

Un objeto Event administra una bandera interna que se puede establecer en true con el método set() y se restablece en false con el método clear(). El método wait() se bloquea hasta que la bandera se establece en true. El flag se establece en false inicialmente.

Un asyncio.Event se basa en threading.Event, y se utiliza para permitir que varios consumidores esperen a que suceda algo sin buscar un valor específico para ser asociado con la notificación.

In [None]:
import threading
import time

def task(event, timeout):
  print("Started thread but waiting for event...")
  # make the thread wait for event with timeout set
  event_set = event.wait(timeout)
  if event_set:
    print("Event received, releasing thread...")
  else:
    print("Time out, moving ahead without event...")
    
if __name__ == '__main__':
  # initializing the event object
  e = threading.Event()
  
  # starting the thread
  thread1 = threading.Thread(name='Event-Blocking-Thread', target=task, args=(e,4))
  thread1.start()
  # sleeping the main thread for 3 seconds
  time.sleep(3)
  # generating the event
  e.set()
  print("Event is set.")
  
  

Started thread but waiting for event...


  self.id = cmd_id


Event is set.
Event received, releasing thread...


#**Condition**

Un objeto Condition. No seguro en hilos.

Una tarea puede usar una condición primitiva de asyncio para esperar a que suceda algún evento y luego obtener acceso exclusivo a un recurso compartido.

En esencia, un objeto Condition combina la funcionalidad de un objeto Event y un objeto Lock. Es posible tener múltiples objetos Condition que compartan un mismo Lock, lo que permite coordinar el acceso exclusivo a un recurso compartido entre diferentes tareas interesadas en estados particulares de ese recurso compartido.

Una Condition funciona de manera similar a un Event excepto que en lugar de notificar a todas las co-rutinas en espera de el número de procesos en espera que han sido despertados se controla con un argumento para notify().

In [None]:
import threading
import time
from random import randint

class SomeItem:
  # init method
  def __init__(self):
    # initialize empty list
    self.list = []
  
  # add to list method for producer
  def produce(self, item):
    print("Adding item to list...")
    self.list.append(item)
    
  # remove item from list method for consumer
  def consume(self):
    print("consuming item from list...")
    item = self.list[0]
    print("Item consumed: ", item)
    self.list.remove(item)

def producer(si, cond):
    r = randint(1,5)
    # creating random number of items
    for i in range(1, r):
      print("working on item creation, it will take: " + str(i) + " seconds")
      time.sleep(i)
      print("acquiring lock...")
      cond.acquire()
      try:
        si.produce(i)
        cond.notify()
      finally:
        cond.release()
      
def consumer(si, cond):
    cond.acquire()
    while True:
      try:
        si.consume()
      except:
        print("No item to consume...")
        # wait with a maximum timeout of 10 sec
        val = cond.wait(10)
        if val:
          print("notification received about item production...")
          continue
        else:
          print("waiting timeout...")
          break
        
    cond.release()
    
if __name__=='__main__':
  # condition object
  cond = threading.Condition()
  # SomeItem object
  si = SomeItem()
  # producer thread
  p = threading.Thread(target=producer, args=(si,cond,))
  p.start()
  # consumer thread
  c = threading.Thread(target=consumer, args=(si,cond,))
  c.start()

  #print('Waiting for producer and consumer threads...')
  p.join()
  c.join()
  print("Done")

working on item creation, it will take: 1 seconds
consuming item from list...
No item to consume...
acquiring lock...
Adding item to list...
working on item creation, it will take: 2 seconds
notification received about item production...
consuming item from list...
Item consumed:  1
consuming item from list...
No item to consume...
acquiring lock...
Adding item to list...
working on item creation, it will take: 3 seconds
notification received about item production...
consuming item from list...
Item consumed:  2
consuming item from list...
No item to consume...
acquiring lock...
Adding item to list...
notification received about item production...
consuming item from list...
Item consumed:  3
consuming item from list...
No item to consume...
waiting timeout...
Done


#**Semaphore**

Un semáforo gestiona un contador interno que se reduce en cada llamada al método acquire() y se incrementa en cada llamada al método release(). El contador nunca puede bajar de cero, cuando acquire() encuentra que es cero, se bloquea, esperando hasta que alguna tarea llame a release().

El argumento opcional value proporciona el valor inicial para el contador interno (1 por defecto). Si el valor dado es menor que 0 se lanza una excepción ValueError.



In [None]:
# importing the modules  
from threading import *           
import time          
    
# creating thread instance where count = 3  
my_obj = Semaphore(4)          
    
# creating instance  
def show(the_name):  
      
    # calling acquire method  
    my_obj.acquire()                  
    for n in range(6):  
        print('Javatpoint, ', end = '')  
        time.sleep(1)  
        print(the_name)  
            
        # calling release method  
        my_obj.release()      
            
# creating multiple thread   
thread_1 = Thread(target = show , args = ('Thread 1',))  
thread_2 = Thread(target = show , args = ('Thread 2',))  
thread_3 = Thread(target = show , args = ('Thread 3',))  
thread_4 = Thread(target = show , args = ('Thread 4',))  
thread_5 = Thread(target = show , args = ('Thread 5',))  
thread_6 = Thread(target = show , args = ('Thread 6',))  
  
# calling the threads   
thread_1.start()  
thread_2.start()  
thread_3.start()  
thread_4.start()  
thread_5.start()  
thread_6.start()  

Javatpoint, Javatpoint, Javatpoint, Javatpoint, 

#**BoundedSemaphore**

Un objeto semáforo delimitado. No es seguro en hilos.

BoundedSemaphore es una versión de la clase Semaphore que lanza una excepción ValueError en release() si aumenta el contador interno por encima del valor inicial.

In [None]:
import threading
from time import sleep
import logging

logging.basicConfig(
    level=logging.DEBUG,
    format='%(asctime)s (%(threadName)-2s) %(message)s',
)
# clase que administra la conexión de base de datos
class DBManager:
    def __init__(self) -> None:
        self.lock = threading.Lock()
        self.active_connections = []

    def connect(self, name):
        with self.lock:
            self.active_connections.append(name)
            print(f'Conexiones activas: {self.active_connections}')
            sleep(0.1)
    
    def disconnect(self, name):
        with self.lock:
            self.active_connections.remove(name)
            print(f'Conexiones activas: {self.active_connections}')
            sleep(0.1)

# método que simula la conexión a la base de datos
def handle_request(manager, semaphore,breaker):
    logging.debug('Esperando por conexión')
    semaphore.acquire()
    name = threading.current_thread().name
    manager.connect(name)
    sleep(1)
    manager.disconnect(name)
    semaphore.release()

    if(breaker):
        semaphore.release()

# creación de semáforo
semaphore = threading.BoundedSemaphore(3)

# creación de administrador de base de datos
manager = DBManager()

# crear hilos
for i in range(10):
    threading.Thread(target=handle_request, args=(manager, semaphore, i == 9),name="hilo "+str(i)).start()

# join 
for i in threading.enumerate():
    if i.name.startswith("hilo"):
        i.join()


Conexiones activas: ['hilo 0']
Conexiones activas: ['hilo 0', 'hilo 1']
Conexiones activas: ['hilo 0', 'hilo 1', 'hilo 2']
Conexiones activas: ['hilo 1', 'hilo 2']
Conexiones activas: ['hilo 1', 'hilo 2', 'hilo 3']
Conexiones activas: ['hilo 1', 'hilo 3']
Conexiones activas: ['hilo 3']
Conexiones activas: ['hilo 3', 'hilo 5']
Conexiones activas: ['hilo 3', 'hilo 5', 'hilo 4']
Conexiones activas: ['hilo 5', 'hilo 4']
Conexiones activas: ['hilo 5', 'hilo 4', 'hilo 6']
Conexiones activas: ['hilo 4', 'hilo 6']
Conexiones activas: ['hilo 6']
Conexiones activas: ['hilo 6', 'hilo 7']
Conexiones activas: ['hilo 6', 'hilo 7', 'hilo 8']
Conexiones activas: ['hilo 7', 'hilo 8']
Conexiones activas: ['hilo 7', 'hilo 8', 'hilo 9']
Conexiones activas: ['hilo 8', 'hilo 9']
Conexiones activas: ['hilo 9']


Exception in thread hilo 9:
Traceback (most recent call last):
  File "/usr/lib/python3.8/threading.py", line 932, in _bootstrap_inner
    self.run()
  File "/usr/lib/python3.8/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "<ipython-input-24-2cf4ab292ad7>", line 38, in handle_request
  File "/usr/lib/python3.8/threading.py", line 489, in release
    raise ValueError("Semaphore released too many times")
ValueError: Semaphore released too many times


Conexiones activas: []


#**Barrier**

Un objeto barrera. No es seguro para los hilos.

Una barrera es una primitiva de sincronización simple que permite bloquear hasta que un número determinado de tareas estén esperando en ella. Las tareas pueden esperar en el método wait() y se bloquearían hasta que el número especificado de tareas terminen de esperar en wait(). En ese momento todas las tareas en espera se desbloquearían simultáneamente.

async with puede utilizarse como alternativa a la espera en wait().

La barrera puede ser reutilizada cualquier número de veces.

In [None]:
import threading
import time

def start_server():
  # starting server
  print("starting the server...")
  # do some startup work
  time.sleep(2)
  

def server(b):
    start_server()
    b.wait()
    print("Server is ready.")

def client(b):
    print("waiting for server getting ready...")
    b.wait()
    print("sending request to server...")

if __name__=='__main__':
  
  b = threading.Barrier(2, timeout=5)
  # server thread
  s = threading.Thread(target=server, args=(b,))
  s.start()
  # client thread
  c = threading.Thread(target=client, args=(b,))
  c.start()

  s.join()
  c.join()
  print("Done")

starting the server...
waiting for server getting ready...
Server is ready.
sending request to server...
Done
