# Paralelismo y Concurrencia

### Cores vs Threads
---

In [1]:
import multiprocessing as mp

cores = mp.cpu_count()
print(f'Cores disponibles: {cores}')

Cores disponibles: 10


In [2]:
# Instalar librería ('!' hace que la celda ejecute bash)
# !pip install psutil

In [3]:
!pip freeze | grep psutil

psutil==7.0.0


In [4]:
import psutil

physical_cores = psutil.cpu_count(logical=False)
logical_cores = psutil.cpu_count(logical=True)

print(f'Cores físicos: {physical_cores}')
print(f'Cores lógicos: {logical_cores}')

Cores físicos: 10
Cores lógicos: 10


In [5]:
if logical_cores > physical_cores:
    print('Hyperthreading disponible...')
else:
    print('Hyperthreading NO disponible :(')

Hyperthreading NO disponible :(


In [6]:
print(f'Recomendación:')
print(f'- CPU-bound (multiprocessing): {physical_cores} workers')
print(f'- I/O-bound (threading): {logical_cores * 4}-{logical_cores * 8} workers')

Recomendación:
- CPU-bound (multiprocessing): 10 workers
- I/O-bound (threading): 40-80 workers


### Ley de Amdahl
---

Fórmula:

Speedup = 1 / [(1 - P) + (P / N)]

Donde:
- P = proporción del programa que puede paralelizarse (0 a 1)
- N = número de procesadores
- (1 - P) = proporción serial (no paralelizable)

In [7]:
P = 0.9
N = 4

speed_up = 1 / ((1 - P) + (P / N))
print(f'Speed up teórico: {round(speed_up, 2)}')

# Estimación de tiempos
seconds = 40
print(f'\nTiempo de ejecución serial: {seconds}s')
print(f'Tiempo de ejecución con paralelismo: {round(seconds/speed_up, 2)}s')

Speed up teórico: 3.08

Tiempo de ejecución serial: 40s
Tiempo de ejecución con paralelismo: 13.0s


### Multithreading (concurrencia)
---

In [8]:
import threading
import time

In [9]:
# 1 - Definir función que ejecutará cada thread

def worker_task(id_worker, seconds):

    print(f'[Thread {id_worker}] Iniciando tarea...')
    time.sleep(seconds)  # Simula operación I/O (espera)
    print(f'[Thread {id_worker}] Tarea completada después de {seconds}s')
    return f'Resultado del thread {id_worker}'

In [10]:
# 2 - Crear y ejecutar threads

def threads_manual():
    
    # Lista para almacenar los threads
    threads = []
    
    # Datos de entrada
    tareas = [
        (1, 2),  # (id_worker, seconds)
        (2, 1),
        (3, 3),
        (4, 1),
    ]
    
    inicio = time.time()
    
    # Crear y ejecutar threads
    for id_worker, seconds in tareas:
        # Crear thread
        t = threading.Thread(
            target=worker_task,       # Función a ejecutar
            args=(id_worker, seconds) # Argumentos de la función
        )
        threads.append(t)
        t.start()  # Iniciar el thread
    
    print(f'\n{len(threads)} threads iniciados\n')
    
    # Esperar a que todos terminen (JOIN)
    for t in threads:
        t.join()  # Bloquea hasta que el thread termine
    
    fin = time.time()
    
    print(f'\nTodos los threads completados')
    print(f'Tiempo total: {fin - inicio:.2f}s')
    print(f'Tiempo si fuera serial: {sum(s for _, s in tareas)}s')

In [11]:
threads_manual()

[Thread 1] Iniciando tarea...
[Thread 2] Iniciando tarea...
[Thread 3] Iniciando tarea...
[Thread 4] Iniciando tarea...

4 threads iniciados

[Thread 2] Tarea completada después de 1s
[Thread 4] Tarea completada después de 1s
[Thread 1] Tarea completada después de 2s
[Thread 3] Tarea completada después de 3s

Todos los threads completados
Tiempo total: 3.01s
Tiempo si fuera serial: 7s


Versión moderna con pool (no manual):

In [12]:
from concurrent.futures import ThreadPoolExecutor, as_completed

In [13]:
def worker_task(id_worker, seconds):

    print(f'[Thread {id_worker}] Procesando...')
    time.sleep(seconds)
    return f'Resultado {id_worker}'


def example_threadpool():

    tareas = [
        (1, 2),  # (id_worker, seconds)
        (2, 1),
        (3, 3),
        (4, 1),
        (5, 1)
    ]
    max_workers = 5  # Número de threads simultáneos
    
    inicio = time.time()
    
    # Context manager - se encarga del join automáticamente
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = []
        for id_worker, seconds in tareas:
            future = executor.submit(worker_task, id_worker, seconds)
            futures.append(future)
        
        # Recolectar resultados conforme terminan
        print('\nResultados (en orden de finalización):')
        for future in as_completed(futures):
            resultado = future.result()  # Obtener resultado
            print(f'{resultado}')
    
    fin = time.time()
    print(f'\nTiempo total: {fin - inicio:.2f}s')

In [14]:
example_threadpool()

[Thread 1] Procesando...
[Thread 2] Procesando...
[Thread 3] Procesando...
[Thread 4] Procesando...
[Thread 5] Procesando...

Resultados (en orden de finalización):
Resultado 2
Resultado 5
Resultado 4
Resultado 1
Resultado 3

Tiempo total: 3.00s


### Multiprocessing (paralelismo)
--- 

In [15]:
import multiprocessing as mp
import random
import time
import os

In [16]:
# 1 - Definir función que ejecutará cada proceso

def search_in_chunk(chunk, target, chunk_id):
    
    for value in chunk:
        if value == target:
            return (True, chunk_id)
        
    return (False, chunk_id)

In [17]:
# 2 - Crear y ejecutar procesos


def worker(chunk_tuple, resultado_queue):
    chunk, target, chunk_id = chunk_tuple
    resultado = search_in_chunk(chunk, target, chunk_id)
    resultado_queue.put(resultado)


def parallel_search(data, target, n_cores):
   
    # Dividir datos en chunks
    chunk_size = len(data) // n_cores
    chunks = []
    
    for i in range(n_cores):
        start = i * chunk_size
        end = start + chunk_size if i < n_cores - 1 else len(data)
        # each chunk is (chunk_data, target, chunk_id)
        chunks.append((data[start:end], target, i))
    
    # Crear procesos manualmente
    processes = []
    queue = mp.Queue()
    
    # [worker]
    # def worker(chunk_tuple, resultado_queue):
    #     chunk, target, chunk_id = chunk_tuple
    #     resultado = search_in_chunk(chunk, target, chunk_id)
    #     resultado_queue.put(resultado)

    start_time = time.time()
    
    # Iniciar procesos
    for chunk_data in chunks:
        p = mp.Process(target=worker, args=(chunk_data, queue))
        p.start()
        processes.append(p)
    
    # Esperar a que terminen
    for p in processes:
        p.join()
    
    # Recolectar resultados
    results = []
    while not queue.empty():
        results.append(queue.get())
    
    total_time = time.time() - start_time
    
    # Mostrar resultados
    for found, chunk_id in results:
        if found:
            print(f'Valor {target} encontrado en chunk {chunk_id}')
            break
    else:
        print(f'Valor {target} no encontrado')
    
    print(f'Tiempo: {total_time:.4f}s')
    print(f'Cores utilizados: {n_cores}')


In [18]:
# Testing

SIZE = 10000000 # 10M
TARGET = 8888888
CORES = mp.cpu_count() # Usar todos los cores disponibles

print(f'\nTamaño de la lista: {SIZE:,}')
print(f'Valor a buscar: {TARGET:,}')
print(f'Cores disponibles: {CORES}')

# Generar datos
datos = list(range(SIZE))

# Ejecutar función general
parallel_search(datos, TARGET, CORES)



Tamaño de la lista: 10,000,000
Valor a buscar: 8,888,888
Cores disponibles: 10


Traceback (most recent call last):
  File [35m"<string>"[0m, line [35m1[0m, in [35m<module>[0m
    from multiprocessing.spawn import spawn_main; [31mspawn_main[0m[1;31m(tracker_fd=85, pipe_handle=89)[0m
                                                  [31m~~~~~~~~~~[0m[1;31m^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^[0m
  File [35m"/Library/Frameworks/Python.framework/Versions/3.13/lib/python3.13/multiprocessing/spawn.py"[0m, line [35m122[0m, in [35mspawn_main[0m
    exitcode = _main(fd, parent_sentinel)
  File [35m"/Library/Frameworks/Python.framework/Versions/3.13/lib/python3.13/multiprocessing/spawn.py"[0m, line [35m132[0m, in [35m_main[0m
    self = reduction.pickle.load(from_parent)
[1;35mAttributeError[0m: [35mCan't get attribute 'worker' on <module '__main__' (<class '_frozen_importlib.BuiltinImporter'>)>[0m


KeyboardInterrupt: 

---