## Reduction operation: the sum of the numbers in the range [0, value)

In [1]:
import numpy as np

def reduc_operation(A):
    """Compute the sum of the elements of Array A in the range [0, value)."""
    s = 0
    for i in range(A.size):
        s += A[i]
    return s

# Secuencial

value = 5*10**4

X = np.random.rand(value)

# Para imprimir los pimeros valores del array

# print(X[0:12])

# Utilizando las operaciones mágicas de ipython

tiempo = %timeit -r 2 -o -q reduc_operation(X)

print("Time taken by reduction operation using a function:", tiempo)


print(f"And the result of the sum of numbers in the range [0, value) is: {reduc_operation(X)}\n")


# Utilizando numpy.sum()

tiempo = %timeit -r 2 -o -q np.sum(X)

print("Time taken by reduction operation using numpy.sum():", tiempo)

print("Now, the result using numpy.sum():", np.sum(X),"\n ")


# Utilizando numpy.ndarray.sum()

tiempo= %timeit -r 2 -o -q X.sum()

print("Time taken by reduction operation using numpy.ndarray.sum():", tiempo)

print("Now, the result using numpy.ndarray.sum():", X.sum())

Time taken by reduction operation using a function: 2.62 ms ± 8.27 µs per loop (mean ± std. dev. of 2 runs, 100 loops each)
And the result of the sum of numbers in the range [0, value) is: 24995.051343753086

Time taken by reduction operation using numpy.sum(): 7.88 µs ± 8.1 ns per loop (mean ± std. dev. of 2 runs, 100,000 loops each)
Now, the result using numpy.sum(): 24995.051343752984 
 
Time taken by reduction operation using numpy.ndarray.sum(): 7.14 µs ± 0.478 ns per loop (mean ± std. dev. of 2 runs, 100,000 loops each)
Now, the result using numpy.ndarray.sum(): 24995.051343752984


a) Librería multiprocessing: En la siguiente celda de código del notebook4 vas a utilizar el paquete multiprocessing para acelerar la operación de reducción. Para ello, importa Pool de la librería multiprocessing, y con la función map llama a la función reduc_operation creando solo un proceso con el tamaño del array original de [0, value], creando 2 procesos llamándolos con 2 arrays que tienen la copia de los valores del array original pero de tamaños [0, int(value/2)] y [int(value/2), value)], y creando 4 procesos que llaman a la función con 4 arrays que tienen la copia de los valores del array original pero de tamaños [0, int(value/4)], [int(value/4), int(value/2)], [int(value/2), int(3*value/4)] y [int(3*value/4), value]. Como verás, el tiempo para calcular la suma va disminuyendo cada vez
 que duplicas el número de procesos (cores) que usas.

In [4]:
import numpy as np
from multiprocessing import Pool

# Definimos la función para la operación de reducción
def reduc_operation(A):
    """Compute the sum of the elements of Array A in the range [0, value)."""
    s = 0
    for i in range(A.size):
        s += A[i]
    return s

# Secuencial
value = 5 * 10**4  # Tamaño del array
X = np.random.rand(value)  # Array con valores aleatorios

# Tiempo secuencial
print("Tiempo para la operación de reducción secuencial:")
tiempo = %timeit -r 2 -o -q reduc_operation(X)
print("Time taken by reduction operation using a single process:", tiempo)
print(f"Resultado de la suma secuencial: {reduc_operation(X)}\n")


# Divisiones del array para multiprocessing
def dividir_array(value, num_procesos):
    """Divide el array en `num_procesos` subarrays según el esquema definido."""
    if num_procesos == 1:
        return [X[0:value]]  # Una sola partición [0, value]
    elif num_procesos == 2:
        mitad = int(value / 2)
        return [X[0:mitad], X[mitad:value]]  # Dos particiones [0, mitad] y [mitad, value]
    elif num_procesos == 4:
        cuarto = int(value / 4)
        mitad = int(value / 2)
        tres_cuartos = int(3 * value / 4)
        return [X[0:cuarto], X[cuarto:mitad], X[mitad:tres_cuartos], X[tres_cuartos:value]]  # Cuatro particiones
    else:
        raise ValueError("Número de procesos no soportado. Use 1, 2 o 4.")

