# Concurrencia y Paralelismo

## **Concurrencia**  

Ejecutar múltiples tareas en períodos solapados (no necesariamente simultáneo)

### 1. Descomposición de Datos

**Propósito**: Dividir un conjunto de datos en partes independientes para procesamiento concurrente.

**Ventajas:**

* Aprovecha múltiples núcleos de CPU
* Escala bien con grandes volúmenes de datos

**Desventajas:**

* Requiere que los datos sean divisibles
* Puede generar overhead por sincronización

**Caso de uso:** Procesamiento de imágenes por bloques o análisis de datasets grandes.

In [26]:
import threading

def process_chunk(data_chunk):
    print(f"Procesando chunk: {data_chunk}")

data = list(range(10))
chunks = [data[i::3] for i in range(3)]  # Divide en 3 partes

threads = []
for chunk in chunks:
    t = threading.Thread(target=process_chunk, args=(chunk,))
    threads.append(t)
    t.start()

for t in threads:
    t.join()

Procesando chunk: [0, 3, 6, 9]
Procesando chunk: [1, 4, 7]
Procesando chunk: [2, 5, 8]


### 2. Objeto Activo (Active Object)

**Propósito:** Encapsular tareas concurrentes en objetos que manejan su propio hilo de ejecución.

**Ventajas:**

* Oculta la complejidad de la concurrencia
* Permite colas de tareas ordenadas

**Desventajas:**

* Overhead por gestión de hilos
* Complejidad en el manejo de errores

**Caso de uso:** Sistemas de mensajería asíncrona o servidores de tareas en segundo plano.

In [None]:
from queue import Queue
from threading import Thread

class ActiveObject:
    def __init__(self):
        self._queue = Queue()
        self._thread = Thread(target=self._run)
        self._thread.start()
    
    def _run(self):
        while True:
            task = self._queue.get()
            if task == 'STOP': break
            task()
    
    def submit(self, task):
        self._queue.put(task)
    
    def shutdown(self):
        self._queue.put('STOP')

obj = ActiveObject()
obj.submit(lambda: print("Tarea ejecutada concurrentemente"))
obj.shutdown()

Tarea ejecutada concurrentemente


### 3. Half-Sync/Half-Async
 
**Propósito:** Separar la E/S (entrada/salida, I/O) asíncrona del procesamiento síncrono.

**Ventajas:**

* Combina eficiencia de E/S no bloqueante con simplicidad de código síncrono
* Ideal para servidores de red

**Desventajas:**

* Complejidad en la coordinación entre capas
* Overhead por cambio de contexto

**Caso de uso:** Servidores web que manejan múltiples conexiones simultáneas.

In [None]:
import asyncio
from concurrent.futures import ThreadPoolExecutor

def fetch_sync(url):
    # Simula operación bloqueante
    import time
    time.sleep(1)
    return f"Datos de {url}"

async def async_layer(url):
    loop = asyncio.get_event_loop()
    with ThreadPoolExecutor() as pool:
        response = await loop.run_in_executor(pool, fetch_sync, url)
        return response

result = await async_layer("https://api.example.com")
print(result)

Datos de https://api.example.com


## **Paralelismo**  

Ejecución simultánea en múltiples núcleos

### 1. Divide y Vencerás

**Propósito:** Paralelizar algoritmos recursivos dividiendo problemas en subproblemas independientes.

**Ventajas:**

* Escalabilidad natural
* Aprovecha múltiples núcleos eficientemente

**Desventajas:**

* Overhead por división/unión de resultados
* No apto para problemas no divisibles

**Caso de uso:** Algoritmos de ordenación (MergeSort) o búsqueda en árboles.

In [29]:
from multiprocessing import Pool
import random

def merge(left, right):
    """Merge eficiente con iteradores (optimizado)"""
    result = []
    i = j = 0
    len_left, len_right = len(left), len(right)
    while i < len_left and j < len_right:
        if left[i] <= right[j]:  # <= para estabilidad
            result.append(left[i])
            i += 1
        else:
            result.append(right[j])
            j += 1
    result.extend(left[i:])
    result.extend(right[j:])
    return result

