In [1]:
import time
import math
from concurrent.futures import ProcessPoolExecutor

# Contar números primos en un rango

## Definir una función que nos diga si un número es primo

Toma como entrada un número

Trata de dividir ese número entre todos los que haya entre 2 y su raiz cuadrada

Si es divisible entre alguno de ellos, no es primo

Si no es divisible por ningún número, entonces es primo



In [2]:
def es_primo(num: int) -> int:
    if num < 2:
        return False
    for i in range(2, int(math.sqrt(num))+1):
        if num % i == 0:
            return False
    return True


In [3]:
es_primo(12)

False

In [4]:
es_primo(11)

True

## Solución iterativa (un solo núcleo)

Creamos una función contar primos que cuenta el número de números primos dentro de un rango

Toma como entrada "inicio" y "fin"

Devuelve el número de primos en ese rango

In [5]:
def contar_primos(inicio: int, fin: int) -> int:
    res = 0
    for i in range(inicio, fin+1):
        if es_primo(i):
            res += 1
    return res

In [6]:
contar_primos(1, 10)

4

In [7]:
contar_primos(2,11)

5

## Medir los tiempos

In [8]:
def test(num_inicio, num_fin):
    t_inicio = time.time()
    total_primos = contar_primos(num_inicio, num_fin)
    t_fin = time.time()
    print(f"Hay {total_primos} numeros primos entre {num_inicio} y {num_fin}")
    print(f"He tardado {t_fin-t_inicio} segundos en calcularlo")

In [9]:
test(2,1000)

Hay 168 numeros primos entre 2 y 1000
He tardado 0.000652313232421875 segundos en calcularlo


In [12]:
test(2,4000000)

Hay 283146 numeros primos entre 2 y 4000000
He tardado 14.761511087417603 segundos en calcularlo


# Mismo ejemplo, pero de manera concurrente

Librería a utilizar: _concurrent_

Más concretamente la clase _ProcessPoolExecutor_ dentro de _concurrent.futures_

ProcessPoolExecutor crea un conjunto de procesos que ejecuta el mismo método en cada uno de ellos tomando como entrada el método a ejecutar y los argumentos.

### Ejemplo de uso del pool process executor

In [13]:
with ProcessPoolExecutor(max_workers=4) as pool_executor:
    temp = pool_executor.map(sum, [(1, 2),(3, 4),(5, 6),(7, 8)])

for t in temp:
    print(t)

3
7
11
15


## Transformando la función para hacerla concurrente

Necesito separar mi intervalo inicial en rangos

In [14]:
def separar_en_rangos(inicio: int, fin: int, rangos: int) -> [int]:
    """
    Distribuir más o menos equitativamente los números en rangos
    Ejemplo: inicio=10, fin=30, rangos=2
        resultado: [(10,19), (20, 30)]
    """
    rango_total = fin-inicio
    rango_size = rango_total // rangos
    res = []
    # Voy a ir metiendo en res cada uno de los rangos
    for i in range(rangos):
        res.append((
            inicio+i*rango_size,
            inicio+(i+1)*rango_size-1
        ))
        
    # Me falta incluir el último valor en el último rango
    res[-1] = (res[-1][0], res[-1][1]+1)
    return res

In [15]:
# Una prueba
separar_en_rangos(10,30,2)

[(10, 19), (20, 30)]

### Contar primos que admita una tupla como entrada

Ahora tengo que tener en cuenta que process executor requiere una tupla como entrada.

In [16]:
def contar_primos_rango(rango: (int,int)) -> int:
    """
    Devuelve el número de primos entre un rango dado como (inicio, fin)
    """
    inicio = rango[0]
    fin = rango[1]
    return contar_primos(inicio, fin)

In [17]:
# Probamos esta funcion
rango_temp = (1, 12)
contar_primos_rango(rango_temp)

5

### Creamos la función para contar números primos concurrentemente

Para ello hay que utilizar el ProcessPoolExecutor para ejecutar la función "contar_primos_rango" en cada uno de los rangos

In [18]:
def contar_primos_concurrente(inicio: int, fin: int, hilos: int=4) -> int:
    """
    """
    # Dividir la tarea en subtareas
    # Es decir, en distintos rangos
    rangos = separar_en_rangos(inicio, fin, hilos)

    # Crear el pool de procesos y ejecutarlos
    with ProcessPoolExecutor(max_workers=hilos) as pool_executor:
        res_procesos = pool_executor.map(contar_primos_rango, rangos)
    return sum(res_procesos)

