<a href="https://colab.research.google.com/github/JacobDowns/CSCI-491-591/blob/main/lecture6.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Advanced mpi4py Features

* In this lecture we'll cover a handful of advanced features in mpi4py
* We'll discuss, persistent communication, one sided communication, and parallel I/O

### Persistent Communication
* In some of our previous examples such as the parallel heat equation, we sent many of the same type of message repreatedly in a loop
* In such cases, communication can be optimized by using persistent communication, a particular case of nonblocking communication that reduces the overhead of repreatedly setting up the same communication
* For point-to-point communication, persistent communication is used by setting up requests with `Send_init` and `Recv_init`
* In each loop iteration, you would then call `Start` or `Startall` and subsequently `Wait` or `Waitall`



#### 1. Usage Pattern
* Create a request one time
```python
req_s = comm.Send_init(buf, dest=..., tag=...)
req_r = comm.Recv_init(buf, source=..., tag=...)
```
* In a loop you can repeat the message with the outlined form many times
```python
req_s.Start()
req_r.Start()
MPI.Request.Waitall([req_s, req_r])
```
After you're finished sending messages, clean up with
```python
req_s.Free()
req_r.Free()
```
* It's a little funky to have to free something in a Python program, but a persistent communication creates a `request` object that holds onto
  * A pointer to the buffer
  * Datatype description
  * The communicator and tag
* Free will tell MPI you're done with these resources
* This is another reminder of how MPI is a lower level library being wrapped in Python


#### 2. Example: Sending Data in a Ring!
* Below, let's look at an example where we have each rank send some information to its left, wrapping around to the last rank

In [None]:
%%bash
cat > hello_mpi.py <<'PY'
from mpi4py import MPI
import numpy as np

comm  = MPI.COMM_WORLD
rank  = comm.Get_rank()
size  = comm.Get_size()

right = (rank + 1) % size
left  = (rank - 1) % size

sendbuf = np.array(rank, dtype='i')   # Will send our rank
recvbuf = np.array(-1, dtype='i')     # Will receive from 'left'

# Build persistent requests once
send_req = comm.Send_init(sendbuf, dest=right, tag=0)
recv_req = comm.Recv_init(recvbuf,  source=left,  tag=0)

n_iters = 5
for it in range(n_iters):
    # Optionally update what we send each iter
    sendbuf[...] = rank + 100*it

    # Start both; then wait for both
    send_req.Start()
    recv_req.Start()
    MPI.Request.Waitall([send_req, recv_req])

    print(f"[iter {it}] rank {rank} got {recvbuf} from {left}")

send_req.Free()
recv_req.Free()
PY

In [None]:
!mpiexec -n 4 python hello_mpi.py

[iter 0] rank 0 got 3 from 3
[iter 1] rank 0 got 103 from 3
[iter 2] rank 0 got 203 from 3
[iter 3] rank 0 got 303 from 3
[iter 4] rank 0 got 403 from 3
[iter 0] rank 1 got 0 from 0
[iter 1] rank 1 got 100 from 0
[iter 2] rank 1 got 200 from 0
[iter 3] rank 1 got 300 from 0
[iter 4] rank 1 got 400 from 0
[iter 0] rank 2 got 1 from 1
[iter 1] rank 2 got 101 from 1
[iter 2] rank 2 got 201 from 1
[iter 3] rank 2 got 301 from 1
[iter 4] rank 2 got 401 from 1
[iter 0] rank 3 got 2 from 2
[iter 1] rank 3 got 102 from 2
[iter 2] rank 3 got 202 from 2
[iter 3] rank 3 got 302 from 2
[iter 4] rank 3 got 402 from 2


* What fun, we sent some data in a ring!
* There is potentially a slight performance benefit from persistent communication and it can help clean up code


# One-Sided Communication
* We have stated that MPI doesn't use a shared memory paradigm, hence the necessity of sending messages
* However, MPI supports a on-sided communication model using **Remote Memmory Access** (RMA), which behaves similarly to a shared memory model
    * In RMA one process (the origin) directly reads from or writes into memory exposed by another procces (the target)
    * The target process doesn't need to call a send / receive on the other end
* In mpi4py, one-sided operations are available using windows via the `Win`
* The main operations for windows are:
    * `Put`: write data into a target's windows
    * `Get` : Read data from a target's window
    * `Accumulate` : atomic fetch and combine into target (sum, max, etc.)


### Why is it Useful
* Why would we use one-sided communication as opposed to normal two way communication?

