# PPD: MPI e programação com passagem de mensagem

Hélio - DC/UFSCar - 2023

# Aspectos gerais das transmissões

Usando o mecanismo de **identificação lógica** (***rank***) dos processos emissores (***senders***) e receptores (***receivers***), MPI oferece suporte tanto para comunicações **diretas** (ponto-a-ponto) entre pares de processos, quanto para operações de comunicação em **grupo** (coletivas).

Relembrando, as identificações lógicas levam em consideração a participação de todos os processos no grupo MPI_COMM_WORLD, variando de 0 a N-1.

    Também é possível criar outros sub-grupos numa aplicação. Dentro de um sub-grupo,
    cada processo também vai ter um identificador lógico, que pode ser usado em comunicações
    entre membros deste sub-grupo.

Nas transmissões ponto-a-ponto, especifica-se um grupo de processos, que geralmente é o grupo MPI_COMM_WORLD, formado por todos os processos dessa aplicação. Os identificadores lógicos (*ranks*) dos processos **neste grupo** são usados então para identificar o emissor e o receptor de cada operação.

Ex:

* int MPI_**Send** (void \*buf, int count, MPI_Datatype dtype,
  int **dest**, int tag, MPI_Comm **comm**)

* int MPI_**Recv** (void \*buf, int count, MPI_Datatype dtype,
  int **src**, int tag, MPI_Comm **comm**, MPI_Status *stat)

<br>

Cada mensagem, por sua vez, possui um atributo de identificação (***Tag***), que pode ser usado na seleção de mensagens a receber. Nas operações de recebimento de mensagens, é possível especificar um processo de origem e um identificador de mensagem esperada (*Tag*). Há, contudo, alguns códigos especiais que podem ser usados para identificar **qualquer processo** origem (**MPI_ANY_SOURCE**) e **qualquer mensagem** (**MPI_ANY_TAG**).

Ex:

* int MPI_**Send** (void \*buf, int count, MPI_Datatype dtype,
  int **dest**, int **tag**, MPI_Comm **comm**)

* int MPI_**Recv** (void \*buf, int count, MPI_Datatype dtype,
  int **src**, int **tag**, MPI_Comm **comm**, MPI_Status *stat)

<br>

Já nas transmissões **coletivas**, identifica-se o **grupo**, ou sub-grupo, envolvido, e o processo que fará o papel de **divulgador** ou **agregador** dos dados, dependendo do tipo de transmissão.



# Tipos dos dados transmitidos

A transmissão de dados usando *sockets* é feita passando-se um ponteiro para uma sequência de bytes. Cabe à aplicação tratar dos detalhes do armazenamento e da leitura dos dados, de acordo com seus tipos.

MPI simplifica o envio de sequências de dados de um mesmo tipo. Considerando os dados transmitidos nas mensagens, MPI permite que a aplicação identifique seus **tipos** e **quantidades**, cuidando automaticamente dos empacotamentos apropriados em cada caso. Também é possível fazer empacotamentos e desempacotamentos de conteúdos específicos.


Os tipos dos dados transmitidos podem ser pré-definidos ou definidos pelo usuário.

* MPI_CHAR: signed char
* MPI_SHORT: signed short int
* MPI_INT: signed int
* MPI_LONG: signed long int
* MPI_UNSIGNED_CHAR: unsigned char
* MPI_UNSIGNED_SHORT: unsigned short int
* MPI_UNSIGNED: unsigned int
* MPI_UNSIGNED_LONG: unsigned long int
* MPI_FLOAT: float
* MPI_DOUBLE: double
* MPI_LONG_DOUBLE: long double
* MPI_BYTE: 8 binary digits
* MPI_PACKED: data packed or unpacked with MPI_Pack()/ MPI_Unpack

Ex:

* int MPI_**Send** (void \*buf, int **count**, MPI_Datatype **dtype**,
  int dest, int tag, MPI_Comm comm)

* int MPI_**Recv** (void \*buf, int **count**, MPI_Datatype **dtype**,
  int src, int tag, MPI_Comm comm, MPI_Status *stat)

Nas operações acima, a indicação do tipo dos dados transmitidos e o número de dados serve para que MPI faça o empacotamento e desempacotamento de maneira apropriada. Assim, o conteúdo dos dados transmitidos é preservado nas transmisões, mesmo entre sistemas emissor e receptor executando em SOs e arquiteturas diferentes.

<br>

Também é possível realizar o envio de dados de tipos variados. Para isso, MPI oferece mecanismos para o empacotamento de dados em um *buffer* para envio e o desempacotamento de dados de uma mensagem recebida.

É claro que cabe ao código da aplicação fazer o empacotamento e o desempacotamento dos dados no buffer na ordem apropriada.

<br>

## Emapacotamento e envio de dados variados

https://www.open-mpi.org/doc/v3.1/man3/MPI_Pack.3.php
<br>
https://www.mpi-forum.org/docs/mpi-3.1/mpi31-report/node92.htm#Node92


```c
int MPI_Pack (const void *inbuf, int incount, MPI_Datatype datatype,
              void *outbuf, int outsize, int *position, MPI_Comm comm)
```

