<img src="http://xarray.pydata.org/en/stable/_static/dataset-diagram-logo.png" align="right" width="30%">

# Introdução ao Dask

Nesta lição, discutiremos os fundamentos do Dask. Nossos objetivos de aprendizagem são conforme segue. Ao final da lição, seremos capazes de:

- Identificar e descrever coleções Dask (Array, DataFrame) e Schedulers;
- Trabalhar com o Dask Array da mesma maneira que trabalharia com um NumPy;
- Compreender algumas das compensações em torno do tamanho do *chunk* (pedaço, fração, fatia), forma do *chunk* e sobrecarga computacional;
- Implantar um *Dask Distributed Cluster* local e acessar o painel de diagnóstico.

## Conteúdo

1. [**O que é Dask?**](#O-que-é-Dask?)
1. [**Estruturas de dados em Dask**](#Estruturas-de-dados-em-Dask)
1. [**Paralelismo usando o agendador dask.distributed**](#Paralelismo-usando-o-agendador-dask.distributed)
1. [**Análise e diagnóstico usando o painel Dask**](#Análise-e-diagnóstico-usando-o-painel-Dask)
1. [**Clusters Dask distribuídos para ambientes HPC e nuvem**](#Clusters-Dask-distribuídos-para-ambientes-HPC-e-nuvem)

<img src="http://dask.readthedocs.io/en/latest/_images/dask_horizontal.svg" 
     width="30%" 
     align=right
     alt="Dask logo">

## O que é Dask?

Dask é uma biblioteca de computação paralela flexível para computação analítica. Dask
fornece agendamento dinâmico de tarefas paralelas e estruturas de dados (*big-data*) de alto nível como `dask.array` e` dask.dataframe`, e um amplo pacote com opções para desenvolvimento. A documentação do Dask pode ser encontrada aqui: https://docs.dask.org/en/latest/

<img src="https://docs.dask.org/en/latest/_images/dask-overview.svg" 
     width="75%" 
     align=center
     alt="Dask overview">

## Configuração rápida

A fim de exemplificar sua utilização neste notebook, usaremos um cluster Dask para gerenciar nosso cálculos. A célula a seguir configura um Cluster local simples. Vamos cobrir os agendadores e clusters em Dask posteriormente neste notebook.


In [None]:
from dask.distributed import Client

client = Client()
client

<p>&#128070</p> Clique no link Dashboard acima.


## Estruturas de dados em Dask

Dask inclui três estruturas especiais para cálculo paralelo:

- [Dask Array](https://docs.dask.org/en/latest/array.html): Versão paralela de arranjos NumPy;
- [Dask DataFrame](https://docs.dask.org/en/latest/dataframe.html): Versão paralela de tabelas (*DataFrames*) Pandas;
- [Dask Bag](https://docs.dask.org/en/latest/bag.html): Versão paralela de listas em Python.

Xarray contém uma interface primária com os objetos *Dask Array*, por hora focaremos apenas nesse ponto. Você pode descobrir mais sobre as demais interfaces de usuário do Dask
[aqui](https://docs.dask.org/en/latest/user-interfaces.html).


## Arranjos Dask

*Dask Array* implementa um subconjunto da interface NumPy `ndarray` que rastrei as tarefas e divide o grande arranjo em muitos pequenos arranjos.
**Isso nos permite calcular em arranjos maiores que a memória usando múltiplos núcleos.**
as tarefas são coordenadas usando gráficos Dask.
As matrizes de Dask também são **preguiçosos / inertes (_lazy_)**, o que significa que
elas não são avaliadas até que você peça explicitamente um resultado usando o método `compute`.

Se quisermos criar um array NumPy com todas as entradas unitárias, fazemos assim:


In [None]:
import numpy as np

shape = (1000, 4000)
ones_np = np.ones(shape)
ones_np

Essa matriz contém exatamente 32 MB de dados:

In [None]:
print("%.1f MB" % (ones_np.nbytes / 1e6))

Agora vamos criar o mesmo array usando a interface de array do Dask:


In [None]:
import dask.array as da

ones = da.ones(shape)
ones

Funcionou! Mas ainda não dissemos ao Dask como dividir (ou fragmentar) a matriz, então
não é otimizado para computação paralela.

Uma diferença crucial com Dask é que devemos especificar o argumento `chunks`.
"*Chunks*" (ou pedaços) descreve como a matriz é dividida em muitas submatrizes.

![Dask Arrays](http://dask.pydata.org/en/latest/_images/dask-array-black-text.svg)
_Fonte:
[Dask Array Documentation](http://dask.pydata.org/en/latest/array-overview.html)_

Existem [várias maneiras de especificar essa fragmentação] (http://dask.pydata.org/en/latest/array-creation.html#chunks). Nesta aula, usaremos a formulação em blocos.


In [None]:
chunk_shape = (1000, 1000)
ones = da.ones(shape, chunks=chunk_shape)
ones

Observe que vemos apenas uma representação simbólica da matriz, incluindo sua
forma, tipo de dados e tamanho do bloco. Nenhum dado foi gerado ainda. Quando invocamos
`.compute ()` em uma matriz Dask, a computação é acionada e a matriz dask
torna-se uma matriz numpy:


In [None]:
ones.compute()

Para entender o que aconteceu quando chamamos `.compute()`, podemos
visualize o *gráfico* Dask, as operações simbólicas que compõem o arranjo:


In [None]:
ones.visualize()

Nosso array possui quatro blocos. Para gerá-lo, Dask chama `np.ones` quatro vezes e
em seguida, concatena isso em um array.

Em vez de carregar imediatamente uma matriz Dask (que coloca todos os dados na RAM),
é mais comum reduzir os dados de alguma forma. Por exemplo:


In [None]:
sum_of_ones = ones.sum()
sum_of_ones.visualize()

### Exercício

Modifique o tamanho do *chunk* (ou sua forma) no array `ones` e visualize as mudanças no gráfico Dask.


In [None]:
# Seu código aqui

Aqui vemos a estratégia de Dask para encontrar a soma. Este exemplo simples ilustra
a beleza do Dask: ele projeta automaticamente um algoritmo apropriado para customização
operações com big data.

Se tornarmos nossa operação mais complexa, o gráfico ficará mais complexo:

In [None]:
fancy_calculation = (ones * ones[::-1, ::-1]).mean()
fancy_calculation.visualize()

### Um cálculo maior

Os exemplos acima foram exemplos didáticos; os dados (32 MB) provavelmente não são grandes
o suficiente para justificar o uso de Dask.

Podemos torná-lo muito maior!


In [None]:
bigshape = (200000, 4000)
big_ones = da.ones(bigshape, chunks=chunk_shape)
big_ones

In [None]:
print("%.1f MB" % (big_ones.nbytes / 1e6))

Este conjunto de dados tem **6,4 GB**, em vez de 32 MB! Isso é provavelmente próximo ou maior do que a quantidade de RAM disponível que você tem em seu computador. Mesmo assim,
Dask não tem problema em trabalhar nisso.

_Não tente `.visualize()` nesse array!_

Ao fazer um grande cálculo, o dask também tem algumas ferramentas para nos ajudar a entender o que está acontecendo sob o capô. Vamos observar o painel novamente enquanto fazemos uma computação maior.


In [None]:
big_calc = (big_ones * big_ones[::-1, ::-1]).mean()

result = big_calc.compute()
result

### Redução

Todos os métodos NumPy usuais funcionam em arrays Dask. Você também pode aplicar funções NumPy diretamente para um array dask e permanecerá preguiçoso (*lazy*).


In [None]:
big_ones_reduce = (np.cos(big_ones) ** 2).mean(axis=1)
big_ones_reduce

A plotagem também ativa a computação, uma vez que precisamos dos valores reais:


In [None]:
%matplotlib inline
from matplotlib import pyplot as plt

In [None]:
plt.plot(big_ones_reduce)

## Paralelismo usando o agendador dask.distributed

Na [primeira célula](#Configuracao-rapida) desse notebook, iniciamos um cliente Dask Cluster local. Nós pulamos alguns detalhes importantes lá que iremos
detalhar agora.

### Agendadores (*Schedulers*) Dask

O Dask _Schedulers_ orquestrar as atividades nos gráficos de tarefas para que elas possam
ser executadas em paralelo. _Como_ eles funcionam em paralelo, porém, é determinado por qual _Scheduler_ você escolhe.

Existem 3 agendadores _locais_:

- **Single-Thread Local:** Para depuração, análise e diagnóstico de problemas;
- **Multi-threaded:** Usando o pacote integrado do Python `threading` (o padrão
   para todas as operações Dask, exceto `Bags`);
- **Multi-processo:** Usando o pacote integrado do Python `multiprocessing` (o padrão para Dask `Bags`).

e 1 agendador _distribuído_, sobre o qual falaremos mais tarde:

- **Distributed:** Usando o módulo `dask.distributed` (que usa [`tornado`](https://www.tornadoweb.org/en/stable/) para comunicação sobre TCP). O agendador distribuído usa um `Cluster` para gerenciar a comunicação entre o agendador e os "trabalhadores". Isso é descrito na próxima seção.

### Distributed Clusters (http://distributed.dask.org/)

- `LocalCluster` - Cria um `Cluster` que pode ser executado localmente. Cada `Cluster` inclui um `Scheduler` e `Worker`s.
- `Client` - Conecta-se e direciona a computação em um `Cluster` distribuído.


## Análise e diagnóstico usando o painel Dask

Você deve se lembrar de que abrimos um url para o painel do Dask:


In [None]:
client

O painel de controle do agendador distribuído Dask fornece uma ferramenta incrivelmente valiosa para obter insights sobre o desempenho de seu cálculo e o cluster como um todo. No painel, você verá uma série de informações:

- _Status_: Visão geral do estado atual do planejador, incluindo o fluxo de tarefas ativas, progresso, memória por trabalhador e o número de tarefas por trabalhador;
- _Workers_: A guia workers (*trabalhadores*) permite que você rastreie o uso da CPU e da memória por trabalhador;
- _Sistema_: Rastreamento ao vivo de recursos do sistema, como CPU, memória, largura de banda e descritores de arquivos;
- _Perfil_: Perfil estatístico refinado
- _Info_: Status e registros do trabalhador.

Outra ferramenta de diagnóstico útil é o relatório de desempenho estático do Dask. Isso lhe permite salvar um relatório, incluindo o fluxo de tarefas, perfis de trabalho, etc. para todos ou uma parte específica de um fluxo de trabalho. Abaixo está um exemplo de como você criaria tal relatório:

In [None]:
from dask.distributed import performance_report

with performance_report(filename="dask-report.html"):
    big_calc.compute()

### Exercício

Novamente, vamos modificar o tamanho do bloco em `big_ones` (visando ~100MB). Como é que
_Relatório de desempenho_ mudou com um tamanho de bloco maior?

In [None]:
# your code here

with performance_report(filename="dask-report-large-chunk.html"):
    big_calc.compute()

## Clusters Dask distribuídos para ambientes HPC e nuvem

Dask pode ser implantado em infraestrutura distribuída, como um sistema HPC (computação de alto desempenho) ou um sistema de computação na nuvem. Há um ecossistema crescente de projetos de implantação de Dask que facilitam a implantação e o dimensionamento de clusters Dask em uma ampla variedade de sistemas de computação.

### HPC

#### Dask Jobqueue (https://jobqueue.dask.org/)

- `dask_jobqueue.PBSCluster`
- `dask_jobqueue.SlurmCluster`
- `dask_jobqueue.LSFCluster`
- etc.

#### Dask MPI (https://mpi.dask.org/)

- `dask_mpi.initialize`

### Cloud

#### Dask Kubernetes (https://kubernetes.dask.org/)

- `dask_kubernetes.KubeCluster`

#### Dask Cloud Provider (https://cloudprovider.dask.org)

- `dask_cloudprovider.FargateCluster`
- `dask_cloudprovider.ECSCluster`
- `dask_cloudprovider.ECSCluster`

#### Dask Gateway (https://gateway.dask.org/)

- `dask_gateway.GatewayCluster`


---

_Nota: Partes desse notebook vêm das seguintes fontes:_

- https://github.com/pangeo-data/pangeo-tutorial
- https://github.com/rabernat/research_computing
- https://github.com/dask/dask-examples