* Decoupling synchronization

  * In two-way communication, both sender and receiver must call matching operations (e.g., `Send`, `Recv`).
  * With one-sided, only the origin process issues the operation; the target doesn't need to explicitly participate
  * This leads to simpler control flow for some applications
  * Great when receivers don't know in advance who will send them data.
* Potential for lower latency and higher throughput
  * RMA can take advantage of specialized network hardware for direct memory access (RDMA) when communicating across nodes
  * Avoids extra protocol overhead from matching send/recv calls.

* With this said two-sided communication is often simpler and preferred over one-sided communication as it works well for many problems and is easier to debug
* Moreover two-sided communication supports messages with Python objects, whereas one-sided communication is limited to NumPy arrays

## 1. Synchronization
* You'll probably notice that this paradigm is much like a shared memory model for a multithreaded application
* This comes with many of the same challenges as threading including race conditions. To contend with this we need some way of handling synchronization
* RMA has a couple primary synchronization mechanisms:
    * **Fence**: collective barrier-like synchronization
    * **Lock / Unlock**: finer control for accessing one target at a time
* RMA is not a true shared memory model, but it behaves much like one

## 2. Example

* This first example is very basic
* Each process exposes a single integer which can be read or written to
* Rank 0 writes into rank 1's window
* `Fence` is used for synchronization much like `comm.Barrier()`

In [None]:
%%bash
cat > hello_mpi.py <<'PY'
from mpi4py import MPI
import numpy as np

comm = MPI.COMM_WORLD
rank = comm.Get_rank()

buf = np.array(rank, dtype='i')
win = MPI.Win.Create(buf, comm=comm)

if rank == 0:
    data = np.array(42, dtype='i')
    win.Fence()
    win.Put([data, MPI.INT], 1)
    win.Fence()
elif rank == 1:
    win.Fence()
    win.Fence()
    print(f"Rank 1 sees {buf.item()}")

win.Free()
PY

In [None]:
!mpiexec -n 2 python hello_mpi.py

Rank 1 sees 42


## 3. Synchronization in RMA

Synchronization in RMA is a bit complex and requires knowing some terminology.

1. **Origin and Target**

* When we use the term **origin** or origin process, we mean the rank that initiates the RMA operation
* This process calls functions like `Put`, `Get`, and `Accumulate`
* In contrast, the **target** or target process is the rank that exposes some memory in a window
* Its buffer, created with `Win.Create`, will be read or written to


2. **Epochs**
* Remote Memory Access (RMA) needs well-defined periods when a window is open for remote operations called epochs
* **Access epoch**: from the origin process's perspective (issuing Put, Get, etc.).
* **Exposure epoch**: from the target process's perspective (memory is exposed to remote access).
* An epoch is always initiated (opened) and terminated (closed) by synchronization calls.

3. **Active Target Synchronization**
* Both the origin and target participate in starting / ending the epoch
* The most typical way of using active target synchronization is with `Fence`
* The first call to `win.Fence()` collectively opens an epoch for all ranks
* The second call to `win.Fence()` collectively closes it
* When fence is used, while the epoch is open, any rank can fetch or read data from windows in any other rank


4. **Pasive Target Synchronization**
* In passive target synchronization, only the origin participates while the target is unaware
* The epoch is initiated with `win.Lock(target, lock_type)` and closed with `win.Unlock(target)`
* When the epoch is open you can do `Put`, `Get`, and `Accumulate` on the target rank's exposed memory
* There are two types of locks you can use:
    * In `MPI.SHARED_LOCK`, multiple origins can hold a shared lock on the same target window
    * This is best used in cases where you're doing read-only access (`Get`) or you're doing writes to disjoint regions that don't conflict with each other
    * The alternative is `MPI.LOCK_EXCLUSIVE`, in which only one origin can hold an exclusive lock on a target at a time
    * MPI ensures no other orgin can open a lock on the target until it's released
    * This is best used in cases when you want to update memory where conflicts are possible
* Locks aren't mutexes, so they do not prevent the target process from accessing its own window memory


### Example of Active Target synchronization

* We already saw an example using active target synchronization with `win.Fence()`
* Let's look at an example of passive synchronization using `Lock` and `Unlock`

In [None]:
%%bash
cat > hello_mpi.py <<'PY'
from mpi4py import MPI
import numpy as np

comm = MPI.COMM_WORLD
rank = comm.Get_rank()

buf = np.array(rank, dtype='i')
win = MPI.Win.Create(buf, comm=comm)

