<a href="https://colab.research.google.com/github/miguelamda/cuda-numba/blob/master/4_Map_Reduce_en_CUDA.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# IV. Map-Reduce En CUDA

En programación funcional, las funciones de orden superior `map` y `reduce` son muy conocidas. Dada su potencia, se emplean de forma común en operaciones de tratamiento de datos en Big Data. Otra ventaja de estas operaciones es que son fácilmente paralelizables. Esto no es una excepción en las GPUs. Recuerda que estos dispositivos permiten escalar mejor la capacidad computacional en entornos distribuídos, ya que las tarjetas gráficas proveen a cada nodo con otro nivel más de paralelismo masivo.

En concreto, `map` y `reduce` son conocidas como primitivas paralelas de la GPU. Hay otras primitivas paralelas para la GPU, pero no las cubriremos en este curso: `prefix sum`, `histogram`, `stencil`, `binning`, etc.

Adentrémonos un poco en el mundo de la **algorítmica paralela**.



## 1.1. Map

Map es una función de orden superior. Esto se puede entender como que es una operación que recibe por parámetro otra función *unaria*, $f$. Después, aplica esa función a cada elemento de una colección. 

![seqmap](https://upload.wikimedia.org/wikipedia/commons/0/06/Mapping-steps-loillibe-new.gif)

Hablemos un poco del *orden de complejidad* de la operación `map`. Usaremos para ello la notacion $O$, que nos indica cómo escala el tiempo de ejecución con respecto al tamaño de la entrada. En concreto, la operación `map` es $O(n)$. Esto significa que el tiempo de ejecutar la operación es proporcional al tamaño de la entrada, $n$. Piénsalo bien, tenemos que hacer un bucle para recorrer todos los elementos, y si tardaramos $m$ mili-segundos en aplicar la función $f$ a un elemento, entonces tardaríamos $m\cdot n$ mili-segundos. 

Sin embargo, si somos capaces de aplicar cada evaluación de $f$ totalmente en paralelo, el orden de complejidad sería $O(1)$, es decir, constante. En otras palabras, da igual la entrada que tengamos, siempre va a tardar $m$ mili-segundos (independientemente de $n$, da igual $n$ sea 100 o 1.000.000 elementos). Sin embargo, en computación paralela también hay que lidiar con la limitación de recursos, por lo que es difícil tener todos los procesadores necesarios para alcanzar ese nivel de paralelismo. Si una GPU contiene $p$ procesadores, está claro que la carga se distribuye pero aún así tardaríamos $\frac{n}{p}\cdot m$ mili-segundos (asumiendo $n \geq p$). Es decir, tendríamos un orden de complejidad $O(\frac{n}{p})$. Aún así, ya es un resultado mucho mejor comparado con tal solo tener un procesador, y es que recuerda, una GPU puede contener hasta 3000 núcleos (a día de hoy).

![parallelmap](https://github.com/miguelamda/cuda-numba/raw/master/img/parallelmap.png)

En cierto modo, ya hemos visto como se puede implementar la operación `map` con Numba, y es a través de **ufunc**. Recuerda, estas operaciones se definen a nivel de elemento, y Python automáticamente escala la operación a todos los elementos de un array de Numpy.

Veamos un ejemplo como el de la figura anterior, donde se suma un escalar a todos los elementos de un array:



In [1]:
from numba import vectorize
import numpy as np

# El vector de entrada como en el ejemplo anterior
x = np.array([0, 5, 8, 3, 2, 1]).astype(np.int32)

# Definamos la función f unaria como el ejemplo anterior 
@vectorize(['int32(int32)'], target='cuda')
def f(x):
    return x + 1

# Automáticamente, Numpy aplica la función mediante broadcast, y Numba lo
# paraleliza en la GPU
xp = f(x)

xp

array([1, 6, 9, 4, 3, 2], dtype=int32)

Aprovechando el sistema de broadcasting de Python, podemos aplicar parametrizar la función suma con una constante *n*:

In [2]:
# Constante a ser configurada para la función.

# Definamos una función f binaria, pero cuyo segundo parámetro será usado para n
@vectorize(['int32(int32,int32)'], target='cuda')
def fp(x,n):
    return x + n

xp2 = fp(xp,10)

xp2

array([11, 16, 19, 14, 13, 12], dtype=int32)

Esta solución que hemos presentado sería ineficiente. Habría sido mejor hacer uso de arrays en memoria de GPU de forma explícita, para evitar copiar el vector xp al host, y de nuevo volverlo a enviar a device. Vamos a hacer una prueba con un array mucho más grande.

In [0]:
# vamos a crear unos datos más grandes
n = 500000000
x = np.arange(n).astype(np.int32)

El código siguiente demuestra la típica estructura de un código CUDA eficiente en el tratamiento de los datos. Esto ya lo vimos en el notebook anterior, pero conviene recordarlo:
1. Copiar los datos a la GPU (device): `cuda.to_device()`
2. Reservar toda la memoria auxiliar en la GPU: `cuda.device_array`
3. Lanzar código GPU (ufunc o kernel): `@vectorize` o `@cuda.jit`
4. Recuperar los resultados hacia la CPU (host): `cuda.copy_to_host()`

In [4]:
%%time

from numba import cuda

# 1. Copiar entrada a la GPU
x_device = cuda.to_device(x)

# 2. Inicializar los arrays auxiliares
xp_device = cuda.device_array(shape=x_device.shape, dtype=x_device.dtype)
xp2_device = cuda.device_array(shape=x_device.shape, dtype=x_device.dtype)

# 3. Lanzamos las funciones map
f(x_device, out=xp_device)
fp(xp_device, -1, out=xp2_device)

# 4. Copiamos el resultado a la CPU
xp2 = xp2_device.copy_to_host()

CPU times: user 512 ms, sys: 62.7 ms, total: 575 ms
Wall time: 578 ms


In [5]:
xp2[1:10]

array([1, 2, 3, 4, 5, 6, 7, 8, 9], dtype=int32)

Veamos, de todo el tiempo total que ha tardado todo el workflow, cuánto ha tardado las funciones en sí.

In [6]:
%timeit f(x_device, out=xp_device)

1000 loops, best of 3: 27.3 ms per loop


In [7]:
%timeit fp(xp_device, -1, out=xp2_device)

The slowest run took 945.75 times longer than the fastest. This could mean that an intermediate result is being cached.
1 loop, best of 3: 28.9 ms per loop


Ya puedes intuir que la mayoría del tiempo se ha dedicado al traspaso de datos. Recuerda, esas copias solo son necesarias al comienzo y al final. Podemos concluir que la GPU compensa, generalmente, en las siguientes situaciones:
* La proporción y/o la complejidad de las operaciones a realizar es muy grande, hasta el punto que compensa esa copia de memoria.
* El tamaño de los arrays es muy grande.
* Existe un paralelismo de datos masivo: se puede definir como una función `map`. 

## 1.2 Reduce

La operación de reducción (`reduce`) es aquella en que dada una colección de elementos (un array), y una función $f$ binaria (de dos argumentos), aplica dicha función para obtener un valor escalar sobre todos los datos. Por ejemplo, si $f(a,b)=a+b$, entonces lo que estaríamos haciendo es simplemente sumar todos los elementos del array. En este caso, como en la operación `map`, estaríamos teniendo un orden de complejidad $O(n)$, con $n$ el número de elementos de entrada.

![reduce](https://cdn.kodigoswift.com/wp-content/uploads/2017/07/funcion-reduce.gif)

Pero, ¿esta operación se puede paralelizar? A diferencia del `map`, el resultado en una iteración del bucle (el que recorre los elementos) depende de los resultados anteriores: hay que calcular el valor acumulado. En el ejemplo de la suma, el valor resultante de sumar los dos primeros elementos se debe pasar para sumárselo al tercer elemento. Esto tiene pinta que no se puede paralelizar para reducir la complejidad a $O(1)$.

Este es un claro ejemplo de que no siempre podemos utilizar el paralelismo para lograr reducir el orden de complejidad de un orden lineal $O(n)$ a uno constante $O(1)$, pero esto no quiere decir que podamos aún conseguir una mejora. El paralelismo se puede aprovechar de otras muchas formas. Pensemos un momento, ¿cómo podríamos paralelizar de alguna forma el cálculo de la suma de elementos de un vector? El siguiente dibujo lo ilustra:

![parallelsum](https://github.com/miguelamda/cuda-numba/raw/master/img/parallelsum.png)

Curioso, no conseguimos hacer todo el resultado en un solo paso, pero sí que lo podemos conseguir en menos iteraciones que recorriendo elemento a elemento. En concreto, necesitamos un número logarítmico de pasos, por lo que la complejidad se ha reducido de $O(n)$ a $O(log_2 n)$. Aunque no parezca demasiado, piénsalo: si el array de entrada contiene 8 elementos, necesitamos 3 pasos (tampoco mucho), pero si contiene 65536 elementos, tan solo necesitamos 16 pasos! Cuantos más elementos tenga el array de entrada, más notaremos la mejora de hacer el `reduce` en paralelo.

Esto ha funcionado porque la suma es una operación binaria conmutativa y asociativa. Otras operaciones que podemos aplicar son: *suma, máximo, mínimo, producto, and lógico, or lógico, etc.*. Operaciones que no podríamos usar serían la resta y la división. En general, el esquema de la reducción en paralelo queda como sigue:

![parallelreduce](https://github.com/miguelamda/cuda-numba/raw/master/img/parallelreduce.png)

Veamos cómo podemos aplicar un `reduce` en CUDA. Para ello, tenemos que definir nuestra operación binaria asociativa con el decorador `@cuda.reduce`:

In [8]:
import numpy
from numba import cuda

# Definamos la función binaria a usar con la reducción, y se lo indicamos a CUDA
@cuda.reduce
def sum_reduce(a, b):
    return a + b

A = (numpy.arange(1234, dtype=numpy.float32)) + 1
expect = A.sum()      # Reducción con numpy sum
got = sum_reduce(A)   # Reducción con sum en CUDA
assert expect == got  # Comprobemos que sale igual
print(got)            # Efectivamente, devuelve un escalar

761995.0


Esto es tan solo un pequeño ejemplo, donde quizás incluso la GPU no valga la pena por el tamaño del array que estamos considerando. Veamos otro ejemplo, el reescalado de datos. Aquí debemos recalcular el valor con $x_{new}=\frac{x-X_{min}}{X_{max}-X_{min}}$. Aquí tenemos que calcular tanto el máximo como el mínimo con reduce, y después con map actualizar los valores.

In [0]:
# Es posible que ahora nos quedemos sin memoria en GPU CUDA_ERROR_OUT_OF_MEMORY,
# o incluso sin memoria RAM.
# Si es así, reinicia el entorno de ejecución, descomenta las siguientes líneas,
# y vuelve a evaluar esta celda ya las siguientes
# import numpy as np
# n = 500000000
# x = np.arange(n).astype(np.int32)

In [0]:
# Reescalado de datos
from numba import vectorize 
from numba import cuda 

# Definimos max y min para ser usados con reduce
@cuda.reduce
def max_reduce(a, b):
    return max(a,b)

@cuda.reduce
def min_reduce(a, b):
    return min(a,b)

# Definimos la función para normalizar
@vectorize(['float32(int32,int32,int32)'], target='cuda')
def reescale(x,maxim,minim):
    return (x - minim)/(maxim-minim)

Apliquemos las funciones. Con `reduce`, al menos en Numba hasta hoy, siempre recogemos el valor resultante en la CPU.

In [5]:
%%time 
# 1. Copiar entrada a la GPU (usamos la misma x de la sección 1.1)
d_x = cuda.to_device(x)
d_xr = cuda.device_array(shape=d_x.shape, dtype=np.float32)

# 2. Lanzamos las funciones reduce para calcular el min y el max
xmax = max_reduce(d_x)
xmin = min_reduce(d_x)

# 4. Aplicamos el map para reescalar los valores, y copiamos el resultado a la vez
reescale(d_x,xmax,xmin, out=d_xr)

xr = d_xr.copy_to_host()

CPU times: user 619 ms, sys: 787 ms, total: 1.41 s
Wall time: 1.41 s


In [6]:
# Veamos una muestra del resultado
xr[4000:4010]

array([8.000e-06, 8.002e-06, 8.004e-06, 8.006e-06, 8.008e-06, 8.010e-06,
       8.012e-06, 8.014e-06, 8.016e-06, 8.018e-06], dtype=float32)

Hagamos una comparativa con Numba sobre CPU.

In [0]:
from numba import jit

@jit(nopython=True)
def reescalado(x):
    xmin = np.amin(x)
    xmax = np.amax(x)
    
    return (x-xmin)/(xmax-xmin)

In [9]:
%%time

xr = reescalado(x)

CPU times: user 1.47 s, sys: 2.33 s, total: 3.8 s
Wall time: 3.82 s


In [4]:
%%time 
xmax = x.max()
xmin = x.min()

xr = (x - xmin)/(xmax-xmin)

CPU times: user 2.36 s, sys: 2.26 s, total: 4.62 s
Wall time: 4.63 s


# Ejercicio propuesto

Supongamos que recibimos una serie de datos en un array y necesitamos normalizarlos; es decir: $x_{new}=\frac{x-x_{mean}}{x_{stdev}}$. 
Normalizar dichos datos mediante operaciones map/reduce en CUDA.

In [0]:
# Datos de entrada
n=100000000
x = np.random.uniform(-3, 3, size=n).astype(np.float32)

In [0]:
# Define aquí las funciones de GPU

In [0]:
%%time

# Define aquí el workflow de GPU para un mejor manejo de la memoria

In [0]:
# Visualiza aquí los primeros 10 valores para ver si ha ido bien

In [0]:
# Escribe aquí una función parecida con Numba para CPU

In [0]:
%%time

# Evalúa aquí el rendimiento

In [0]:
# Imprime los diez primeros elementos para comprobar si ha ido bien