# Dask Distribuído

Além do processamento local utilizando *threads* ou processos, Dask possibilita que seu processamento seja realizado de maneira distribuída. Com isso, o processamento pode ser executado em máquinas remotas, seja em *cluster* de computadores ou máquinas alocadas utilizando recursos de computação em nuvem.

Uma das formas realizar o processamento distribuído é conectando o ambiente a um *scheduler*. Com isto, o ambiente que processa o código gera o DAG e repassa a esse *scheduler*, que será responsável por distribuir as tarefas às máquinas que realizarão o processamento.

Para executar o *scheduler*, execute no terminal da máquina que será responsável pelo serviço de escalonamento.

```bash
dask scheduler
```

Em seguida, podemos conectar ao scheduler, passando seu endereço IP e porta:

```python
from dask.distributed import Client
client = Client(f'scheduler:8786')
```

Neste caso, `scheduler` corresponde ao nome do Docker Container, que será resolvido para o respectivo endereço IP.

In [2]:
client = Client(f'scheduler:8786')

Até este ponto, não há nenhum *worker* ligado ao *scheduler*. 

```python
client.nthreads()
```

{'tcp://172.20.0.4:39337': 4,
 'tcp://172.20.0.5:40031': 4,
 'tcp://172.20.0.6:43959': 4,
 'tcp://172.20.0.7:41489': 4,
 'tcp://172.20.0.8:37745': 4}

Para tal, será necessário que outras máquinas conectectem ao *scheduler*, informando que são *workers*. Para tal, execute no terminal das máquinas *workers* o comando abaixo. O comando pode ser executado em diversas máquinas.

```bash
dask worker tcp://scheduler:8786
```

Após criar os novos *containers* e definí-los como *workers*, podemos observar novamente os recursos associados:

```python
client.nthreads()
```

{'tcp://172.20.0.4:39337': 4,
 'tcp://172.20.0.5:40031': 4,
 'tcp://172.20.0.6:43959': 4,
 'tcp://172.20.0.7:41489': 4,
 'tcp://172.20.0.8:37745': 4}

Pode-se também obter o *link* para o *dashboard* do *scheduler*.

```python
client.dashboard_link
```

'http://scheduler:8787/status'

Abrindo base localmente com Pandas

```python
from glob import glob
import pandas as pd

base_pd = pd.concat([pd.read_csv(base, sep=';') for base in glob('base/benef_sus/*.csv')])
```

Abrindo base de forma distribuída, utilizando Dask.

```python
import dask.dataframe as dd

base_dask = dd.read_csv('base/benef_sus/*.csv', sep=';', 
                        dtype={'COD_CID': str,
                              'COD_PROCEDIMENTO': str,
                              'VL_PAGO_RESSARC':str,
                              'ANO_ABI':str,
                              'ESPECIALIDADE':str})

```

**Atividade**
- Altere o tipo de dado da coluna `VL_ATENDIMENTO` para `float`
<!-- 
```python
base_dask['VL_ATENDIMENTO'] = base_dask['VL_ATENDIMENTO'].str.replace(',','.').astype(float)
```
 -->

**Exercícios**
- Quantos tipos diferentes de `NATUREZA_ESTAB` estão presentes na base?
<!-- 
```python
base_dask['NATUREZA_ESTAB'].unique().count().compute()
```
 -->

- Quantos atendimentos foram realizados por ano?
<!-- 
```python
base_dask['ANO_ABI'].value_counts().compute()
```
 -->

- Qual foi o total gasto com atendimento no ano de 2010?
<!-- 
```python
ano = '2001'
base_ano = base_dask[base_dask['ANO_ABI'] == ano]
total_ano = base_ano['VL_ATENDIMENTO'].sum()
print(total_ano.compute())
```
 -->
- Observe o DAG gerado
<!-- 
```python
total_ano.visualize()
```
 -->