```c
 Example: An example using MPI_Pack:

    int position, i, j, a[2];
    char buff[1000];
    ....
    MPI_Comm_rank(MPI_COMM_WORLD, &myrank);
    
    if (myrank == 0) { /* SENDER CODE */
      position = 0;
      MPI_Pack(&i, 1, MPI_INT, buff, 1000, &position, MPI_COMM_WORLD);
      MPI_Pack(&j, 1, MPI_INT, buff, 1000, &position, MPI_COMM_WORLD);
      MPI_Send( buff, position, MPI_PACKED, 1, 0, MPI_COMM_WORLD);
    }
    else { /* RECEIVER CODE */
      MPI_Recv( a, 2, MPI_INT, 0, 0, MPI_COMM_WORLD)
    }
```



## Tipos de transmissão ponto-a-ponto

Há diferentes tipos de primitivas de envio ponto-a-ponto, que variam em função do **sincronismo** entre emissor e receptor, do **bloqueio** ou não da operação, e do uso de ***buffers*** no transmissor e no receptor.

* ***Standard***: nas transmissões padrão, não há sincronismo entre emissor e receptor. Se forem providos buffers, envio pode ser concluído antes do recebimento. No caso de transmissões não-bloqueantes, as operações MPI_Wait( ) e MPI_Test( ) podem ser usadas para saber se foram concluídas.
* ***Buffered***: transmissões podem ser *bufferizadas*. Para tanto, as chamadas MPI_Buffer_attach( ) e MPI_Buffer_detach( ) tratam da definição de espaços de *buffer*. Nesse tipo de transmissão bufferizada, o envio pode ser concluído antes do recebimento ser selecionado.
* ***Synchronous***: nas transmissões síncronas, as operações envio e recebimento podem ser realizadas em qualquer ordem, mas a transmissão só ocorre quando ambas as operações forem emitidas. Deste modo, além de prover a transmissão, essas chamadas servem para a sincronziação entre as partes envolvidas.
* ***Ready***: nesse modo de transmissão, o envio pode ser iniciado apenas quando o recebimento já foi solicitado, ou um erro é resultado.

<br>

    Nas chamadas providas pela API MPI, mnemônicos nos nomes das funções especificam cada um desses modos de transmissão:
    -b: buffered, -s: synchronous, -r: ready

Qualquer tipo de envio (padrão, buferizado, síncrono ou *ready*) pode ser associado a qualquer tipo de recepção (padrão, ...).

Nas operações não bloqueantes, há formas de verificar posteriormente se mensagens esperadas já foram recebidas.

Já as operações que usam *buffers* têm o efeito de permitir que as transmissões ocorram mesmo que emissor e receptor não estejam sincronizados numa operação de transmissão.

Uma vez que as chamadas de envio e recebimento comumente podem ser emitidas de forma não sincronizada, **cabe à implementação MPI** tratar do posicionamento dos dados até que eles possam ser efetivamente repassados ao processo receptor.

Por exemplo, caso a operação de envio seja realizada antes de o receptor emitir uma chamada de recepção, cabe à implementação MPI decidir onde os dados transmitidos serão armazenados até que possam ser entregues. Para tanto, uma área de *buffer* de recebimento deve ser alocada, seja no nó emissor e/ou no receptor. A decisão de como tratar isso, contudo, não é padronizada, sendo dependente da implementação MPI.

Vale ressaltar que as transmissões providas pela biblioteca MPI **são confiáveis**. Ou seja, salvo se ocorra falha nos meios de transmissão, as mensagens enviadas serão sempre recebidas corretamente e a aplicação não precisa preocupar-se com a verificação de erros nos dados recebidos, com limites de tempos para transmissão (*time-outs*), ou com outras condições de erro.

MPI garante ainda que a ordem de recebimento das mensagens equivalentes é respeitada nas entregas aos receptores, mas não cabe à implementação MPI garantir que não ocorrerá *starvation* no recebimento de mensagens por processos concorrentes. Ou seja, se vários processos competem pelo recebimento de algum tipo de mensagem, MPI não garante que todos receberão mensagens.



# Comunicação com MPI_Send e MPI_Receive