# Passive target synchronization
if rank == 0:
    val = np.array(123, dtype='i')
    win.Lock(1, MPI.LOCK_SHARED)        # open epoch with rank 1
    win.Put([val, MPI.INT], 1)   # origin=0, target=1
    win.Flush(1)                        # ensure target sees it
    win.Unlock(1)                       # close epoch

comm.Barrier()  # just to order printing
print(f"[Lock/Unlock] rank {rank} buf = {buf.item()}")
win.Free()
PY

In [None]:
!mpiexec -n 2 python hello_mpi.py

[Lock/Unlock] rank 0 buf = 0
[Lock/Unlock] rank 1 buf = 123


* In this example, rank 0 is basically using `Lock` to open a channel to communicate with rank 1, then closing the channel with `Unlock`
* After the `Flush` operation, we know that the value has been written into rank 1's buffer


### Atomic Operations
1. In addition to `Get` and `Put` mpi4py has several atomic operations
* We have already mentioned one atomic operation for RMA called `Accumulate`
* An atomic operation is one that appears to happen all at once without interference from other processes
* In MPI RMA, atomic operations are guaranteed to be indivisble
* This can be used to avoid race conditions where multiple origins try to update the same target memory at the same time

2. What are they used for?
* In one-sided communication, multiple ranks may read or write the same window at once.
* Without atomics, you'd risk lost updates (e.g., two processes increment a counter and one increment gets overwritten).
* Atomics can be used to ensure correctness for:
    * Distributed counters
    * Locks / flags
    * Queues or ticket dispensers
    * Reductions (sum, max, etc.)

3. Examples:

**Accumulate** applies an operation at the target like sum, max, replace, etc.

```python
# Increment a shared counter at root
one = np.array(1, dtype='i')
win.Accumulate([one, MPI.INT], ROOT, op=MPI.SUM)
```

 **Get_accumulate** fetches the old value and applies an operation with the origin's value.
 ```python
 # Increments a shared counter at root and gets the old counter value to store in old
oldval = np.zeros(1, dtype='i')
one = np.array(1, dtype='i')
win.Get_accumulate([one, MPI.INT], [oldval, MPI.INT],
                   ROOT, op=MPI.SUM)
 ```

 **Compare_and_swap** is an operation that will replace the value in the target with a new value if it is equal to the current value.
 ```python
 # Replaces the value in the root with newval if it is equal to expected value. The value in the root's buffer is returned
 # regardless of whether the comparison was true or not
expected = np.array(5, dtype='i')
newval   = np.array(4, dtype='i')
oldval   = np.zeros(1, dtype='i')

win.Compare_and_swap([newval, MPI.INT],
                     [expected, MPI.INT],
                     [oldval, MPI.INT],
                     ROOT, 0)
 ```


### Example: Work Stealing Queue

* We've seen the use of `ProcessPoolExecutor` in Python where you can create a queue of tasks to be accomplished which will be farmed out to different process
* Let's consider how this kind of process pool pattern could be implemented in `mpi4py`
* To coordinate differnt processes, the main idea here is that there will be a set of tickets (represented by integers) corresponding to the number of tasks to be accomplished
* Processes will independently try to claim a ticket, so they have the privelege of doing a task
* In this case, the task is really trivial, the process will just retrieve an integer from the root, square it, and write it back to the root
* Squaring some numbers is a silly task to acommplish, but it illustrates a common scheduling pattern in parallel computing
* The tasks could easily be replaced with something computationally expensive

In [None]:
%%bash
cat > hello_mpi.py <<'PY'
from mpi4py import MPI
import numpy as np

comm  = MPI.COMM_WORLD
rank  = comm.Get_rank()

ROOT    = 0
N_TASKS = 8

i32 = np.dtype('i')   # MPI.INT
f64 = np.dtype('d')   # MPI.DOUBLE
i32B = i32.itemsize
f64B = f64.itemsize

def log(msg):
    print(f"[rank {rank}] {msg}", flush=True)

# -------------------------
# Windows & initialization
# -------------------------
if rank == ROOT:
    # pool[0] = counter (remaining tasks)
    # pool[1:] = task payloads (just integers here)
    pool = np.empty(N_TASKS + 1, dtype=i32)
    pool[0] = N_TASKS
    pool[1:] = np.arange(1, N_TASKS + 1, dtype=i32)
    results = np.full(N_TASKS, np.nan, dtype=f64)
else:
    pool = None       # workers expose no memory for pool
    results = None    # workers expose no memory for results

win_pool = MPI.Win.Create(pool,    comm=comm)
win_res  = MPI.Win.Create(results, comm=comm)

