# Cómputo Concurrente

## Multiprocessing

El módulo `multiprocessing` de Python permite la creación, manipulación y sincronización de procesos, también ofrece concurrencia local como remota.

Ejemplo de motivación...

In [3]:
import time

def calc_cuad(numeros):
    print("calcula el cuadrado:")
    for n in numeros:
        time.sleep(0.2)
        print("cuadrado:", n * n)
        
def calc_cubo(numeros):
    print("calcula el cubo:")
    for n in numeros:
        time.sleep(0.2)
        print("cubo:", n * n * n)

nums = range(10)

t = time.time()
calc_cuad(nums)
calc_cubo(nums)


print("Finaliza ejecución")
print("Tiempo de ejecución", time.time()-t)


calcula el cuadrado:
cuadrado: 0
cuadrado: 1
cuadrado: 4
cuadrado: 9
cuadrado: 16
cuadrado: 25
cuadrado: 36
cuadrado: 49
cuadrado: 64
cuadrado: 81
calcula el cubo:
cubo: 0
cubo: 1
cubo: 8
cubo: 27
cubo: 64
cubo: 125
cubo: 216
cubo: 343
cubo: 512
cubo: 729
Finaliza ejecución
Tiempo de ejecución 4.028238296508789


Una manera sencilla de generar procesos en Python es por medio de la creación del objeto `Process` y llamarlo por medio del método `start()`.

In [9]:
import multiprocessing as mp

def tarea(nombre):
    print("Hola ", nombre)
    
if __name__ == '__main__':
    p = mp.Process(target=tarea, args=('Saul', ))
    p.start()
    p.join()


Hola  Saul


In [12]:
import multiprocessing as mp

def calc_cuad(numeros):
    print("calcula cuadrado de números")
    for n in numeros:
        time.sleep(0.2)
        print('cuadrado:', n * n )

def calc_cubo(numeros):
    print("calcula cubo de números")
    for n in numeros:
        time.sleep(0.2)
        print('cubo:', n * n * n)        
        
nums = range(10)

t = time.time()

p1 = mp.Process(target=calc_cuad, args=(nums,))
p2 = mp.Process(target=calc_cubo, args=(nums,))

p1.start()
p2.start()

p1.join()
p2.join()

print("Tiempo de ejecución: ", time.time()-t)
print("Finaliza ejecución")

calcula cuadrado de números
calcula cubo de números
cuadrado: 0
cubo: 0
cuadrado: 1
cubo: 1
cuadrado: 4
cubo: 8
cuadrado: 9
cubo: 27
cuadrado: 16
cubo: 64
cuadrado: 25
cubo: 125
cuadrado: 36
cubo: 216
cuadrado: 49
cubo: 343
cuadrado: 64
cubo: 512
cuadrado: 81
cubo: 729
Tiempo de ejecución:  2.114143133163452
Finaliza ejecución


## Identificadores pid, ppid

In [2]:
import multiprocessing as mp
import os

print("Nombre del proceso:", __name__)
print("Proceso padre:", os.getppid())
print("Proceso actual:", os.getpid())

Nombre del proceso: __main__
Proceso padre: 1586009
Proceso actual: 1588624


In [4]:
import multiprocessing as mp
import os

def info(titulo):
    print(titulo)
    print("Nombre del proceso:", __name__)
    print("Proceso padre:", os.getppid())
    print("Proceso actual:", os.getpid())
    
def f(nombre):
    info("Función f")
    print("Hola", nombre)
    print("----------")

    
info("Inicio")
p = mp.Process(target=f, args=("Valeriano",))
p.start()
p.join()

Inicio
Nombre del proceso: __main__
Proceso padre: 1586009
Proceso actual: 1588624
Función f
Nombre del proceso: __main__
Proceso padre: 1588624
Proceso actual: 1665113
Hola Valeriano
----------


## Ejercicio:

Crea tres procesos hijos, donde:
- El primero multiplique 3 números (a, b, c)
- El segundo sume (a, b, c)
- El tercero (a+b)/c
- Todos devolverán el nombre del proceso hijo, el id del proceso hijo, el id del proceso padre y el valor calculado.

In [16]:
import multiprocessing

Con el método `cpu_count()` se muestra el número de procesadores del sistema que se está utilizando.

In [17]:
multiprocessing.cpu_count()

4

El identificador del proceso actual se accede por medio de `current_process`:

```
print(multiprocessing.current_process().pid)

```


In [18]:
import time

def TareaHijo():
    print("Proceso HIJO con PID: {}".format(multiprocessing.current_process().pid))
    time.sleep(3)
    print("Fin del proceso hijo")

def main():
    print("Proceso Padre PID: {}".format(multiprocessing.current_process().pid))
    myProcess = multiprocessing.Process(target=TareaHijo)
    myProcess.start()
    myProcess.join()

# Se acostumbra usar la variable __name__
# para hacer la ejecución desde el progragrama
# principal, puede omitirse en los notebooks 
if __name__ == '__main__':
    main()

Proceso Padre PID: 2580461
Proceso HIJO con PID: 2598014
Fin del proceso hijo


