In [1]:
from joblib import Parallel, delayed

In [2]:
with Parallel(n_jobs=-1) as parallel:
    def sqrt(x):
        return x**0.5
    
    accumulator = 0.
    n_iter = 0
    while accumulator < 1000:
        results = parallel(delayed(sqrt) (accumulator + i **2) for i in range(5))
        accumulator += sum(results)
        n_iter += 1
        
    print(f'Computate to the value {accumulator} in {n_iter} iterations')

Computate to the value 1136.5969161564717 in 14 iterations


In [3]:
accumulator = 0.
n_iter = 0
while accumulator < 1000: 
    results = 0.
    for i in range(5): 
        results += sqrt(accumulator + i**2)
    accumulator += (results)
    n_iter += 1
        
print(f'Computate to the value {accumulator} in {n_iter} iterations')
        

Computate to the value 1136.5969161564717 in 14 iterations


#### Entendiendo los diferentes tipos de backend de joblib con aleatoriedad

##### Primero vamos a ver que pasa al correr de manera secuencial un experimento con aleatoriedad

In [4]:
import numpy as np
from joblib import Parallel, delayed

def printvector(vector, backend):
    print(f'\nThe different generated vectors using the {backend} backend are:\n { np.array(vector)}')

def generarcoordenadas(max):
    return np.random.randint(max, size = 5)

nvectors = 5

randomvector = [generarcoordenadas(15) for i in range(nvectors)]

print(f'\nThe different generated vectors in a sequential manner are: \n{np.array(randomvector)}')


The different generated vectors in a sequential manner are: 
[[13 10  9 14  6]
 [ 0  1 11  4 13]
 [ 9 11 12  6  8]
 [11 11  8  1 11]
 [ 5  7  6  2  3]]


##### Luego vamos a ver que pasa con backend = 'loky' y 'threading'

In [10]:
backend = 'loky'
random_vector = Parallel(n_jobs=4, backend=backend)(delayed(
    generarcoordenadas)(15) for _ in range(nvectors))
printvector(random_vector, backend)

backend = 'threading'
random_vector = Parallel(n_jobs=4, backend=backend)(delayed(generarcoordenadas)(15) for _ in range(nvectors))
printvector(random_vector, backend)


The different generated vectors using the loky backend are:
 [[10  0  8  9  8]
 [ 2  7  5  5  5]
 [ 0  6  8  3  1]
 [14  7  0 13  2]
 [14  1  3  7 11]]

The different generated vectors using the threading backend are:
 [[ 1  1  9  6  3]
 [13 10 13  9  6]
 [ 1 14  0  8  9]
 [14  2 14 12  3]
 [10  3  7  3 14]]


#### Observemos que en estos casos la paralelización preserva aleatoriedad, pasemos ahora a multiprocessing

In [17]:
import multiprocessing as mp

backend = 'multiprocessing'
nvectors = 10
def generarcoordenadas(max):
    seed = np.random.randint(1000) # add a random seed value
    return np.random.RandomState(seed).randint(max, size=5)


random_vector = Parallel(n_jobs = 5, backend = backend)( delayed(generarcoordenadas)(15) for i in range(nvectors))
printvector(random_vector, backend)



The different generated vectors using the multiprocessing backend are:
 [[13 13  2  0 12]
 [13 13  2  0 12]
 [13 13  2  0 12]
 [13 13  2  0 12]
 [13 13  2  0 12]
 [ 5 14  1  8  5]
 [ 5 14  1  8  5]
 [ 5 14  1  8  5]
 [ 5 14  1  8  5]
 [11  4  0  1 11]]


#### YA NO SE CUMPLE la aleatoriedad, observemos que obtendremos nvectors // n_jobs grupos iguales lo cual muestra que la aleatoriedad ya no se está cumpliendo esto sucede porque el estado global del generador de números aleatorios de numpy se duplicará exactamente en todos los trabajadores. Es decir todos tienen la misma random seed

### Esto podemos arreglarlo cambiando el estado de generador de números aleatorios usando RandomState y seeds que lo cambien por cada proceso

In [20]:
import numpy as np
from joblib import Parallel, delayed

def printvector(vector, backend):
    print(f'\nThe different generated vectors using the {backend} backend are:\n { np.array(vector)}')

def generarcoordenadas(max, seed):
    rng = np.random.RandomState(seed)
    return rng.randint(max, size=5)

backend = 'multiprocessing'
nvectors = 10


random_vector = Parallel(n_jobs=5, backend=backend)(delayed(generarcoordenadas)(15, None) for i in range(nvectors))
printvector(random_vector, backend)