In [19]:
separar_en_rangos(2, 12, 2)

[(2, 6), (7, 12)]

In [20]:
contar_primos_concurrente(2,12,hilos=2)

5

### Medir tiempos y comparar con el approach iterativo

In [21]:
def test_concurrente(inicio, fin, hilos):
    t_inicio = time.time()
    total_primos = contar_primos_concurrente(inicio, fin, hilos)
    t_fin = time.time()

    print(f"Hay {total_primos} entre {inicio} y {fin}")
    print(f"He tardado {t_fin-t_inicio} segundos en calcularlo con {hilos} hilos.")

In [22]:
test_concurrente(2,12, 4)

Hay 4 entre 2 y 12
He tardado 0.016586780548095703 segundos en calcularlo con 4 hilos.


In [27]:
test(2, 2000000)
test_concurrente(2,2000000, hilos=4)

Hay 148933 numeros primos entre 2 y 2000000
He tardado 5.497720718383789 segundos en calcularlo
Hay 148933 entre 2 y 2000000
He tardado 2.6217010021209717 segundos en calcularlo con 4 hilos.


In [29]:
inicio = 2
fin = 4000000
test_concurrente(inicio, fin, hilos=1)
test_concurrente(inicio, fin, hilos=2)
test_concurrente(inicio, fin, hilos=4)
test_concurrente(inicio, fin, hilos=8)
test_concurrente(inicio, fin, hilos=16)
test_concurrente(inicio, fin, hilos=32)
test_concurrente(inicio, fin, hilos=256)

Hay 283146 entre 2 y 4000000
He tardado 16.6339910030365 segundos en calcularlo con 1 hilos.
Hay 283146 entre 2 y 4000000
He tardado 11.133265256881714 segundos en calcularlo con 2 hilos.
Hay 283146 entre 2 y 4000000
He tardado 7.057806968688965 segundos en calcularlo con 4 hilos.
Hay 283146 entre 2 y 4000000
He tardado 5.590848207473755 segundos en calcularlo con 8 hilos.
Hay 283146 entre 2 y 4000000
He tardado 6.49009895324707 segundos en calcularlo con 16 hilos.
Hay 283145 entre 2 y 4000000
He tardado 6.352636337280273 segundos en calcularlo con 32 hilos.
Hay 283131 entre 2 y 4000000
He tardado 6.894747734069824 segundos en calcularlo con 256 hilos.


## El problema de las Race Conditions

¿Qué pasas cuando varios procesos deben acceder al mismo recurso?

Utilizando un 



In [31]:
import os
import random

In [32]:
COUNTER_FILE = 'mi_contador.txt'

Definimos funciones para inicializar, leer y escribir el valor en el archivo.

Definimos una función para incrementar el contador que haga uso de las 3 anteriores, con un pequeño retardo aleatorio para simular una operación computacionalmente costosa.

In [33]:
def init_counter():
    """
    Inicializa el valor del archivo a cero
    """
    with open(COUNTER_FILE, "w") as f:
        f.write("0")

def read_counter():
    """
    Lee el valor del contador desde el archivo.
    """
    with open(COUNTER_FILE, "r") as f:
        return int(f.read().strip())

def write_counter(value):
    """
    Escribe un nuevo valor en el archivo del contador.
    """
    with open(COUNTER_FILE, "w") as f:
        f.write(str(value))

def increment_counter():
    """
    Incrementa el contador (función computacionalmente costosa).
    """
    current_value = read_counter()
    time.sleep(random.uniform(0.01, 0.1))  # Simula una operación lenta
    new_value = current_value + 1
    write_counter(new_value)
    print(f"Proceso {os.getpid()} incrementó el contador a {new_value}")

### ¿Qué pasa si ejecutamos el increment_counter en varios hilos simultáneamente?

In [36]:
init_counter()
hilos = 10  # Número de procesos en paralelo

with ProcessPoolExecutor(max_workers=hilos) as executor:
    futures = [executor.submit(increment_counter) for _ in range(hilos)]

# Esperamos que todos los hilos terminen
for future in futures:
    future.result()

