<img src="dask_horizontal.svg" style="height:200px" />

## *Because not all problems are dataframes...*

Fuente: https://examples.dask.org

In [None]:
import dask.dataframe as dd
import dask.array as da

In [None]:
from dask_session import DaskSession

In [None]:
import time
import random
import dask

# ¿Por qué Dask?

### Los analistas suelen utilizar herramientas como Pandas, Scikit-Learn, Numpy y el resto del ecosistema de Python para analizar datos en su computadora personal. Les gustan estas herramientas porque son eficientes, intuitivas y de gran confianza. Sin embargo, cuando eligen aplicar sus análisis a conjuntos de datos más grandes, descubren que estas herramientas no fueron diseñadas para escalar más allá de una sola máquina. Y así, el analista reescribe su cálculo utilizando una herramienta más escalable, a menudo en otro idioma. Este proceso de reescritura ralentiza el descubrimiento y causa frustración.

### Dask proporciona formas de escalar los flujos de trabajo de Pandas, Scikit-Learn y Numpy de forma más nativa, con una reescritura mínima. Se integra bien con estas herramientas, por lo que copia la mayor parte de su API y utiliza sus estructuras de datos internamente. Además, Dask se ha desarrollado conjuntamente con estas bibliotecas para garantizar que evolucionen de manera constante, minimizando la fricción al pasar de una computadora portátil local a una estación de trabajo de varios núcleos y luego a un clúster distribuido. Los analistas familiarizados con Pandas / Scikit-Learn / Numpy se familiarizarán inmediatamente con sus equivalentes de Dask, y gran parte de su intuición se trasladará a un contexto escalable.

Fuente: https://docs.dask.org/en/latest/why.html

In [None]:
init_session = DaskSession()
init_session.client

# Dask escala a clústeres
### Como los conjuntos de datos y los cálculos se escalan más rápido que las CPU y la RAM, necesitamos encontrar formas de escalar nuestros cálculos en varias máquinas. Esto introduce muchas preocupaciones nuevas:

- ¿Cómo hacer que las computadoras se comuniquen entre sí a través de la red?
- ¿Cómo y cuándo mover datos entre máquinas?
- ¿Cómo recuperarse de las fallas de la máquina?
- ¿Cómo implementar en un clúster interno?
- ¿Cómo implementar en la nube?
- ¿Cómo implementar en una supercomputadora HPC?
- ¿Cómo proporcionar una API a este sistema que los usuarios encuentren intuitiva?
- ...


### Dask resuelve los problemas anteriores. Descubre cómo dividir grandes cálculos y enrutar partes de ellos de manera eficiente en hardware distribuido. Dask se ejecuta habitualmente en clústeres de miles de máquinas para procesar cientos de terabytes de datos de manera eficiente dentro de entornos seguros.

### Dask tiene utilidades y documentación sobre cómo implementar internamente, en la nube o en supercomputadoras HPC. Admite cifrado y autenticación mediante certificados TLS / SSL. Es resistente y puede manejar el fallo de los nodos de trabajo con elegancia y es elástico, por lo que puede aprovechar los nuevos nodos agregados sobre la marcha. Dask incluye varias API de usuario que son utilizadas y perfeccionadas por miles de investigadores de todo el mundo que trabajan en diferentes dominios.

Fuente: https://docs.dask.org/en/latest/why.html

In [None]:
data_path = 'set_your_data_path'

dd.read_parquet(data_path).to_dask_array()

In [None]:
dd.read_parquet(data_path).compute()

In [None]:
dd.read_parquet(data_path).to_dask_array().compute()

# Dask admite aplicaciones complejas
### Algunos cálculos paralelos son simples y simplemente aplican la misma rutina en muchas entradas sin ningún tipo de coordinación. Son fáciles de paralelizar con cualquier sistema.

### Se pueden expresar cálculos algo más complejos con el patrón map-shuffle-reduce popularizado por Hadoop y Spark. Esto suele ser suficiente para realizar la mayoría de las tareas de limpieza de datos, consultas de estilo de base de datos y algunos algoritmos ligeros de aprendizaje automático.

### Sin embargo, existen cálculos paralelos más complejos que no encajan en estos paradigmas, por lo que son difíciles de realizar con las tecnologías tradicionales de big data. Estos incluyen algoritmos más avanzados para estadísticas o aprendizaje automático, series de tiempo u operaciones locales, o paralelismo personalizado que a menudo se encuentra dentro de los sistemas de las grandes empresas.

### Hoy en día, muchas empresas e instituciones tienen problemas que son claramente paralelizables, pero no claramente transformables en un gran cálculo de DataFrame. Hoy en día, estas empresas tienden a resolver sus problemas escribiendo código personalizado con sistemas de bajo nivel como MPI, ZeroMQ o sockets y sistemas de cola complejos, o introduciendo su problema en una tecnología estándar de big data como MapReduce o Spark, y esperando el mejor.

### Dask ayuda a resolver estas situaciones al exponer API de bajo nivel a su programador de tareas interno, que es capaz de ejecutar cálculos muy avanzados. Esto brinda a los ingenieros dentro de la institución la capacidad de construir su propio sistema de computación en paralelo utilizando el mismo motor que impulsa las matrices, los DataFrames y los algoritmos de aprendizaje automático de Dask, pero ahora con la propia lógica personalizada de la institución. Esto permite a los ingenieros mantener la lógica empresarial compleja internamente mientras siguen confiando en Dask para manejar la comunicación de red, el equilibrio de carga, la resistencia, los diagnósticos, etc.

Fuente: https://docs.dask.org/en/latest/why.html

In [None]:
def inc(x):
    time.sleep(random.random())
    return x + 1

def dec(x):
    time.sleep(random.random())
    return x - 1

def add(x, y):
    time.sleep(random.random())
    return x + y

In [None]:
%%time
x = inc(1)
y = dec(2)
z = add(x, y)
z

### Ahora vamos a usar el decorador @dask para aplicar el método delayed y generar un comportamiento lazy. Dask.delayed es una forma simple y poderosa de paralelizar el código existente. Permite a los usuarios retrasar las llamadas a funciones en un gráfico de tareas con dependencias. Dask.delayed no proporciona ningún algoritmo paralelo elegante como Dask.dataframe, pero le da al usuario un control completo sobre lo que quiere construir.

### Los sistemas como Dask.dataframe se crean con Dask.delayed. Si tiene un problema que es paralelizable, pero que no es tan simple como una gran matriz o un gran marco de datos, entonces dask.delayed puede ser la opción correcta para usted.

In [None]:
@dask.delayed
def inc(x):
    time.sleep(random.random())
    return x + 1

@dask.delayed
def dec(x):
    time.sleep(random.random())
    return x - 1

@dask.delayed
def add(x, y):
    time.sleep(random.random())
    return x + y

In [None]:
%%time
x = inc(1)
y = dec(2)
z = add(x, y)
z

In [None]:
z.visualize(rankdir='LR')

In [None]:
z.compute()

In [None]:
%%time
zs = []

for i in range(128):
    x = inc(i)
    y = dec(x)
    z = add(x, y)
    zs.append(z)

zs = dask.persist(*zs)

In [None]:
# sumamos los vecinos de cada nivel del arbol

#         5
#     3       3
#   2   4   4   4
# 1   1   4   7   3

In [None]:
L = zs

while len(L) > 1:
    new_L = []
    for i in range(0, len(L), 2):
        lazy = add(L[i], L[i + 1])
        new_L.append(lazy)
    L = new_L
    
dask.compute(L)

---------------------------------------------------------------------------------------------------------------------------------------------------------------