# 7. Paralelización en Python: Dask

Dask es un paquete que nos permite palelizar algunas operaciones de manera sencilla sin necesidad de alterar nuestro código de manera notable. Esto lo logra mediante la partición de los datos en `chunks` o bloques que entran en la memoria de nuestro sistema para su procesado independiente. Su función extiende aún mas las capacidades de numpy y pandas para trabajar con big data y, para nuestro interés, se integra de manera eficiente con xarray para el manejo de _big data_ en geociencias.

Antes de abordar el uso en geociencias, vamos a revisar un poco como es que dask funciona sobre los elementos base de numpy y pandas ya que esto nos ayudará a comprender su comportamiento con xarray.

In [None]:
import matplotlib.pyplot as plt
%matplotlib inline

## 7.1 Dask Arrays

Un arreglo de Dask es, en esencia, una combinación de arreglos de numpy los cuales son ordenados en bloques dentro de una grilla. La mayoría de las funciones que numpy ofrece tambien se encuentran disponibles en dask. Las formas disponibles para construir arreglos de dask son iguales a las de numpy con la diferencia que se tiene que especificar el parámetro `chunks` para indicarle a dask como debe de fraccionar los datos.

In [None]:
import dask.array as da

In [None]:
data = da.random.random((1000,1000), chunks=(100,100))
data

Mediante la información provista por dask podemos observar como este fraccionamiento sucede junto a cuanto espacio ocupa en la memoria cada bloque. Algo que debemos entender es que si bien parece que hemos creado un arreglo de datos, en realidad solo hemos creado las instrucciones que dask debe seguir para crear los datos. Esta forma de trabajar de dask se denomina _perezosamente_, asi podemos decir que hemos creado un arreglo de manera perezosa.

Toda operación realizada sobre este arreglo de dask formara una serie de instrucciones (gráficos computacionales) que se iran agregando en cadena hasta el momento que deseemos realizar los cálculos.

In [None]:
res = data.mean()
res

In [None]:
res.visualize()

Del gráfico computacional obtenido podemos observar que dask va operando individualmente sobre los bloques para luego ir agregando nuestros resultados. Si queremos obtener el resultado deberemos llamar al método `compute` para que se realicen los cálculos.

In [None]:
%time res.compute()

Podemos trabajar sobre los arreglos de dask tal y como se trabajarían con arreglos de numpy.

In [None]:
res = (data - data.mean(axis=1, keepdims=True))/data.std(axis=1, keepdims=True)
res

In [None]:
# El grafico computacional es tan grande que mejor sera visualizarlo como pdf
res.visualize('norm_dask.pdf')

Podemos llamar a `compute` sobre este arreglo para obtener el resultado como arreglo de numpy.

Para poder ver el progreso de los calculos realizados por dask haremos uso de `distributed`, una herramienta que consiste de un planificador o _scheduler_ el cual distribuye el trabajo entre varios nodos computacionales de manera eficiente ([más información](https://distributed.dask.org/en/latest/)) el cual puede escalar desde una laptop hasta un cluster. Al usar el módulo `Client` estaremos creando un cluster local en donde cada núcleo de nuestra pc será un trabajador.

In [None]:
from dask.distributed import Client
# Correr esta celda solo una vez
# en caso de ejecutarla mas de una vez
# reiniciar el kernel del notebook
client = Client()
client

Podemos acceder al link que nos muestra el widget para acceder al panel de información de nuestro cluster local. En caso de tener instalado la extensión de dask para Jupyter, puede acceder a todos estos paneles directamente en Jupyter.

Al usar `compute` veremos como dask ejecuta las instrucciones guardadas en el gráfico computacional para otorgarnos el resultado

In [None]:
res.compute()

## 7.2 Dataframes

## 7.3 Procesos personalizados

Si bien podemos hacer uso de dask sobre numpy y pandas, muchas veces tenemos funciones que operan de manera individual o bucles los cuales funcionarían mejor si estuvieran paralelizados. Dask permite paralelizar perezosamente bucles y funciones mendiante el decorador `delayed`. ([info](https://docs.dask.org/en/latest/delayed.html))

In [None]:
# Ejemplo tomado de la documentación de dask
# https://docs.dask.org/en/latest/delayed.html
import dask

@dask.delayed
def inc(x):
    return x + 1

@dask.delayed
def double(x):
    return x + 2

@dask.delayed
def add(x, y):
    return x + y

data = [1, 2, 3, 4, 5]

output = []
for x in data:
    a = inc(x)
    b = double(x)
    c = add(a, b)
    output.append(c)

total = dask.delayed(sum)(output)

In [None]:
total.visualize()

In [None]:
total.compute()

## 7.4 Integración con xarray

Frecuentemente nos encontramos con registros diarios, mensuales o anuales, guardados en archivos separados, los cuales cubren un periodo extenso de tiempo sobre el cual nos gustaria operar. Gracias a la integración de dask con xarray, esto es facilmente manejable usando la función `open_mfdataset` la cual leerá el metadato de todos nuestros archivos para finalmente otorgarnos un objeto de dask sobre el cual trabajar.

In [None]:
import xarray as xr

# En caso de tener archivos almacenados localmente
# reemplazar el string por la ubicación de los datos
# data = xr.open_mfdataset("PATH/TO/FILES/*.nc")

Tambien podemos leer datos desde la nube de manera perezosa gracias al [catálogo](https://pangeo-data.github.io/pangeo-datastore/index.html) que proporciona el proyecto [Pangeo](http://pangeo.io/)

In [None]:
import intake
cat = intake.Catalog("https://raw.githubusercontent.com/pangeo-data/pangeo-datastore/master/intake-catalogs/ocean.yaml")
copernicus = cat["sea_surface_height"].to_dask()
copernicus

Nuestra estructura de xarray se encuentra poblada de metadatos y etiquetas de la manera usual, la unica diferencia es que los datos son arreglos de dask los cuales no han cargado los datos realmente.

Este es un ejemplo de _big data_ considerando que nuestras laptops a lo mucho tienen 12GB de memoria. Para comprobar el tamaño de nuestros datos, hacemos uso de la propiedad `nbytes` que xarray hereda de numpy.

In [None]:
print(f"Los datos cargados tienen un tamaño de {copernicus.nbytes/1e9:.2f} GB")

Vamos a realizar algunos ejercicios básicos de selección y agregación para apreciar de primera mano como dask trabaja. Si han ejecutado la celda en donde se declara el `Client` para usar nuestra pc como cluster entonces todas las operaciones que realicemos con dask de ahora en adelante seran ancladas a este distribuidor de tareas.

In [None]:
sla = copernicus.sla
sla

In [None]:
(
    sla
    .sel(latitude=0, method='nearest')
    .sel(longitude=slice(140, 280), time=slice("2015-01-01","2015-05-01"))
    .plot()
)