# Multiprocessing

## Proceso como objeto

En el siguiente ejemplo se muestra cómo crear un proceso por medio de la definición de la clase `MyProcess` que hereda de la clase base `multiprocessing.Process`. La clase creada incluye un constructor y un método `run`.

In [2]:
import multiprocessing

class MyProcess(multiprocessing.Process):
    def __init__(self):
        super(MyProcess,self).__init__()
        
    def run(self):
        print("Child Process: {}".format(multiprocessing.current_process().pid))

def main():
    print("Main Process PID: {}".format(multiprocessing.current_process().pid))
    myProcess = MyProcess()
    myProcess.start()
    myProcess.join()
main()

Main Process PID: 437932
Child Process: 441664


## Comunicación entre procesos
Consideremos el siguiente ejemplo donde el proceso padre quiere consultar el resultado de una operación que ejecutó el proceso hijo:

In [3]:
import multiprocessing as mp
import time
import os

In [6]:
num_res = []


def calc_cuad(numeros):
    global num_res
    for n in numeros:
        print('cuadrado:', n**2)
        num_res.append(n*n)


nums = range(10)

t = time.time()
p1 = mp.Process(target=calc_cuad, args=(nums,))


p1.start()
p1.join()

print('Tiempo de ejecución:', time.time()-t)
print('Resultado del proceso:', num_res)
print('Finaliza ejecución')

cuadrado: 0
cuadrado: 1
cuadrado: 4
cuadrado: 9
cuadrado: 16
cuadrado: 25
cuadrado: 36
cuadrado: 49
cuadrado: 64
cuadrado: 81
Tiempo de ejecución: 0.09141945838928223
Resultado del proceso: []
Finaliza ejecución


Los procesos tienen su propio espacio de memoria. Así, las variables del programa no se comparten entre procesos. Es necesario crear comunicación entre procesos (ICP).


## Memoria compartida
Cuando se realiza una programación concurrente, generalmente es mejor evitar el uso de recursos compartidos en la medida de lo posible. Esto es particularmente cierto cuando se utilizan múltiples procesos. Sin embargo, si se necesita utilizar algunos datos compartidos, el multiprocesamiento proporciona un medio para hacerlo.



Los datos se pueden almacenar en un mapa de memoria compartida usando `Array` o `Value`.

In [7]:
# Sugar syntax
nums = [3,4,5]
for idx, i in enumerate(nums):
    print(idx, i)

0 3
1 4
2 5


In [8]:
nums = range(4)
for idx, i in enumerate(nums):
    print(idx, i)

0 0
1 1
2 2
3 3


Se pueden compartir datos del proceso hijo al proceso padre por medio del método `Array` de multiprocessing.

In [15]:
def calc_cuad(numeros,result):
    for idx, n in enumerate(numeros):
        result[idx] = n*n
    print('Resultado del proceso:', result[:])
        
nums = range(10)

t = time.time()
result = mp.Array('i',10) #(integers, 10 elementos)
print(list(result))
p1 = mp.Process(target=calc_cuad, args=(nums,result))

p1.start()
p1.join()

print('Resultado fuera del proceso:', result[:])

print('Tiempo de ejecución:', time.time()-t)
print('Finaliza ejecución')