if rank == ROOT:
    log(f"init: counter={pool[0]}, tasks={pool[1:].tolist()}")

# -----------------------------------------
# Atomic claim (CAS) to avoid overshoot
# -----------------------------------------
def claim_ticket(win, target, dtype):
    """
    Atomically decrement pool[0] at ROOT only if > 0.
    Return ticket index in [0..N_TASKS-1], else -1.
    """
    # Working buffers for the CAS operation
    cur    = np.zeros(1, dtype=dtype)   # will hold the current counter value read from ROOT
    expect = np.zeros(1, dtype=dtype)   # the value we expect to see (if counter hasn't changed)
    new    = np.zeros(1, dtype=dtype)   # the value we want to write if compare succeeds (expect-1)
    old    = np.zeros(1, dtype=dtype)   # CAS will return whatever was actually in the target

    while True:
        # Step 1: read the current counter on ROOT into cur
        win.Lock(target, MPI.LOCK_SHARED)
        win.Get([cur, MPI.INT], target, 0)   # displacement=0 bytes -> pool[0]
        win.Flush(target)
        win.Unlock(target)

        c = int(cur[0])   # current number of remaining tasks
        if c <= 0:
            return -1     # no tasks left

        # Step 2: prepare expected and new values
        expect[0] = c     # we think the counter is still c
        new[0]    = c - 1 # we want to write back c-1 (decrement by one)

        # Step 3: attempt CAS at ROOT
        win.Lock(target, MPI.LOCK_EXCLUSIVE)
        win.Compare_and_swap([new, MPI.INT],     # proposed new value
                             [expect, MPI.INT],  # expected current value
                             [old, MPI.INT],     # gets the actual old value
                             target, 0)
        win.Flush(target)
        win.Unlock(target)

        # Step 4: check whether CAS succeeded
        if int(old[0]) == c:
            # success: we got the ticket corresponding to this claim
            return c - 1
        # else: someone else beat us to it, retry loop

# -----------------------------------------
# Workers do the work
# -----------------------------------------
if rank != ROOT:
    while True:
        ticket = claim_ticket(win_pool, ROOT, i32)
        if ticket < 0:
            log("no work left; exiting")
            break

        # pool[1 + ticket] holds the payload for this task
        disp_bytes = (1 + ticket) * i32B
        task = np.zeros(1, dtype=i32)

        win_pool.Lock(ROOT, MPI.LOCK_SHARED)
        win_pool.Get([task, MPI.INT], ROOT, disp_bytes)
        win_pool.Flush(ROOT)
        win_pool.Unlock(ROOT)

        payload = int(task[0])
        log(f"claimed ticket={ticket}, task={payload}")

        # Do some work (here: just square it)
        result_val = float(payload * payload)

        # Write result back to ROOT's results[ticket]
        res_disp = ticket * f64B
        res_buf  = np.array([result_val], dtype=f64)

        win_res.Lock(ROOT, MPI.LOCK_SHARED)
        win_res.Put([res_buf, MPI.DOUBLE], ROOT, res_disp)
        win_res.Flush(ROOT)
        win_res.Unlock(ROOT)

        log(f"processed task {payload} -> result {result_val:.1f}")

comm.Barrier()

if rank == ROOT:
    filled = int(np.count_nonzero(~np.isnan(results)))
    log(f"final counter={pool[0]} (should be 0)")
    log(f"results filled: {filled}/{N_TASKS}")
    log(f"results: {results.tolist()}")

win_pool.Free()
win_res.Free()

PY


In [None]:
!mpiexec -n 4 python hello_mpi.py

[rank 0] init: counter=8, tasks=[1, 2, 3, 4, 5, 6, 7, 8]
[rank 1] claimed ticket=6, task=7
[rank 1] processed task 7 -> result 49.0
[rank 1] claimed ticket=4, task=5
[rank 1] processed task 5 -> result 25.0
[rank 2] claimed ticket=5, task=6
[rank 2] processed task 6 -> result 36.0
[rank 3] claimed ticket=7, task=8
[rank 3] processed task 8 -> result 64.0
[rank 3] claimed ticket=3, task=4
[rank 3] processed task 4 -> result 16.0
[rank 2] claimed ticket=2, task=3
[rank 1] claimed ticket=1, task=2
[rank 2] processed task 3 -> result 9.0
[rank 1] processed task 2 -> result 4.0
[rank 2] no work left; exiting
[rank 3] claimed ticket=0, task=1
[rank 1] no work left; exiting
[rank 3] processed task 1 -> result 1.0
[rank 3] no work left; exiting
[rank 0] final counter=0 (should be 0)
[rank 0] results filled: 8/8
[rank 0] results: [1.0, 4.0, 9.0, 16.0, 25.0, 36.0, 49.0, 64.0]


