In [1]:
!python -V

Python 3.10.4


In [2]:
import os

nucleos_totales = os.cpu_count()

# <center> TIPOS DE INSTANCIA</center>

In [3]:
# libreria para procesamiento en paralelo
from threading import Thread

# importa las librerias que permiten usar la computacion paralela
from multiprocessing import Process

tipo_instancia = {
                'hilos':Thread,
                'procesos':Process
}

# <center> FUNCIONES PARA LOS HILOS Y PROCESOS</center>

In [4]:
def generar_instancia(tipo_tarea=None, funcion_a_procesar=None, **kw):
    
    # convierte los argumentos a tupla ya que los argumentos deben ser de tipo tupla
    argumentos = tuple(kw.values())
    
    # crea la instancia
    tipo_ejecucion = tipo_instancia.get(tipo_tarea, Thread)
    
    # retorna la instancia con los argumentos a evaluar
    return tipo_ejecucion(name='test_ejecucion', target=funcion_a_procesar, args=argumentos)


def iniciar_y_finalizar_instancia(instancia):
    
    # muestra el nombre del hilo o del proceso
    print('nombre ', instancia.name)
    
    # inicializa la ejecucion de la instruccion invocada en el hilo o proceso
    instancia.start()
    
    # garantiza que se finaliza la funcion invocada, antes de que se continue con una nueva instruccion
    instancia.join()
    
    print('tarea Finalizada') 
    
    return instancia    

# <center> HILOS</center>

# <center> COLA PARA ADICIONAR RESULTADOS HILOS</center>

In [5]:
from queue import Queue as cola_classica

almacenar_resultados = cola_classica()

def almacenar_en_cola(f):
    def wrapper(*args):
        almacenar_resultados.put(f(*args))
    return wrapper

@almacenar_en_cola
def retornar_lista(numero=100):
    return [i for i in range(numero)]

def obtener_resultado_cola_tarea(cola):
    
    for indice_tarea in range(cola.qsize()):
        print(cola.get(indice_tarea))

In [6]:
listado = []
for i in range(5):
  
    instancia = generar_instancia(tipo_tarea='hilos', funcion_a_procesar=retornar_lista, numero=20)
    ejecutar_tarea = iniciar_y_finalizar_instancia(instancia)
    type(ejecutar_tarea)
   
    listado.append(ejecutar_tarea)

obtener_resultado_cola_tarea(almacenar_resultados)

nombre  test_ejecucion
tarea Finalizada
nombre  test_ejecucion
tarea Finalizada
nombre  test_ejecucion
tarea Finalizada
nombre  test_ejecucion
tarea Finalizada
nombre  test_ejecucion
tarea Finalizada
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]


# <center> PROCESOS</center>
## <center> Process-based parallelism</center>

# <center> COLA PARA ADICIONAR RESULTADOS PROCESOS</center>

In [7]:
from multiprocessing import Queue as cola_multiproceso

almacenar_resultados_multiproceso = cola_multiproceso()

def almacenar_en_cola_multiproceso(f):
    def wrapper(*args):
        almacenar_resultados_multiproceso.put(f(*args))
    return wrapper

@almacenar_en_cola_multiproceso
def retornar_lista_proceso(numero=20):
    return [chr(i + 64)  for i in range(numero)]

def obtener_resultado_cola_tarea_while(cola):
    
    while not cola.empty():
        result = cola.get()
        print (result)

In [8]:
listado = []
for i in range(5):
    instancia = generar_instancia(tipo_tarea='procesos', funcion_a_procesar=retornar_lista_proceso, numero=10)
    ejecutar_tarea = iniciar_y_finalizar_instancia(instancia)
    type(ejecutar_tarea)
    
    listado.append(ejecutar_tarea)

obtener_resultado_cola_tarea_while(almacenar_resultados_multiproceso)

