# 02d Paralelismo

El paralelismo es la capacidad de ejecutar varias tareas simultáneamente, aprovechando que la mayoría de ordenadores actuales tienen una arquitectura de varios núcleos. Hay dos modelos principales de paralelismo:

* **Paralelismo por procesos:** Hay varias copias del mismo programa (proceso) ejecutándose a la vez, de modo que si abres el Administrador de tareas, verás que `Python` aparece varias veces.
* **Paralelismo por "hilos" (threads):** Hay un único proceso, que tiene varios conjuntos de código (threads) ejecutándose a la vez.

En general, el paralelismo por procesos es mejor para situaciones que requieren mucho uso de CPU, como cálculos matemáticos, mientras que el paralelismo por threads es mejor cuando la ejecución está limitada por operaciones de lectura y escritura. Además, la implementación estándar de Python, CPython, limita el número de threads ejecutándose simultáneamente a 1 (GIL), por lo que el paralelismo por threads no ofrece absolutamente ninguna ventaja en tareas de CPU.

In [1]:
# Código para importar Temporizador desde utils.py, no es importante

import os, sys
dir2 = os.path.abspath('')
dir1 = os.path.dirname(dir2)
if not dir1 in sys.path: sys.path.append(dir1)
from utils import Temporizador

## Paralelismo por hilos

El paralelismo por hilos está implementado por el módulo `threading` de la librería estándar. Veremos solamente los conceptos más básicos, ya que en general nos interesarán más las tareas limitadas por CPU.

In [2]:
import threading 
from time import sleep

def f(x):
    sleep(5)
    print(x)

t1 = threading.Thread(target=f, args=("Thread 1\n",))
t2 = threading.Thread(target=f, args=("Thread 2\n",))

with Temporizador() as temp:
    t1.start()
    t2.start()
    t1.join()
    t2.join()

print(f"Tiempo total: {temp.ver_tiempo():.4f} s")

Thread 1

Thread 2

Tiempo total: 5.0057 s


Cada thread se crea como un objeto `threading.Thread`, donde `target` indica la función que se va a ejecutar, y `args` es una tupla con sus argumentos. El método `start()` inicia la ejecución del thread, y `join()` espera a que acabe. En este caso, al no tratarse de una tarea que esté limitada por CPU, el tiempo de ejecución se ha visto reducido.

En cambio, veamos qué ocurre con un cálculo matemático, calculando el factorial de los números entre 1000 y 2000:

In [11]:
def factorial(x):
    f = 1
    i = x
    while i > 0:
        f *= i
        i -=1
    return f

def tarea(inicio, fin, lista):
    for x in range(inicio, fin):
        lista[x-1000] = factorial(x)

In [14]:
resultado1 = [0,]*1000

with Temporizador() as temp:
    tarea(1000, 2000, resultado1)

print(f"Tiempo en un único thread: {temp.ver_tiempo():.4f} s")

Tiempo en un único thread: 0.3721 s


In [15]:
resultado2 = [0,]*1000
t1 = threading.Thread(target=tarea, args=(1000, 1500, resultado2))
t2 = threading.Thread(target=tarea, args=(1500, 2000, resultado2))

with Temporizador() as temp:
    t1.start()
    t2.start()
    t1.join()
    t2.join()

print(f"Tiempo en dos threads: {temp.ver_tiempo():.4f} s")

Tiempo en dos threads: 0.4132 s


In [34]:
for i in range(1000):
    if resultado1[i] != resultado2[i]:
        print(f"Error en el elemento {i}")
        break

El tiempo de ejecución es ligeramente mayor usando dos threads que uno solo! Además, no es posible obtener el valor de un `return`, por eso hemos tenido que pasar la lista "por referencia" para almacenar los valores. En este caso sencillo, cada thread escribía en elementos distintos, y no había posibilidad de que intentarán competir entre ellos. Pero en general, esto es una posibilidad que se puede evitar si uno de los threads bloquea temporalmente (lock) la ejecución del resto cuando tiene que usar un recurso compartido.

Finalmente, veamos que ambos threads comparten el mismo proceso. Una forma de hacerlo es abriendo el administrador de tareas, pero también podemos hacerlo desde python: el comando `os.getpid()` devuelve un identificador que es único para cada proceso:

In [11]:
def pid(x):
    sleep(3)
    print(f"Thread {x}, PID: {os.getpid()}\n")

