# GSON : Programmation Haute Performance

## Partie 2 : les communications collectives

Les fonctions de communications collectives sont les fonctions qui permettent de réaliser un ensemble de communications (envoi et réception) impliquant tous les processus du communicateur (**COMM_WORLD**). Elles pourraient être réalisées par des communications point-à-point mais l'implémentation de ces fonctions par MPI sont optimisées et peuvent tenir compte de l'architecture de la machine (comment les processeurs sont organisées par rapport à la mémoire etc).

### 2.1 L'ensemble

Ces fonctions sont les suivantes et leur utilisatoin est détaillée vu dans les sections suivantes
- une barrière de synchronisation : définir un point dans le programme qui permet d'attendre que tous les processus soient arrivés à ce niveau des instructions.
- les communications diffusion - découpage - rassemblement : un seul appel de la fonction pour réaliser l'ensemblee des communications envoi/réception pour transmettre ou rassembler des données. 
- la réduction

**ATTENTION : Une fonction de communications collectives doit être appelée par tous les processus du communicateur impliqué.**

### 2.2 La barrière de synchronisation
Les processus exécutent leur programme de manière asynchrone. Cette fonction permet dans un programme d'indiquer un endroit où tous les processus vont s'attendre. L'instruction qui suit cette barrière ne sera exécutée que si tous les processus ont terminé les instructions qui précèdent la barrière.

Le coût d'une barrière est vraiment important. **Il faut limiter au minimum l'utilisation de cette barrière.**

**Exercice 1:** Testez le programme suivant en observant l'ordre des affichages. Puis rajoutez l'instruction *comm.Barrier()* entre les 2 instructions *print*. Quelle est la différence ?


In [26]:
%%writefile barrier.py
from mpi4py import MPI
import time

start = time.time()
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

print("Hello de ", rank, " sur ", size, " processus dans COMM_WORLD")
comm.Barrier()
print("Goodbye de ", rank, " sur ", size, " processus dans COMM_WORLD")
end = time.time()
print(end-start)

Overwriting barrier.py


In [27]:
!mpirun -np 4 python3 barrier.py

Hello de  3  sur  4  processus dans COMM_WORLD
Goodbye de  3  sur  4  processus dans COMM_WORLD
2.1219253540039062e-05
Hello de  0  sur  4  processus dans COMM_WORLD
Goodbye de  0  sur  4  processus dans COMM_WORLD
1.7642974853515625e-05
Hello de  1  sur  4  processus dans COMM_WORLD
Goodbye de  1  sur  4  processus dans COMM_WORLD
Hello de  2  sur  4  processus dans COMM_WORLD
Goodbye de  2  sur  4  processus dans COMM_WORLD
2.288818359375e-05
2.288818359375e-05


### 2.3 La diffusion

Une diffusion est une communication de **un vers tous**. Il s'agit de transmettre une donnée d'un processus à tous les autres processus du communicateur.

Là encore **mpi4py** fait la distinction des fonctions du type des données. 

- bcast pour "any object python"
- Bcast pour des "memory buffers"

Pour ces communications, un processus va jouer un rôle différent. Ce processus est appelé processus ***root***. Ce processus est souvent celui de $rank=0$ mais ce n'est pas une obligation. Un programme MPI doit fonctionner quelque soit le *rank* de ce processus et en général le *rank* de ***root*** est donné en argument du programme.

Soit l'exemple ci-dessous qui réalise une diffusion d'un scalaire. 

In [28]:
%%writefile exemple_bcast.py

from mpi4py import MPI
import sys

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

root = int(sys.argv[1])

data = rank

print("je suis", rank, " data avant =", data)

data = comm.bcast(data, root)

print("je suis", rank, " data après =", data)

Writing exemple_bcast.py


In [29]:
!mpirun -np 4 python3 exemple_bcast.py 0

je suis 0  data avant = 0
je suis 2  data avant = 2
je suis 2  data après = 0
je suis 3  data avant = 3
je suis 0  data après = 0
je suis 1  data avant = 1
je suis 1  data après = 0
je suis 3  data après = 0


L'instruction *data = comm.bcast(data, root)* est exécutée par tous les processus. Le sens de *data* est différent en fonction du *rank* des processus. *data* comme paramètre de la fonction est la donnée que le processus *root* souhaite diffuser et *data* comme valeur de retour est la donnée reçue par tous. 

