# Introduction à Dask

Dask est une bibliothèque de calcul parallèle flexible pour le calcul analytique. <br>
Dask fournit un ordonnancement dynamique des tâches parallèles et des
collections de données volumineuses de haut niveau comme dask.array et
dask.dataframe. Pour en savoir plus sur Dask, voir :
- La documentation : https://docs.dask.org/en/latest/ (https://docs.dask.org/en/latest/) <br>
- [La présentation de Dask](https://docs.google.com/presentation/d/e/2PACX-1vSTH2kAR0DCR0nw8pFBe5kuYbOk3inZ9cQfZbzOIRjyzQoVaOoMfI2JONGBz-qsvG_P6g050ddHxSXT/pub?start=false&loop=false&delayms=60000#slide=id.p) <br>

Note: Des parties de ce notebook proviennent des sources suivantes : <br>
- https://github.com/rabernat/research_computing (https://github.com/rabernat/research_computing)
- https://github.com/dask/dask-examples (https://github.com/dask/dask-examples)

## Démarrer un cluster distribué Dask et un client pour le tableau de bord
Le démarrage du Cluster/Client Dask est généralement facultatif.
Il fournit un tableau de bord qui est utile pour avoir un aperçu du calcul.
L'utilisation de dask_jobqueue permet également d'obtenir une plus grande puissance de calcul en mettant à
l'échelle Dask sur plusieurs noeuds du HPC.
Le lien vers le tableau de bord sera visible lorsque vous créerez le cluster ou le client ci-dessous. Comme dasklabextension (https://github.com/dask/dask-labextension) est intégré dans l'environnement actuel, il peut être
suffisant pour surveiller les tâches Dask (voir les fenêtres Task Stream et Progress sur la droite). Sinon, nous
vous recommandons d'avoir le tableau de bord ouvert d'un côté de votre écran tout en utilisant votre ordinateur
portable de l'autre côté. Cela peut demander un certain effort pour arranger vos fenêtres, mais les voir toutes
les deux en même temps est très utile pour apprendre.

## Création du cluster local

In [None]:
!pip install dask[distributed]
!pip install graphviz

In [None]:
import dask.distributed

cluster = dask.distributed.LocalCluster()

Connexion d'un client au cluster:

In [None]:
import dask.distributed

client = dask.distributed.Client(cluster)
client

## Dask Arrays
Un tableau dask ressemble beaucoup à un tableau numpy. Cependant, un tableau dask ne contient pas
directement de données. Il représente plutôt symboliquement les calculs nécessaires pour générer les
données. Rien n'est réellement calculé tant que les valeurs numériques réelles ne sont pas nécessaires. Ce
mode de fonctionnement est appelé "paresseux" ; il permet d'effectuer des calculs complexes et volumineux de
manière symbolique avant de les transférer au programmateur pour exécution.
Si nous voulons créer un tableau numérique de tous les calculs, nous le faisons comme ceci :

In [None]:
import numpy as np

shape = (1000, 4000)
ones_np = np.ones(shape)
ones_np

Ce tableau contient exactement 32 Mo de données :

In [None]:
print("%.1f Mo" % (ones_np.nbytes / 1e6))

Maintenant, créons le même tableau en utilisant l'interface de tableau de dask

In [None]:
import dask.array as da

ones = da.ones(shape)
ones

Cela fonctionne, mais nous n'avons pas dit à Dask comment diviser le tableau, il n'est donc pas optimisé pour
le calcul distribué.
Une différence essentielle avec dask est que nous devons spécifier l'argument chunks. Chunks décrit
comment le tableau est réparti sur plusieurs sous-tableaux.

In [None]:
chunk_shape = (1000, 1000)
ones = da.ones(shape, chunks=chunk_shape)
ones

Remarquez que nous ne voyons qu'une représentation symbolique de l'ensemble, y compris sa forme, son type
et sa taille. Aucune donnée n'a encore été générée. Lorsque nous appelons .compute() sur un tableau Dask,
le calcul est déclenché et le tableau Dask devient un tableau numpy.

In [None]:
ones.compute()

Afin de comprendre ce qui s'est passé lorsque nous avons appelé .compute() , nous pouvons visualiser le
graphe Dask, c'est-à-dire les opérations symboliques qui composent le tableau.

In [None]:
ones.visualize()

Notre tableau comporte 4 morceaux. Pour le générer, dask appelle 4 fois np.ones et les concatène ensuite
en un tableau. Plutôt que de charger immédiatement un tableau Dask (qui met toutes les données dans la RAM), il est plus
courant de réduire les données d'une manière ou d'une autre. Par exemple :

In [None]:
sum_of_ones = ones.sum()
sum_of_ones.visualize()

Nous voyons ici la stratégie de Dask pour trouver la somme. Cet exemple simple illustre la beauté de dask : il
conçoit automatiquement un algorithme approprié pour les opérations personnalisées avec de grosses
données.

## Un calcul plus grand
Les exemples ci-dessus étaient de petits exemples ; les données (32 Mo) sont loin d'être assez volumineuses
pour justifier l'utilisation de dask.
Changeons d’échelle !

In [None]:
bigshape = (200000, 4000)
big_ones = da.ones(bigshape, chunks=chunk_shape)
big_ones

Cet ensemble de données est de 6,4 Go, au lieu de 32 Mo ! C'est probablement proche ou supérieur à la
quantité de mémoire vive disponible que vous avez dans votre ordinateur. Néanmoins, Dask n'a aucun
problème pour travailler dessus. N'essayez pas d'appeler la méthode .visualize() sur ce tableau !
Lorsqu'il fait un gros calcul, dask a aussi quelques outils pour nous aider à comprendre ce qui se passe sous le
capot. Regardons à nouveau le tableau de bord pendant que nous faisons un gros calcul.

In [None]:
big_calc = (big_ones * big_ones[::-1, ::-1]).mean()
result = big_calc.compute()
result

## Réduction
Toutes les méthodes habituelles numpy fonctionnent sur des tableaux de dask. Vous pouvez également
appliquer la fonction numpy directement à un dask array, et il restera paresseux.

In [None]:
big_ones_reduce = (np.cos(big_ones) ** 2).mean(axis=1)
big_ones_reduce

Le tracé déclenche également le calcul, car nous avons besoin des valeurs réelles

In [None]:
import matplotlib.pyplot as plt

%matplotlib inline
plt.rcParams["figure.figsize"] = (12, 8)
plt.plot(big_ones_reduce)

## Dask.delayed
dask.delayed est un moyen simple et puissant de paralléliser le code existant. Il permet aux utilisateurs de
retarder les appels de fonctions dans un graphe de tâches avec des dépendances. Dask.delayed ne fournit
pas d'algorithmes parallèles sophistiqués comme dask.dataframe , mais il donne à l'utilisateur un contrôle
complet sur ce qu'il veut construire.
Les systèmes comme dask.dataframe sont construits avec des objets dask.delayed. Si vous avez un
problème qui est parallélisable, mais qui n'est pas aussi simple qu'un grand tableau ou une grande trame de
données, alors dask.delayed peut être le bon choix.

## Créer des fonctions simples

Ces fonctions effectuent des opérations simples comme l'addition de deux nombres, mais elles dorment
pendant un temps aléatoire pour simuler un travail réel.

In [None]:
import time


def inc(x):
    time.sleep(0.1)
    return x + 1


def dec(x):
    time.sleep(0.1)
    return x - 1


def add(x, y):
    time.sleep(0.2)
    return x + y

Réalisons un petit algorithme à l’aide des fonctions ci-dessus.

In [None]:
%%time
x = inc(1)
y = dec(2)
z = add(x, y)
z

Les appels se sont succédés les uns après les autres, dans l'ordre. Notez cependant que les deux premières
lignes inc(1) et dec(2) ne dépendent pas l'une de l'autre, nous aurions pu les appeler en parallèle si nous
avions été intelligents.

## Annoter les fonctions avec dask.delayed pour les rendre paresseuses
Nous pouvons appeler dask.delayed pour rendr nos fonctions paresseuses. Plutôt que de calculer leurs
résultats immédiatement, ils enregistrent ce que nous voulons calculer dans un graphe que nous exécuterons
plus tard en parallèle.

In [None]:
import dask

inc = dask.delayed(inc)
dec = dask.delayed(dec)
add = dask.delayed(add)

L'appel de ces fonctions paresseuses est désormais presque gratuit. Nous construisons juste un graphe

In [None]:
%%time
x = inc(1)
y = dec(2)
z = add(x, y)
z

## Graphe de calcul

In [None]:
z.visualize(rankdir="LR")

## Calcul en parallèle
Appelez .compute() quand vous voulez que votre résultat soit un objet Python normal
Si vous avez démarré Client() ci-dessus, vous pouvez regarder la progression du calcul dans le tableau de
bord.

In [None]:
%%time
z.compute()

## Pour en savoir plus

https://tutorial.dask.org/