<a href="https://colab.research.google.com/github/TomasRojass/TP-Progra-concurrente/blob/main/TP2/Parte%202/TP_2_Parte2_MPI.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Programación Concurrente - TP2 - Parte 2

Para este ejercicio se ha optado por aplicar el tema teórico **MPI** (Message Passing Interface). La finalidad del ejercicio es ampliar el conocimiento sobras la manera que posee Python para implementar la comunicación entre distintos procesos con el uso de una librería denominada **MPI4py** [1]. Para más información puede consultar la última revisión en [2].

---
## 2.1. Ejercicio - Hola Mundo con MPI

### 2.1.1. Preguntas del ejercicio

Ejecute este ejemplo para $4$ o más instancias y responda:

a) ¿Qué función realiza la instancia maestra? ¿Qué función realizan las instancias esclavas?

La instacia maestra crea las tareas y las distribuye entre sus empleados o instancias esclavas.
Por otra parte instacias esclavas (id > 0) ejecutan estan tareas creadas por el maestro y devuelven su resultado a la instacia maestra.

b) ¿Cuántas de esas instancias ejecuta la función `main()`, `initWork()` y `doWork()`?

La funcion main() es ejecutada por todas las instacias, en base a esto en este main se define, si el id == 0 entonces se ejecuta la funcion initWork(), y si el id > 0 entonces se ejecuta la funcion doWork().

c) ¿Cómo se diferencian los mensajes de trabajo o de finalización?

Se diferencian mediante la implementacion de un tag el cual puede ser WORK_FLAG en caso de mensajes de trabajo y END_WORK_FLAG en caso de mensajes de finalizacion.

d) ¿Cómo implementaría la función BLAS `axpy()` con este patrón?¿Sería eficiente? *Tips*: Pide solo el planteo, no la implementación.

Cada esclavo podria ejecutar la función axpy() procesar algunos de sus datos locales y luego enviarlos al proceso raiz mediante un Gather, sin embargo por el tipo de problema planteado y debido a su simplicidad no encontrariamos una forma en la cual sea mas conveniente el uso de axpy().

e) ¿Qué sucede cuando solo ejecuta con una sola instancia?

Si se ejecuta con una sola instancia, el maestro no llega a enviar ningun trabajo ya que no existen procesos esclavos al que enviarle. Esto rompe el sentido del uso de MPI.

f) *Punto opcional*: El código que ejecutan las instancias esclavas, tienen un error en su lógica. ¿Cómo se podría solucionar?

Se podría administrar mejor el uso de los tag de trabajo y finalizacion.

###2.1.2. Armado del ambiente

Es en donde se instalar, por única vez, el módulo MPI4py de Python en el cuaderno.

In [None]:
! pip install mpi4py

### 2.1.3. Código del programa en Lenguaje Python

In [None]:
%%writefile Ejercicio2.py
from mpi4py import MPI
import numpy as np
import time

# --------------------------------------------
# Formulario
Max_tiempo_sleep =   1#@param {type: "slider", min: 1, max: 10}
Min_tiempo_sleep =   0#@param {type: "slider", min: 0, max: 10}
# --------------------------------------------

# --------------------------------------------
# Constantes de comunicacion
WORK_FLAG = 1
END_WORK_FLAG = 2
# --------------------------------------------

def main():
    comm = MPI.COMM_WORLD # Instanciamos el tipo de comunicador a utilizar.
    id = comm.Get_rank()  # Obtenemos el id como atributo del proceso que se ejecuta.

    # Utilizamos el 0 para definir al procesos Maestro, cualquier otro id sera un esclavo.
    if (id == 0) :
        init() # Llamamos funcion init para eventos que requeriremos inicialmente solo 1 vez.
        numProcesses = comm.Get_size()  # Obtenemos el numero de procesos totales ejecutados.
        numTasks = (numProcesses-1)*4 # Se setea el numero de tareas.
        workTimes = generateTasks(numTasks) # Se generan las tareas, en este caso seran
        print("El jefe crea {} horas de descanso de sus empleados:".format(workTimes.size), flush=True)
        print(workTimes, flush=True)
        initWork(comm, workTimes, numProcesses)
    else:
        doWork(comm)

def generateTasks(numTasks):
    #TODO: Cambiar la semilla del random para que se generen efectivamente diferentes numeros.
    np.random.seed(1000)
    return np.random.randint(low=Min_tiempo_sleep, high=Max_tiempo_sleep, size=numTasks)

def init():
  print()
  print("Version MPI4py utilizada: {}".format(MPI.Get_version()), flush=True)
  print()
  print( "------------------------------------", flush=True)
  print( "Sistema de trabajo Suizo:", flush=True)
  print( "------------------------------------", flush=True)
  print()