**Exercice 2:** Modifiez le code ci-dessus pour que la valeur reçue ne soit pas reçue dans data.


In [42]:
%%writefile exemple_bcast_bis.py
#Nouvelle version

from mpi4py import MPI
import sys

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

root = int(sys.argv[1])

data = rank

print("je suis", rank, " data avant =", data)

datarecue = comm.bcast(data, root)

print("je suis", rank, " data après =", datarecue, flush = True)

Overwriting exemple_bcast_bis.py


In [43]:
!mpirun -np 4 python3 exemple_bcast_bis.py 0

je suis 3  data avant = 3
je suis 1  data avant = 1
je suis 0  data avant = 0
je suis 2  data avant = 2
je suis 1  data après = 0
je suis 3  data après = 0
je suis 0  data après = 0
je suis 2  data après = 0


Exemple 2 : 

In [1]:
%%writefile exemple2_bcast.py
from mpi4py import MPI
import random
import sys

comm = MPI.COMM_WORLD
rank = comm.Get_rank()

root = int(sys.argv[1])

if rank == root:
    tab = [random.randint(1,50) for _ in range(10)]
    print("tab sur ",rank, " : ", tab)
else:
    tab = None

tab = comm.bcast(tab, root)

print("je suis", rank, " tab après =", tab, flush = True)

Overwriting exemple2_bcast.py


In [2]:
!mpirun -np 4 python3 exemple2_bcast.py 1

tab sur  1  :  [2, 15, 19, 28, 35, 1, 23, 3, 2, 2]
je suis 1  tab après = [2, 15, 19, 28, 35, 1, 23, 3, 2, 2]
je suis 2  tab après = [2, 15, 19, 28, 35, 1, 23, 3, 2, 2]
je suis 3  tab après = [2, 15, 19, 28, 35, 1, 23, 3, 2, 2]
je suis 0  tab après = [2, 15, 19, 28, 35, 1, 23, 3, 2, 2]


Exemple 3 lorsque les données sont des memory buffers de la bibliothèque numpy : 

In [11]:
%%writefile exemple3_Bcast.py

from mpi4py import MPI
import numpy as np
import sys

rank = comm.Get_rank()
size = comm.Get_size()

root = int(sys.argv[1])

if rank == root:
    data = np.random.randint(1, 50, 10, dtype='i')
    print("data sur ", rank, " : ", data)
else:
    data = np.empty(10, dtype='i')

comm.Bcast([data, 10, MPI.INT], root)

print("je suis", rank, " data après =", data)

Overwriting exemple3_Bcast.py


In [10]:
!mpirun -np 4 python3 exemple3_Bcast.py 0

<class 'numpy.int32'>
data sur  0  :  [43  2  3  9  6 33 31 34 20 17]
je suis 0  data après = [43  2  3  9  6 33 31 34 20 17]
je suis 2  data après = [43  2  3  9  6 33 31 34 20 17]
je suis 1  data après = [43  2  3  9  6 33 31 34 20 17]
je suis 3  data après = [43  2  3  9  6 33 31 34 20 17]


### 2.4 La distribution et le rassemblement

Lorsqu'on manipule des ensembles de données, ces données sont stockées dans des *tableaux*. En programmation ces tableaux peuvent prendre la forme de différentes structures de données (liste, tableau à 2 dimensions, ...) mais ici on travaillera essentiellement sur des tableaux à une dimension.

On a vu que la parallélisation sur des architectures à mémoire distribuée s'appuiait sur la distribution des données afin de répartir les calculs à effectuer. De plus initialement les données sont stockées sur un processeur particulier. Il est donc nécessaire de pouvoir découper et transmettre ces données.

Les fonctions disponibles sont les suivantes

Lorsque la taille est divisible par le nombre de processus
- scatter/Scatter : pour le découpage d'un tableau (un vers tous)
- gather/Gather (tous vers un)
  
Lorsque la taille n'est pas divisible par le nombre de processus
- Scatterv
- Gatherv

Exemple 1 à exécuter sur 4 processus :

In [19]:
%%writefile exemple_scatter.py

from mpi4py import MPI
import sys

comm = MPI.COMM_WORLD
rank = comm.Get_rank()