The different generated vectors using the multiprocessing backend are:
 [[10 10  2 14  6]
 [ 6 12  8  8 10]
 [ 5  4  4 10  5]
 [ 9  2  1 11  7]
 [14  4 12 10  5]
 [ 2 14  1 11 14]
 [ 3 14 11 12 13]
 [ 3  3  8  7  4]
 [ 8 13  3 13  5]
 [14  3  6  0  8]]


# Diferencias entre joblib y multiprocessing

Como hemos notado, tanto joblib como multiprocessing sirven para realizar procesos en paralelo empleando diferentes cores de nuestro CPU, sin embargo, entre ellas existen diferencias entre ambas librerías.
### Nivel:
- Multiprocessing: Librería de bajo nivel ==> Menos amigable para el usuario; mayor control y flexibilidad para con el proceso.
- Joblib: Librería de alto nivel ==> API más fácil de utilizar, menor control e ingerencia del usuario en el proceder

### Empleo:
- Multiprocessing: Aplicable a cualquier tipo de proceso de Python.
- Joblib: Aplicable principalmente a procesos que involucren computación científica o numérica, o que, en general, solo utilicen el CPU. 

### Alcance:
- Multiprocessing: Procesos más complejos que requieran de mayor control y flexibilidad. Ejemplo: Tasks que involucren I/O como lectura o escritura de archivos.
- Joblib: Procesos más simples (solo requieren del CPU) que priorizan cachear los resultados. Ejemplo: Tasks con datasets muy extensos.

### Codigo que ejemplicifica la diferencia de eficiencia entre joblib y multiprocessing

In [None]:
import time
import numpy as np
from joblib import Parallel, delayed
import multiprocessing as mp

# Define a function to be parallelized
def square(x):
    return x**2

# Define the input data
data = np.arange(100000)

# Run the function using joblib
start_time = time.time()
result1 = Parallel(n_jobs=4)(delayed(square)(i) for i in data)
joblib_time = time.time() - start_time

# Run the function using multiprocessing
start_time = time.time()
pool = mp.Pool(processes=4)
result2 = pool.map(square, data)
multiprocessing_time = time.time() - start_time

# Compare the results and time response
print("Joblib result:", result1[:10])
print("Multiprocessing result:", result2[:10])
print("Joblib time:", joblib_time)
print("Multiprocessing time:", multiprocessing_time)


## Busquemos la forma de aplicar Paralelización en dataframes, de manera que podamos iterar sobre las columnas para hacer operaciones

In [16]:
import pandas as pd
import numpy as np
from joblib import Parallel, delayed

df = pd.DataFrame({
    'col1': [1, 2, 3, 4, 5],
    'col2': [6, 7, 8, 9, 10]
})

columna1 = df['col1'].tolist()
print(df)

def sumar5(cell): 
    return cell +5

resultados = Parallel(n_jobs=4)(delayed(sumar5)(cell) for cell in columna1)



df['col1'] = resultados
print(df)

df.loc[df['col1']>= 8, 'col2'] = 10

print(df)



   col1  col2
0     1     6
1     2     7
2     3     8
3     4     9
4     5    10
   col1  col2
0     6     6
1     7     7
2     8     8
3     9     9
4    10    10
   col1  col2
0     6     6
1     7     7
2     8    10
3     9    10
4    10    10


### Corriendo cosas de joblib con memoria compartida, es decir que varios procesos afecten a un mismo objeto

In [29]:
## Nuestro ejemplo es con un grafo
from joblib import Parallel, delayed
graph =  {1: [2,3, 4], 
          2: [1, 3],
          3: [1, 2], 
          4: [1],
          5: []}

# Podemos hacer que todos los nodos menores o iguales a 3 estén conectados con el nodo 5, usando paralelización y memoria compartida

def connect(node1, node2):
    graph[node1].append(node2)
    graph[node2].append(node1)
    
Parallel(n_jobs = 4, require = 'sharedmem')(
        delayed(connect)(node, 5) for node in range(1,4))

print(graph)

## Volvamos a correr todo lo mismo pero sin memoria compartida y vemos que obtenemos un resultado distinto

graph =  {1: [2,3, 4], 
          2: [1, 3],
          3: [1, 2], 
          4: [1],
          5: []}

def connect(node1, node2):
    graph[node1].append(node2)
    graph[node2].append(node1)

Parallel(n_jobs = 4)(
    delayed(connect)(node, 5) for node in range(1, 4))

print(graph)

{1: [2, 3, 4, 5], 2: [1, 3, 5], 3: [1, 2, 5], 4: [1], 5: [1, 2, 3]}
{1: [2, 3, 4], 2: [1, 3], 3: [1, 2], 4: [1], 5: []}


1