def initWork(comm, workTimes, numProcesses):
    totalWork = workTimes.size
    workcount = 0
    recvcount = 0

    print("Jefe enviando las tareas iniciales:", flush=True)
    for id in range(1, numProcesses):
        if workcount < totalWork:
            work=workTimes[workcount]
            comm.send(work, dest=id, tag=WORK_FLAG) # Envia mensaje de iniciar trabajo con el dato correspondiente del array.
            workcount += 1
            print("Jefe envia trabajo y {} hs de descanso al empleado {}.".format(work, id), flush=True)
    print( "------------------------------------", flush=True)

    # Mientras haya trabajo, se recibe el resultado de los empleados y se sigue enviando MAS trabajo.
    while (workcount < totalWork) :
        stat = MPI.Status()
        workTime = comm.recv(source=MPI.ANY_SOURCE, status=stat) # Recibimos resultados de los empleados.
        recvcount += 1
        workerId = stat.Get_source() # Obtenemos el identificador del empleado.
        print("Jefe recibe trabajo completado {} del empleado {}.".format(workTime, workerId), flush=True)
        #send next work
        work=workTimes[workcount]
        comm.send(work, dest=workerId, tag=WORK_FLAG)
        workcount += 1
        print("Jefe envia nuevo trabajo y {} hs de descanso al empleado {}.".format(work, workerId), flush=True)

    while (recvcount < totalWork):
        stat = MPI.Status()
        workTime = comm.recv(source=MPI.ANY_SOURCE, status=stat)
        recvcount += 1
        workerId = stat.Get_source()
        print("Jefe recibe trabajo completado {} del empleado {}.".format(workTime, workerId), flush=True)

    for id in range(1, numProcesses):
        comm.send(0, dest=id, tag=END_WORK_FLAG)


def doWork(comm):
    while(True):
        stat = MPI.Status() # Obtiene el estado actual del empleado.
        waitTime = comm.recv(source=0, tag=MPI.ANY_TAG, status=stat) # Obtiene lo enviado por el Jefe.
        print("Soy el empleado con id {}, toca descanzo por {} hs.".format(comm.Get_rank(), waitTime), flush=True)

        if (stat.Get_tag() == END_WORK_FLAG):
            print("Marca tarjeta el empleado {}.".format(comm.Get_rank()), flush=True)
            return
        time.sleep(waitTime)
        comm.send(waitTime, dest=0)

main()


Writing Ejercicio2.py


### 2.1.4 Ejecución del programa

In [None]:
# --------------------------------------------
#@title Parámetros de ejecución { vertical-output: true }
NRO =   2#@param {type: "number"}
# --------------------------------------------

! mpirun --oversubscribe --allow-run-as-root -np $NRO python Ejercicio2.py


Version MPI4py utilizada: (3, 1)

------------------------------------
Sistema de trabajo Suizo:
------------------------------------

El jefe crea 4 horas de descanso de sus empleados:
[0 0 0 0]
Jefe enviando las tareas iniciales:
Jefe envia trabajo y 0 hs de descanso al empleado 1.
------------------------------------
Soy el empleado con id 1, toca descanzo por 0 hs.
Jefe recibe trabajo completado 0 del empleado 1.
Jefe envia nuevo trabajo y 0 hs de descanso al empleado 1.
Soy el empleado con id 1, toca descanzo por 0 hs.
Jefe recibe trabajo completado 0 del empleado 1.
Jefe envia nuevo trabajo y 0 hs de descanso al empleado 1.
Soy el empleado con id 1, toca descanzo por 0 hs.
Jefe recibe trabajo completado 0 del empleado 1.
Jefe envia nuevo trabajo y 0 hs de descanso al empleado 1.
Soy el empleado con id 1, toca descanzo por 0 hs.
Jefe recibe trabajo completado 0 del empleado 1.
Soy el empleado con id 1, toca descanzo por 0 hs.
Marca tarjeta el empleado 1.


---
## 3.1 Ejercicio Contar palabras

Desarrollar un programa que permita obtener el valor máximo de un conjunto dado de forma distribuida.

Las condiciones a tener en cuenta son:


*   Debe trabajar con al menos, 4 procesos.
*   El resultado final debe ser informado en cada proceso.
*   Implementar comunicación por Buffer



###3.1.1 Preparación

In [None]:
%%writefile mpi_tp4.py

from mpi4py import MPI
import numpy as np
from IPython.display import display
comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()

count = 5000
sendbuf = None

if rank == 0:
  sendbuf = np.random.randn(size, count)
  display("Máximo de matriz: ", sendbuf.max())

recvbuf = np.empty(count, dtype='d')
comm.Scatter(sendbuf, recvbuf, root=0)

local_max = np.max(recvbuf)
print(f"Proceso {rank}: Máximo local = {local_max}")

global_max = comm.reduce(local_max, op=MPI.MAX, root=0)

if rank == 0:
    print(f"Máximo global calculado con MPI = {global_max}")


Overwriting mpi_tp4.py


In [None]:
# --------------------------------------------
# Formulario
NRO =   6#@param {type: "slider", min: 4, max: 10}
# --------------------------------------------

! mpirun --oversubscribe --allow-run-as-root -np $NRO python mpi_tp4.py

Máximo de matriz:  3.876604191527599
Proceso 3: Máximo local = 3.876604191527599
Proceso 1: Máximo local = 3.535322014222286
Proceso 2: Máximo local = 3.440875230735511
Proceso 5: Máximo local = 3.6800149079242748
Proceso 4: Máximo local = 3.753674008810323
Proceso 0: Máximo local = 3.5265744071868683
Máximo global calculado con MPI = 3.876604191527599


---
# Bibliografía

[1] MPI4py: https://mpi4py.readthedocs.io/en/stable/

[2] La versión oficial de MPI (Versión 4): https://www.mpi-forum.org/docs/mpi-4.0/mpi40-report.pdf