t1 = threading.Thread(target=pid, args=(1,))
t2 = threading.Thread(target=pid, args=(2,))

t1.start()
t2.start()
t1.join()
t2.join()

print(f"Proceso general, PID: {os.getpid()}")

Thread 1, PID: 42927

Thread 2, PID: 42927

Proceso general, PID: 42927


## Paralelismo por procesos

El paralelismo por procesos se realiza mediante el módulo `multiprocessing` de la librería estándar. El funcionamiento básico es muy similar a `threading`:

In [3]:
import multiprocessing
from time import sleep

def f(x):
    sleep(5)
    print(x)

p1 = multiprocessing.Process(target=f, args=("Process 1",))
p2 = multiprocessing.Process(target=f, args=("Process 2",))

with Temporizador() as temp:
    p1.start()
    p2.start()
    p1.join()
    p2.join()

print(f"Tiempo total: {temp.ver_tiempo():.4f} s")

Process 1
Process 2
Tiempo total: 5.0129 s


Pero en esta ocasión sí que obtenemos un (pequeño) beneficio al ejecutar en paralelo una tarea que requiera cálculos:

In [22]:
def factorial(x):
    f = 1
    i = x
    while i > 0:
        f *= i
        i -=1
    return f

def tarea(inicio, fin):
    lista = [factorial(x) for x in range(inicio, fin)]

Hay que notar que los procesos son programas completamente independientes entre sí, por lo que no es posible que compartan memoria, es decir, no podemos pasarles una lista para que la escriban. Más adelante veremos cómo devolver valores.

In [23]:
with Temporizador() as temp:
    tarea(1000, 2000)

print(f"Tiempo en un único thread: {temp.ver_tiempo():.4f} s")

Tiempo en un único thread: 0.4096 s


In [24]:
t1 = multiprocessing.Process(target=tarea, args=(1000, 1500))
t2 = multiprocessing.Process(target=tarea, args=(1500, 2000))

with Temporizador() as temp:
    t1.start()
    t2.start()
    t1.join()
    t2.join()

print(f"Tiempo en dos procesos: {temp.ver_tiempo():.4f} s")

Tiempo en dos procesos: 0.2653 s


Cada proceso tiene su identificador propio, y distinto del proceso general:

In [9]:
def pid(x):
    sleep(3)
    print(f"Proceso {x}, PID: {os.getpid()}\n")

t1 = multiprocessing.Process(target=pid, args=(1,))
t2 = multiprocessing.Process(target=pid, args=(2,))

t1.start()
t2.start()
t1.join()
t2.join()

print(f"Proceso general, PID: {os.getpid()}")

Proceso 1, PID: 50339

Proceso 2, PID: 50340

Proceso general, PID: 42927


### Pools

En vez de gestionar los procesos individualmente, `multiprocessing` permite crear un "fondo común" de procesos a los que ir asignando trabajos, mediante el objeto `Pool`. Los trabajos se pueden asignar mediante las versiones paralelizadas de `map` o `starmap`. Funcionan de un modo similar a sus versiones no-paralelizadas, excepto que aquí se devuelve la lista calculada en vez de un itaredor. Esta es la forma más cómoda de paralelizar un cálculo en el que los pasos sean independientes entre sí:

In [15]:
with Temporizador() as temp:
    with multiprocessing.Pool(processes=2) as pool:
        resultado3 = pool.map(factorial, range(1000, 2000))

print(f"Tiempo en dos procesos: {temp.ver_tiempo():.4f} s")

Tiempo en dos procesos: 0.2990 s


El objeto `Pool` debe ser cerrado cuando acabemos de usarlo para poder devolver el control al proceso general. Para asegurarnos de que se cierra aunque se produzca una excepción, usamos un gestor de contexto.

Esta forma suele ser más conveniente que `start()`-`join()`, ya que no hay que especificar cómo dividir los argumentos entre procesos, y porque la manera de guardar los resultados es más natural.

En el ejemplo anterior hemos creado un pool de 2 procesos, pero podemos usar tantos procesos como queramos. Lo óptimo es usar, como mucho, un número de procesos igual al número de núcleos del ordenador. Si se solicitan más procesos, algunos de ellos tendrán que esperar a que otros terminen, por lo que no se produce ninguna ganancia en el tiempo de ejecución. La función `cpu_count()` devuelve el número de núcleos:

In [17]:
procesos = multiprocessing.cpu_count()

with Temporizador() as temp:
    with multiprocessing.Pool(processes=procesos) as pool:
        resultado3 = pool.map(factorial, range(1000, 2000))

print(f"Tiempo en {procesos} procesos: {temp.ver_tiempo():.4f} s")

Tiempo en 12 procesos: 0.1521 s


In [18]:
procesos = 2*multiprocessing.cpu_count()

with Temporizador() as temp:
    with multiprocessing.Pool(processes=procesos) as pool:
        resultado3 = pool.map(factorial, range(1000, 2000))

print(f"Tiempo en {procesos} procesos: {temp.ver_tiempo():.4f} s")

Tiempo en 24 procesos: 0.2661 s


También podemos asignar tareas individuales para que sean realizadas por uno de los procesos, mediante `apply`. La ejecución del resto de procesos se bloquea temporalmente hasta que termine:

In [38]:
def f(x):
    sleep(1.5)
    print(x)

with Temporizador() as temp:
    with multiprocessing.Pool(processes=2) as pool:
        pool.apply(f, ("Tarea 1",))
        pool.apply(f, ("Tarea 2",))

print(f"Tiempo en dos procesos: {temp.ver_tiempo():.4f} s")

Tarea 1
Tarea 2
Tiempo en dos procesos: 3.0265 s


El método `apply_async()` aplica la función pero de modo asíncrono, como una tarea de fondo, sin bloquear el resto de procesos:

In [39]:
with Temporizador() as temp:
    with multiprocessing.Pool(processes=2) as pool:
        pool.apply_async(f, ("Tarea 1",))
        pool.apply(f, ("Tarea 2",))

print(f"Tiempo en dos procesos: {temp.ver_tiempo():.4f} s")

Tarea 2Tarea 1

Tiempo en dos procesos: 1.5279 s


El hecho de que `apply_async()` no bloquee también tiene un inconveniente: el objeto `Pool` se puede cerrar sin esperar a que el proceso termine, con lo cual no obtenemos el resultado de la función:

In [9]:
def f(x):
    sleep(0.2)
    print(f"{x}, {os.getpid()}\n")

with multiprocessing.Pool(2) as pool:
    pool.apply_async(f, (1,))
    pool.apply_async(f, (2,))

Una posibilidad es usar al final del bloque un trabajo que sí bloquee (`apply`, `map`, `starmap`), y otra posibilidad es simplemente esperar:

In [28]:
with multiprocessing.Pool(2) as pool:
    pool.apply_async(f, (1,))
    pool.apply_async(f, (2,))
    sleep(0.3)

2, 34882

1, 34881



Los procesos asíncronos además tienen un método que bloquea la ejecución hasta que terminen, convirtiéndolas en procesos síncronos

In [10]:
with multiprocessing.Pool(2) as pool:
    a1 = pool.apply_async(f, (1,))
    a2 = pool.apply_async(f, (2,))
    a1.wait()
    a2.wait()

2, 7204
1, 7203




Si una función devuelve resultados, el valor devuelto por `apply_async()` no es el resultado directamente, sino un objeto de tipo `ApplyResult`. para recuperar el resultado, hay que usar el método `.get()`:

In [6]:
def f(x):
    sleep(0.2)
    return x**2

with multiprocessing.Pool(2) as pool:
    v = pool.apply_async(f, (5,))
    sleep(0.4)

print(v)
print(v.get())


<multiprocessing.pool.ApplyResult object at 0x7f60b77421a0>
25


Usar `.get()` dentro del `Pool` es una acción que bloquea, por lo que el programa esperará a que termine el cálculo:

In [8]:
with multiprocessing.Pool(2) as pool:
    v = pool.apply_async(f, (5,))
    v_res = v.get()

print(v_res)

25


Sin embargo, si el `Pool` se ha cerrado antes de finalizar el proceso asíncrono, al hacer `.get()`intentará esperar hasta que acabe el proceso, generando un bucle infinito:

In [7]:
with multiprocessing.Pool(2) as pool:
    v = pool.apply_async(f, (5,))

print(v)
print(v.get())

<multiprocessing.pool.ApplyResult object at 0x7f60b7742ef0>


KeyboardInterrupt: 