# TP2 Parte 2 - MPI

## 2.1. Ejercicio - Hola Mundo con MPI

### 2.1.1. Preguntas del ejercicio
Ejecute este ejemplo para  4  o más instancias y responda:

a) **¿Qué función realiza la instancia maestra? ¿Qué función realizan las instancias esclavas?**

🔷La función que realiza la instancia maestra es initWork con los siguientes parámetros:
* comm: instancia del comunicador MPI, en este caso MPI.COMM_WORLD, que incluye a todos los procesos involucrados en la ejecución paralela.
* workTimes: array de enteros, que representarian los trabajos que se les va a asignar a los esclavos.
* numProcesses: número de procesos totales ejecutados, contando igualmente al proceso maestro.

La función se compone de 4 partes. Para explicarlo mejor, me baso en esta situación: tengo 4 esclavos y 12 trabajos. Los trabajos son t0, t1, ... t11. Los esclavos son p1, p2, p3, p4 (ya que p0 sería el maestro).

Primero, "asigna" a cada esclavo un trabajo (en este contexto, sus horas de descanso del empleado), es decir, envía una posición del array a su respectivo proceso.
p1 -> t0 ("trabajará en"),
p2 -> t1,
p3 -> t2,
p4 -> t3

Segundo, los trabajos restantes se van asignando a medida que terminan los esclavos sus trabajos previos. El maestro se queda esperando a recibir los resultado del trabajo terminado de los esclavos para asignarles el siguiente.
Continuando con el ejemplo, t4 a t11 no habían sigo asignados. Si p3 (no importa que esclavo, MPI.ANY_SOURCE) termina, se le asigna t4. Así, hasta que todos los trabajos hayan sido asignados.

Tercero, una vez que todos los trabajos fueron asignados, el maestro también se encarga de recibir estos últimos trabajos a pesar de que ya no tiene que asignar más. En el ejemplo, se quedaría esperando los trabajos terminados t8, t9, t10 y t11.

Cuarto y último, notifica a los esclavos que se completaron todos los trabajos por medio de send.

🔷La función que realiza las instancias esclavas es doWork con el siguientes parámetro:
* comm: instancia del comunicador MPI.
El proceso esclavo se queda esperando lo que el proceso maestro le envió. Se mantiene descansando (sleep) según ese valor recibido y envia su resultado (el mismo valor). Si el tag corresponde al fin de trabajo, finaliza. Caso contrario, vuelve a quedar a la escucha.

b) **¿Cuántas de esas instancias ejecuta la función main(), initWork() y doWork()?**

Siendo N el número total de procesos que participan.
La función main() es ejecutado N veces, osea por todos los procesos.
La función initWork() una vez, solamente por el proceso maestro, cuyo id = 0.
La función doWork() es ejecutado por todos los procesos esclavos. Es decir será ejecutada N-1 veces.

c) **¿Cómo se diferencian los mensajes de trabajo o de finalización?**

Se diferencia por medio del tag.

Los esclavos envian su resultado al maestro sin indicar un tag particular, por lo que se manda un default, el cual el maestro recibe.
workTime = comm.recv(source=MPI.ANY_SOURCE, status=stat)
        recvcount += 1

WORK_FLAG = 1. El maestro envia el trabajo con tag WORK_FLAG.

END_WORK_FLAG = 2. El maestro envia el trabajo "0" con tag END_WORK a todos los procesos esclavos para avisarles que ya no quedan trabajos que atender.

Los esclavos lo reciben cualquiera sea el tag (MPI.ANY_TAG) y trabajan (bajo este contexto, descansan según ese valor, pudiendo ser 0, min_tiempo_sleep a max_tiempo_sleep). A continuación, si el tag resultaba ser END_WORK, finalizan.

d) **¿Cómo implementaría la función BLAS axpy() con este patrón?¿Sería eficiente? Tips: Pide solo el planteo, no la implementación.**

La operación axpy() es una operación lineal a partir de un escalar y 2 vectores.

*Y <- aX + Y*

MPI_Scatter(&sendbuf, sendcnt, sendtype, &recvbuf, ...)

MPI_Gather(&sendbuf, sendcnt, sendtype, &recvbuf, ...)

Podemos tener 2 procesos X e Y. A partir de MPI_Scatter distribuimos a cada proceso un vector. Paralelamente, se haría el cálculo axpy localmente posición a posición.
A través de MPI_Gather, el proceso Y recibe los resultados "parciales" que sería nuestro vector Y resultado.

e) **¿Qué sucede cuando solo ejecuta con una sola instancia?**

Al definir NRO = 1, indicamos que la cantidad de procesos que se ejecutarán en paralelo sea uno. De esta forma, comm.Get_size() devuelve 1 efectivamente. El proceso maestro es parte, por lo que no hay "empleados disponibles". En el código, tras querer repartir el trabajo a 0 empleados, el numero de tareas es 0.

