# MPI Collective Communication

Previously, we used point-to-point communication (i.e. **Send** and **Recv**) to sum the results across all ranks:

```python
    if my_rank == 0:
        world_sum = sum
        for i in range( 1, world_size ):
            sum_np = np.empty( 1 )
            world_comm.Recv( [sum_np, MPI.DOUBLE], source=i, tag=77 )
            world_sum += sum_np[0]
        average = world_sum / N
    else:
        sum_np = np.array( [sum] )
        world_comm.Send( [sum_np, MPI.DOUBLE], dest=0, tag=77 )
```

MPI provides many collective communication functions, which automate many processes that can be complicated to write out using only point-to-point communication.

### Reduce function:

In particular, the **Reduce** function allows us to sum a value across all ranks, without all of the above code. Replace the above with:

```python
    sum = np.array( [sum] )
    world_sum = np.zeros( 1 )
    world_comm.Reduce( [sum, MPI.DOUBLE], [world_sum, MPI.DOUBLE], op = MPI.SUM, root = 0 )
    average = world_sum / N
```

The **op** argument lets us specify what operation should be performed on all of the data that is reduced. Setting this argument to **MPI.SUM**, as we do above, causes all of the values to be summed onto the root process. There are many other operations provided by **MPI**, as you can see here:




Note that in addition to enabling us to write simpler-looking code, collective communication operations tend to be faster than what we can achieve by trying to write our own communication operations using point-to-point calls.

The code should look like this in **example_mpi10.py**:

```python
from mpi4py import MPI
import numpy as np

if __name__ == "__main__":

    # get basic information about the MPI communicator
    world_comm = MPI.COMM_WORLD
    world_size = world_comm.Get_size()
    my_rank = world_comm.Get_rank()

    N = 10000000

    # determine the workload of each rank
    workloads = [ N // world_size for i in range(world_size) ]
    for i in range( N % world_size ):
        workloads[i] += 1
    my_start = 0
    for i in range( my_rank ):
        my_start += workloads[i]
    my_end = my_start + workloads[my_rank]

    # initialize a
    start_time = MPI.Wtime()
    a = np.ones( workloads[my_rank] )
    end_time = MPI.Wtime()
    if my_rank == 0:
        print("Initialize a time: " + str(end_time-start_time))

    # initialize b
    start_time = MPI.Wtime()
    b = np.zeros( workloads[my_rank] )
    for i in range( workloads[my_rank] ):
        b[i] = 1.0 + ( i + my_start )
    end_time = MPI.Wtime()
    if my_rank == 0:
        print("Initialize b time: " + str(end_time-start_time))

    # add the two arrays
    start_time = MPI.Wtime()
    for i in range( workloads[my_rank] ):
        a[i] = a[i] + b[i]
    end_time = MPI.Wtime()
    if my_rank == 0:
        print("Add arrays time: " + str(end_time-start_time))

    # average the result
    start_time = MPI.Wtime()
    sum = 0.0
    for i in range( workloads[my_rank] ):
        sum += a[i]
    sum = np.array( [sum] )
    world_sum = np.zeros( 1 )
    world_comm.Reduce( [sum, MPI.DOUBLE], [world_sum, MPI.DOUBLE], op = MPI.SUM, root = 0 )
    average = world_sum / N
    end_time = MPI.Wtime()
    if my_rank == 0:
        print("Average result time: " + str(end_time-start_time))
        print("Average: " + str(average))
```

Then, we run the code again:

```
wladimir$ mpirun -np 2 python example_mpi10.py
Initialize a time: 0.031435
Initialize b time: 1.088035
Add arrays time: 1.8827329999999998
Average result time: 1.1045690000000001
Average: [5000001.5]
```

## Broadcast (MPI_Bcast):

- MPI_Bcast sends the same data from one root process to all other processes in a communicator.

- A single piece of data is duplicated and sent to every process.

- MPI_Bcast distributes configuration parameters, initial data, or any information that every process needs to have a copy of.

### Example on broadcast:

```python 

from mpi4py import MPI

comm = MPI.COMM_WORLD
rank = comm.rank

if rank == 0:
    data = {'a':1,'b':2,'c':3}
else:
    data = None

data = comm.bcast(data, root=0)
print('rank',rank,data)
```


# Scatter(MPI_Scatter):

- MPI_Scatter sistributes different chunks of data from one root process to each process in a communicator.

- The root process has an array (or a similar data structure), and it divides that array into chunks, sending a different chunk to each process.   

- MPI_Scatter divides data for parallel processing, where each process needs to work on a distinct portion of the data.   

```python 
from mpi4py import MPI

comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()

if rank == 0:
   data = [(2*x+1)**x for x in range(size)]
   print('we will be scattering: ',data)
else:
   data = None
   
data = comm.scatter(data, root=0)
print('rank '+ str(rank) + ' has data: ' + str(data))
```

# Gather (MPI.Gather)

- Gather is a collective communication operation used to collect data from all processes in a communicator into a single process (the root process).

- Gather is designed to bring together pieces of data from multiple processes into a single, combined dataset on one designated process. 

## Example on Scatter and Gather

```python
# Import the MPI library
from mpi4py import MPI  

# Create a world communicator
comm = MPI.COMM_WORLD 

# Get the total number of processes and rank IDs in the communicator
size = comm.Get_size()  
rank = comm.Get_rank()

# If the current process is the root process (rank 0)
if rank == 0:
    
    # List of data with (x+1)^x.
    data = [(x + 1) ** x for x in range(size)]
    
    # Print the data that will be scattered
    print('we will be scattering:', data)  

else:
    # Initialize data to None, non-root processes will receive data via scatter
    data = None  

# Distribute the data list from the root process to all processes, each receiving one element.
data = comm.scatter(data, root=0)  

# The root process also receives its own data and 
data += 10  

# Print the rank and the modified data on each proces
print('rank', rank, 'has data:', data)

# Gather the modified data from all processes to the root process
newData = comm.gather(data, root=0)  

# If the current process is the root process
if rank == 0:
    # Print the gathered data on the root process
    print('master:', newData)  
    
```