def sequential_merge_sort(data):
    """Versión secuencial para tamaños pequeños"""
    if len(data) <= 1:
        return data
    mid = len(data) // 2
    left = sequential_merge_sort(data[:mid])
    right = sequential_merge_sort(data[mid:])
    return merge(left, right)

def parallel_merge_sort(data):
    """Versión paralela con optimizaciones clave"""
    if len(data) <= 1_000_000:  # Umbral ajustado
        return sequential_merge_sort(data)
    
    # Divide en 4 partes para mejor balanceo
    chunk_size = len(data) // 4
    chunks = [data[i*chunk_size:(i+1)*chunk_size] for i in range(4)]
    
    with Pool(4) as pool:  # Usa 4 procesos (ideal para 4 núcleos)
        sorted_chunks = pool.map(sequential_merge_sort, chunks)
    
    # Merge jerárquico (reduce llamadas a merge)
    return merge(
        merge(sorted_chunks[0], sorted_chunks[1]),
        merge(sorted_chunks[2], sorted_chunks[3])
    )

# Generación de datos optimizada
big_data = random.choices(range(1_000_000), k=1_000_000)  # Más rápido que randint

# Benchmark
import time
print("Ordenando 1,000,000 elementos...")
start = time.time()
sorted_data = parallel_merge_sort(big_data)
end = time.time()

print(f"Tiempo total: {end - start:.2f} segundos")
print("Primeros 10:", sorted_data[:10])

Ordenando 1,000,000 elementos...
Tiempo total: 1.71 segundos
Primeros 10: [0, 0, 2, 3, 4, 5, 5, 7, 7, 8]


### 2. Descomposición Geométrica

**Propósito:** Paralelizar operaciones sobre estructuras espaciales (matrices, grids).

**Ventajas:**

* Localidad de datos mejorada
* Balance de carga predecible

**Desventajas:**

* Depende de la estructura del problema
* Dificultad con datos irregulares

**Caso de uso:** Simulaciones físicas o procesamiento de imágenes/matrices.

In [30]:
import numpy as np
import concurrent.futures

def process_region(region):
    # Simula un procesamiento en una región de la imagen (por ejemplo, filtrado)
    return region * 2

image = np.random.randint(0, 255, (4, 4))  # Imagen de 4x4
regions = np.vsplit(image, 2)  # Dividimos en 2 regiones horizontales

with concurrent.futures.ThreadPoolExecutor() as executor:
    results = executor.map(process_region, regions)

new_image = np.vstack(list(results))
print(new_image)


[[286 250  26 242]
 [408 424  28 322]
 [464 458 240 406]
 [118 170 466  28]]


### 3. Coordinación por Eventos

**Propósito:** Gestionar operaciones concurrentes mediante notificaciones de eventos.

**Ventajas:**

* Bajo consumo de recursos (1 hilo maneja múltiples tareas)
* Escalabilidad para E/S

**Desventajas:**

* Complejidad en código CPU-bound

**Caso de uso:** Servidores web, interfaces gráficas.

In [31]:
class EventManager:
    def __init__(self):
        self.subscribers = []

    def subscribe(self, callback):
        self.subscribers.append(callback)

    def emit(self, data):
        for callback in self.subscribers:
            callback(data)

# Subsistemas que reaccionan al evento
def log_event(data):
    print(f"Log: se recibió evento '{data}'")

def alert_event(data):
    print(f"Alerta: '{data}' activado")

# Configuración
event_manager = EventManager()
event_manager.subscribe(log_event)
event_manager.subscribe(alert_event)

# Emisión del evento
event_manager.emit("Robot detectó obstáculo")


Log: se recibió evento 'Robot detectó obstáculo'
Alerta: 'Robot detectó obstáculo' activado


## **Antipatrones Comunes**

Ejemplo de race condition (Pueden notar que cada iteracion da un resultado distinto)

In [32]:
import threading
import time
import random

counter = 0

def increment():
    global counter
    value = counter
    time.sleep(random.uniform(0, 0.001))  # Pausa intencional para forzar error
    counter = value + 1

threads = [threading.Thread(target=increment) for _ in range(20)]

for t in threads:
    t.start()
for t in threads:
    t.join()

print("Valor final del contador:", counter)


Valor final del contador: 3