[0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
Resultado del proceso: [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
Resultado fuera del proceso: [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
Tiempo de ejecución: 0.04576921463012695
Finaliza ejecución


Tambien es podible compartir valores unitarios del proceso hijo al proceso padre por medio del método `Value`.

In [18]:
def calc_cuad(numeros,result,val):
    
    val.value = 5.35
    for idx, n in enumerate(numeros):
        result[idx] = n*n
    print('Resultado del proceso:', result[:])
        
nums = range(10)

t = time.time()
result = mp.Array('i',10) #(integers, 10 elementos)
val = mp.Value('d',0.0) #(float, vale 0)

print(list(result))
p1 = mp.Process(target=calc_cuad, args=(nums,result,val))

p1.start()
p1.join()

print('Resultado fuera del proceso:', result[:])
print('Resultado fuera del proceso:', val.value)
print('Tiempo de ejecución:', time.time()-t)
print('Finaliza ejecución')

[0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
Resultado del proceso: [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
Resultado fuera del proceso: [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
Resultado fuera del proceso: 5.35
Tiempo de ejecución: 0.042021751403808594
Finaliza ejecución



## Pipes

In [22]:
from multiprocessing import Pipe, Process

def f(conn):
    conn.send(['Hello world'])
    conn.close()
    
if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p = Process(target=f, args=(child_conn,))
    p.start()
    print(parent_conn.recv())
    p.join()

['Hello world']


`Pipe()` devuelve dos objetos de conexión que representan los dos extremos de la tubería. Cada objeto de conexión tiene métodos `send()` y `recv()`. Aquí creamos un proceso que imprime `Hola mundo` de cadena y entonces comparte los datos.

In [23]:
from multiprocessing import Pipe, Process

def worker(conn):
    print(conn.recv())
    conn.send('sent from child process')
    conn.close()
    
conn1, conn2 = Pipe()
process = Process(target=worker, args=(conn2,))
process.start()

conn1.send('sent from main process')
print(conn1.recv())
process.join()


sent from main process
sent from child process


Ejercicio: Extiende el código para que dos procesos escriban al padre cada uno, una cadena de caracteres.

In [43]:
def worker(conn):
    print(conn.recv())
    pname = multiprocessing.current_process().name
    conn.send('sent from child process {}'.format(pname))
    conn.close()

    
conn1, conn2 = Pipe()
conn1.send('sent from main process')

process = Process(target=worker, args=(conn2,),name= 'Worker 1')
process.start()    
process.join()
conn1.send('Terminó el proceso 1')

process2 = Process(target=worker, args=(conn2,),name= 'Worker 2')
process2.start()    
process2.join()

print(conn1.recv())
print(conn1.recv())


sent from main process
Terminó el proceso 1
sent from child process Worker 1
sent from child process Worker 2


In [41]:
from multiprocessing import Process, Pipe

nombres =['Carlos', 'Renata', 'Rebeca', 'Sandra', 'END']

def send_msg(conn, msgs):
    for msg in msgs:
        time.sleep(1)
        print(f'mandando {msg}')
        
        conn.send(msg)
    conn.close()

def recv_msgs(conn):
    while 1:
        msg = conn.recv()
        #time.sleep(1)
        if msg =='END':
            break
        print(msg)
        
parent_conn, child_conn = multiprocessing.Pipe()


p1 = Process(target=send_msg, args=(parent_conn, nombres))
p2 = Process(target = recv_msgs, args=(child_conn,))

p1.start()
p2.start()


p1.join()
p2.join()



mandando Carlos
Carlos
mandando Renata
Renata
mandando Rebeca
Rebeca
mandando Sandra
Sandra
mandando END


In [36]:
from multiprocessing import Process, Pipe
def funcion1(conn):
    conn.send(['hello'])
    conn.close
def funcion2(conn):
    conn.send(['world'])
    conn.close
if __name__ =='__main__':
    parent_conn, child_conn =Pipe()
    p1=Process(target=funcion1,args=(child_conn,))
    p2=Process(target=funcion2,args=(child_conn,))
    p1.start()
    p2.start()
    print(parent_conn.recv())
    print(parent_conn.recv())
    p1.join()
    p2.join()

['hello']
['world']


## Queue

In [46]:
from multiprocessing import Process, Queue

def f(q):
    q.put([42,None,'hello'])
    q.put(['bye'])
    
if __name__ == '__main__':
    q = Queue()
    p = Process(target=f, args=(q,))
    p.start()
    print(q.get())
    p.join()
    print(q.get())

[42, None, 'hello']
['bye']


In [None]:
def calc_cuad(numeros, q):
    for n in numeros:
        q.put(n*n)
        
if __name__ == '__main__':
    
    nums = range(10)
    q = Queue()
    p = Process(target=calc_cuad, args=(nums,q))
    
    p.start()
    p.join()

In [None]:
import os, sys
import multiprocessing

class ChildProcess(multiprocessing.Process):
    def __init__(self, pipein):
        super(ChildProcess, self).__init__()
        self.pipein=pipein
        
    def run(self):
        print('Intentando iniciar el pipe')
        self.pipein = os.fdopen(self.pipein,'w')
        self.pipein.write('Mi código es MX-317')
        self.pipein.close()
        
def main():
    pipeout,pipein = os.pipe()
    
    child= ChildProcess(pipein)
    child.start()
    child.join()
    
    os.close(pipein)
    pipeout = os.fdopen(pipeout)
    
    pipeContent = pipeout.read()
    print('Pipe: {}'.format(pipeContent))

if __name__ == '__main__':
    main()

## Bloqueo

Existen recursos los cuales no se deben acceder al mismo tiempo por dos procesos o más; por lo que es necesario proteger o bloquear el acceso a estos recursos compartidos: memoria, archivos o base de datos.

In [18]:
import time
import multiprocessing


def deposit(balance):
    for i in range(10000):
        time.sleep(0.0001)
        balance.value = balance.value + 1


def withdraw(balance):
    for i in range(10000):
        time.sleep(0.0001)
        balance.value = balance.value - 1


if __name__ == '__main__':
    balance = multiprocessing.Value('i', 200)
    d = multiprocessing.Process(target=deposit, args=(balance,))
    w = multiprocessing.Process(target=withdraw, args=(balance,))
    d.start()
    w.start()
    d.join()
    w.join()
    print(balance.value)

219


In [23]:
import time
import multiprocessing


def deposit(balance, lock):
    for i in range(10000):
        time.sleep(0.00001)
        lock.acquire()
        balance.value = balance.value + 1
        lock.release()


def withdraw(balance, lock):
    for i in range(10000):
        time.sleep(0.00001)
        lock.acquire()
        balance.value = balance.value - 1
        lock.release()


if __name__ == '__main__':
    balance = multiprocessing.Value('i', 200)
    lock = multiprocessing.Lock()
    d = multiprocessing.Process(target=deposit, args=(balance,lock))
    w = multiprocessing.Process(target=withdraw, args=(balance,lock))
    d.start()
    w.start()
    d.join()
    w.join()
    print(balance.value)

200