numTasks = (numProcesses-1)*4

numTasks = (1-1)*4 = 0

f) Punto opcional: El código que ejecutan las instancias esclavas, tienen un error en su lógica. ¿Cómo se podría solucionar?

### 2.1.2 Armado del ambiente

In [1]:
! pip install mpi4py

Collecting mpi4py
  Downloading mpi4py-3.1.5.tar.gz (2.5 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.5/2.5 MB[0m [31m23.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
Building wheels for collected packages: mpi4py
  Building wheel for mpi4py (pyproject.toml) ... [?25l[?25hdone
  Created wheel for mpi4py: filename=mpi4py-3.1.5-cp310-cp310-linux_x86_64.whl size=2746527 sha256=890d6c838ec8a722078a70fad58703a5dcc61b4ecdb05bae3e538b8855aa7b80
  Stored in directory: /root/.cache/pip/wheels/18/2b/7f/c852523089e9182b45fca50ff56f49a51eeb6284fd25a66713
Successfully built mpi4py
Installing collected packages: mpi4py
Successfully installed mpi4py-3.1.5


### 2.1.3. Código del programa en Lenguaje Python

In [None]:
%%writefile Ejercicio2.py
from mpi4py import MPI
import numpy as np
import time

# --------------------------------------------
# Formulario
Max_tiempo_sleep =   1#@param {type: "slider", min: 1, max: 10}
Min_tiempo_sleep =   0#@param {type: "slider", min: 0, max: 10}
# --------------------------------------------

# --------------------------------------------
# Constantes de comunicacion
WORK_FLAG = 1
END_WORK_FLAG = 2
# --------------------------------------------

def main():
    comm = MPI.COMM_WORLD # Instanciamos el tipo de comunicador a utilizar.
    id = comm.Get_rank()  # Obtenemos el id como atributo del proceso que se ejecuta.

    # Utilizamos el 0 para definir al procesos Maestro, cualquier otro id sera un esclavo.
    if (id == 0) :
        init() # Llamamos funcion init para eventos que requeriremos inicialmente solo 1 vez.
        numProcesses = comm.Get_size()  # Obtenemos el numero de procesos totales ejecutados.
        numTasks = (numProcesses-1)*4 # Se setea el numero de tareas.
        workTimes = generateTasks(numTasks) # Se generan las tareas, en este caso seran
        print("El jefe crea {} horas de descanso de sus empleados:".format(workTimes.size), flush=True)
        print(workTimes, flush=True)
        initWork(comm, workTimes, numProcesses)
    else:
        doWork(comm)

def generateTasks(numTasks):
    #TODO: Cambiar la semilla del random para que se generen efectivamente diferentes numeros.
    np.random.seed(1000)
    return np.random.randint(low=Min_tiempo_sleep, high=Max_tiempo_sleep, size=numTasks)

def init():
  print()
  print("Version MPI4py utilizada: {}".format(MPI.Get_version()), flush=True)
  print()
  print( "------------------------------------", flush=True)
  print( "Sistema de trabajo Suizo:", flush=True)
  print( "------------------------------------", flush=True)
  print()

def initWork(comm, workTimes, numProcesses):
    totalWork = workTimes.size
    workcount = 0
    recvcount = 0

    print("Jefe enviando las tareas iniciales:", flush=True)
    for id in range(1, numProcesses):
        if workcount < totalWork:
            work=workTimes[workcount]
            comm.send(work, dest=id, tag=WORK_FLAG) # Envia mensaje de iniciar trabajo con el dato correspondiente del array.
            workcount += 1
            print("Jefe envia trabajo y {} hs de descanso al empleado {}.".format(work, id), flush=True)
    print( "------------------------------------", flush=True)

    # Mientras haya trabajo, se recibe el resultado de los empleados y se sigue enviando MAS trabajo.
    while (workcount < totalWork) :
        stat = MPI.Status()
        workTime = comm.recv(source=MPI.ANY_SOURCE, status=stat) # Recivimos resultados de los empleados.
        recvcount += 1
        workerId = stat.Get_source() # Obtenemos el identificador del empleado.
        print("Jefe recibe trabajo completado {} del empleado {}.".format(workTime, workerId), flush=True)
        #send next work
        work=workTimes[workcount]
        comm.send(work, dest=workerId, tag=WORK_FLAG)
        workcount += 1
        print("Jefe envia nuevo trabajo y {} hs de descanso al empleado {}.".format(work, workerId), flush=True)

    while (recvcount < totalWork):
        stat = MPI.Status()
        workTime = comm.recv(source=MPI.ANY_SOURCE, status=stat)
        recvcount += 1
        workerId = stat.Get_source()
        print("Jefe recibe trabajo completado {} del empleado {}.".format(workTime, workerId), flush=True)

    for id in range(1, numProcesses):
        comm.send(0, dest=id, tag=END_WORK_FLAG)


def doWork(comm):
    while(True):
        stat = MPI.Status() # Obtiene el estado actual del empleado.
        waitTime = comm.recv(source=0, tag=MPI.ANY_TAG, status=stat) # Obtiene lo enviado por el Jefe.
        print("Soy el empleado con id {}, toca descanzo por {} hs.".format(comm.Get_rank(), waitTime), flush=True)

        if (stat.Get_tag() == END_WORK_FLAG):
            print("Marca tarjeta el empleado {}.".format(comm.Get_rank()), flush=True)
            return
        time.sleep(waitTime)
        comm.send(waitTime, dest=0)

main()

Writing Ejercicio2.py


### 2.1.4 Ejecución del programa

Parámetros de ejecución

In [None]:
# --------------------------------------------
#@title Parámetros de ejecución { vertical-output: true }
NRO =   6#@param {type: "number"}
# --------------------------------------------

! mpirun --oversubscribe --allow-run-as-root -np $NRO python Ejercicio2.py

## 3.1 Ejercicio Contar palabras

Desarrollar un programa que permita obtener el valor máximo de un conjunto dado de forma distribuida.

Las condiciones a tener en cuenta son:

Debe trabajar con al menos, 4 procesos.
El resultado final debe ser informado en cada proceso.
Implementar comunicación por Buffer

In [9]:
%%writefile mpi_tp4.py

from mpi4py import MPI
import numpy as np
from IPython.display import display
comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()

count = 12
sendbuf = None
recvbuf_ = None # resultado reduce

if rank == 0:
  sendbuf = np.empty([size, count], dtype='i') # creo matriz
  sendbuf[:,:] = np.random.randint(0, 100, size=(size, count)) # asigno valores

  print("Matriz generada\n", sendbuf)
  print("Maximo check = ", sendbuf.max()) # este debería ser el resultado final.

## ---------------------- SCATTER ----------------------------------------------

recvbuf = np.empty(count, dtype='i') # array vacio, fila

comm.Scatter(sendbuf, recvbuf, root=0) # scatter(matriz, vector, root)
print(f"Scatter | Proceso: {rank} - Data: {recvbuf}")

# todos los procesos recibieron una parte de la matriz "sendbuf",
# y lo alojan en un array previamente vacio y secuencial "recvbuf"

## ------------------------ REDUCE ---------------------------------------------

if rank == 0:
  recvbuf_ = np.empty([1, count], dtype='i') #matriz de fila 1

comm.Reduce(recvbuf, recvbuf_, op=MPI.MAX, root=0) # reduce(vector, matriz, op, root)

if rank == 0:
  print("Reduce | Soy proceso root. Maximos de cada columna: ", recvbuf_) #resultado matriz

# el proceso root recibe el array "recvbuf" de cada proceso y
# junto con la operación para obtener los máximos de cada COLUMNA de la matriz
# se aloja el resultado en la matriz de una fila "recvbuf_".

## ----------------------- BROADCAST -------------------------------------------

if rank == 0:
  max_final = recvbuf_.max() # variable con el maximo de maximos
  data = np.array([max_final], dtype='i')
else:
  data = np.empty(1, dtype='i') # array de una sola posición

comm.Bcast(data, root=0)

print(f"Broadcast | Proceso: {rank} - Resultado final: {data[0]}")

# todos los procesos reciben el resultado final y lo informan


Writing mpi_tp4.py


In [10]:
NRO = 4
! mpirun --oversubscribe --allow-run-as-root -np $NRO python mpi_tp4.py

Matriz generada
 [[69 88 62 54 66 43 31 52 17 20 22 69]
 [20 11  6 84 14 43 83 54 29 34 50 31]
 [94 23 19 29 27 17  0  3 89 75 13 83]
 [ 1 53 75 49 82 89 49 53 26 79 68 73]]
Maximo check =  94
Scatter | Proceso: 0 - Data: [69 88 62 54 66 43 31 52 17 20 22 69]
Scatter | Proceso: 1 - Data: [20 11  6 84 14 43 83 54 29 34 50 31]
Scatter | Proceso: 2 - Data: [94 23 19 29 27 17  0  3 89 75 13 83]
Scatter | Proceso: 3 - Data: [ 1 53 75 49 82 89 49 53 26 79 68 73]
Reduce | Soy proceso root. Maximos de cada columna:  [[94 88 75 84 82 89 83 54 89 79 68 83]]
Broadcast | Proceso: 0 - Resultado final: 94
Broadcast | Proceso: 1 - Resultado final: 94
Broadcast | Proceso: 3 - Resultado final: 94
Broadcast | Proceso: 2 - Resultado final: 94