root = int(sys.argv[1])

data = None
if rank == root:
    data = [[0,1,2,3], [4,5,6,7], [8,9,10,11], [12,13,14,15]]
    print("data pour répartition : ", data)

data_local = comm.scatter(data, root)

print("je suis", rank, " data_local =", data_local, flush = True)

data_check = None

data_check = comm.gather(data_local, root)

if rank == root:
    print("data rassemblement dans",rank," : ", data_check)

Overwriting exemple_scatter.py


In [20]:
!mpirun -np 4 python3 exemple_scatter.py 0

data pour répartition :  [[0, 1, 2, 3], [4, 5, 6, 7], [8, 9, 10, 11], [12, 13, 14, 15]]
je suis 2  data_local = [8, 9, 10, 11]
je suis 0  data_local = [0, 1, 2, 3]
je suis 1  data_local = [4, 5, 6, 7]
je suis 3  data_local = [12, 13, 14, 15]
data rassemblement dans 0  :  [[0, 1, 2, 3], [4, 5, 6, 7], [8, 9, 10, 11], [12, 13, 14, 15]]


Dans ce premier exemple, observez la forme des données et comment elles sont réparties. *data* contient 4 tableaux qui sont répartis sur les 4 processus.

Exemple 2 : 

In [21]:
%%writefile exemple2_scatter.py 
from mpi4py import MPI
import numpy as np
import sys

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

length = int(sys.argv[1])
root = int(sys.argv[2])

if length % size != 0:
    if rank == root:
        print(" le programme prend 2 arguments")
        print(" la taille du tableau à découper qui doit être divisible par le nombre de processus")
        print(" le processus root")
    exit()

data = None
if rank == root:
    data = np.random.randint(1, 50, length, dtype='i')
    print("data pour répartition : ", data)

length_local = length // size

data_local = np.empty(length_local, dtype='i')

comm.Scatter(data, data_local, root)
print("je suis", rank, " data_local =", data_local)

data_check = np.empty(length, dtype='i')

comm.Gather(data_local, data_check, root)

if rank == root:
    print("data rassemblement:", data_check)


Writing exemple2_scatter.py


In [22]:
!mpirun -np 4 python3 exemple2_scatter.py 32 0

data pour répartition :  [34 44 34 21 32 13  5 41 35  9 45 49 13 21  1 39 13 37 47  6 39 40 32 29
 23  5 40 22 34 18  4 25]
je suis 0  data_local = [34 44 34 21 32 13  5 41]
je suis 2  data_local = [13 37 47  6 39 40 32 29]
je suis 3  data_local = [23  5 40 22 34 18  4 25]
je suis 1  data_local = [35  9 45 49 13 21  1 39]
data rassemblement: [34 44 34 21 32 13  5 41 35  9 45 49 13 21  1 39 13 37 47  6 39 40 32 29
 23  5 40 22 34 18  4 25]


La fonction Scatter travaille sur des buffers numpy et permet de découper ce tableau pour distribuer les morceaux sur chaque processus. Par contre elle suppose que la taille du tableau est divisible par le nombre de processus. Cette hypothèse est aussi faite par la fonction Gather qui rassemble les morceaux.

L'exemple suivant lève l'hypothèse de la taille divisible par le nombre de processus. On ne peut travailler alors qu'avec les fonction *Scatterv/Gatherv* et donc des **memory buffers**.


In [31]:
%%writefile exemple_scatter_nondivisible.py
from mpi4py import MPI
import numpy as np
import sys

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

length = int(sys.argv[1])
root = int(sys.argv[2])

data = None

length_local = length // size
reste = length % size


if rank == root:
    data = np.random.randint(1, 50, length, dtype='i')
    print("data pour répartition : ", data)
    count = np.empty(size, dtype='i')
    for i in range(0, size):
        if i < reste:
            count[i] = length_local + 1
        else:
            count[i] = length_local
else:
    count = None

if rank < reste:
    length_local += 1

data_local = np.empty(length_local, dtype='i')

comm.Scatterv([data, count, MPI.INT], [data_local, length_local, MPI.INT], root)

print("je suis", rank, " data_local =", data_local)

data_check = np.empty(length, dtype='i')

comm.Gatherv([data_local, length_local, MPI.INT], [data_check, count, MPI.INT], root)