This example is a little bit more complex, so let's walk through it. First, this will initialize some constants.

```python
ROOT    = 0
N_TASKS = 8

i32 = np.dtype('i')   # MPI.INT
f64 = np.dtype('d')   # MPI.DOUBLE
i32B = i32.itemsize
f64B = f64.itemsize
```
 Here, `ROOT` is defining the root node, which will essentially serve as the ticket master and data keeper. `N_TASKS` is be the number of tasks to accomplish. Getting the sizes of integer and floating point data types will be used to help MPI figure out where to read and write in memory buffers.



On the root, we create an array of integers called pool. The first element of pool is going to contain the number of tasks. Subsequent elements in pool will contain the workload (e.g. the numbers we want to square). We will also create a results buffer on the root where we'll store the results of each task (that is, the results of squaring each number). On all other ranks, the pool is defined `None` which is a useful way to tell MPI that these ranks should expose no memory.
```python
if rank == ROOT:
    # pool[0] = counter (remaining tasks)
    # pool[1:] = task payloads (just integers here)
    pool = np.empty(N_TASKS + 1, dtype=i32)
    pool[0] = N_TASKS
    pool[1:] = np.arange(1, N_TASKS + 1, dtype=i32)
    results = np.full(N_TASKS, np.nan, dtype=f64)
else:
    pool = None       # workers expose no memory for pool
    results = None    # workers expose no memory for results
```

```python
else:
    pool = None       # expose zero bytes on workers
    results = None    # expose zero bytes on workers
```

Note that altough some ranks may not expose any memory, all ranks need to call
```python
win_pool = MPI.Win.Create(pool,    comm=comm)
win_res  = MPI.Win.Create(results, comm=comm)
```
to facilitate communication, since these are still considered collective communications.

Next, let's break down the `claim_ticket` function. In this function, each process will rush to claim a ticket that can be redeemed for a unit of work. If the rank is successful, it will return the integer corresponding the claimed ticket (or actually the index of the integer to square, but essentially this corresponds to its ticket number). Otherwise, it will return -1, meaning there is no additional work to complete.

The arguments to this function include a window `win`, which will be needed to communicate with the root process. As the target, we will be passing in `ROOT`, since each process will need to get information from the root. The dtype argument will be used to pass in the data type of the `pool` buffer so we can index it properly.  

```python
def claim_ticket(win, target, dtype):
    """
    Atomically decrement pool[0] at ROOT only if > 0.
    Return ticket index in [0..N_TASKS-1], else -1.
    """
    # Working buffers for the CAS operation
    cur    = np.zeros(1, dtype=dtype)   # will hold the current counter value read from ROOT
    expect = np.zeros(1, dtype=dtype)   # the value we expect to see (if counter hasn't changed)
    new    = np.zeros(1, dtype=dtype)   # the value we want to write if compare succeeds (expect-1)
    old    = np.zeros(1, dtype=dtype)   # CAS will return whatever was actually in the target
```

The buffers `cur`, `expect`, `new`, and `old` will be used in the ticket claiming logic, which is contained in the while loop below. At every iteration of this loop, the first step is to read the current value of the ticket counter on the root, and store this value in `c`. We then check if this value is $\leq$ 0, in which case we know that there are no more tasks to complete so we can return.

```python
# Step 1: read the current counter on ROOT into cur
win.Lock(target, MPI.LOCK_SHARED)
win.Get([cur, MPI.INT], target, 0)   # displacement=0 bytes -> pool[0]
win.Flush(target)
win.Unlock(target)

c = int(cur[0])   # current number of remaining tasks
    if c <= 0:
        return -1     # no tasks left
```

We then need some logic to deal with the fact that multiple processes could potentially be vying to claim the same ticket. For a given rank, once we've retrieved the number of tickets remaining from root, we can check in with the root again to see if the ticket counter value has changed. If it has, some other process has claimed that ticket before us. Otherwise, we need to decrement the number of tickets on the root, which will serve to claim the ticket.

We'll accomplish this using some code that uses the `Compare_and_swap` operation (CAS).

