# A look behind the curtain: some internals for the experts 

In this section, we'll go through some Heat-specific functionalities that simplify the implementation of a data-parallel application in Python. We'll demonstrate them on small arrays and 4 processes on a single cluster node, but the functionalities are indeed meant for a multi-node set up with huge arrays that cannot be processed on a single node.

Your IPython cluster should still be running. Let's check it out.

In [1]:
from ipyparallel import Client
rc = Client(profile="default")
rc.ids

if len(rc.ids) == 0:
    print("No engines found")
else:
    print(f"{len(rc.ids)} engines found")

4 engines found


If no engines are found, go back to the [Intro](0_setup/0_setup_local.ipynb) for instructions.

We already mentioned that the DNDarray object is "MPI-aware". Each DNDarray is associated to an MPI communicator, it is aware of the number of processes in the communicator, and it knows the rank of the process that owns it. 

We will use the `%%px` magic in every cell that executes MPI code.

In [2]:
%%px
import torch
import heat as ht

a = ht.random.randn(7,4,3, split=0)
a.comm

[0;31mOut[1:11]: [0m<heat.core.communication.MPICommunication at 0x7f24294c4940>

[0;31mOut[0:11]: [0m<heat.core.communication.MPICommunication at 0x7fcf24220940>

[0;31mOut[2:11]: [0m<heat.core.communication.MPICommunication at 0x7f3681074970>

[0;31mOut[3:11]: [0m<heat.core.communication.MPICommunication at 0x7f9f26470940>

In [3]:
%%px
# MPI size = total number of processes 
size = a.comm.size

print(f"a is distributed over {size} processes")
print(f"a is a distributed {a.ndim}-dimensional array with global shape {a.shape}")

[stdout:2] a is distributed over 4 processes
a is a distributed 3-dimensional array with global shape (7, 4, 3)


[stdout:3] a is distributed over 4 processes
a is a distributed 3-dimensional array with global shape (7, 4, 3)


[stdout:0] a is distributed over 4 processes
a is a distributed 3-dimensional array with global shape (7, 4, 3)


[stdout:1] a is distributed over 4 processes
a is a distributed 3-dimensional array with global shape (7, 4, 3)


In [4]:
%%px
# MPI rank = rank of each process
rank = a.comm.rank
# Local shape = shape of the data on each process
local_shape = a.lshape
print(f"Rank {rank} holds a slice of DNDarray 'a' with local shape {local_shape}")

[stdout:0] Rank 0 holds a slice of DNDarray 'a' with local shape (2, 4, 3)


[stdout:2] Rank 2 holds a slice of DNDarray 'a' with local shape (2, 4, 3)


[stdout:1] Rank 1 holds a slice of DNDarray 'a' with local shape (2, 4, 3)


[stdout:3] Rank 3 holds a slice of DNDarray 'a' with local shape (1, 4, 3)


### Distribution map

On many occasions, when building a memory-distributed pipeline, it will be convenient for each rank to have information on which slice of the distributed array each rank holds. 

The `lshape_map` attribute of a DNDarray gathers (or, if possible, calculates) this info from all processes and stores it as metadata of the DNDarray. Because it is meant for internal use, it is stored in a torch tensor, not a DNDarray. 

The `lshape_map` tensor is a 2D tensor, where the first dimension is the number of processes and the second dimension is the number of dimensions of the array. Each row of the tensor contains the local shape of the array on a process. 

In [5]:
%%px
lshape_map = a.lshape_map
lshape_map

[0;31mOut[2:14]: [0m
tensor([[2, 4, 3],
        [2, 4, 3],
        [2, 4, 3],
        [1, 4, 3]])

[0;31mOut[3:14]: [0m
tensor([[2, 4, 3],
        [2, 4, 3],
        [2, 4, 3],
        [1, 4, 3]])

[0;31mOut[1:14]: [0m
tensor([[2, 4, 3],
        [2, 4, 3],
        [2, 4, 3],
        [1, 4, 3]])

[0;31mOut[0:14]: [0m
tensor([[2, 4, 3],
        [2, 4, 3],
        [2, 4, 3],
        [1, 4, 3]])

Go back to where we created the DNDarray and create `a` with a different split axis. See how the `lshape_map` changes.

### Modifying the DNDarray distribution

In a distributed pipeline, it is sometimes necessary to change the distribution of a DNDarray, when the array is not distributed in the most convenient way for the next operation / algorithm.

Depending on your needs, you can choose between:
- `DNDarray.redistribute_()`: This method keeps the original split axis, but redistributes the data of the DNDarray according to a "target map".
- `DNDarray.resplit_()`: This method changes the split axis of the DNDarray. This is a more expensive operation, and should be used only when absolutely necessary. Depending on your needs and available resources, in some cases it might be wiser to keep a copy of the DNDarray with a different split axis.

Let's see some examples.

In [6]:
%%px
#redistribute
target_map = a.lshape_map
target_map[:, a.split] = torch.tensor([1, 2, 2, 2])
# in-place redistribution (see ht.redistribute for out-of-place)
a.redistribute_(target_map=target_map)

# new lshape map after redistribution
a.lshape_map

[0;31mOut[1:15]: [0m
tensor([[1, 4, 3],
        [2, 4, 3],
        [2, 4, 3],
        [2, 4, 3]])

[0;31mOut[2:15]: [0m
tensor([[1, 4, 3],
        [2, 4, 3],
        [2, 4, 3],
        [2, 4, 3]])

[0;31mOut[0:15]: [0m
tensor([[1, 4, 3],
        [2, 4, 3],
        [2, 4, 3],
        [2, 4, 3]])

[0;31mOut[3:15]: [0m
tensor([[1, 4, 3],
        [2, 4, 3],
        [2, 4, 3],
        [2, 4, 3]])

In [None]:
%%px
# local arrays after redistribution
a.larray

[0;31mOut[3:27]: [0m
tensor([[[ 0.5730, -1.0918, -0.8577],
         [ 0.6610, -0.4874,  0.9850],
         [ 1.0930, -0.8518, -0.7061],
         [-0.7625,  0.6767,  0.1940]],

        [[-1.1230,  0.2482,  0.7127],
         [-0.3202, -0.3510, -1.2052],
         [-1.0595, -0.5830,  0.4192],
         [ 0.5600, -1.2777, -0.1323]]])

[0;31mOut[2:27]: [0m
tensor([[[ 1.6286,  0.4707, -0.5730],
         [ 0.3841, -0.4789, -0.8033],
         [ 0.1299, -0.6602, -2.0182],
         [ 0.5541, -0.1653, -0.4314]],

        [[ 1.1544, -0.8126, -0.7634],
         [-0.0817, -1.5430, -0.6341],
         [ 0.0291,  0.9677,  0.1294],
         [-0.3747, -1.4987, -0.1063]]])

[0;31mOut[1:27]: [0m
tensor([[[-0.0919, -0.7646,  0.1660],
         [-0.9814,  0.9445, -1.8339],
         [-1.0218,  0.8454, -0.6050],
         [-0.4161, -0.0764,  0.4383]],

        [[ 0.3151, -2.1761,  0.9970],
         [ 0.9423,  0.7667,  0.6834],
         [ 1.9586, -0.0994,  0.0186],
         [-0.0961, -0.3901,  1.2133]]])

[0;31mOut[0:45]: [0m
tensor([[[-0.1776, -0.8116, -0.6636],
         [ 0.3238,  2.4110,  0.4005],
         [-0.7808, -2.0984,  1.7691],
         [ 0.9370,  0.0141,  0.6934]]])

In [7]:
%%px
# resplit
a.resplit_(axis=1)

a.lshape_map

[0;31mOut[3:16]: [0m
tensor([[7, 1, 3],
        [7, 1, 3],
        [7, 1, 3],
        [7, 1, 3]])

[0;31mOut[0:16]: [0m
tensor([[7, 1, 3],
        [7, 1, 3],
        [7, 1, 3],
        [7, 1, 3]])

[0;31mOut[1:16]: [0m
tensor([[7, 1, 3],
        [7, 1, 3],
        [7, 1, 3],
        [7, 1, 3]])

[0;31mOut[2:16]: [0m
tensor([[7, 1, 3],
        [7, 1, 3],
        [7, 1, 3],
        [7, 1, 3]])

You can use the `resplit_` method (in-place), or `ht.resplit` (out-of-place) to change the distribution axis, but also to set the distribution axis to None. The latter corresponds to an MPI.Allgather operation that gathers the entire array on each process. This is useful when you've achieved a small enough data size that can be processed on a single device, and you want to avoid communication overhead.

In [8]:
%%px
# "un-split" distributed array
a.resplit_(axis=None)
# each process now holds a copy of the entire array

[0;31mOut[0:17]: [0m
DNDarray([[[ 0.5357, -1.9890, -0.4163],
           [ 0.0807,  0.0103, -0.8651],
           [ 0.5091, -0.9029, -0.2220],
           [-1.3162,  0.4313,  2.0407]],

          [[ 0.5421, -1.1846, -1.2136],
           [-0.2742,  1.2303,  0.3469],
           [ 2.3878,  0.6656, -0.0703],
           [-0.7895,  0.2615,  1.4271]],

          [[ 0.4421,  0.8812,  0.0758],
           [-0.0802, -1.0479,  1.0972],
           [ 0.3463,  0.7280, -0.4285],
           [-0.9275, -2.3051, -0.4425]],

          [[-0.3667,  1.9827,  1.5161],
           [ 0.5978, -0.1475, -0.4847],
           [-0.8409, -1.1828, -0.4474],
           [-0.3909, -0.5872, -0.2087]],

          [[-1.5586,  0.7291, -0.8386],
           [ 0.1649, -0.7093,  0.0767],
           [ 0.5224, -1.7311, -0.6501],
           [-1.1856, -0.5394,  1.0237]],

          [[ 0.9554,  2.0103,  1.3122],
           [ 0.1958, -0.4657, -0.4318],
           [-1.0655, -1.0135, -0.7660],
           [-0.6943,  0.2370,  1.6383]],

     







The opposite is not true, i.e. you cannot use `resplit_` to distribute an array with split=None. In that case, you must use the `ht.array()` factory function:

In [9]:
%%px
# make `a` split again
a = ht.array(a, split=0)

### Making disjoint data into a global DNDarray

Another common occurrence in a data-parallel pipeline: you have addressed the embarassingly-parallel part of your algorithm with any array framework, each process working independently from the others. You now want to perform a non-embarassingly-parallel operation on the entire dataset, with Heat as a backend.

You can use the `ht.array` factory function with the `is_split` argument to create a DNDarray from a disjoint (on each MPI process) set of arrays. The `is_split` argument indicates the axis along which the disjoint data is to be "joined" into a global, distributed DNDarray.

In [10]:
%%px
# create some random local arrays on each process
import numpy as np
local_array = np.random.rand(3, 4)

# join them into a distributed array
a_0 = ht.array(local_array, is_split=0)
a_0.shape

[0;31mOut[3:19]: [0m(12, 4)

[0;31mOut[0:19]: [0m(12, 4)

[0;31mOut[1:19]: [0m(12, 4)

[0;31mOut[2:19]: [0m(12, 4)

Change the cell above and join the arrays along a different axis. Note that the shapes of the local arrays must be consistent along the non-split axes. They can differ along the split axis.

The `ht.array` function takes any data object as an input that can be converted to a torch tensor. 

Once you've made your disjoint data into a DNDarray, you can apply any Heat operation or algorithm to it and exploit the cumulative RAM of all the processes in the communicator. 

You can access the MPI communication functionalities of the DNDarray through the `comm` attribute, i.e.:

```python
# these are just examples, this cell won't do anything
a.comm.Allreduce(a, b, op=MPI.SUM)

a.comm.Allgather(a, b)
a.comm.Isend(a, dest=1, tag=0)
```

etc.

### Exercise 1: Mapping Local Shapes to Global Indices

This exercise uses $\text{lshape\_map}$ and the process $\text{rank}$ to determine the **start and end indices** of the distributed array ($\mathbf{DNDarray}$) slice held by the current process. This is vital for algorithms that require local knowledge of the global data structure.

* **Background**: When a $\mathbf{DNDarray}$ is split, the $\text{lshape\_map}$ attribute gives the local shape of the data on each process. We can use this to calculate the global starting and ending indices for the current rank along the $\text{split}$ axis ($\text{axis}=0$).
    * **Start Index** = Sum of local shapes on all preceding ranks.
    * **End Index** = Start Index + Local Shape on the current rank.

* **Task**:
    1.  Create a $7 \times 4 \times 3$ $\mathbf{DNDarray}$ distributed along $\text{axis}=0$.
    2.  Calculate and print the global **start index** ($\text{start\_idx}$) and **end index** ($\text{end\_idx}$) of the slice held by the current process.

### Exercise 2: Comparing Redistribution and Resplit

This exercise explores the difference between $\mathbf{DNDarray}.\text{redistribute}\_()$ and $\mathbf{DNDarray}.\text{resplit}\_()$.

* **Task**:
    1.  Create a $10 \times 4$ $\mathbf{DNDarray}$ ($\mathbf{A}$) distributed along $\text{axis}=0$.
    2.  Use $\mathbf{DNDarray}.\text{resplit}\_()$ to redistribute $\mathbf{A}$ along $\text{axis}=1$ ($\mathbf{B}$).
    3.  Create a **target map** to redistribute $\mathbf{A}$ along $\text{axis}=0$ such that rank 0 holds 1 row, and all other ranks hold 3 rows. Apply $\mathbf{DNDarray}.\text{redistribute}\_()$ ($\mathbf{C}$).
* **Check**: Print the $\text{lshape}$ and $\text{split}$ for $\mathbf{A}$, $\mathbf{B}$, and $\mathbf{C}$ on each rank to confirm the new distributions.

In the next notebooks, we'll show you how we use Heat's distributed-array infrastructure to scale complex data analysis workflows to large datasets and high-performance computing resources.

- [Data loading and preprocessing](3_loading_preprocessing.ipynb)
- [Matrix factorization algorithms](4_matrix_factorizations.ipynb)
- [Clustering algorithms](5_clustering.ipynb)

In [11]:
## solutions exercise 1
%%px
import heat as ht
import torch

# 1. Create a distributed DNDarray
a = ht.random.randn(7, 4, 3, split=0)

# Get rank, split axis, and local shape map
rank = a.comm.rank
split_axis = a.split
lshape_map = a.lshape_map.numpy() # Convert to numpy for slicing/summing (local operation)

# Calculate the start index: sum of local shapes (along split axis) for all preceding ranks
start_idx = lshape_map[:rank, split_axis].sum()

# Calculate the end index: start index + local shape (along split axis) for the current rank
end_idx = start_idx + lshape_map[rank, split_axis]

# 2. Print the results
print(f"Rank {rank}/{a.comm.size}: Global Indices along axis={split_axis}: [{start_idx} : {end_idx}]")

UsageError: Line magic function `%%px` not found.


In [12]:
## solution exercise 2

%%px
import heat as ht
import torch

# 1. Create original DNDarray A (split=0)
A = ht.random.randn(10, 4, split=0)
rank = A.comm.rank

# 2. Resplit A to split=1 (creating B)
B = A.copy()  # Use copy to keep A untouched for redistribution test
B.resplit_(axis=1)

# 3. Create target map and redistribute A (creating C)
# Original size: 10 rows. Split=0. Ranks: 0, 1, 2, 3.
# Target distribution (rows along axis 0): [1, 3, 3, 3]
target_map = A.lshape_map.copy()
# The original split axis is 0 (index 0 of lshape_map rows)
target_map[:, A.split] = torch.tensor([1, 3, 3, 3])

C = A.copy()
C.redistribute_(target_map=target_map)

# Check: Print properties
print(f"--- Rank {rank} ---")
print(f"A (Original): Split={A.split}, lshape={A.lshape}")
print(f"B (Resplit A to axis=1): Split={B.split}, lshape={B.lshape}")
print(f"C (Redistribute A): Split={C.split}, lshape={C.lshape}")

UsageError: Line magic function `%%px` not found.