# Utilizando multiprocessing con 1, 2 y 4 procesos
for num_procesos in [1, 2, 4]:
    print(f"\nTiempo para la operación de reducción con {num_procesos} procesos:")

    # Dividimos el array según el número de procesos
    subarrays = dividir_array(value, num_procesos)

    # Definimos una función para ejecutar multiprocessing
    def run_parallel_reduction():
        with Pool(processes=num_procesos) as pool:
            # Usamos map para aplicar reduc_operation a cada subarray
            resultados = pool.map(reduc_operation, subarrays)
        return sum(resultados)

    # Medimos el tiempo usando %timeit
    tiempo = %timeit -r 2 -o -q run_parallel_reduction()
    resultado = run_parallel_reduction()

    print(f"Time taken by reduction operation using {num_procesos} processes: {tiempo}")
    print(f"Resultado de la suma con {num_procesos} procesos: {resultado}")

Tiempo para la operación de reducción secuencial:
Time taken by reduction operation using a single process: 84.5 ms ± 831 µs per loop (mean ± std. dev. of 2 runs, 10 loops each)
Resultado de la suma secuencial: 24950.801135787468


Tiempo para la operación de reducción con 1 procesos:
Time taken by reduction operation using 1 processes: 119 ms ± 190 µs per loop (mean ± std. dev. of 2 runs, 10 loops each)
Resultado de la suma con 1 procesos: 24950.801135787468

Tiempo para la operación de reducción con 2 procesos:
Time taken by reduction operation using 2 processes: 91 ms ± 361 µs per loop (mean ± std. dev. of 2 runs, 10 loops each)
Resultado de la suma con 2 procesos: 24950.801135787482

Tiempo para la operación de reducción con 4 procesos:
Time taken by reduction operation using 4 processes: 111 ms ± 3.71 ms per loop (mean ± std. dev. of 2 runs, 10 loops each)
Resultado de la suma con 4 procesos: 24950.801135787307


b) Libreria Numba: En la siguiente celda de código vamos a utilizar el paquete numba. Importa de dicho paquete njit y prange, y haz 2 nuevas versiones de la función original reduc_operation(A) decorándolas con el decorador @njit y @njit(parallel=True)5 respectivamente. Como ya vimos en la práctica anterior, solo con utilizar @njit se mejora mucho el tiempo de ejecución, pero si además ejecutamos en paralelo aún se nota una mayor reducción de tiempo.

In [7]:
import numpy as np
from numba import njit, prange

# Función original (sin optimización)
def reduc_operation(A):
    """Compute the sum of the elements of Array A in the range [0, value)."""
    s = 0
    for i in range(A.size):
        s += A[i]
    return s

# Optimización con @njit
@njit
def reduc_operation_njit(A):
    """Compute the sum of the elements of Array A using Numba's njit."""
    s = 0
    for i in range(A.size):
        s += A[i]
    return s

# Optimización con @njit(parallel=True) y prange
@njit(parallel=True)
def reduc_operation_parallel(A):
    """Compute the sum of the elements of Array A using Numba's parallel njit."""
    s = 0.0
    # Paralelizamos el bucle con prange
    for i in prange(A.size):
        s += A[i]
    return s


# Tamaño del array
value = 5 * 10**6  # Un tamaño suficientemente grande para observar diferencias
X = np.random.rand(value)  # Array aleatorio de valores entre 0 y 1

# Comparación de tiempos
print("Tiempo para la función original:")
tiempo = %timeit -r 2 -o -q reduc_operation(X)
print("Tiempo:", tiempo)
print(f"Resultado:", reduc_operation(X), "\n")

print("Tiempo para la función con @njit:")
tiempo = %timeit -r 2 -o -q reduc_operation_njit(X)
print("Tiempo:", tiempo)
print(f"Resultado:", reduc_operation_njit(X), "\n")

print("Tiempo para la función con @njit(parallel=True):")
tiempo = %timeit -r 2 -o -q reduc_operation_parallel(X)
print("Tiempo:", tiempo)
print(f"Resultado:", reduc_operation_parallel(X), "\n")

Tiempo para la función original:
Tiempo: 8.24 s ± 22.5 ms per loop (mean ± std. dev. of 2 runs, 1 loop each)
Resultado: 2500385.989518582 