nombre  test_ejecucion
tarea Finalizada
nombre  test_ejecucion
tarea Finalizada
nombre  test_ejecucion
tarea Finalizada
nombre  test_ejecucion
tarea Finalizada
nombre  test_ejecucion
tarea Finalizada
['@', 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I']
['@', 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I']
['@', 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I']
['@', 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I']
['@', 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I']


# <center> POOL DE PROCESOS</center>

In [9]:
from multiprocessing import Pool

def calcular_cuadrado(x):
    return x*x

### <center> APPLY</center>

In [42]:
pool = Pool(processes=2)
pool_apply = pool.apply(calcular_cuadrado, [5])
pool.close() # Prevents any more tasks from being submitted to the pool. Once all the tasks have been completed the worker processes will exit.
pool.join() # Wait for the worker processes to exit. One must call close() or terminate() before using join().
pool_apply

25

### <center> APPLY_ASYNC</center>

In [11]:
pool = Pool(processes=2)
pool_apply_async = pool.apply_async(calcular_cuadrado, [3])
pool.close()
pool.join()
pool_apply_async.get()

9

### <center> MAP_ASYNC</center>

In [12]:
pool = Pool(processes=2)
pool_map_async = pool.map_async(calcular_cuadrado, range(0,20,3))
pool.close()
pool.join()
pool_map_async.get()

[0, 9, 36, 81, 144, 225, 324]

### <center> MAP</center>

In [13]:
pool = Pool(processes=2)
pool_map = pool.map(calcular_cuadrado, range(0,20,5))
pool.close()
pool.join()
pool_map

[0, 25, 100, 225]

### <center> IMAP</center>

In [14]:
pool = Pool(processes=2)
pool_imap_async = pool.imap(calcular_cuadrado, range(0,20,4))
pool.close()
pool.join()
list(pool_imap_async)

[0, 16, 64, 144, 256]

### <center> IMAP_UNORDERED</center>

In [15]:
pool = Pool(processes=2)
pool_imap_unordered = pool.imap_unordered(calcular_cuadrado, range(0,20,2))
pool.close()
pool.join()
list(pool_imap_unordered)

[0, 4, 16, 36, 64, 100, 144, 256, 324, 196]

### <center> STARMAP</center>

In [16]:
pool = Pool(processes=2)
pool_starmap = pool.starmap(calcular_cuadrado, [[1],[2],[3],[4],[5]])
pool.close()
pool.join()
list(pool_starmap)

[1, 4, 9, 16, 25]

### <center> STARMAP_ASYNC</center>

In [17]:
pool = Pool(processes=2)
pool_starmap_async = pool.starmap_async(calcular_cuadrado, [[1],[2],[3],[4],[5]])
pool.close()
pool.join()
pool_starmap_async.get()

[1, 4, 9, 16, 25]

# <center> PROCESAMIENTO MASIVO</center>

In [18]:
# importa librerias para trabajar concurrencia
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import ThreadPoolExecutor

# adiciona nuevas key/values al diccionario
tipo_instancia['multihilos'] = ThreadPoolExecutor
tipo_instancia['multiproceso'] = ProcessPoolExecutor

tipo_instancia

{'hilos': threading.Thread,
 'procesos': multiprocessing.context.Process,
 'multihilos': concurrent.futures.thread.ThreadPoolExecutor,
 'multiproceso': concurrent.futures.process.ProcessPoolExecutor}

In [19]:
import random

def generar_lista(numero = 22):
    
    return [random.randint(-100, i**2) for i in range(numero)]
    
def imprimir_multi_datos(lista):
    
    for valores in lista:
        print(valores)
        
def generar_pool_iniciar_y_finalizar_multi_instancia(tipo_tarea=None, funcion_a_procesar=None, numero = 22):
    
    # crea la instancia de ejecucion
    tipo_ejecucion = tipo_instancia.get(tipo_tarea, ThreadPoolExecutor)
    
    # asigna los workers basado en el total de CPUs de la maquina
    pool = tipo_ejecucion(max_workers=nucleos_totales)
    
    response = pool.submit(funcion_a_procesar, numero)
    
    # obtiene la respuesta de la tarea
    return response.result()

# <center> MULTI HILOS</center>

In [20]:
listado = []
for i in range(5):
    instancia = generar_pool_iniciar_y_finalizar_multi_instancia(tipo_tarea='multihilos', funcion_a_procesar=generar_lista, numero= i + 1)
    # almacena las tareas ejecutadas
    listado.append(instancia)

imprimir_multi_datos(listado)

[-42]
[-16, -11]
[-38, -36, -79]
[-8, -65, -63, -32]
[-100, -32, -74, -68, 15]


# <center> MULTI PROCESOS</center>

In [21]:
listado = []
for i in range(5):
    instancia = generar_pool_iniciar_y_finalizar_multi_instancia(tipo_tarea='multiprocesos', funcion_a_procesar=generar_lista, numero= i + 1)
    # almacena las tareas ejecutadas
    listado.append(instancia)

imprimir_multi_datos(listado)

[-41]
[-85, -94]
[-1, -38, -28]
[-15, -98, -86, -3]
[-4, -80, -58, 3, -92]


# <center> MULTI HILOS Y MULTIPROCESOS CON MULTIPLES ARGUMENTOS USANDO **MAP**</center>

In [22]:
def cuadrado(x=9):
    
    return x**2

def multi_argumentos(*parametros, tipo_tarea=None, funcion_a_procesar=None):
    
    # convierte los parametros tipo tupla a lista
    argumentos = list(parametros[0])
    
    # crea la instancia de ejecucion
    tipo_ejecucion = tipo_instancia.get(tipo_tarea, ThreadPoolExecutor)
    
    # asigna los workers basado en el total de CPUs de la maquina
    pool = ThreadPoolExecutor()
        
    response = pool.map(cuadrado, argumentos)
        
    # obtiene la respuesta de la tarea
    return [valor for valor in response]

print(multi_argumentos([1,2,3,4],tipo_tarea='multihilos', funcion_a_procesar=cuadrado))
print(multi_argumentos([5,6,7,8,9],tipo_tarea='multiprocesos', funcion_a_procesar=cuadrado))

[1, 4, 9, 16]
[25, 36, 49, 64, 81]


# <center> ASINCRONISMO - ASYNCIO</center>
<p align="center" width="100%">
    <img width="100%" src="sync.png">
</p>

# <center> EVENT LOOP</center>

<p align="center" width="100%">
    <img width="100%" src="event-loop.png">
</p>

In [23]:
import asyncio

# esta seria la forma de ejecutar en un IDE, dado que  jupyter ya esta corriendo un event loop
# siempre es necesario ejecutar una funcion asincrona de esta manera 
#asyncio.run(funcion_asincrona())

# <center> TODA FUNCION ASINCRONA DEBE SER PRECEDIDA POR LA PALABRA **ASYNC**</center>

In [24]:
async def listado(limite = 10):
    
    return [random.randrange(-100, 100) for x in range(limite)]

In [25]:
def ejecuta_funcion_listado(limite = 15):
    return listado(limite)

In [26]:
async def ejecuta_funcion_asincrona(limite = 20):
    return await listado(limite)

# <center> TODA VARIABLE O FUNCION QUE LLAME UNA FUNCION ASYNCRONA DEBE SER PRECEDIDA POR LA PALABRA **AWAIT**</center>

In [27]:
ejecuta_funcion_listado()

<coroutine object listado at 0x7f29fc70d230>

In [28]:
respuesta = listado()
respuesta

<coroutine object listado at 0x7f29fc70d4d0>

In [29]:
await respuesta

[64, 28, -47, 25, 25, -64, 85, 1, 34, -70]

In [30]:
await listado()

[12, -44, -58, -25, 94, -46, 1, 87, -18, -8]

In [31]:
await ejecuta_funcion_listado()

[-55, -73, -79, 61, -68, -74, -4, -39, -27, 96, 74, -39, 62, 47, 52]

In [32]:
print(await ejecuta_funcion_asincrona())

[35, 2, -1, -39, -32, 48, -39, -29, -61, -24, 87, 39, 80, 86, 7, -48, -59, -93, 44, -20]


In [33]:
lista = []

for i in range(10):
    
    lista.append(await listado())

lista

[[65, -78, 49, -54, -80, 97, -89, 71, -57, 90],
 [50, 44, -9, -6, 12, -29, 39, 88, -90, -48],
 [32, 38, 32, -27, 22, 25, -40, -56, 8, 73],
 [-16, 79, -96, 36, -36, 82, 35, 51, -45, 24],
 [-8, 35, 73, -49, -94, -14, 85, 73, -8, -15],
 [-87, 94, -4, 61, -53, -83, -90, -75, 95, 85],
 [0, -50, -14, 6, 67, 84, 41, -44, 11, -93],
 [-32, 74, -49, 83, 88, -25, 87, 88, 95, -5],
 [63, 28, -72, 29, 38, 55, 55, -98, -71, -72],
 [-37, 29, -63, -85, -18, -91, -82, 40, -35, 60]]

# <center> **FUNCIONES ASINCRONAS**</center>

In [34]:
async def Primero(limite=5):
    print("Inicio funcion Primero")
    await asyncio.sleep(2)
    print("Final funcion Primero")
    
    return [random.randrange(-100, 0) for x in range(limite)]

async def Ultimo(limite=5):
    print("Comenzar funcion ultimo")
    await asyncio.sleep(2)
    print("Terminar funcion ultimo")
    
    return [random.randrange(0, 100) for x in range(limite)]

# <center> **GATHER**</center>
## <center> **RECIBE UNA LISTA DE TAREAS Y RETORNA UNA LISTA DE RESULTADOS**</center>
## <center> **TIENE OPCIONES ESPECÍFICAS PARA EL MANEJO DE ERRORES Y CANCELACIONES.**</center>

In [35]:
# El siguiente ejemplo muestra cómo esperar a que se completen varias tareas asincrónicas.

from asyncio import gather

async def iniciar_gather():
    
    tareas = [Primero(), Ultimo(), Primero(), Ultimo(), Primero(), Ultimo(), Primero()]
    
    return await gather(*tareas)
    
await iniciar_gather()

Inicio funcion Primero
Comenzar funcion ultimo
Inicio funcion Primero
Comenzar funcion ultimo
Inicio funcion Primero
Comenzar funcion ultimo
Inicio funcion Primero
Final funcion Primero
Terminar funcion ultimo
Final funcion Primero
Terminar funcion ultimo
Final funcion Primero
Terminar funcion ultimo
Final funcion Primero


[[-47, -48, -10, -9, -62],
 [59, 22, 34, 98, 18],
 [-8, -19, -95, -58, -39],
 [91, 65, 39, 39, 58],
 [-53, -18, -32, -76, -100],
 [23, 41, 79, 97, 63],
 [-79, -95, -83, -77, -81]]

# <center> **WAIT_FOR**</center>
## <center> **PERMITE DEFINIR EL MAXIMO TIEMPO DE ESPERA DE UNA TAREA**</center>

In [36]:
# El siguiente ejemplo demuestra cómo podemos utilizar un tiempo de espera para evitar esperar indefinida a que finalice una tarea asincrónica.

from asyncio import wait_for

async def iniciar_wait_for():
    try:
        return await wait_for(Primero(), timeout=1)
    except asyncio.TimeoutError:
        print("la ejecucion supero el tiempo de espera!")
        

await iniciar_wait_for()

Inicio funcion Primero
la ejecucion supero el tiempo de espera!


# <center> **AS_COMPLETED**</center>
## <center> **ES SIMILIAR A GATHER, PERO RETORNA FUTUROS, LOS RESULTADOS SON RETORNADOS EN EL ORDEN QUE ESTAN LISTOS**</center>

In [37]:
# El siguiente ejemplo demuestra cómo as_complete, completará la primera tarea, seguida de la siguiente más rápida y la siguiente hasta que se completen todas las tareas.

from asyncio import as_completed

async def iniciar_as_completed():
    
    tareas = [Primero(), Ultimo(), Primero(), Ultimo(), Primero(), Ultimo(), Primero()]
    counter = 0
    
    for future in as_completed(tareas):
        n = "la tarea mas rapida" if counter == 0 else "la siguiente tarea mas rapida"
        counter += 1
        result = await future
        print(f"{n} obtuvo el resultado: {result}")

await iniciar_as_completed()

Inicio funcion Primero
Comenzar funcion ultimo
Inicio funcion Primero
Inicio funcion Primero
Inicio funcion Primero
Comenzar funcion ultimo
Comenzar funcion ultimo
Final funcion Primero
Terminar funcion ultimo
Final funcion Primero
Final funcion Primero
Final funcion Primero
Terminar funcion ultimo
Terminar funcion ultimo
la tarea mas rapida obtuvo el resultado: [-78, -98, -12, -56, -23]
la siguiente tarea mas rapida obtuvo el resultado: [96, 77, 18, 85, 1]
la siguiente tarea mas rapida obtuvo el resultado: [-14, -14, -97, -34, -77]
la siguiente tarea mas rapida obtuvo el resultado: [-50, -5, -34, -83, -67]
la siguiente tarea mas rapida obtuvo el resultado: [-31, -57, -91, -39, -46]
la siguiente tarea mas rapida obtuvo el resultado: [5, 51, 2, 69, 55]
la siguiente tarea mas rapida obtuvo el resultado: [13, 26, 61, 95, 91]


# <center> **CREATE_TASK**</center>

In [38]:
# El siguiente ejemplo demuestra cómo convertir una rutina en una tarea y programarla en el bucle de eventos.

from asyncio import create_task

async def iniciar_create_task():
    
    tarea = create_task(Primero())
    print(tarea)
    
    await asyncio.sleep(2)
    print("MAS PROCESOS!")
    
    await asyncio.sleep(3)
    print(sum(tarea))
    
    return tarea

await iniciar_create_task()

<Task pending name='Task-28' coro=<Primero() running at /tmp/ipykernel_2533/4294725927.py:1>>
Inicio funcion Primero
MAS PROCESOS!
Final funcion Primero
0


<Task finished name='Task-28' coro=<Primero() done, defined at /tmp/ipykernel_2533/4294725927.py:1> result=[-72, -57, -84, -30, -69]>

# <center> **POOLS**</center>
# <center> **GET_RUNNING_LOOP**</center>

## <center> **ASYNCIO - MULTI HILO**</center>

In [39]:
# importa la libtreria
from asyncio import get_running_loop
# importa librerias para trabajar concurrencia
from concurrent.futures import ThreadPoolExecutor

def blocking_io(value = 23, otros = 10):
    # File operations (such as logging) can block the
    # event loop: run them in a thread pool.
    with open("/dev/urandom", "rb") as f:
        return f.read(value + otros)
    
async def asyncio_multi_hilo():
    
    loop = get_running_loop()
    pool = ThreadPoolExecutor()
    
    # tipo de pool, funcion, parametros
    return await loop.run_in_executor(pool, blocking_io, 50, 70)

await asyncio_multi_hilo()

b'\xbbI\xcf\x04I#\xc04\xc5\xa0\xc2\xc9U\x0b\xfa\xd1\xab\x83\xfc\x1a\x02\x81\xf6}+\x921G\x15\xa4^\xfe\xd4.F\xd6\xe8\x172+oE>T\x1bw\x87\x90m3\xdeW\x9e\x81\xe9\xae"\x96\xc6\x17H\x16e)#\x19\xff\xd7w\x12\x12\xc7\x9d\xa5\xd6\x1dqm\xca\x7f\xd5LS_\x1c\x94\\q\xcdq\x95*3\\P4\xb8\xdf\x82b\x19\xc7T\xe1\x0b\xa3K\xec\x8a\xa4\xcb\x14~(\x93u\xa7\xe7B\x06'

## <center> **ASYNCIO - MULTI PROCESO**</center>

In [40]:
from concurrent.futures import ProcessPoolExecutor

def cpu_bound(value = 23, otros = 10):
    # CPU-bound operations will block the event loop:
    # in general it is preferable to run them in a
    # process pool.
    return  sum(i * i for i in range(10 ** 7)) // (value + otros)

    
async def asyncio_multi_proceso():
    
    loop = get_running_loop()
    pool = ProcessPoolExecutor()
    
    # tipo de pool, funcion, parametros
    return await loop.run_in_executor(pool, cpu_bound, 123, 100)

await asyncio_multi_proceso()

1494768086696569506

 # TABLA DE DEFINICIONES
 https://pharos.sh/concurrencia-en-python/#:~:text=al%20mismo%20tiempo.-,Simultaneidad%20vs%20paralelismo,acelerar%20el%20proceso%20de%20c%C3%A1lculo.
 

| CONCURRENCIA O SIMULTANEIDAD  | PARALELISMO |
| --- | --- |
| Es la ejecución de tareas al mismo tiempo, Cuando dos o más eventos son concurrentes, significa que están sucediendo al mismo tiempo. | Se logra cuando se realizan múltiples cálculos u operaciones al mismo tiempo o en paralelo con el objetivo de acelerar el proceso de cálculo. | 
| Solo tiene lugar en un procesador. | Utiliza múltiples procesadors para realizar tareas en paralelo. | 
| es la tarea de ejecutar y administrar múltiples cálculos al mismo tiempo. | es la tarea de ejecutar múltiples cálculos simultáneamente. |
| se logra a través de la operación de entrelazado de procesos en la unidad central de procesamiento (CPU) o, en otras palabras, mediante el cambio de contexto. | se logra a través de múltiples unidades centrales de procesamiento (CPU) |
| se puede realizar utilizando una sola unidad de procesamiento. | necesita múltiples unidades de procesamiento. |
| aumenta la cantidad de trabajo terminado a la vez. |  mejora el rendimiento y la velocidad computacional del sistema.|
| trata muchas cosas simultáneamente. | hace muchas cosas simultáneamente. |
|  es el enfoque de flujo de control no determinista.| es un enfoque de flujo de control determinista. |
|  la depuración es muy difícil. | la depuración también es difícil pero mas simple que la concurrencia. |



| HILO O SUBPROCESO | PROCESO | TAREA |
| --- | --- | --- |
| cada tarea ejecutada en un proceso es un subproceso. | Es un trabajo o una instancia de un programa calculado que se puede ejecutar. | Es un conjunto de instrucciones de programa que se cargan en la memoria.|
| Es la unidad de ejecución más pequeña que se puede realizar en una computadora. | Un hilo solo puede pertenecer a un proceso, pero un proceso puede tener múltiples hilos. |
| pueden acceder a los datos de otros subprocesos  | funcionan de forma aislada | |
| comparten memoria con otros subprocesos | cada proceso tiene su propia asignación de memoria. | |
||es un proceso liviano: en comparación con un proceso|||
| un subproceso genera menos carga en el sistema operativo para crear, mantener y administrar, lo que significa que el costo o la sobrecarga del subproceso es relativamente pequeño.|||
| El hilo no tiene espacio de direcciones, y el hilo está contenido en el espacio de direcciones del proceso.|||
| Todos los hilos comparten la memoria y los recursos del proceso. |||


| HILOS | PROCESOS | ASINCRONISMO |
| --- | --- | --- |
| Implementa concurrencia a través de hilos de aplicación | Implementa la concurrencia usando procesos del sistema | Construye aplicaciones concurrentes que utilizan co-rutinas. Utiliza un enfoque de un solo hilo y un solo proceso en el que partes de una aplicación cooperan para cambiar tareas explícitamente en momentos óptimos.|
|Multihilo es el proceso en el que multiples hilos se ejecutan al mismo tiempo en un proceso.|El multiproceso es permitir que se realicen múltiples procesos al mismo tiempo,  utilizando  uno o mas procesadores.|