# Mostramos el valor final del contador
final_value = read_counter()
print(f"\nValor final esperado: {hilos}")
print(f"Valor final real: {final_value}")

Proceso 3038 incrementó el contador a 1Proceso 3039 incrementó el contador a 1

Proceso 3033 incrementó el contador a 1Proceso 3035 incrementó el contador a 1

Proceso 3034 incrementó el contador a 1
Proceso 3037 incrementó el contador a 1
Proceso 3041 incrementó el contador a 1
Proceso 3032 incrementó el contador a 1
Proceso 3036 incrementó el contador a 1
Proceso 3040 incrementó el contador a 1

Valor final esperado: 10
Valor final real: 1


### ¿Cómo soluciono esto?

La solución es bloquear el acceso al archivo mientras estoy haciendo la operación de incrementar

Para ello, un hilo puede crear un lock y mientras el lock exista, nadie más debe tocar el archivo de "mi_contador". Cuando termine de hacer la operación crítica con mi recurso compartido (lectura/escritura) elimino el "lock" para que otro proceso pueda acceder a los recursos si quiere.

Esto se puede hacer con un archivo. Cuando tenga que incrementar el archivo:

* Si hay un archivo "lock.txt", me espero a que no lo haya.
* Si no hay un archivo "lock.txt", lo creo.
    * Hago mis opreciones con el recurso compartido.
* Elimino el archivo "lock.txt" para que otro proceso pueda acceder al recurso compartido.

De esta forma solo el proceso que haya creado el lock podrá editar el archivo de "mi_contador".

In [37]:
LOCK_FILE = "lock.txt"

Hay que crear tres nuevos métodos:

* Adquirir el lock
* Liberar el lock
* Incrementar (utilizando el lock para esperar si no puedo)

In [38]:
def acquire_lock():
    """
    Crea un archivo de Lock si no existe. Si existe, espera hasta que se libere.
    El proceso que consiguió crear el archivo, seguirá
    Los demás se quedarán esperando
    """
    while True:
        try:
            # Intentamos crear el archivo en modo exclusivo
            with open(LOCK_FILE, "x"):
                return  # Lock adquirido, salir del bucle
        except FileExistsError:
            time.sleep(0.01)  # Si ya existe, esperar un poco y reintentar


In [39]:
def release_lock():
    """Elimina el archivo de Lock, permitiendo que otros procesos accedan."""
    try:
        os.remove(LOCK_FILE)
    except FileNotFoundError:
        pass  # Si el archivo ya fue eliminado, ignorar

In [40]:
def increment_counter_with_lock():
    """Incrementa el contador solo si es el dueño del lock."""
    print(f"Inicio del hilo {os.getpid()}")
    acquire_lock()  # Si puedo adquirir el lock, sigo. Si no, me quedo esperando a que me toque.
    try:
        current_value = read_counter()
        time.sleep(random.uniform(0.01, 0.1))  # Simula una operación lenta
        new_value = current_value + 1
        write_counter(new_value)
        print(f"Proceso {os.getpid()} incrementó el contador a {new_value}")
    finally:
        release_lock()  # 🔓 Libera el Lock

In [41]:
# Mismo proceso de antes
init_counter()
hilos = 10  # Número de procesos en paralelo

with ProcessPoolExecutor(max_workers=hilos) as executor:
    futures = [executor.submit(increment_counter_with_lock) for _ in range(hilos)]

# Esperamos que todos los hilos terminen
for future in futures:
    future.result()

# Mostramos el valor final del contador
final_value = read_counter()
print(f"\nValor final esperado: {hilos}")
print(f"Valor final real: {final_value}")

Inicio del hilo 3136Inicio del hilo 3140Inicio del hilo 3142Inicio del hilo 3141Inicio del hilo 3139Inicio del hilo 3137Inicio del hilo 3138

Inicio del hilo 3145
Inicio del hilo 3144Inicio del hilo 3143






Proceso 3136 incrementó el contador a 1
Proceso 3138 incrementó el contador a 2
Proceso 3145 incrementó el contador a 3
Proceso 3137 incrementó el contador a 4
Proceso 3140 incrementó el contador a 5
Proceso 3139 incrementó el contador a 6
Proceso 3141 incrementó el contador a 7
Proceso 3142 incrementó el contador a 8
Proceso 3144 incrementó el contador a 9
Proceso 3143 incrementó el contador a 10