Es posible asignar un nombre a un proceso hijo que ha sido creado, por medio del argumento `name` se asigna el nombre del proceso hijo.

In [19]:
def myProcess():   #CODIGO DEL HIJO
    print("Proceso con nombre: {}".format(multiprocessing.current_process().name))

def main():   #CODIGO DEL PADRE
    childProcess = multiprocessing.Process(target=myProcess, name='Proceso_LCD-CC')
    childProcess.start()
    childProcess.join()

main()

Proceso con nombre: Proceso_LCD-CC


In [25]:
from multiprocessing import Process, current_process
import time

def f1():
    pname = current_process().name
    print('Starting process %s...' % pname)  
    time.sleep(2)
    print('Exiting process %s...' % pname)

def f2():
    pname = current_process().name
    print('Starting process %s...' % pname)
    time.sleep(4)
    print('Exiting process %s...' % pname)
    
if __name__ == '__main__':
    p1 = Process(name='Worker 1', target=f1)
    p2 = Process(name='Worker 2', target=f2)
    p3 = Process(target=f1)
    p1.start()
    p2.start()
    p3.start()
    
    p1.join()
    p2.join()
    p3.join()

Starting process Worker 1...
Starting process Process-35...
Starting process Worker 2...
Exiting process Worker 1...
Exiting process Process-35...
Exiting process Worker 2...


Algunas veces es importante esperar los procesos que se están ejecutando hasta que terminen la tarea que están llevando a cabo. Por medio del método `join()`se implementa el mecanismo de espera has ta qu el proceso ha completado su tarea.

Un proceso que está en ejecución puede ser cancelado o interrumpido por medido de la función `terminate()`

In [26]:
def TareaProceso():
    proceso_actual = multiprocessing.current_process()
    print("Procesos Hijo PID: {}".format(proceso_actual.pid))
    time.sleep(20)
    proceso_actual = multiprocessing.current_process()
    print("Procesos Padre PID: {}".format(proceso_actual.pid))

# Codigo del padre
miProceso = multiprocessing.Process(target=TareaProceso)
miProceso.start()
#myProcess.join()

print("Proceso Padre ha terminado, termina el proceso main")
print("Terminando el proceso Hijo...")
miProceso.terminate()
print("Proceso Hijo ha terminado exitosamente")

Proceso Padre ha terminado, termina el proceso main
Terminando el proceso Hijo...
Proceso Hijo ha terminado exitosamente


### Ejercicio:
1. Vamos a crear 3 procesos los cuales tendrán nombre y código definido como `funP1`, `funP2`, `funP3`. Cada hijo escribirá su numbre, su `pid` y el `pid` del padre, además de hacer un cálculo sobre tres valores a, b y c. 
2. El proceso 1 calcula $a \times b + c$, el proceso 2 calcula $a \times b \times c$ y el proceso 3 calcula $(a \times b)/c$.
3. Crea un mecanismo para terminar alguno de los procesos de manera aleatoria.


##  Bloqueo Global (Global Interpreter Lock) 