if rank == root:
    print("data rassemblement:", data_check, flush = True)

Overwriting exemple_scatter_nondivisible.py


In [32]:
!mpirun -np 4 python3 exemple_scatter_nondivisible.py 34 1

data pour répartition :  [18  6 47 10 14  1 12 32 36 37 28 46  7 39 27 38  1 21 47 42 15 27  6 28
 47 11 30  6  7 24 33 24 37 16]
je suis 1  data_local = [37 28 46  7 39 27 38  1 21]
je suis 2  data_local = [47 42 15 27  6 28 47 11]
je suis 0  data_local = [18  6 47 10 14  1 12 32 36]
je suis 3  data_local = [30  6  7 24 33 24 37 16]
data rassemblement: [18  6 47 10 14  1 12 32 36 37 28 46  7 39 27 38  1 21 47 42 15 27  6 28
 47 11 30  6  7 24 33 24 37 16]


**Exercice 3:** Ecrivez le programme qui permet à un processus *root* de créer un tableau d'entiers et qui calcule le minimum de ce tableau. Le résultat doit être disponible uniquement sur le processus *root*. La parallélisation consiste à partager le tableau afin que chacun calcul un minimum en local et ensuite le calcul est finalisé sur le processus *root* qui doit donc recevoir de la part de chacun le minimum qu'il a calculé. 
1. faîtes l'hypothèse que le tableau a une taille divisible par le nombre de processus.
2. Levez cette hypothèse.

In [11]:
%%writefile exercice_minimum.py 
from mpi4py import MPI
import numpy as np
import sys

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

length = int(sys.argv[1])
root = int(sys.argv[2])

if length % size != 0:
    if rank == root:
        print(" le programme prend 2 arguments")
        print(" la taille du tableau à découper qui doit être divisible par le nombre de processus")
        print(" le processus root")
    exit()

data = None
if rank == root:
    data = np.random.randint(1, 50, length, dtype='i')
    print("data pour répartition : ", data)

length_local = length // size

data_local = np.empty(length_local, dtype='i')

comm.Scatter(data, data_local, root)

#print("je suis", rank, " data_local =", data_local)

data_check = np.empty(size, dtype='i')

for i in range(0, size):
    minlocal = np.min(data_local)   
    
    comm.Gather(np.array([minlocal]), data_check, root)
    

if rank == root:
    minglobal = np.min(data_check)
    print("data rassemblement:", minglobal)

Overwriting exercice_minimum.py


In [10]:
!mpirun -np 4 python3 exercice_minimum.py 20 1

data pour répartition :  [24 10 46 30 29 37 25 37 18 41 27 34 31  6 11  8 37  6 24 38]
data rassemblement: 6


In [31]:
%%writefile exemple_scatter_nondivisible.py 
from mpi4py import MPI
import numpy as np
import sys

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

length = int(sys.argv[1])
root = int(sys.argv[2])

data = None

length_local = length // size
reste = length % size


if rank == root:
    data = np.random.randint(1, 50, length, dtype='i')
    print("data pour répartition : ", data)
    count = np.empty(size, dtype='i')
    for i in range(0, size):
        if i < reste:
            count[i] = length_local + 1
        else:
            count[i] = length_local
else:
    count = None

if rank < reste:
    length_local += 1

data_local = np.empty(length_local, dtype='i')

comm.Scatterv([data, count, MPI.INT], [data_local, length_local, MPI.INT], root)

#print("je suis", rank, " data_local =", data_local)

data_check = np.empty(size, dtype='i')

minlocal = np.min(data_local)   
comm.Gather(np.array([minlocal]), data_check, root)
    
if rank == root:
    minglobal = np.min(data_check)
    print("le minimumm du tableau est:", minglobal)


Overwriting exemple_scatter_nondivisible.py


In [32]:
!mpirun -np 4 python3 exemple_scatter_nondivisible.py 34 1

data pour répartition :  [25 32 14  6 41 28 46 28  4 49 30 49 18 45 21 28  7 29 46 24  4 18  4 13
  1 24 21 32 41 16  7 24 12 32]
le minimumm du tableau est: 1


**Exercice 4:** Reprendre le programme précédent mais on souhaite maintenant connaître le minimum et la position du minimum dans le tableau. Considérez le cas général où la taille n'est pas divisible par le nombre de processus.