```python
# Step 2: prepare expected and new values
expect[0] = c     # we think the counter is still c
new[0]    = c - 1 # we want to write back c-1 (decrement by one)

# Step 3: attempt CAS at ROOT
win.Lock(target, MPI.LOCK_EXCLUSIVE)
win.Compare_and_swap([new, MPI.INT],     # proposed new value
                        [expect, MPI.INT],  # expected current value
                        [old, MPI.INT],     # gets the actual old value
                        target, 0)
win.Flush(target)
win.Unlock(target)
```

For a given rank, we expect that when we check back in with the root after reading the counter, that the value should still be the same. That is, we expect that the counter value in root is equal to `c`. If so, then great! That means we can claim that ticket decrement the ticket counter, and move on to doing some work. Checking if the value is the same and decrementing the counter can be done in  a single atomic operation.

 But what would happen if the counter value in the root changed between when we intially read it and we checked in again with the root?  Well, in that case some other process claimed the ticket, and we missed our chance to claim it. In this case, we don't want to decrement the ticket counter on the root.

 This is the logic of using `Compare_and_swap` in the above code. If the value on the root is what we expect, we'll decrement the counter return the ticket number. Otherwise, we'll see that the value changed and keep trying to claim a ticket unless there's no more work to be done. After the `Compare_and_swap`, we just need to verify if the value on the root changed and act accordingly.

 ```python
 # Step 4: check whether CAS succeeded
if int(old[0]) == c:
    # success: we got the ticket corresponding to this claim
    return c - 1
```
If our rank successfully claimed the ticket, then the value returned in the CAS operation will be equal to the old counter value. In this case, we don't return our ticket number directly, but rather the index into the pool array on the root, which we'll use to read the integer that we need to square.

Now that we understand `claim_ticket`, let's take a look at the main work loop. If the `claim_ticket` function returns, then there are one of two possibilites. If it returns a value of -1, then there was not work so we can exit the loop.
```python
while True:
    ticket = claim_ticket(win_pool, ROOT, i32)
    if ticket < 0:
        log("no work left; exiting")
        break
```
Otherwise, we will need to get the work unit associated with the ticket, which, in this case, is just an integer to square. The `Get` function needs the offset in bytes in the pool buffer on root to read the correct integer.

```python
# pool[1 + ticket] holds the payload for this task
disp_bytes = (1 + ticket) * i32B
task = np.zeros(1, dtype=i32)

win_pool.Lock(ROOT, MPI.LOCK_SHARED)
win_pool.Get([task, MPI.INT], ROOT, disp_bytes)
win_pool.Flush(ROOT)
win_pool.Unlock(ROOT)

payload = int(task[0])
log(f"claimed ticket={ticket}, task={payload}")
```

Once we have the payload, we can do the "work":
```python
# Do some work (here: just square it)
result_val = float(payload * payload)
```

The result can be written into the results buffer on the root. Here we needed to account for the fact that the result buffer was of float type instead of integer type.
```python
# Write result back to ROOT's results[ticket]
res_disp = ticket * f64B
res_buf  = np.array([result_val], dtype=f64)

win_res.Lock(ROOT, MPI.LOCK_SHARED)
win_res.Put([res_buf, MPI.DOUBLE], ROOT, res_disp)
win_res.Flush(ROOT)
win_res.Unlock(ROOT)

log(f"processed task {payload} -> result {result_val:.1f}")
```

Now the resulting squared integer is stored in its correct slot in the results array!

# Parallel I/O
* To wrap up our discussion about mpi4py, we'll briefly discuss parallel I/O
* mpi4py provides an interface for parallel access to files
* Instead of each rank opening its own file, or opening a single file and scattering the data across ranks, all ranks can cooperatively open a single file handle
* Benefits:
    * Results can be written in parallel without corrupting data
    * Big datasets can be read in parallel without creating a bottelneck on one process
* mpi4py supports raw binary files for parallel I/O


### Basic Functionality
* The `MPI.File` object represents a file opened in parallel by miltiple MPI ranks
* To open a file collectively, you can use syntax
`MPI.File.Open(comm, "filename.bin", accessmode)`
* The `MPI.FILE` object provideds methods for collective and individual I/O operations 
* Collective operations include `File.Read_all`, `File.Write_all`, `File.Read_at_all`, and `File.Write_at_all`
* These methods can significantly improve performance by allowing the MPI library to optimize data transfer and access patterns.
* Alternatively, one can use individual I/O operations using `File.Read`, `File.Write`, `File.Read_at`, `File.Write_at` 
* As an analogy, collective I/O operations are like collective communication
    * Every rank calls the function and participates in the I/O operation
    * This is often preferred as it leads to better performance