Valor final esperado: 10
Valor final real: 10


### Implementación paralela del MergeSort

Implementar la ordenación por mergesort de una lista aprovechando el paralelismo.

¿Cómo dividiríamos el problema para poder atajarlo de manera paralela?

Para ello debemos:
* Separar en varias sub-listas la lista inicial.
* Lanzar la ordenación de cada una de estas sub-listas de manera palalela.
* Una vez tenemos las sub-listas ordenadas, juntarlas en una sola.

In [42]:
import random

def n_aleatorios(n: int) -> [int]:
    """
    Devuelve una lista de n enteros aleatorios
    """
    random_list = [random.randint(0, 10**6) for _ in range(n)]
    return random_list

In [43]:
n_aleatorios(5)

[556699, 679124, 977373, 773182, 810801]

In [44]:
def merge(left, right):
    """Función para fusionar dos listas ordenadas."""
    result = []
    i = j = 0

    while i < len(left) and j < len(right):
        if left[i] < right[j]:
            result.append(left[i])
            i += 1
        else:
            result.append(right[j])
            j += 1

    result.extend(left[i:])
    result.extend(right[j:])
    return result

def merge_sort(arr):
    """Implementación normal de Merge Sort."""
    if len(arr) <= 1:
        return arr

    mid = len(arr) // 2
    left = merge_sort(arr[:mid])
    right = merge_sort(arr[mid:])

    return merge(left, right)

In [45]:
listarandom = n_aleatorios(12)
print(listarandom)
print(merge_sort(listarandom))

[637745, 227603, 142316, 147441, 311363, 985737, 572384, 533266, 333703, 79339, 518169, 319170]
[79339, 142316, 147441, 227603, 311363, 319170, 333703, 518169, 533266, 572384, 637745, 985737]


### Mergesort en paralelo

In [46]:
def mergesort_paralelo(milista, hilos=8):
    """
    Lanzar varios mergesort con las sublistas
    y luego juntarlo todo en una sola
    """
    if len(milista) <= 1:
        return milista

    # Dividir la lista en partes iguales
    chunk_size = len(milista) // hilos
    sublists = [milista[i * chunk_size:(i + 1) * chunk_size] for i in range(hilos)]

    # El último trozo puede ser más grande si no es exactamente divisible
    sublists[-1].extend(milista[hilos * chunk_size:])

    # Lanzar n mergesorts
    with ProcessPoolExecutor(max_workers=hilos) as executor:
        sorted_sublists = list(executor.map(merge_sort, sublists))

    # Mezclar los resultados en serie
    while len(sorted_sublists) > 1:
        new_sorted_sublists = []
        for i in range(0, len(sorted_sublists), 2):
            if i + 1 < len(sorted_sublists):
                new_sorted_sublists.append(merge(sorted_sublists[i], sorted_sublists[i + 1]))
            else:
                new_sorted_sublists.append(sorted_sublists[i])
        sorted_sublists = new_sorted_sublists

    return sorted_sublists[0]

In [47]:
print(mergesort_paralelo(listarandom, hilos=4))

[79339, 142316, 147441, 227603, 311363, 319170, 333703, 518169, 533266, 572384, 637745, 985737]


In [48]:
def test_merge(listarandom):
    t_inicio = time.time()
    ordenada = merge_sort(listarandom)
    t_fin = time.time()
    
    print(f"Ordenado {len(ordenada)} elementos en {t_fin-t_inicio} segundos.")

In [50]:
def test_merge_paralelo(listarandom, hilos=8):
    t_inicio = time.time()
    ordenada = mergesort_paralelo(listarandom, hilos)
    t_fin = time.time()
    
    print(f"Ordenado {len(ordenada)} elementos en {t_fin-t_inicio} segundos con {hilos} hilos.")

In [51]:
listarandom2 = n_aleatorios(10000000)

In [52]:
len(listarandom2)

10000000

In [53]:
test_merge(listarandom)
test_merge_paralelo(listarandom)

Ordenado 12 elementos en 1.52587890625e-05 segundos.
Ordenado 12 elementos en 0.05152297019958496 segundos con 8 hilos.


In [54]:
1.52/0.05

30.4