El paquete `multiproccessing` permite generar procesos. La ejecució *concurrente* la lleva acabo el intérprete de Python (escrito en C `CPython`). El paquete ofrece concurrencia tanto local como remota, por medio del Interpretre de bloqueo global (GIL. Debido a esto, el módulo `multiprocessing` permite al programador aprovechar al máximo múltiples procesadores en una máquina determinada. La ejecución de multiples procesos creados con `multiprocessing` es posible tanto en Unix como en Windows.

Es el mecanismo utilizado por el intérprete CPython para garantizar que solo un subproceso ejecute el código de bytes (*bytecode*) de Python a la vez. Esto simplifica la implementación de CPython al hacer que el modelo de objetos esté implícitamente seguro contra el acceso concurrente.
El GIL funciona como un mecanismo de sincronización en la ejecución de procesos (e hilos) tal que únicamente un proceso (o hilo) puede ejecutarse a la vez, aún en un procesador multi-core.

<img src="gil.png">

## Esperar la ejecución de procesos

La clase `Process` del módulo `multiprocessing` provee el método `join()` como una forma de esperar que un proceso creado finalice su tarea y salga de su ejecución.

No obstante, a veces se requiere **crear procesos que corran en silencio (_background_) y no bloquear el proceso principal hasta que finalicen**. Esta especificación es comunmente utilizada cuando el proceso principal no tiene la certeza de interrumpir el proceso en un momento específico o cuando al finalizar el proceso principal no se tengan afectaciones por no terminar otro proceso.

Estos proceso se llaman **Proceso demonio** (_daemon processes)_. Por medio del atributo `daemon` del método `Process` se crea un proceso de este tipo. El valor por defecto del atributo `daemon` es `False`, se define a `True` para crear el proceso demonio.

In [4]:
from multiprocessing import Process, current_process
import time

def f1():
    p = current_process()
    print('Starting process %s, ID %s...' % (p.name, p.pid))
    time.sleep(4)
    print('Exiting process %s, ID %s...' % (p.name, p.pid))

def f2():
    p = current_process()
    print('Starting process %s, ID %s...' % (p.name, p.pid))
    time.sleep(2)
    print('Exiting process %s, ID %s...' % (p.name, p.pid))

if __name__ == '__main__':
    p1 = Process(name='Worker 1', target=f1)
    p1.daemon = True
    p2 = Process(name='Worker 2', target=f2)
    
    p1.start()
    time.sleep(1)
    p2.start()

Starting process Worker 1, ID 48064...
Starting process Worker 2, ID 48067...
Exiting process Worker 2, ID 48067...
Exiting process Worker 1, ID 48064...


El proceso `p1` es un proceso demonio y el proceso `p2` es un proceso convencional. La ejecución del primer proceso es más larga que la del segundo, por lo que es muy posible que el proceso `p2` finalice antes del proceso `p1`. Debido a que `p1` es un proceso demonio, el proceso principal deberá finalizar antes que el proceso `p1` termine.

Otra forma de usar los procesos demonios consiste en esperar su ejecución un periodo de tiempo específico antes de terminarlos prematuramente. Las combinación de los procesos demonio con el método `join()` permite determinar un tiempo específico de espera antes de la finalización de un proceso. Por medio del paso de un argumento (número de segundos de espera) al método `join()` de un proceso demonio, se crea una ventana de tiempo de ejecución del proceso después del cual se finalizará haya o no terminado su tarea. 

In [3]:
if __name__ == '__main__':
    p1 = Process(name='Worker 1', target=f1)
    p1.daemon = True
    p2 = Process(name='Worker 2', target=f2)
    
    p1.start()
    time.sleep(1)
    p2.start()
    
    p1.join(1)
    print('Whether Worker 1 is still alive:', p1.is_alive())
    p2.join()


Starting process Worker 1, ID 47908...
Starting process Worker 2, ID 47911...
Whether Worker 1 is still alive: True
Exiting process Worker 2, ID 47911...
Exiting process Worker 1, ID 47908...


Vemos que el proceso `p1` permanece en ejecución y continua vivo el tiempo que se ha indicado en proceso principal.

### Finalización de procesos

El método `terminate()` ofrece una forma rápida de terminar un proceso. Es importante mencionar que los procesos hijos que del proceso que termina no son finalizados al realizar la llamada a `terminate()`, estos procesos se denominarán **procesos huérfanos**.

Aunque finalizar un proceso puede ser mal visto, algunas veces es necesario si existe algún problema al realizar tareas de _comunicación interprocesos_.

Al utlizar `terminate()` es importante hacer la llamada a `join()` también debido a que no es inmediata la actualización del estado `alive` del proceso.

## Comunicación entre procesos

La principal forma de comunicación entre procesos se lleva acabo por medio de tuberías _pipe_ y colas _queue_.
entre diferentes procesos. Específicamente, brindan opciones de transmisión de mensajes para facilitar comunicación entre procesos: tuberías para conexiones entre dos procesos y
colas para múltiples productores y consumidores.

Veremos el uso de colas, específicamente la clase `Queue` de
el módulo `multiprocessing`. La implementación de la clase `Queue` es segura para subprocesos y procesos.

Se prefiere el uso de una cola de mensajes para la comunicación entre procesos en lugar de compartir recursos ya que si ciertos procesos manejan mal la memoria y la corrompen habrá numerosos elementos indeseables y consecuencias impredecibles.
Sin embargo, si un proceso no pudo manejar su mensaje correctamente, otros elementos de la cola permanecerán intactos. 

El siguiente diagrama representa el diferencias en la arquitectura entre el uso de una cola de mensajes y los recursos compartidos (específicamente memoria) para la comunicación entre procesos:

<img src="queue.png">

Para manejar el objeto `Queue` necesitamos usar dos métodos principales:
- `get()` regresa el siguiente item de la cola
- `put()` agrega un item a la cola

Un ejemplo de implementación de una cola es la siguiente:

In [30]:
from multiprocessing import Process, Queue

def worker(num, q):
    print("Se pone en la cola: ",  num * num)
    q.put(num*num)
    
if __name__ == '__main__':
    my_queue = Queue()
    p = Process(target=worker, args=(5, my_queue))
    p.start()
    p.join()
    
    print("Se lee de la cola: ", my_queue.get())

Se pone en la cola:  25
Se lee de la cola:  25


Otro ejemplo de implementación de una cola por medio de uso de clases es la siguiente:

In [33]:
import multiprocessing
    
class MyWorker():
    def __init__(self, x):
        self.x = x

    def process(self):
        pname = multiprocessing.current_process().name
        print('Starting process %s for number %i...' % (pname, self.x))

def work(q):
        worker = q.get()
        worker.process()

if __name__ == '__main__':
    my_queue = multiprocessing.Queue()
    p = multiprocessing.Process(target=work, args=(my_queue,))
    p.start()
    my_queue.put(MyWorker(5))
    my_queue.close()
    my_queue.join_thread()
    p.join()
    print('Done.')

Starting process Process-17 for number 5...
Done.


**Ejercicio:** Extiende el ejercicio para que dos procesos hijos lean el valor que el proceso padre pone en la cola.