In [33]:
%%writefile exemple_scatter_nondivisible_pos.py 

from mpi4py import MPI
import numpy as np
import sys

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

length = int(sys.argv[1])
root = int(sys.argv[2])

data = None

length_local = length // size
reste = length % size


if rank == root:
    data = np.random.randint(1, 50, length, dtype='i')
    print("data pour répartition : ", data)
    count = np.empty(size, dtype='i')
    for i in range(0, size):
        if i < reste:
            count[i] = length_local + 1
        else:
            count[i] = length_local
else:
    count = None

if rank < reste:
    length_local += 1

data_local = np.empty(length_local, dtype='i')

comm.Scatterv([data, count, MPI.INT], [data_local, length_local, MPI.INT], root)

#print("je suis", rank, " data_local =", data_local)

data_check = np.empty(size, dtype='i')

minlocal = np.min(data_local)   
comm.Gather(np.array([minlocal]), data_check, root)
    
if rank == root:
    minglobal = np.min(data_check)
    pos = np.argmin(data)
    print("le minimumm du tableau est:", minglobal,' et il se situe à la position', pos)


Overwriting exemple_scatter_nondivisible_pos.py


In [30]:
!mpirun -np 4 python3 exemple_scatter_nondivisible_pos.py 34 1

data pour répartition :  [13  3 30  1 46 45 19 17 14 12  1 17 25  9 43 12 37 12  2 34 46 11 24 49
 41  4 22 11 38 20 24 11 12 47]
le minimumm du tableau est: 1  et il se situe à la position 3


### 2.5 La réduction