* In individual I/O operations, only one rank participates, and other ranks are oblivious 



### Example
* Let's see a basic example in action
* Here every rank is going to have its own buffer
* It will write the buffer to the parallel file beginning at a given byte offset
* This avoids conflicts among the different ranks
* To read data in parallel, each rank will start reading at a particlar byte offset 

In [3]:
%%bash
cat > hello_mpi.py <<'PY'

from mpi4py import MPI
import numpy as np

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

# Each rank prepares some data
data = np.arange(5, dtype='i') + 100*rank

# Open file for parallel write
fh = MPI.File.Open(comm, "datafile.bin",
                   MPI.MODE_CREATE | MPI.MODE_WRONLY)

# Each rank writes at its own offset (rank * bytes_per_rank)
offset = rank * data.nbytes
fh.Write_at(offset, data)
fh.Close()

# Reopen and read back
buf = np.zeros_like(data)
fh = MPI.File.Open(comm, "datafile.bin", MPI.MODE_RDONLY)
fh.Read_at(offset, buf)
fh.Close()

print(f"Rank {rank} read {buf}")
PY

In [4]:
!mpiexec -n 4 python hello_mpi.py

Rank 0 read [0 1 2 3 4]
Rank 1 read [100 101 102 103 104]
Rank 2 read [200 201 202 203 204]
Rank 3 read [300 301 302 303 304]


### Example 2: Saving a 2D Array + Header
* Okay, to see where file view come into play let's look at a more complex example
* In this example, each rank is going to have its own 2d array, which represent some rows in a global 2d array 
* In this case the rank's array is just going to contain a bunch of floats set to its rank valeu
* To represent this in a binary file, we are going to create a parallel file handle
* We'll first write a global header to the file that contains the global array size `(nx_global,ny_global)`. 
* After writing the global header, each rank will write its own chunk of the 2d array to the file using file views 

In [6]:
%%bash
cat > hello_mpi.py <<'PY'

# mpiexec -n 4 python mpiio_rows_fixed_write.py
from mpi4py import MPI
import numpy as np

comm  = MPI.COMM_WORLD
rank  = comm.Get_rank()
size  = comm.Get_size()

# Each rank is going to have some given number of rows in the final 2d array
num_rows  = 3                 # rows per rank 
nx_global = 10                # columns
ny_global = size * num_rows   # total rows (exact multiple)

dtype = np.float64
etype = MPI.DOUBLE
HEADER_BYTES = 16             # two int64: ny_global, nx_global
FNAME = "rows_fixed.bin"

# Local block coordinates in the global array
i0 = rank * num_rows          # starting row for this rank
ny_local = num_rows           # Number of rows owned by this rank
nx_local = nx_global          # Nubmer of elements per column

# Local data: every element equals 'rank' as a float
arr_local = np.full((ny_local, nx_local), float(rank), dtype=dtype)

# Helper: make a 2D subarray type describing this rank’s block in the global array
# This will be used for the file view!
def make_filetype_2d(ny, nx, i0, nloc, j0, mloc, etype):
    sizes    = (ny, nx)          # full array shape
    subsizes = (nloc, mloc)      # this rank's block shape
    starts   = (i0, j0)          # top-left of this block in the global array
    ft = etype.Create_subarray(sizes, subsizes, starts, order=MPI.ORDER_C)
    ft.Commit()
    return ft

# Write the header
fh = MPI.File.Open(comm, FNAME, MPI.MODE_CREATE | MPI.MODE_WRONLY)

if rank == 0:
    hdr = np.array([ny_global, nx_global], dtype=np.int64)
    fh.Write_at(0, hdr)          # rank-0-only write

# After the 16-byte header, the file is a ny_global×nx_global double array
disp = HEADER_BYTES
filetype = make_filetype_2d(ny_global, nx_global, i0, ny_local, 0, nx_local, etype)
fh.Set_view(disp, etype, filetype, datarep="native")

# Collective write of our contiguous local block
fh.Write_all(arr_local)

fh.Close()
filetype.Free()

if rank == 0:
    print(f"[write] Wrote {ny_global}x{nx_global} (rows_per_rank={num_rows}) to {FNAME}")
    
PY

In [7]:
!mpiexec -n 4 python hello_mpi.py

[write] Wrote 12x10 (rows_per_rank=3) to rows_fixed.bin


* Now we've written the file in parallel, so let's look at the code to read it back in in parallel 
* This is going to look very similar using file views 

