[![img/pythonista.png](img/pythonista.png)](https://www.pythonista.io)

# Introducción a *Dask*.

Las bibliotecas de *Scipy* tienen limitaciones en cuanto a su capacidad de escalar de forma horizontal y aún cuando son capaces de realizar *multithreading* para procesamiento en paralelo, están restringidas a la cantidad de recursos disponibles de la máquina dede las que són ejecutadas.

[*Dask*](https://dask.org/) es una biblioteca general para cómputo paralelo que permite escalar sus operaciones por medio de clústers (grupos de equipos de cómputo que trabajan de forma coordinada).

*Dask* consta de:

* Un calendarizador de tareas dinámico (*dynamic task scheduler*).
* Una colección de bibliotecas optimizadas para *Big Data*, con interfaces que extienden a*Numpy* y *Pandas*.

https://docs.dask.org/en/stable/

https://tutorial.dask.org/

In [1]:
pip install dask

Defaulting to user installation because normal site-packages is not writeable

[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip available: [0m[31;49m22.3[0m[39;49m -> [0m[32;49m22.3.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpython -m pip install --upgrade pip[0m
Note: you may need to restart the kernel to use updated packages.


##  Principales paquetes de *Dask*.

<img src="img/arquitectura_dask.png" width=75%>

### Paquetes de colecciones de datos de *Dask*.

* ```dask.array```, el cual contiene una biblioteca para manejo de arreglos similar a la de *Numpy*. Por convención, este módulo se importa como ```da```. La documentación de este paquete puede consultarse en:
    * https://docs.dask.org/en/stable/array.html
* ```dask.dataframe```, el cual contiene una biblioteca para manejo de *datafames* similar a la de *Pandas*. Por convención, este módulo se importa como ```dd```. La documentación de este paquete puede consultarse en:
    * https://docs.dask.org/en/stable/dataframe.html
* ```dask.bags```, el cual contiene una biblioteca para manejo de *bags*, las cuales son estructuras de datos que pueden contener datos semi-estructurados y estructurados. Por convención este módulo se importa como ```db```. La documentación de este paquete puede consultarse en:
https://docs.dask.org/en/stable/bag.html

### Evaluación perezosa (*lazy*) con el método ```compute()```.

In [2]:
import dask.dataframe as dd

In [4]:
df = dd.read_csv('data/data_covid.csv')

In [5]:
df

Unnamed: 0_level_0,index,AGUASCALIENTES,BAJA CALIFORNIA,BAJA CALIFORNIA SUR,CAMPECHE,CHIAPAS,CHIHUAHUA,DISTRITO FEDERAL,COAHUILA,COLIMA,DURANGO,GUANAJUATO,GUERRERO,HIDALGO,JALISCO,MEXICO,MICHOACAN,MORELOS,NAYARIT,NUEVO LEON,OAXACA,PUEBLA,QUERETARO,QUINTANA ROO,SAN LUIS POTOSI,SINALOA,SONORA,TABASCO,TAMAULIPAS,TLAXCALA,VERACRUZ,YUCATAN,ZACATECAS,Nacional
npartitions=1,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1,Unnamed: 22_level_1,Unnamed: 23_level_1,Unnamed: 24_level_1,Unnamed: 25_level_1,Unnamed: 26_level_1,Unnamed: 27_level_1,Unnamed: 28_level_1,Unnamed: 29_level_1,Unnamed: 30_level_1,Unnamed: 31_level_1,Unnamed: 32_level_1,Unnamed: 33_level_1,Unnamed: 34_level_1
,object,int64,int64,int64,int64,int64,int64,int64,int64,int64,int64,int64,int64,int64,int64,int64,int64,int64,int64,int64,int64,int64,int64,int64,int64,int64,int64,int64,int64,int64,int64,int64,int64,int64
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...


In [8]:
df.compute()

Unnamed: 0,index,AGUASCALIENTES,BAJA CALIFORNIA,BAJA CALIFORNIA SUR,CAMPECHE,CHIAPAS,CHIHUAHUA,DISTRITO FEDERAL,COAHUILA,COLIMA,...,SAN LUIS POTOSI,SINALOA,SONORA,TABASCO,TAMAULIPAS,TLAXCALA,VERACRUZ,YUCATAN,ZACATECAS,Nacional
0,2020-02-26,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
1,2020-02-27,0,0,0,0,0,0,2,0,0,...,0,0,0,0,0,0,0,0,0,4
2,2020-02-28,0,0,0,0,0,0,0,1,0,...,0,0,0,0,0,0,0,0,0,2
3,2020-02-29,0,0,0,0,1,0,0,0,0,...,0,0,0,0,0,0,0,0,0,2
4,2020-03-01,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,2
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
790,2022-04-26,25,23,24,1,1,8,163,4,9,...,16,6,12,4,5,4,79,38,3,583
791,2022-04-27,21,25,18,3,1,9,200,3,6,...,21,10,7,5,8,2,51,25,5,556
792,2022-04-28,33,17,12,4,3,9,134,2,2,...,13,11,4,10,8,0,34,24,1,456
793,2022-04-29,19,15,6,1,1,2,63,2,2,...,7,1,2,3,3,0,39,27,3,255


In [9]:
type(df["Nacional"])

dask.dataframe.core.Series

In [10]:
df["Nacional"].compute()

0        0
1        4
2        2
3        2
4        2
      ... 
790    583
791    556
792    456
793    255
794      1
Name: Nacional, Length: 795, dtype: int64

In [12]:
df.loc[df["Nacional"] > 50000].loc[:, ['index', 'Nacional']]

Unnamed: 0_level_0,index,Nacional
npartitions=1,Unnamed: 1_level_1,Unnamed: 2_level_1
,object,int64
,...,...


In [11]:
df.loc[df["Nacional"] > 50000].loc[:, ['index', 'Nacional']].compute()

Unnamed: 0,index,Nacional
684,2022-01-10,68043
685,2022-01-11,70936
686,2022-01-12,74420
687,2022-01-13,73216
688,2022-01-14,72308
691,2022-01-17,79651
692,2022-01-18,74129
693,2022-01-19,69731
694,2022-01-20,64525
695,2022-01-21,57000


### Bibliotecas de *Dask*.

* ```dask.delayed```. Esta biblioteca permite procesar colecciones basadas en *Python* de forma paralela.
    * https://docs.dask.org/en/stable/delayed.html
* ```dask.futures```. Es una implementación  de [```concurrent.futures```](https://docs.python.org/3/library/concurrent.futures.html) de *Python* optimizado para correr en un cluster.  La documentación de este paquete puede consultarse en:
    * https://docs.dask.org/en/stable/futures.html

## Depliegue de un cluster con ```Dask.Distributed```.

*Dask* puede ser desplegado en clusters mediante el uso de varios equipos *workers* gestionados por un *scheduler*.


https://distributed.dask.org/en/stable/

<img src="img/dask_cluster.png" width=45%>

In [14]:
!pip install "bokeh>=2.4.2, <3"
!pip install dask distributed --upgrade

Defaulting to user installation because normal site-packages is not writeable

[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip available: [0m[31;49m22.3[0m[39;49m -> [0m[32;49m22.3.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpython -m pip install --upgrade pip[0m
Defaulting to user installation because normal site-packages is not writeable

[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip available: [0m[31;49m22.3[0m[39;49m -> [0m[32;49m22.3.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpython -m pip install --upgrade pip[0m


In [None]:
!dask scheduler

2022-12-22 20:35:19,891 - distributed.scheduler - INFO - -----------------------------------------------
2022-12-22 20:35:20,235 - distributed.http.proxy - INFO - To route to workers diagnostics web server please install jupyter-server-proxy: python -m pip install jupyter-server-proxy
2022-12-22 20:35:20,264 - distributed.scheduler - INFO - State start
2022-12-22 20:35:20,267 - distributed.scheduler - INFO - -----------------------------------------------
2022-12-22 20:35:20,267 - distributed.scheduler - INFO -   Scheduler at: tcp://192.168.68.111:8786
2022-12-22 20:35:20,267 - distributed.scheduler - INFO -   dashboard at:                     :8787


<p style="text-align: center"><a rel="license" href="http://creativecommons.org/licenses/by/4.0/"><img alt="Licencia Creative Commons" style="border-width:0" src="https://i.creativecommons.org/l/by/4.0/80x15.png" /></a><br />Esta obra está bajo una <a rel="license" href="http://creativecommons.org/licenses/by/4.0/">Licencia Creative Commons Atribución 4.0 Internacional</a>.</p>
<p style="text-align: center">&copy; José Luis Chiquete Valdivieso. 2022.</p>