Lorsqu'à partir d'un ensemble de données on souhaite appliquer une opération **OP** commutative et associative (la somme cumulées de nombre, le minimum d'un ensemble de valeurs, ...) on effectue une réduction à partir de **OP**. Pour la parallélisation, il s'agit de faire un calcul en local et ensuite de propager le résultat tout en appliquant l'opération. 

MPI implémente la réduction à partir d'un ensemble d'opérateurs disponibles (***MPI.SUM***, ***MPI.MIN***, ...).

Exemple 1 avec *reduce* pour travailler sur des *any python objects* : 


In [25]:
%%writefile exemple_reduce.py
from mpi4py import MPI
import numpy as np
import sys

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

length = int(sys.argv[1])
root = int(sys.argv[2])

data = None

length_local = length // size
reste = length % size


if rank == root:
    data = np.random.randint(1, 50, length, dtype='i')
    print("données initiales : ", data)
    count = np.empty(size, dtype='i')
    for i in range(0, size):
        if i < reste:
            count[i] = length_local + 1
        else:
            count[i] = length_local
else:
    count = None

if rank < reste:
    length_local += 1

data_local = np.empty(length_local, dtype='i')

comm.Scatterv([data, count, MPI.INT], [data_local, length_local, MPI.INT], root)

s = 0
for i in range(0, length_local):
    s += data_local[i]

res = comm.reduce(s, MPI.SUM, root)

if (rank==root):
    print("je suis ", rank, " et res=", res)


Overwriting exemple_reduce.py


In [27]:
!mpirun -np 4 python3 exemple_reduce.py 34 0

hwloc/linux: Ignoring PCI device with non-16bit domain.
Pass --enable-32bits-pci-domain to configure to support such devices
données initiales :  [ 7 20 26 26  3 28 38 25 21 19  1 31 25 46 32 29 14  9 44  7 22 41  3 19
 42  5 41 44 44 49 40 20 25 26]
je suis  0  et res= 872


**Exercice 5 :** Explicitez les paramètres de *Scatterv* dans cet exemple pour un tableau de taille 22 et une exécution sur 4 processus.  

Exemple 2 : Attention on utilise un Scatter donc la taille de données doit être divible par le nombre de processus.

In [28]:
%%writefile exemple2_reduce.py
from mpi4py import MPI
import numpy as np
import sys

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

length = int(sys.argv[1])
root = int(sys.argv[2])

if length % size != 0:
    if rank == root:
        print(" le programme prend 2 arguments")
        print(" la taille du tableau à découper qui doit être divisible par le nombre de processus")
        print(" le processus root")
    exit()

data = None
if rank == root:
    data = np.random.randint(1, 50, length, dtype='i')
    print("data pour répartition : ", data)

length_local = length // size

data_local = np.empty(length_local, dtype='i')

comm.Scatter(data, data_local, root)
print("je suis", rank, " data_local =", data_local)

data_reduce = np.zeros(length_local, dtype='i')
comm.Reduce([data_local, length_local, MPI.INT], [data_reduce, length_local, MPI.INT], MPI.SUM, root)

if rank == root:
    print("je suis ", rank, "data_reduce=", data_reduce)

Writing exemple2_reduce.py


In [29]:
!mpirun -np 4 python3 exemple2_reduce.py 32 2

hwloc/linux: Ignoring PCI device with non-16bit domain.
Pass --enable-32bits-pci-domain to configure to support such devices
data pour répartition :  [21 12 16  6 30 10 38 35 39 18 27 38 27 49  7 48 47 10 10 44  1 47  7 15
 21 40 26  5 43 22  9 20]
je suis 2  data_local = [47 10 10 44  1 47  7 15]
je suis 0  data_local = [21 12 16  6 30 10 38 35]
je suis 1  data_local = [39 18 27 38 27 49  7 48]
je suis 3  data_local = [21 40 26  5 43 22  9 20]
je suis  2 data_reduce= [128  80  79  93 101 128  61 118]


**Exercice 6:** Explicitez le rôle des paramètres de *Reduce* et donnez un exemple d'un tableau de taille 16 sur 4 processus. Quel est le calcul réellement effectué ? 

**Exercice 7:** Ecrivez à nouveau un programme pour calculer le minimun d'un tableau d'entiers en n'utilisant que des communications collectives. 

In [3]:
%%writefile exemple3_reduce.py

from mpi4py import MPI
import numpy as np
import sys

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

length = int(sys.argv[1])
root = int(sys.argv[2])[47 10 10 44  1 47  7 15]

if length % size != 0:
    if rank == root:
        print(" le programme prend 2 arguments")
        print(" la taille du tableau à découper qui doit être divisible par le nombre de processus")
        print(" le processus root")
    exit()

data = None
if rank == root:
    data = np.random.randint(1, 50, length, dtype='i')
    print("data pour répartition : ", data)

length_local = length // size

data_local = np.empty(length_local, dtype='i')

comm.Scatter(data, data_local, root)
print("je suis", rank, " data_local =", data_local)

min_local = np.min(data_local)

data_reduce = np.zeros(length_local, dtype='i')
comm.Reduce([data_local, length_local, MPI.INT], [data_reduce, length_local, MPI.INT], MPI.MIN, root)

if rank == root:
    minglobal = np.min(data_reduce)
    print("je suis ", rank, "minTab= ", minglobal)

Overwriting exemple3_reduce.py


In [4]:
!mpirun -np 4 python3 exemple3_reduce.py 32 2

data pour répartition :  [36 42  8  6 32 35 11 31 25 27 33 12 27 33  2 10 24 16  9  2 34 20 10 32
 23 41  2 44 19 48 10 46]
je suis 2  data_local = [24 16  9  2 34 20 10 32]
je suis 0  data_local = [36 42  8  6 32 35 11 31]
je suis 1  data_local = [25 27 33 12 27 33  2 10]
je suis 3  data_local = [23 41  2 44 19 48 10 46]
je suis  2 minTab=  2


**Exercice 8:** Reprenez le programme précédent pour également calculer la position du minimum. Vous pourrez utiliser l'opérateur ***MPI.MINLOC***

In [68]:

%%writefile exemple4_reduce.py

from mpi4py import MPI
import numpy as np
import sys

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

length = int(sys.argv[1])
root = int(sys.argv[2])

if length % size != 0:
    if rank == root:
        print(" le programme prend 2 arguments")
        print(" la taille du tableau à découper qui doit être divisible par le nombre de processus")
        print(" le processus root")
    exit()

data = None
if rank == root:
    data = np.random.randint(1, 50, length, dtype='i')
    print("data pour répartition : ", data)

length_local = length // size

data_local = np.empty(length_local, dtype='i')

comm.Scatter(data, data_local, root)
print("je sui", rank, " data_local =", data_local)

min_local = np.min(data_local)
pos = np.argmin(data_local)

mini = comm.reduce([min_local,pos + rank*length_local], MPI.MINLOC, root)

if rank == root:
    print("Le minimum global se trouve dans le processus ", rank,", sa valeur est ", mini[0]," et se trouve à la position ", mini[1]," du tableau.", flush = True)

Overwriting exemple4_reduce.py


In [72]:
!mpirun -np 4 python3 exemple4_reduce.py 20 1

data pour répartition :  [29 15 36 46 17 18 11 17 42 21 36 45 30 11 13 11 33 32 25  5]
je sui 1  data_local = [18 11 17 42 21]
je sui 3  data_local = [11 33 32 25  5]
je sui 0  data_local = [29 15 36 46 17]
je sui 2  data_local = [36 45 30 11 13]
Le minimum global se trouve dans le processus  1 , sa valeur est  5  et se trouve à la position  19  du tableau.


**Précision sur le type des données entre numpy et mpi4py**
Dans les exemples, le type des données est précisé à la création des tableaux *numpy* et dans les fonctions de communication. Cette précision n'est pas nécessaire si on travaille sur des données de type simple et si les données sont contiguës en mémoire. Dans ce cas **mpi4py** est capable de déduire le type de données. Sinon voici quelques correspondances entre les types *MPI* et les types *numpy*

| `numpy` dtype        | `mpi4py.MPI` type |
|----------------------|------------------|
| `np.int32`           | `MPI.INT`        |
| `np.int64`           | `MPI.LONG`       |
| `np.float32`         | `MPI.FLOAT`      |
| `np.float64`         | `MPI.DOUBLE`     |


**Exercice 9: Norme de Frobenius d'une matrice**
On souhaite calculer la norme de Frobenius d'une matrice $M$ de taille $n \times m$ initialement sur le processus $rank=root$. 
$$\left\| M \right\|_F = \sum_{i,j} m_{i,j}^2$$

Soit le code ci-dessous qui permet de générer une matrice sur le processus $rank=root$. Complétez le code afin de
1. répartir la matrice par ligne sur les processus
2. effectuer le calcul de la somme partielle des éléments reçus
3. calculer le résultat final de la norme de Frobenius par une réduction. Ce résultat doit être disponible sur *root*.

In [105]:
%%writefile frobenius.py

from mpi4py import MPI
import numpy as np
import sys

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

n = int(sys.argv[1])
m = int(sys.argv[2])
root = int(sys.argv[3])

M = None
n_local = n // size

if rank == root:
    M = np.random.uniform(low=1.0, high=5.0, size=(n, m))
    #M = np.full((n,m),1.0) # autre version pour les tests

M_local = np.empty((n_local,m), dtype='float64')

# à compléter ici 
comm.Scatter(M , M_local ,root = root)


res = np.sum(M_local**2)
print(res)

liste_carree = []
for i in range(n_local):
    carree =0
    for j in range(len(M_local[i])):
        carree += M_local[i][j]**2
    liste_carree.append(carree)

sommepartielle = np.sum(np.array(liste_carree))
        
res = comm.reduce(sommepartielle, MPI.SUM, root)

if rank == root:
    print("je suis le processus ",rank," et la norme de Frobenus est ", res)



Overwriting frobenius.py


In [106]:
!mpirun -np 4 python3 frobenius.py 16 16 0

593.9601907148932
660.2096555712837
606.8791540621814
664.3903562781653
je suis le processus  0  et la norme de Frobenus est  2525.4393566265235


**Exercice 10: Suite croissante**
On souhaite définir si une suite d'entiers est une suite croissante. Complétez le programme ci-dessous pour que les calculs nécessaires à cette vérification soient réparties sur les processus d'une exécution parallèle.

In [None]:
%%writefile suite.py
from mpi4py import MPI
import numpy as np
import sys

## tester si une suite est croissante avec les 2 fonctions numpy diff et all.
def est_croissante(s):
    return np.all(np.diff(s) > 0)
    
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

n = int(sys.argv[1])
root = int(sys.argv[2])

S = None

if (rank==root):
    S = np.linspace(0.0,10.0, n) # construction d'une suite croissante.
    # pour les tests modifier des valeurs pour la rendre non croissante
S_local = np.empty(n//size)

comm.Scatter(S, S_local, root)

# à compléter ici.

  

In [None]:
!mpirun -np 4 python3 ./suite.py 16 0