In [1]:
%%bash
cat > hello_mpi.py <<'PY'

from mpi4py import MPI
import numpy as np

comm  = MPI.COMM_WORLD
rank  = comm.Get_rank()
size  = comm.Get_size()

dtype = np.float64
etype = MPI.DOUBLE
HEADER_BYTES = 16
FNAME = "rows_fixed.bin"

# Same filetype helper function as before
def make_filetype_2d(ny, nx, i0, nloc, j0, mloc, etype):
    sizes    = (ny, nx)
    subsizes = (nloc, mloc)
    starts   = (i0, j0)
    ft = etype.Create_subarray(sizes, subsizes, starts, order=MPI.ORDER_C)
    ft.Commit()
    return ft

# Open the file and read the file in rank 0
fh = MPI.File.Open(comm, FNAME, MPI.MODE_RDONLY)

# Broadcast the header to all ranks
hdr = np.zeros(2, dtype=np.int64)
if rank == 0:
    fh.Read_at(0, hdr)
comm.Bcast(hdr, root=0)

ny_global, nx_global = map(int, hdr)

# In this simplified layout: each rank has ny_global/size rows
assert ny_global % size == 0, "File assumes equal rows per rank."
num_rows = ny_global // size

# Local block coordinates and shape
i0 = rank * num_rows
ny_local = num_rows
nx_local = nx_global

# Set the same file view and read our block collectively
disp = HEADER_BYTES
filetype = make_filetype_2d(ny_global, nx_global, i0, ny_local, 0, nx_local, etype)
fh.Set_view(disp, etype, filetype, datarep="native")

buf = np.empty((ny_local, nx_local), dtype=dtype)
fh.Read_all(buf)

fh.Close()
filetype.Free()

# Quick sanity check: data should equal 'rank' everywhere in this block
ok = np.allclose(buf, float(rank))
print(f"[read] rank {rank}: block {buf.shape}, i0={i0}, ok={ok}, first row={buf[0]}")

PY

In [2]:
!mpiexec -n 4 python hello_mpi.py

[read] rank 0: block (3, 10), i0=0, ok=True, first row=[0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
[read] rank 3: block (3, 10), i0=9, ok=True, first row=[3. 3. 3. 3. 3. 3. 3. 3. 3. 3.]
[read] rank 1: block (3, 10), i0=3, ok=True, first row=[1. 1. 1. 1. 1. 1. 1. 1. 1. 1.]
[read] rank 2: block (3, 10), i0=6, ok=True, first row=[2. 2. 2. 2. 2. 2. 2. 2. 2. 2.]


### Support for HDF5 and Other File Types
* Numerous libraries wrap the basic file I/O functionality in MPI so you can use it for other file types
* For example, **hdf5** files and **NetCDF** files can be written and loaded in parallel in Python 
* Using hdf5 files in particular can make a task like the above a little easier
* For example, we could write a similar 2d array to an h5 file in parallel:

In [3]:

%%bash
cat > hello_mpi.py <<'PY'

from mpi4py import MPI
import h5py, numpy as np

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

ny, nx = 12, 8

with h5py.File("state.h5", "w", driver="mpio", comm=comm) as f:
    dset = f.create_dataset("u", (ny, nx), dtype="f32") 

    base, rem = divmod(ny, size)
    ny_loc = base + (1 if rank < rem else 0)
    i0 = rank * base + min(rank, rem)

    local = np.full((ny_loc, nx), float(rank), dtype="f32")

    # with dset.collective:
    dset[i0:i0+ny_loc, :] = local            # parallel slice write

comm.Barrier()
if rank == 0:
    print("Wrote state.h5:/u")
    
PY

In [4]:
!mpiexec -n 4 python hello_mpi.py

Traceback (most recent call last):
  File "/workspaces/CSCI-491-591/hello_mpi.py", line 3, in <module>
Traceback (most recent call last):
  File "/workspaces/CSCI-491-591/hello_mpi.py", line 3, in <module>
    import h5py, numpy as np
    import h5py, numpy as np
ModuleNotFoundErrorModuleNotFoundError: No module named 'h5py'
: No module named 'h5py'
Traceback (most recent call last):
  File "/workspaces/CSCI-491-591/hello_mpi.py", line 3, in <module>
Traceback (most recent call last):
  File "/workspaces/CSCI-491-591/hello_mpi.py", line 3, in <module>
    import h5py, numpy as np
    import h5py, numpy as np
ModuleNotFoundError: No module named 'h5py'ModuleNotFoundError: No module named 'h5py'