Tiempo para la función con @njit:
Tiempo: 20.4 ms ± 15.7 µs per loop (mean ± std. dev. of 2 runs, 1 loop each)
Resultado: 2500385.989518582 

Tiempo para la función con @njit(parallel=True):
Tiempo: 7.8 ms ± 911 µs per loop (mean ± std. dev. of 2 runs, 1 loop each)
Resultado: 2500385.9895190033 



c) Finalmente, para que podamos variar el número de elementos con mayor comodidad, haz una modificación en el notebook original que permita darle el valor del número de elementos (variable value) por la línea de comandos al lanzar a ejecutar el notebook con el gestor de colas sbatch. Una vez hecho esto, vamos a lanzar a ejecutar dicho notebook a otra cola de tu elección por medio del intérprete ipython. Recuerda que dicho notebook debe estar en tu directorio de trabajo de ibsen (subdirectorio lab-gpu) y en el directorio de trabajo de la correspondiente cola. Una vez hecho esto, crea el shell script que va a usar el comando sbatch para lanzar con SLURM a dicha cola y medir el tiempo de ejecución. Llama a este fichero submit_reduc_oper-Par_machine-name-login.sh6. Lánzalo con el valor de 5 ∗107 elementos.

In [1]:
import sys
import numpy as np
from numba import njit, prange

# Detectar si estamos en un entorno interactivo (como IPython) o no
if 'ipykernel' in sys.modules:
    # Estamos en un entorno interactivo (IPython), no usamos los argumentos de la línea de comandos
    value = 5 * 10**4
    print(f"Valor de 'value' (modo interactivo) establecido en: {value}")
else:
    # Estamos en un entorno no interactivo (ejecución desde sbatch o línea de comandos)
    if len(sys.argv) > 1:
        try:
            value = int(sys.argv[1])  # Toma el valor del argumento como el tamaño del array
        except ValueError:
            print("Error: El valor proporcionado para 'value' no es un número entero válido.")
            sys.exit(1)  # Salir con error
    else:
        value = 5 * 10**4  # Valor por defecto si no se pasa ningún argumento
    print(f"Valor de 'value' establecido en: {value}")

# Crear un array de tamaño 'value'
X = np.random.rand(value)

# Función original y versiones optimizadas con @njit
def reduc_operation(A):
    s = 0
    for i in range(A.size):
        s += A[i]
    return s

@njit
def reduc_operation_njit(A):
    s = 0
    for i in range(A.size):
        s += A[i]
    return s

@njit(parallel=True)
def reduc_operation_parallel(A):
    s = 0.0
    for i in prange(A.size):
        s += A[i]
    return s

# Pruebas de tiempos
print("Tiempo para la función original:")
print(f"Resultado:", reduc_operation(X))

print("Tiempo para la función con @njit:")
print(f"Resultado:", reduc_operation_njit(X))

print("Tiempo para la función con @njit(parallel=True):")
print(f"Resultado:", reduc_operation_parallel(X))

Valor de 'value' (modo interactivo) establecido en: 50000
Tiempo para la función original:
Resultado: 25021.667500700056
Tiempo para la función con @njit:
Resultado: 25021.667500700056
Tiempo para la función con @njit(parallel=True):
Resultado: 25021.667500700027


d) Crea una nueva celda de texto debajo de la última celda de código para mostrar y explicar los resultados obtenidos por los 2 paquetes utilizados (multiprocessing y numba).

En este ejercicio, se compararon dos enfoques para optimizar una operación de reducción: multiprocessing y numba. El uso de multiprocessing mostró una mejora en el tiempo de ejecución al aumentar el número de procesos, pero después de cierto punto, la sobrecarga de gestión de los procesos hizo que el rendimiento no mejorara significativamente. Por otro lado, numba demostró ser mucho más eficiente, especialmente al utilizar la compilación Just-in-Time (JIT) con @njit y la paralelización de hilos con @njit(parallel=True). La optimización con numba redujo considerablemente los tiempos de ejecución, destacándose especialmente cuando se utilizó la paralelización. En resumen, para este tipo de operaciones de reducción en arrays, numba resultó ser la opción más eficaz y rápida, superando a multiprocessing en términos de rendimiento.