O exemplo a seguir ilustra transmissões usando [MPI_Send](https://www.open-mpi.org/doc/v3.1/man3/MPI_Send.3.php) e [MPI_Recv](https://www.open-mpi.org/doc/v3.1/man3/MPI_Recv.3.php).

Uma vez compilado um programa MPI, ele pode ser ativado com diferentes números de processos, alocados sobre diferentes conjuntos de computadores. Além disso, comumente, o mesmo código (arquivo executável) é iniciado em todos os nós, no modelo SPMD.

Deste modo, um aspecto comum na maior parte dos programas é determinar quantos processos foram usados na execução corrente e qual é o número lógico (*rank*) de um processo dentro deste conjunto.

Isso é feito com as chamadas MPI_Comm_size(MPI_COMM_WORLD, ...) e MPI_Comm_rank(MPI_COMM_WORLD, ...);

A diferenciação do papel que cada processo exeutará dentro da aplicação é comumente feita em função de seus *ranks*. Em geral, o processo de *rank* **0** é usado para fazer as atividades de coordenação, mas isso é critério da aplicação.

```c
// Determina o número de processos e o rank do processo atual no grupo geral
int num_procs, rank;

MPI_Comm_size(MPI_COMM_WORLD, &num_procs);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);

int i;
int val;    // vai ser usado para transmissão pelo rank 0 e para recepção pelos demais processos
...
if (rank == 0) {
    val = set_val();   // função hipotética que produz o valor de interesse
    // envia mensagem para todos os demais processos, 1 a rank-1
    // int MPI_Send(const void *buf, int count, MPI_Datatype datatype,
    //              int dest, int tag, MPI_Comm comm)
    for(i=1; i < num_procs; i++)
      MPI_Send(&val, 1, MPI_INT, i, 0, MPI_COMM_WORLD);

} else if (rank != 0) { // processos rank > 0 recebem mensagem de rank 0
    // int MPI_Recv(void *buf, int count, MPI_Datatype datatype,
    //              int source, int tag, MPI_Comm comm, MPI_Status *status)
    MPI_Recv(&val, 1, MPI_INT, 0, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
    printf("Processo %d recebeu valor %d do processo rank 0\n", rank, val);
}
```

Alguns aspectos a notar neste trecho de código:

* o **mesmo programa** está sendo executado nos processos de todos os *ranks*
* no envio, veja que o buffer de transmissão é o endereço de onde se iniciam os dados. Neste caso, é o endereço da variável;
* no envio ainda, o contador é o número de elementos, do tipo definido, que serão copiados para transmissão a partir do endereço de início do *buffer*;
* no envio, o valor de ***i*** está sendo usado para indicar o *ranK* de cada receptor a que se destina a mensagem;
* veja que, como o nó de *rank* ***0*** está transmitindo, o ***for*** varia de ***i=1 a (num_procs-1)***. Ou seja, 0 envia para os demais;
* nessas transmissões, todas as mensagens têm o ***tag*** 0, tanto no envio quanto no recebimento; assim, não há seleção de mensagens pelos *tags* neste caso;
* no recebimento, passa-se o endereço de onde os dados serão colocados, o tipo dos dados e o número de ocorrências;
* no recebimento, todos os nós recebem do nó de *rank* ***0***;
* ainda no recebimento, veja que o valor inteiro sendo recebido é copiado para a variável ***val***. É claro que isso não vai sobrepor o valor da variável val do emissor, já que essa operação de recebimento vai estar sendo executada só nos nós receptores (*rank* > 0)!

Um outro aspecto a ressaltar aqui e que talvez esteja confuso, é o fato que embora estejamos examinando um código único, vai haver várias instâncias de processos executando esse mesmo código, provavelmente em nós (computadores) distintos!

Em uma das cópias, no nó de *rank* ***0***, aquele de iniciou a execução, vai ser executada a primeira parte do ***if***. Nos demais, o código do ***else***.

Faz sentido?



# Transmissões não bloqueantes

Em MPI, e na programação com passagem de mensagem de maneira geral,  as transmissões de mensagens podem também servir para algum tipo de **sincronização** entre os processos. O uso de *buffers*, explicitamente pela aplicação, ou pela biblioteca MPI, introduz ainda um outro aspecto às transmissões.

Comumente, a bibliotevca MPI utiliza *buffers* para transmissão, alocados no espaço de endereçamento do processo, mas de forma **não visível pela aplicação**. Assim, quando há uma chamada ***MPI_Send***, os dados a serem transmitidos são copiados para um *buffer* da biblioteca MPI e a chamada retorna imediatamente. Caso não haja espaço nesse *buffer*, devido a transmissões anteriores ainda pendentes, a tarefa que emitiu a chamada é bloqueada.

Já quando a primitiva ***MPI_Isend*** for usada, a chamada retorna imediatamente, mesmo que os dados a transmitir não tenham sido copiados para o *buffer* da biblioteca. Esse é um comportamento **não bloqueante**. Um parâmetro extra presente nesta chamada, ***request***, pode ser usado posteriormente para verficiar o estado desta operação.

O recebimento de mensagens também pode ser não bloqueante, usando-se ***MPI_Irecv***.

* **MPI_Send** (buffer,count,type,dest,tag,comm): envio bloqueante
* **MPI_Isend** (buffer,count,type,dest,tag,comm,request): envio não-bloqueante
* **MPI_Recv** (buffer,count,type,source,tag,comm,status): recebimento bloqueante
* **MPI_Irecv** (buffer,count,type,source,tag,comm,request): recebimento não bloqueante


Parâmetros:

* ***Buffer***: endereço de memória da localização dos dados; geralmente é o endereço de uma variável.
* ***Data Count***: número de elementos de dados do tipo especificado a serem enviados.
* ***Data Type***: tipos pré-definidos ou definidos pelo usuário.
* ***Destination***: indica o rank do processo a quem se destina a msg.
* ***Source***: especifica o rank do processo emissor. MPI_ANY_SOURCE permite receber de qualquer tarefa.
* ***Tag***: identificador atribuído (0..32767) pelo programador para identificar uma mensagem. Permite especificar a mensagem a receber. MPI_ANY_TAG permite receber qualquer mensagem.
* ***Communicator***: indica o conjunto de processos a quem se destina a mensagem. Normalmente usa-se MPI_COMM_WORLD.
* ***Status***: em C, é um ponteiro para uma estrutura MPI_Status. Ex. stat.MPI_SOURCE, stat.MPI_TAG, MPI_Get_count routine (núm. Bytes recebidos)
* ***Request***: usado em operações não-bloqueantes, retorna um "request number", que pode ser usado posteriormente (em operações do tipo WAIT) para determinar o estado da operação.


O exemplo de código a seguir ilustra o uso de primitivas para envio e recebimento de mensagens. Neste exemplo, o processo de rank 0 envia mensagens individuais para cada um dos demais processos. Cada um deles recebe a mensagem do rank0 e envia uma resposta.

Como há apenas 1 mensagem de cada origem para cada destino, o campo ***tag*** pode ser o mesmo em todas as trasmissões.

In [None]:
%%writefile sr.c

#include <sys/types.h>
#include <unistd.h>
#include <stdio.h>
#include <string.h>
#include <mpi.h>

// #define LEN 256
#define LEN (MPI_MAX_PROCESSOR_NAME + 128)

int
main( int argc, char *argv[])
{
	int i, rank, result, numtasks, namelen, msgtag, pid, pid_0;
	char processor_name[MPI_MAX_PROCESSOR_NAME];
	char tx_buf[LEN], rx_buf[LEN];
	MPI_Status status;

	result = MPI_Init(&argc,&argv);

	if (result != MPI_SUCCESS) {
		printf ("Erro iniciando programa MPI.\n");
		MPI_Abort(MPI_COMM_WORLD, result);
	}

	// Determina número de processos em execução na aplicação
	MPI_Comm_size(MPI_COMM_WORLD,&numtasks);

	// Determina ranking desse processo no grupo
	MPI_Comm_rank(MPI_COMM_WORLD,&rank);

	// Determina nome do host local
	MPI_Get_processor_name(processor_name,&namelen);

	pid=getpid();

	msgtag=1;

	if(rank==0) { // master node envia msg para todos os demais: 1..N-1

		for(i=1; i < numtasks; i++) {

			// int MPI_Send(void *buf, int count, MPI_Datatype dtype, int dest, int tag, MPI_Comm comm)
			MPI_Send (&pid, 1, MPI_INT,i,msgtag,MPI_COMM_WORLD);
			// printf("%s enviou: %d para processo %d\n",processor_name,pid, i);
		}

		// rank 0 aguarda resposta individual de cada um dos demais nós: 1..N-1

		for(i=1; i < numtasks; i++) {
			// rank 0 recebe dos demais (MPI_Comm_size -1)

			// int MPI_Recv(void *buf, int count, MPI_Datatype dtype,
			//              int src, int tag, MPI_Comm comm, MPI_Status *stat)
			MPI_Recv(rx_buf,LEN,MPI_CHAR, i, msgtag, MPI_COMM_WORLD,&status);

			printf("%s: msg de resposta recebida do processo %d: %s\n", processor_name, i,rx_buf);
		}

	} else {   // rank != 0: worker nodes: todos recebem de rank 0 e retornam

		// int MPI_Recv(void* buf,int count,MPI_Datatype datatype,
		//              int source, int tag,MPI_Comm comm,MPI_Status *status);
		MPI_Recv(&pid_0,1,MPI_INT,0,msgtag,MPI_COMM_WORLD,&status);
		// printf("%s recebeu: %d\n",processor_name,pid_0);

		// Ranks != 0 enviam msg para rank 0
    // Monta mensagem de texto para resposta
		sprintf(tx_buf,"%s: rank=%d, pid 0=%d",processor_name,rank,pid_0);

		// int MPI_Send(void *buf, int count, MPI_Datatype dtype, int dest,
		//              int tag, MPI_Comm comm)
		MPI_Send(tx_buf,strlen(tx_buf)+1,MPI_CHAR,0,msgtag,MPI_COMM_WORLD);
	}

	MPI_Finalize();

	return(0);
}

Writing sr.c


In [None]:
!mpicc sr.c -o sr && mpirun --allow-run-as-root -n 4 -host localhost:4 sr

3c1f42ea217e: msg de resposta recebida do processo 1: 3c1f42ea217e: rank=1, pid 0=33042
3c1f42ea217e: msg de resposta recebida do processo 2: 3c1f42ea217e: rank=2, pid 0=33042
3c1f42ea217e: msg de resposta recebida do processo 3: 3c1f42ea217e: rank=3, pid 0=33042


O exemplo a seguir, baseado em https://mpitutorial.com/tutorials/mpi-send-and-receive/, ilustra um modelo de comunicação circular entre os processos da aplicação, também usando as primitivas MPI_Send e MPI_Recv.

In [None]:
%%writefile pipeline.c

#include <mpi.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>

int
main(int argc, char** argv)
{
  int world_rank;
  int world_size;
  int token, prox, ant;

  MPI_Init(NULL, NULL);

  MPI_Comm_rank(MPI_COMM_WORLD, &world_rank);
  MPI_Comm_size(MPI_COMM_WORLD, &world_size);

  // Recebe mensagem do processo de rank anterior e envia para o posterior
  // Atenção com o primeiro e o último...
  // Todas as mensagens são enviadas com Tag=0

  prox = (world_rank +1) % world_size;
  ant = (world_rank + world_size -1) % world_size;

  // Quem começa?
  if (world_rank == 0) {
    token = 0;
    // envia token: 1 valor do tipo MPI_INT
    MPI_Send(&token, 1, MPI_INT, prox, 0, MPI_COMM_WORLD);
  }
  // todos agora, inclusive o 0...
  do {
    // espera token...
    MPI_Recv(&token, 1, MPI_INT, ant, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
    printf("Rank %d recebeu token %d do rank %d\n", world_rank, token, ant);

    if(token < 3 * world_size -1) {
      // Já que tem o token, poderia usar o recurso agora!
      // Token, na verdade, poderia ser dados que serão manipulados localmente e
      // encaminhados para mais processamentos no próximo nó do pipeline...
      sleep(rand()%3);

      token++;
    }
    // libera o token, passando-o para o próximo no anel
    MPI_Send(&token, 1, MPI_INT, prox, 0, MPI_COMM_WORLD);

  } while (token < 3 * world_size -1);

  printf("Rank %d terminando...\n",world_rank);

  MPI_Finalize();
}

Writing pipeline.c


In [None]:
# Aqui, testamos com 4 processos no mesmo nó. Se houver mais nós bastaria configurá-los no hostfile, ou especificar em linha de comando
! mpicc -Wall pipeline.c -o pipeline && mpirun --allow-run-as-root -n 4 -host localhost:4 pipeline

Rank 1 recebeu token 0 do rank 0
Rank 2 recebeu token 1 do rank 1
Rank 3 recebeu token 2 do rank 2
Rank 0 recebeu token 3 do rank 3
Rank 1 recebeu token 4 do rank 0
Rank 2 recebeu token 5 do rank 1
Rank 3 recebeu token 6 do rank 2
Rank 0 recebeu token 7 do rank 3
Rank 1 recebeu token 8 do rank 0
Rank 2 recebeu token 9 do rank 1
Rank 3 recebeu token 10 do rank 2
Rank 3 terminando...
Rank 0 recebeu token 11 do rank 3
Rank 0 terminando...
Rank 1 recebeu token 11 do rank 0
Rank 1 terminando...
Rank 2 recebeu token 11 do rank 1
Rank 2 terminando...


# MPI_ANY_SOURCE e MPI_ANY_TAG

No exemplo a seguir, o processo de rank 0 envia e recebe mensagens para/de todos os demais processos da aplicação (MPI_COMM_WORLD).

Neste caso, contudo, o recebimento pode ser fora de ordem; ou seja, o identificador do processo que enviou a próxima mensagem na fila não é conhecido previamente. Assim, uma opção é usar-se a constante **MPI_ANY_SOURCE** para identificar a origem da mensagem esperada. Como o **tag** pode variar também, usa-se a constante **MPI_ANY_TAG** para especificar a mensagem.

Também pode ocorrer de o número de itens recebidos na mensagem não ser conhecido previamente.

Vale observar que na operação de recebimento há um parâmetro a mais que no envio, que é um ponteiro para uma estrutura **MPI_Status**. No retorno da chamada, esssa variável vai estar preenchida com informações sobre a mensagem recebida.

```c
MPI_Status {
	int MPI_SOURCE;
	int MPI_TAG;
	int MPI_ERROR;
	int st_length;  // message length // tamanho da mensagem recebida
};
```
Os campos MPI_SOURCE, MPI_TAG e MPI_ERROR podem ser usados diretamente. Entretanto, para saber o número de itens recebidos na mensagem é preciso usar a função [MPI_Get_count](https://www.open-mpi.org/doc/v3.1/man3/MPI_Get_count.3.php), indicando o tipo dos itens contidos na mensagem:

```c
int MPI_Get_count(const MPI_Status *status, MPI_Datatype datatype, int *count);
```
O exemplo a seguir ilustra essa forma de recebimento, com identificação do emissor feita posteriormente.

In [None]:
%%writefile sr-any.c

#include <sys/types.h>
#include <unistd.h>
#include <stdio.h>
#include <string.h>
#include <mpi.h>

#define LEN 300

int
main( int argc, char *argv[])
{
	int i, rank, count, result, numtasks, namelen, msgtag, pid, pid_0;
	char processor_name[MPI_MAX_PROCESSOR_NAME];
	char tx_buf[LEN], rx_buf[LEN];
	MPI_Status status;

	result = MPI_Init(&argc,&argv);

	if (result != MPI_SUCCESS) {
		printf ("Erro iniciando programa MPI.\n");
		MPI_Abort(MPI_COMM_WORLD, result);
	}

	// Determina número de processos em execução na aplicação
	MPI_Comm_size(MPI_COMM_WORLD,&numtasks);

	// Determina ranking desse processo no grupo
	MPI_Comm_rank(MPI_COMM_WORLD,&rank);

	// Determina nome do host local
	MPI_Get_processor_name(processor_name,&namelen);

	pid=getpid();

	msgtag=1;

	// todos os processos executaram o mesmo código até aqui!

	if(rank==0) { // master node

		// rank 0 envia valor de seu PID para todos os demais: 1..N-1
		for(i=1; i < numtasks; i++) {
			// int MPI_Send(void *buf, int count, MPI_Datatype dtype, int dest, int tag, MPI_Comm comm)
			MPI_Send(&pid,1,MPI_INT,i,msgtag,MPI_COMM_WORLD);
			// printf("%s enviou: %d\n",processor_name,pid);
		}

		// rank 0 aguarda resposta individual de cada um dos demais nós: 1..N-1

		for(i=1;i<numtasks;i++) {
			// rank 0 recebe dos demais (MPI_Comm_size -1)

			// Uso de MPI_ANY_SOURCE e MPI_ANY_TAG: não se sabe a ordem de envio
			// int MPI_Recv(void *buf, int count, MPI_Datatype dtype,
			//              int src, int tag, MPI_Comm comm, MPI_Status *stat)
			MPI_Recv(rx_buf,LEN,MPI_CHAR,MPI_ANY_SOURCE,MPI_ANY_TAG,MPI_COMM_WORLD,&status);

      // Neste caso, como os valores MPI_ANY_SOURCE e MPI_ANY_TAG foram usados,
      // se for preciso identificar o emissor, é preciso verificar as informações
      // retornadas na estrutura status.

			// Campos de MPI_Status. Tamanho da mensagem recebida pode ser consultado via MPI_Get_count
			// MPI_Status {
			//    int MPI_SOURCE;
			//    int MPI_TAG;
			//    int MPI_ERROR;
			//    int st_length;  // message length
			// };

			// int MPI_Get_count(const MPI_Status *status, MPI_Datatype datatype, int *count);
			// Retorna o número de itens recebidos
			result = MPI_Get_count(&status, MPI_CHAR, &count);

			printf("%d @ %s recebeu %d itens do processo %d (%d): %s\n",
          rank, processor_name, count, status.MPI_SOURCE, status.MPI_TAG, rx_buf);
		}

	} else {   // worker nodes

		// todos recebem de rank 0
		// int MPI_Recv(void* buf,int count,MPI_Datatype datatype,
		//              int source, int tag,MPI_Comm comm,MPI_Status *status);
		MPI_Recv(&pid_0,1,MPI_INT,0,msgtag,MPI_COMM_WORLD,&status);
		// printf("%s recebeu: %d\n",processor_name,pid_0);

		// Ranks != 0 enviam msg para rank 0
		sprintf(tx_buf,"%s: rank=%d, pid 0=%d",processor_name,rank,pid_0);

		// int MPI_Send(void *buf, int count, MPI_Datatype dtype, int dest,
		//              int tag, MPI_Comm comm)
		msgtag=pid;
		MPI_Send(tx_buf,strlen(tx_buf)+1,MPI_CHAR,0,msgtag,MPI_COMM_WORLD);
		// MPI_Send(&pid,1,MPI_INT,0,msgtag,MPI_COMM_WORLD);

	}

	MPI_Finalize();

	return(0);
}

Overwriting sr-any.c


In [None]:
!mpicc -Wall sr-any.c -o sr-any && mpirun --allow-run-as-root -n 4 -host localhost:4 sr-any

0 @ ee13bfcc7386 recebeu 33 itens do processo 3 (4871): ee13bfcc7386: rank=3, pid 0=4866
0 @ ee13bfcc7386 recebeu 33 itens do processo 1 (4867): ee13bfcc7386: rank=1, pid 0=4866
0 @ ee13bfcc7386 recebeu 33 itens do processo 2 (4868): ee13bfcc7386: rank=2, pid 0=4866


# MPI_Probe

(baseado em https://mpitutorial.com/tutorials/dynamic-receiving-with-mpi-probe-and-mpi-status/)


Em algumas aplicações distribuídas, além de o nó receptor não saber previamente o emissor das mensagens que irá receber, o que o obriga a usar a origem MPI_ANY_SOURCE, é comum que mesmo o tamanho das mensagens não seja conhecido.

Neste caso, é possível alocar um *buffer* com o tamanho da maior mensagem prevista.

Entretanto, como as mensagens transmitidas via *socket* pela implementação MPI são armazenadas previamente em *buffers* locais, é possível à aplicação saber previamente o tamanho de uma messagem já recebida pela API, antes chamar a operação MPI_Recv.

Essa consulta pode ser feita com a chamada MPI_Probe:
```c
MPI_Probe(int source, int tag, MPI_Comm comm, MPI_Status* status)
```
De maneira equivalente a MPI_Recv, MPI_Probe bloqueia à espera do recebimento de uma mensagem específica pela implementação da API, sem que o conteúdo seja copiado para algum endereço especificado pela aplicação, contudo.

Quando a chamada retorna, a estrutura *status* pode ser usada com a função MPI_Get_count para saber o número de itens recebidos. É possível então alocar um buffer de tamanho apropriado para o recebimento.

```c
int number_amount;

if (world_rank == 0) {
    const int MAX_NUMBERS = 100;
    int numbers[MAX_NUMBERS];

    // Pick a random amount of integers to send to process one
    srand(time(NULL));
    number_amount = (rand() / (float)RAND_MAX) * MAX_NUMBERS;

    // Send the random amount of integers to process one
    MPI_Send(numbers, number_amount, MPI_INT, 1, 0, MPI_COMM_WORLD);
    printf("0 sent %d numbers to 1\n", number_amount);

} else if (world_rank == 1) {
    MPI_Status status;
    // Probe for an incoming message from process zero
    MPI_Probe(0, 0, MPI_COMM_WORLD, &status);

    // When probe returns, the status object has the size and other
    // attributes of the incoming message. Get the message size
    MPI_Get_count(&status, MPI_INT, &number_amount);

    // Allocate a buffer to hold the incoming numbers
    int* number_buf = (int*)malloc(sizeof(int) * number_amount);

    // Now receive the message with the allocated buffer
    MPI_Recv(number_buf,number_amount,MPI_INT,0,0,MPI_COMM_WORLD, MPI_STATUS_IGNORE);
    printf("1 dynamically received %d numbers from 0.\n", number_amount);
    free(number_buf);
}
```

Se a locação e a liberação de memória forem constantes no código, contudo, talvez valha a pena usar um buffer de tamanho máximo :-)


In [None]:
%%writefile sr-any.c

#include <sys/types.h>
#include <unistd.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <stdlib.h>
#include <mpi.h>

#define LEN 256

int
main( int argc, char *argv[])
{
	int i, rank, result, numtasks, namelen, msgtag;
  double local_time, remote_time, mean_time, time_diff;
	char processor_name[MPI_MAX_PROCESSOR_NAME];

	MPI_Status status;

	result = MPI_Init(&argc,&argv);

	if (result != MPI_SUCCESS) {
		printf ("Erro iniciando programa MPI.\n");
		MPI_Abort(MPI_COMM_WORLD, result);
	}

	// Determina número de processos em execução na aplicação
	MPI_Comm_size(MPI_COMM_WORLD,&numtasks);

	// Determina ranking desse processo no grupo
	MPI_Comm_rank(MPI_COMM_WORLD,&rank);

	// Determina nome do host local
	MPI_Get_processor_name(processor_name,&namelen);

	msgtag=1;

	if(rank==0) { // master node

		// rank 0 envia o instante local para todos os demais: 1..N-1
		for(i=1; i < numtasks; i++) {

      // determina instante atual
      local_time = MPI_Wtime();

			// int MPI_Send(void *buf, int count, MPI_Datatype dtype, int dest, int tag, MPI_Comm comm)
			MPI_Send(&local_time, 1, MPI_DOUBLE, i, msgtag, MPI_COMM_WORLD);

			// printf("%s enviou: %f\n",processor_name, local_time);
		}

		// rank 0 aguarda resposta individual de cada um dos demais nós: 1..N-1

    mean_time = 0.0;

		for(i=1; i < numtasks;i++) {

			// rank 0 recebe dos demais (MPI_Comm_size -1)

			// Uso de MPI_ANY_SOURCE e MPI_ANY_TAG: não se sabe a ordem de envio
			// int MPI_Recv(void *buf, int count, MPI_Datatype dtype,
			//              int src, int tag, MPI_Comm comm, MPI_Status *stat)
			MPI_Recv(&remote_time,1,MPI_DOUBLE, MPI_ANY_SOURCE,MPI_ANY_TAG,MPI_COMM_WORLD,&status);

      mean_time += remote_time;

			printf("%d @ %s recebeu time_diff do processo %d: %f\n",
          rank, processor_name, status.MPI_SOURCE, remote_time);
		}
    mean_time /= numtasks;

    printf("Atraso médio de propagaçao: %f\n", mean_time);


	} else {   // worker nodes

		// todos recebem de rank 0
		// int MPI_Recv(void* buf,int count,MPI_Datatype datatype,
		//              int source, int tag,MPI_Comm comm,MPI_Status *status);
		MPI_Recv(&remote_time,1,MPI_DOUBLE,0,msgtag,MPI_COMM_WORLD,&status);

		// printf("%s recebeu: %f\n",processor_name,remote_time);

    local_time = MPI_Wtime();
    time_diff = local_time - remote_time;

    // dorme um pouquinho, para gerar envio em ordem aleatória...
    usleep(rand()%100);

		// int MPI_Send(void *buf, int count, MPI_Datatype dtype, int dest,
		//              int tag, MPI_Comm comm)
		MPI_Send(&time_diff, 1, MPI_DOUBLE,0,msgtag,MPI_COMM_WORLD);
	}

	MPI_Finalize();

	return(0);
}

Overwriting sr-any.c


# Envio não bloqueante (imediato)

O exemplo a seguir ilustra a comunicação não bloqueante com as primitivas [MPI_Isnd](https://www.open-mpi.org/doc/v4.0/man3/MPI_Isend.3.php) e [MPI_Irecv](https://www.open-mpi.org/doc/v4.0/man3/MPI_Irecv.3.php). Essas chamadas liberam a biblioteca MPI para escrever nos *buffers* internos associados às transmissões. Posteriormente, é possível   bloquear o prosseguimento do programa até que as operações tenham sido concluídas.

In [None]:
%%writefile isnd.c

#include <stdio.h>
#include <unistd.h>

#include "mpi.h"

int
main(int argc,char *argv[])
{
  int numtasks, rank, next, prev, buf[2], tag1=1, tag2=2;
  MPI_Request reqs[4];   // required variable for non-blocking calls
  MPI_Status stats[4];   // required variable for Waitall routine

  char hostname[MPI_MAX_PROCESSOR_NAME];
  int namelen;


  MPI_Init(&argc,&argv);
  MPI_Comm_size(MPI_COMM_WORLD, &numtasks);
  MPI_Comm_rank(MPI_COMM_WORLD, &rank);

  // Determina nome do host local
  MPI_Get_processor_name(hostname,&namelen);

  // determina nós vizinhos à esquerda e à direita
  prev = rank-1;
  if (rank == 0)
    prev = numtasks - 1;

  next = rank+1;
  if (rank == (numtasks - 1))
    next = 0;

  // int MPI_Irecv(void *buf, int count, MPI_Datatype datatype,
  //               int source, int tag, MPI_Comm comm, MPI_Request *request);
  //
  // Nonblocking calls allocate a communication request object and associate it
  // with the request handle (the argument request). The request can be used
  // later to query the status of the communication or wait for its completion.
  //
  // A nonblocking receive call indicates that the system may start writing data
  // into the receive buffer. The receiver should not access any part of the
  // receive buffer after a nonblocking receive operation is called, until the
  // receive completes.
  //
  // A receive request can be determined being completed by calling the MPI_Wait,
  // MPI_Waitany, MPI_Test, or MPI_Testany with request returned by this function.

  // post non-blocking receives and sends for neighbors
  MPI_Irecv(&buf[0], 1, MPI_INT, prev, tag1, MPI_COMM_WORLD, &reqs[0]);
  MPI_Irecv(&buf[1], 1, MPI_INT, next, tag2, MPI_COMM_WORLD, &reqs[1]);

  // int MPI_Isend(const void* buf, int count, MPI_Datatype datatype, int dest,
  //               int tag, MPI_Comm comm, MPI_Request *request);
  //
  // MPI_Isend starts a standard-mode, nonblocking send. Nonblocking calls
  // allocate a communication request object and associate it with the request
  // handle (the argument request). The request can be used later to query the
  // status of the communication or wait for its completion.
  //
  // A nonblocking send call indicates that the system may start copying data
  // out of the send buffer. The sender should not modify any part of the send
  // buffer after a nonblocking send operation is called, until the send completes.
  //
  // A send request can be determined being completed by calling the MPI_Wait,
  // MPI_Waitany, MPI_Test, or MPI_Testany with request returned by this function.

  MPI_Isend(&rank, 1, MPI_INT, prev, tag2, MPI_COMM_WORLD, &reqs[2]);
  MPI_Isend(&rank, 1, MPI_INT, next, tag1, MPI_COMM_WORLD, &reqs[3]);


  // aqui, devereia haver algo últil a fazer, ao invés de parar à espera das mensagens...


  // int MPI_Waitall(int count, MPI_Request array_of_requests[],
  //                 MPI_Status *array_of_statuses)
  //
  // Blocks until all communication operations associated with active handles in
  // the list complete, and returns the status of all these operations. Both
  // arrays have the same number of valid entries. The ith entry in array_of_statuses
  // is set to the return status of the ith operation. Requests that were created by
  // nonblocking communication operations are deallocated, and the corresponding
  // handles in the array are set to MPI_REQUEST_NULL.
  //
  // When one or more of the communications completed by a call to MPI_Waitall
  // fail, it is desirable to return specific information on each communication.

  MPI_Waitall(4, reqs, stats);

  printf("%s (%d): buf[0]: %d, buf[1]: %d\n",hostname,rank,buf[0],buf[1]);

  MPI_Finalize();

  return(0);
}

Writing isnd.c


In [None]:
!mpicc -Wall isnd.c -o isnd && mpirun --allow-run-as-root -n 4 -host localhost:4 isnd

ee13bfcc7386 (1): buf[0]: 0, buf[1]: 2
ee13bfcc7386 (3): buf[0]: 2, buf[1]: 0
ee13bfcc7386 (2): buf[0]: 1, buf[1]: 3
ee13bfcc7386 (0): buf[0]: 3, buf[1]: 1


Ainda sobre o exemplo anterior, já que cada processo realiza operações e transmissão e recebimento, seria possível ainda substituir essas 2 operações por uma uma chamada a MPI_Sendrecv:

```c
int MPI_Sendrecv(const void *sendbuf, int sendcount, MPI_Datatype sendtype,
    int dest, int sendtag, void *recvbuf, int recvcount,
    MPI_Datatype recvtype, int source, int recvtag,
    MPI_Comm comm, MPI_Status *status)
```
Esta chamada permite enviar e receber mensagens de nós distintos, com tags, tipos e tamanhos distintos também.

Bem, há várias primitivas de comunicação com MPI. Espero que os exemplos tratados sirvam para entender os mecanismos utilizados e para explorar outras funcionalidades!