# Semi-Automatic Parallelism

У MindSpore есть распределенные операторы и она умеет работать с распределенными тензорами. Пользователю о том, как это работает внутри думать не приходится. При этом MindSpore имеет 3 стратегии для распределения (как я понял, MindSPore может для каждого случая автоматически подбирать подходящую, хотя можно и задать самому). Их можно рассмотреть на примере умножения Тензора input на Тензор weight. 

1) **data parallelism**. При этой стратегии проиходит разрез input, weights не разрезается, задается так: `strategy=((2^N, 1, 1),(1, 1, 1))`*
2) **model parallelism**. При этой стратегии наоборот происходит разрез weights. Задается так: `strategy=((1, 1, 1),(2^N, 1, 1))`*
3) **mixed parallelism**. Разрезается и то, и другое. Задается так: `strategy=((2^N, 1, 1),(1, 1, 2^N))`*

На основе стратегии шардов в распределенном операторе определяется метод модели распределения входного и выходного тензора оператора. Распределенная модель состоит из      `device_matrix` (то, как происходит распределение), `tensor_shape` (рахмерность тензора), `tensor_map`(отношение между размерностями устройства и тензора). Распределенный оператор далее определяет, следует ли вставлять в граф дополнительные вычислительные и коммуникационные операции в соответствии с моделью тензорного распределения, чтобы обеспечить корректность логики работы оператора. (Суть этого предложение можно понять, если прочитать следующий пункт)

\* **Замечание:** 2^N, ибо MindSpore исходит из 2-х принципов: principle of base-2 and uniform distribution. Кортежах под цифрами обозначается, что можно делить и на сколько.

![tensor_parallel](../pictures/Tensor_parallel.png)

### Примеры кода

## <a id="DistributedOperators" style="text-decoration: none; color: #cccccc;">Distributed operators</a>

In [None]:
import mindspore.nn as nn
from mindspore import ops
import mindspore as ms
import numpy as np
from mindspore.communication import init

ms.set_context(mode=ms.GRAPH_MODE, device_target="GPU")
ms.set_auto_parallel_context(parallel_mode=ms.ParallelMode.SEMI_AUTO_PARALLEL, device_num=4)

init()

class DenseMatMulNet(nn.Cell):
    def __init__(self):
        super(DenseMatMulNet, self).__init__()
        self.matmul1 = ops.MatMul().shard(((4, 1), (1, 1)))
        self.matmul2 = ops.MatMul().shard(((1, 1), (1, 4)))
        
    def construct(self, x, w, v):
        y = self.matmul1(x, w)
        print("y=", y)
        z = self.matmul2(y, v)
        print("z=", z)
        return z
    
if __name__ == "__main__":
    net = DenseMatMulNet()
    matrix = np.array([
        [1, 1, 1, 1],
        [1, 1, 1, 1],
        [1, 1, 1, 1],
        [1, 1, 1, 1]
    ], dtype=np.float32)
    
    result = net(ms.Tensor(matrix), ms.Tensor(matrix), ms.Tensor(matrix))

$$
\begin{pmatrix}
    X_1\\
    X_2\\
    X_3\\
    X_4
\end{pmatrix} * W = 
\begin{pmatrix}
    Y_1\\
    Y_2\\
    Y_3\\
    Y_4
\end{pmatrix} => AllGather => Y * \begin{pmatrix} V_1&V_2&V_3&V_4 \end{pmatrix} = \begin{pmatrix} Z_1 &Z_2 & Z_3 & Z_4 \end{pmatrix}
$$

In [None]:
import os
import mindspore as ms
import mindspore.dataset as ds
from mindspore import nn, ops
from mindspore.communication import init
from mindspore.common.initializer import initializer

ms.set_context(mode=ms.GRAPH_MODE)
ms.set_context(max_device_memory="28GB")
ms.set_auto_parallel_context(parallel_mode=ms.ParallelMode.SEMI_AUTO_PARALLEL)
init()
ms.set_seed(1)

class Network(nn.Cell):
    """Network"""
    def __init__(self):
        super().__init__()
        self.flatten = ops.Flatten()
        self.fc1_weight = ms.Parameter(initializer("normal", [28*28, 512], ms.float32))
        self.fc2_weight = ms.Parameter(initializer("normal", [512, 512], ms.float32))
        self.fc3_weight = ms.Parameter(initializer("normal", [512, 10], ms.float32))
        self.matmul1 = ops.MatMul()
        self.relu1 = ops.ReLU()
        self.matmul2 = ops.MatMul()
        self.relu2 = ops.ReLU()
        self.matmul3 = ops.MatMul()

    def construct(self, x):
        x = self.flatten(x)
        x = self.matmul1(x, self.fc1_weight)
        x = self.relu1(x)
        x = self.matmul2(x, self.fc2_weight)
        x = self.relu2(x)
        logits = self.matmul3(x, self.fc3_weight)
        return logits

net = Network()
net.matmul1.shard(((2, 4), (4, 1)))
net.relu1.shard(((4, 1),)) 
net.matmul2.shard(((1, 8), (8, 1)))
net.relu2.shard(((8, 1),))

def create_dataset(batch_size):
    """create dataset"""
    dataset_path = os.getenv("DATA_PATH")
    dataset = ds.MnistDataset(dataset_path)
    image_transforms = [
        ds.vision.Rescale(1.0 / 255.0, 0),
        ds.vision.Normalize(mean=(0.1307,), std=(0.3081,)),
        ds.vision.HWC2CHW()
    ]
    label_transform = ds.transforms.TypeCast(ms.int32)
    dataset = dataset.map(image_transforms, 'image')
    dataset = dataset.map(label_transform, 'label')
    dataset = dataset.batch(batch_size)
    return dataset

data_set = create_dataset(32)
optimizer = nn.SGD(net.trainable_params(), 1e-2)
loss_fn = nn.CrossEntropyLoss()

def forward_fn(data, target):
    """forward propagation"""
    logits = net(data)
    loss = loss_fn(logits, target)
    return loss, logits

grad_fn = ms.value_and_grad(forward_fn, None, net.trainable_params(), has_aux=True)

@ms.jit
def train_step(inputs, targets):
    """train_step"""
    (loss_value, _), grads = grad_fn(inputs, targets)
    optimizer(grads)
    return loss_value

for epoch in range(10):
    i = 0
    for image, label in data_set:
        loss_output = train_step(image, label)
        if i % 10 == 0:
            print("epoch: %s, step: %s, loss is %s" % (epoch, i, loss_output))
        i += 1

### 1) Matmul1

$$ 
\begin{pmatrix}
    X_{11} & X_{12} & X_{13} & X_{14}\\
    X_{21} & X_{22} & X_{23} & X_{24}
\end{pmatrix} * \begin{pmatrix} W_{1} \\ W_{2} \\ W_{3} \\ W_{4} \end{pmatrix} =(AllReduce)= \begin{pmatrix} \sum\limits_{i=1}^4X_{1i}W_i  \\ \sum\limits_{i=1}^4X_{1i}W_i \end{pmatrix} = \begin{pmatrix} Y_1 \\ Y_2 \end{pmatrix} = 
$$

### 2) ReLU1
$$
Y_i = \begin{pmatrix}
    Y_{i1} \\ Y_{i2} \\ Y_{i3} \\ Y_{i4}
\end{pmatrix} = (ReLU) = \begin{pmatrix} Y_{i1}' \\ Y_{i2}' \\ Y_{i3}' \\ Y_{i4}'
\end{pmatrix}
$$

**Замечание:** Параллельно задействуется 8 видеокарт, так $i = 1, 2$

### 3) Matmul2
$$
=\begin{pmatrix}
    X_1 & X_2 & X_3 & X_4 & X_5 & X_6 & X_7 & X_8
\end{pmatrix} * \begin{pmatrix}
    V_1 \\ V_2 \\ V_3 \\ V_4 \\ V_5 \\ V_6 \\ V_7 \\ V_8
\end{pmatrix} =(AllReduce)= \sum\limits_{i=1}^8 X_i*V_i = Y
$$

### 4) ReLU2
$$
Y = \begin{pmatrix}
    Y_1 \\ Y_2 \\ Y_3 \\ Y_4 \\ Y_5 \\ Y_6 \\ Y_7 \\ Y_8
\end{pmatrix} = (ReLU) = \begin{pmatrix}
    Y_1' \\ Y_2' \\ Y_3' \\ Y_4' \\ Y_5' \\ Y_6' \\ Y_7' \\ Y_8'
\end{pmatrix} = Y'
$$

### Операторы, поддерживающие параллелизм
[все операторы, поддерживающие параллелизм](https://www.mindspore.cn/docs/en/r2.2/note/operator_list_parallel.html)

**AllReduce**

In [None]:
import numpy as np
from mindspore.communication import init, get_rank
import mindspore as ms
import mindspore.nn as nn
import mindspore.ops as ops

init()
class Net(nn.Cell):
    def __init__(self):
        super(Net, self).__init__()
        self.all_reduce_sum = ops.AllReduce(ops.ReduceOp.SUM, group="nccl_world_group")

    def construct(self, x):
        return self.all_reduce_sum(x)

value = get_rank()
input_x = ms.Tensor(np.array([[value]]).astype(np.float32))
net = Net()
output = net(input_x)
print(output)

![AllReduce](../pictures/OperatorParallelism/AllRefuce.png)

**AllGather**

In [None]:
import numpy as np
import mindspore.ops as ops
import mindspore.nn as nn
from mindspore.communication import init, get_rank
import mindspore as ms

ms.set_context(mode=ms.GRAPH_MODE)
init()
class Net(nn.Cell):
    def __init__(self):
        super(Net, self).__init__()
        self.all_gather = ops.AllGather()

    def construct(self, x):
        return self.all_gather(x)

value = get_rank()
input_x = ms.Tensor(np.array([[value]]).astype(np.float32))
net = Net()
output = net(input_x)
print(output)

![AllGather](../pictures/OperatorParallelism/AllGather.png)

**ReduceScatter**

In [None]:
import mindspore as ms
from mindspore.communication import init, get_rank
import mindspore.nn as nn
import mindspore.ops as ops
import numpy as np

ms.set_context(mode=ms.GRAPH_MODE)
init()
class Net(nn.Cell):
    def __init__(self):
        super(Net, self).__init__()
        self.reduce_scatter = ops.ReduceScatter(ops.ReduceOp.SUM)

    def construct(self, x):
        return self.reduce_scatter(x)

input_x = ms.Tensor(np.array([[0], [1], [2], [3]]).astype(np.float32))
net = Net()
output = net(input_x)
print(output)

операция сначала суммирует входные данные каждой карты, а затем разбивает данные на количество карт и распределяет данные по соответствующей карте.

![ReduceScatter](../pictures/OperatorParallelism/ReduceScatter.png)

**Broadcast**

In [None]:
import mindspore as ms
from mindspore.communication import init
import mindspore.nn as nn
import mindspore.ops as ops
import numpy as np

ms.set_context(mode=ms.GRAPH_MODE)
init()
class Net(nn.Cell):
    def __init__(self):
        super(Net, self).__init__()
        self.broadcast = ops.Broadcast(0)

    def construct(self, x):
        return self.broadcast((x,))

input_x = ms.Tensor(np.array([[0]]).astype(np.int32))
net = Net()
output = net(input_x)
print(output)

![Broadcast](../pictures/OperatorParallelism/Broadcast.png)

**NeighborExchange**

In [None]:
import os
import mindspore as ms
from mindspore.communication import init
import mindspore.nn as nn
import mindspore.ops as ops
import numpy as np

class Net0(nn.Cell):
    def __init__(self):
        super(Net0, self).__init__()
        self.neighbor_exchange = ops.NeighborExchange(send_rank_ids=[1], recv_rank_ids=[1], recv_shapes=([2, 2],), send_shapes=([3, 3],), recv_type=ms.float32)

    def construct(self, x):
        out = self.neighbor_exchange((x,))
        return out[0]

class Net1(nn.Cell):
    def __init__(self):
        super(Net1, self).__init__()
        self.neighbor_exchange = ops.NeighborExchange(send_rank_ids=[0], recv_rank_ids=[0], recv_shapes=([3, 3],), send_shapes=([2, 2],), recv_type=ms.float32)

    def construct(self, x):
        out = self.neighbor_exchange((x,))
        return out[0]

ms.set_context(mode=ms.GRAPH_MODE)
init()
rank_id = int(os.getenv("RANK_ID"))
if (rank_id % 2 == 0):
    input_x = ms.Tensor(np.ones([3, 3]), dtype = ms.float32)
    net = Net0()
    output = net(input_x)
    print(output)
else:
    input_x = ms.Tensor(np.ones([2, 2]) * 2, dtype = ms.float32)
    net = Net1()
    output = net(input_x)
    print(output)

Операция `NeighborExchange` предоставит набор данных, которые будут отправлены на каждую из других конкретных карт при получении данных с
конкретной карты. Например, на приведенном ниже рисунке ранг 0 отправляет тензор с формой [16,16] в ранг 1 и получает тензор с формой
[32,32] из ранга 1. ранг 1 отправляет тензор с формой [32,32] в ранг 0 и получает тензор с формой [16,16] из ранга 0.

![NeighborExchange](../pictures/OperatorParallelism/NeighborExchange.png)

**NeighborExchangeV2**

In [None]:
import os
import mindspore as ms
from mindspore.communication import init
import mindspore.nn as nn
import mindspore.ops as ops
import numpy as np

class Net0(nn.Cell):
    def __init__(self):
        super(Net0, self).__init__()
        self.neighbor_exchangev2 = ops.NeighborExchangeV2(send_rank_ids=[-1, -1, -1, -1, 1, -1, -1, -1], send_lens=[0, 1, 0, 0], recv_rank_ids=[-1, -1, -1, -1, 1, -1, -1, -1], recv_lens=[0, 1, 0, 0], data_format="NCHW")

    def construct(self, x):
        out = self.neighbor_exchangev2(x)
        return out

class Net1(nn.Cell):
    def __init__(self):
        super(Net1, self).__init__()
        self.neighbor_exchangev2 = ops.NeighborExchangeV2(send_rank_ids=[0, -1, -1, -1, -1, -1, -1, -1], send_lens=[1, 0, 0, 0], recv_rank_ids=[0, -1, -1, -1, -1, -1, -1, -1], recv_lens=[1, 0, 0, 0], data_format="NCHW")

    def construct(self, x):
        out = self.neighbor_exchangev2(x)
        return out

ms.set_context(mode=ms.GRAPH_MODE)
init()
rank_id = int(os.getenv("RANK_ID"))
if (rank_id % 2 == 0):
    input_x = ms.Tensor(np.ones([1, 1, 2, 2]), dtype = ms.float32)
    net = Net0()
    output = net(input_x)
    print(output)
else:
    input_x = ms.Tensor(np.ones([1, 1, 2, 2]) * 2, dtype = ms.float32)
    net = Net1()
    output = net(input_x)
    print(output)
    
'''
мы используем оператора NeighborExchangeV2 для обмена данными между картой 0 и картой 1,
отправляя данные, указанные под картой 0, на карту 1 и получая данные с карты 1, прошитой ниже.
Карта 1 отправляет верхнюю часть данных на карту 0 и получает данные с карты 0, прошитой сверху. Наконец, каждая карта выводит полученные данные.
'''

Операция `NeighborExchangeV2` отправляет часть данных из тензора на окружающие 8 карт в соответствии с настройками атрибута и
получает данные с окружающих 8 карт и объединяет их в новый тензор, который часто используется в сценариях, когда большой тензор
разбивается на несколько карт для распределенных сверточных операций. Атрибуты send_rank_ids и recv_rank_ids представляют собой 8 чисел, соответственно,
указывающих на отправку/получение идентификатора ранжирования в 8 направлениях, а заполнение -1 означает отсутствие отправки/получения. 
Атрибуты send_lens и recv_lens - это четыре числа, которые представляют длину передачи/приема в
четырех направлениях [вверху, внизу, слева, справа] соответственно. Например, на рисунке 1 ниже показан пример с 16 карточками, в качестве примера возьмем 10-й ранг
, установим send_rank_ids=[6,7,11,15,14,13,9,5], данные 10-го ранга будут вырезаны и отправлены в rank. 5, 6, 7, 11, 15, 14, 13, 9 соответственно,
например, красный цвет на рисунке переводится в ранг 5, красный, желтый и синий - в ранг 6, синий - в ранг 7 и т.д. Установка recv_rank_ids=[6,7,11,15,14,13,9,5],
в то же время rank10 получает некоторые данные с каждой из этих карточек, прошитых в соответствующем направлении, для формирования нового тензорного
вывода, как показано на рисунке с rank10 и светло-зеленой частью.

**Замечание:** Как я понял, мы по сути создаем определенную топологию наших карт для последующей передачи на них определенных частей тензора, которые нужны

![NeighborExchangeV2](../pictures/OperatorParallelism/NeighborExchangeV2.png)

**AlltoAll**

In [None]:
import os
import mindspore as ms
from mindspore.communication import init
import mindspore.nn as nn
import mindspore.ops as ops
import numpy as np

class Net(nn.Cell):
    def __init__(self):
        super(Net, self).__init__()
        self.all_to_all = ops.AlltoAll(split_count = 8, split_dim = -2, concat_dim = -1)

    def construct(self, x):
        out = self.all_to_all(x)
        return out

ms.set_context(mode=ms.GRAPH_MODE, device_target='Ascend')
init()
net = Net()
rank_id = int(os.getenv("RANK_ID"))
input_x = ms.Tensor(np.ones([1, 1, 8, 1]) * rank_id, dtype = ms.float32)
output = net(input_x)
print(output)
"""
мы используем оператор AlltoAll для обмена данными с 8 картами.
Разрезаем тензор на карте по 2 размерности с конца.
Затем отправляем данные о срезе на другие карты по порядку, а также получаем данные от других карт
и сшиваем их по 1 размерности с конца. Наконец, каждая карта выводит сшитые данные.
Должно получиться такое: [[[[0. 1. 2. 3. 4. 5. 6. 7.]]]]
"""

Операция `AlltoAll` разбивает входные данные на определенное количество блоков по определенной размерности и отправляет их на другие карты по
порядку, одновременно получая входные данные с других карт и объединяя данные по определенной размерности по порядку. Например, на приведенном ниже
рисунке тензор разрезается на 5 частей по 1-й размерности, одновременно получая данные из других карт и объединяя их по 2-й размерности, и, наконец, выводя переключенные данные.

![AlltoAll](../pictures/OperatorParallelism/AlltoAll.png)

### Еще примеры, связанные с операторами

 `Z=(X*W)*V`

Предположим, что сначала были разделены именно input данные. Тогда для их сбори применяется оператор `AllGather`

![AllGather](../pictures/Sample1-Dictributed_Transformation.png)

В следующем примере, наоборот используется model parallel. Для сборки и разделения, соответственно, используется оператор `AlltoAll`

![alt_all](../pictures/Sample2-Distributed_Transformation.png)

В последнем примере у нас между операциями с размерностями все хорошо. Однако затем необходимо уменьшить размерность. Для этого применяется оператор `AllReduce`

![AllReduce](../pictures/Sample3-Distributed_Transformation.png)

## Pipeline Parallelism

**$\underline{\text{В MindSpore 2.2.14 планировщик 1F1B}}$**


При большом количестве устройств кластера, если используется только **operator parallelism**, требуется обмен данными через область обмена данными всего кластера, что может сделать обмен данными неэффективным и, следовательно, снизить общую производительность. Конвейерный параллелизм может разделить структуру нейронной сети на несколько этапов, и каждый этап выполняется в определенной части устройства, что ограничивает область коллективного взаимодействия этой частью устройства, в то время как на промежуточном этапе используется двухточечная связь. 

**Базовый принцип**. Параллельный конвейер — это разделение операторов в нейронной сети на несколько этапов, а затем сопоставление этапов с разными устройствами, чтобы разные устройства могли вычислять разные части нейронной сети. Сеть из 4 слоев MatMul разбивается на 4 этапа и распределяется по 4 устройствам. При прямых вычислениях каждая машина отправляет результат следующей машине через оператора связи после вычисления MatMul на машине, и в это же время следующая машина получает (Receive) результат MatMul предыдущей машины через оператора связи, и начинает рассчитывать MatMul на машине; При обратном расчете, после того как градиент последней машины вычислен, результат отправляется предыдущей машине, и в то же время предыдущая машина получает результат градиента последней машины и начинает вычислять обратный результат текущей машины.

![piepline_parallelism](../pictures/Piepline_parallelism-basic_principe.png)

Простое разделение модели на несколько устройств не приведет к повышению производительности, поскольку в линейной структуре модели одновременно работает только одно устройство, в то время как другие устройства ожидают, что приводит к пустой трате ресурсов. Чтобы повысить эффективность, параллельный конвейер дополнительно делит малую партию (MiniBatch) на более мелкие микропакеты (MicroBatch) и принимает последовательность выполнения конвейера в микропакете, чтобы достичь цели повышения эффективности

**GPipe schreduler**.Малые партии разрезаются на 4 микропакета, а 4 микропакета выполняются на 4 группы, образуя конвейер. Для обновления параметров используется градиентная агрегация микропартии, где каждое устройство только хранит и обновляет параметры соответствующей группы. где порядковый номер белого цвета представляет индекс микропартии.

![GPipe](../pictures/Pipeline_parallelism-GPipe.png)

**1F1B schreduler**.В параллельной реализации конвейера MindSpore порядок выполнения был скорректирован для лучшего управления памятью. Обратный микропакет с номером 0 выполняется сразу после его прямого выполнения, так что память промежуточного результата микропакета с номером 0 освобождается раньше, тем самым гарантируя, что пиковое использование памяти будет ниже

![1F1B](../pictures/Pipeline_parallelism-1F1B.png)

**Interleaved Pipeline Schreduler**.Чтобы повысить эффективность параллелизма конвейеров и уменьшить долю простоев, компания Megatron LM предлагает новую систему параллельного планирования конвейеров под названием "чередующийся конвейер". Традиционный конвейерный параллелизм обычно предусматривает размещение нескольких последовательных слоев модели (например, слоев трансформера) на одной рабочей площадке. При планировании конвейера с чередованием на каждом этапе выполняются вычисления с чередованием на неперерывных слоях модели, чтобы еще больше уменьшить долю простоев с большей связью. Например, при традиционном конвейерном параллелизме каждая стадия имеет 2 слоя модели, а именно: стадия 0 имеет слои 0 и 1, стадия 1 имеет слои 2 и 3, стадия 3 имеет слои 4 и 5, а стадия 4 имеет слои 6 и 7, в то время как в конвейере с чередованием на стадии 0 есть слои 0 и 4, стадия 1 содержит слои 1 и 5, стадия 2 содержит слои 2 и 6, а стадия 3 содержит слои 3 и 7.

![Intervalled+1F1B](../pictures/Pipeline_patallelism-Intervalled_and_1F1B.png)

**MindStore schreduler**

![MindStore](../pictures/Pipeline_parallelism-MindStore_screduler.png)


In [None]:
import os
import mindspore as ms
import mindspore.dataset as ds
from mindspore import nn, train
from mindspore.communication import init

ms.set_context(mode=ms.GRAPH_MODE)
ms.set_auto_parallel_context(parallel_mode=ms.ParallelMode.SEMI_AUTO_PARALLEL, pipeline_stages=2)
init()
ms.set_seed(1)

class Network(nn.Cell):
    """Network"""
    def __init__(self):
        super().__init__()
        self.flatten = nn.Flatten()
        self.layer1 = nn.Dense(28*28, 512)
        self.relu1 = nn.ReLU()
        self.layer2 = nn.Dense(512, 512)
        self.relu2 = nn.ReLU()
        self.layer3 = nn.Dense(512, 10)

    def construct(self, x):
        x = self.flatten(x)
        x = self.layer1(x)
        x = self.relu1(x)
        x = self.layer2(x)
        x = self.relu2(x)
        logits = self.layer3(x)
        return logits

net = Network()
net.layer1.pipeline_stage = 0
net.relu1.pipeline_stage = 0
net.layer2.pipeline_stage = 1
net.relu2.pipeline_stage = 1
net.layer3.pipeline_stage = 1

def create_dataset(batch_size):
    """create dataset"""
    dataset_path = os.getenv("DATA_PATH")
    dataset = ds.MnistDataset(dataset_path)
    image_transforms = [
        ds.vision.Rescale(1.0 / 255.0, 0),
        ds.vision.Normalize(mean=(0.1307,), std=(0.3081,)),
        ds.vision.HWC2CHW()
    ]
    label_transform = ds.transforms.TypeCast(ms.int32)
    dataset = dataset.map(image_transforms, 'image')
    dataset = dataset.map(label_transform, 'label')
    dataset = dataset.batch(batch_size)
    return dataset

data_set = create_dataset(32)

optimizer = nn.SGD(net.trainable_params(), 1e-2)
loss_fn = nn.CrossEntropyLoss()
loss_cb = train.LossMonitor()
net_with_grads = nn.PipelineCell(nn.WithLossCell(net, loss_fn), 4)
model = ms.Model(net_with_grads, optimizer=optimizer)
model.train(3, data_set, callbacks=[loss_cb], dataset_sink_mode=True)

"""
В этом коде используется разделение на 4 MicroBatch. При этом используется конвеер всего из 2-х карт. 
"""

$$2 \ pipeline: \ Dev_0(layer1,\  relu1), \ Dev_1(layer2,\  relu2,\ layer3)$$

$$
D = \cup_{k=1}^n D_k = \cup_{k=1}^n\cup_{i=1}^4B_{ki} \\ (D_k - MiniBatch; \ B_{ki} - MicroBatch) 
$$

![SchreduleScheme](../pictures/myPictures//SchredulerScheme.png)



**Зачем нам при конвеерном параллелизме использовать именно разделение на microbatch?** 

Дело в том, что, что, если мы разделим на microbatch, то у нас сразу на видеокарте будет несколько обернутных для конвеера батчей. В случае, если не делить, то нам каждый раз надо будет создавать новую обертку, что существенно может снизить нашу производительность

Для доказательства этого можно провети ряд экспериментов. Ниже представлены результаты (первая цифра в кортеже показывает размер батча, вторая - количество микробатчей, через ":" представлено время, затраченное на обучение в секундах):
$$
(4, 1): 193.36 \\
(8, 2): 166.19 \\
(16, 4): 155.26 \\
(32, 8): 145.04 \\
$$
Из представленных результатов видно, что время, затраченное на обучение, уменьшается.

Возникает гипотеза о том, что чем больше количество microbatch тем быстрее будет обучение. Однако, исходя из представленных результатов ниже, это не так:
$$
(32, 1): 52.57 \\
(32, 2): 67.21 \\
(32, 4): 97.68 \\
(32, 8): 145.04 \\
(32, 16): 275.40 \\
(32, 32): 528.28 \\
$$

**Вывод (гипотеза):** за счет разделение на microbatch действительно уменьшается время, затрачиваемое на обучение. Однако, если MiniBatch не слишком большой, то не следует его разделять на больше, чем количество видеокарт в системе 

**Замечание:** Стоит также отметить, что при разделении на microbatch работа видеокарты более стабильна. Также стоит отметить, что при большом разделении, оперативная память в некоторых случаях загружалась сильнее, хотя и не очень значительно. 

Нерешенная вопрос: Почему растет Loss?

![Loss](../pictures/myPictures/TableLoss.png)


## Optimizer parallelism

При обучении параллелизму данных или параллелизму операторов одна и та же копия параметров модели может существовать на нескольких устройствах, что позволяет оптимизатору выполнять избыточные вычисления на нескольких устройствах при обновлении этого веса. В этом случае вычисления оптимизатора могут быть распределены по нескольким устройствам за счет параллелизма оптимизатора. Его преимущества заключаются в снижении потребления статической памяти и объема вычислений в оптимизаторе

**Замечание:** В режиме AUTO_PARALLEL или SEMI_AUTO_PARALLEL включается optimizer parallelism, если параметры после стратегии нарезки имеют повторяющиеся срезы между машинами, а максимальный размер фигуры делится на количество повторяющихся срезов, фреймворк сохраняет параметры как минимальные срезы и обновляет их в оптимизаторе. В этом режиме поддерживаются все оптимизаторы

**Что мы хотим сделать и зачем это надо?**

Традиционная параллельная модель данных хранит копии параметров модели на каждом устройстве, разрезает обучающие данные, синхронизирует информацию о градиенте после каждой итерации с помощью операторов связи и, наконец, обновляет параметры с помощью вычислений оптимизатора. Параллелизм данных, хотя и эффективен для повышения производительности обучения, не позволяет максимально эффективно использовать машинные ресурсы. Оптимизатор вводит избыточную память и вычисления, устранение этих избыточностей является точкой оптимизации, на которой следует сосредоточиться.

В обучающей итерации параллелизм данных вводит операцию связи для синхронизации градиентов по нескольким карточкам для сбора градиентов параметров, созданных различными выборками на каждой карте. Поскольку параллелизм модели не используется, операции оптимизатора на каждой карте фактически обновляются на основе одних и тех же параметров и в том же направлении. Основная идея устранения избыточности оптимизатора заключается в том, чтобы распределить эту память и вычисления по картам для достижения прироста памяти и производительности.

**Межслойное деление**. Одной из групп весов является межслойное деление параметров и градиентов внутри оптимизатора, а общий поток обучения показан на рисунке 1. Параметры и градиенты группируются на различных картах для обновления, а затем обновленные веса распространяются между устройствами с помощью операции коммуникационной трансляции. Прирост памяти и производительности решения зависит от группы с наибольшей долей параметров. Когда параметры разделены поровну, теоретические положительные выигрыши равны (N-1)/N времени выполнения оптимизатора и динамической памяти и (N-1)/N объема памяти для параметров состояния оптимизатора, где N обозначает количество устройств. А отрицательное преимущество — это время связи, которое наступает при совместном использовании весов сети.

![Inner-layer](../pictures/Optimizer_parallelism-inner-layer.png)


**Внутрислойное деление (Ее использует MindSpore)**. Еще один способ реализации нарезки параметров — это внутрислойное разделение параметров, и для каждого параметра берется соответствующий срез и градиент в соответствии с номером устройства. После обновления параметров и градиентов вызывается операция агрегации связи для совместного использования параметров между устройствами. Преимущество этой схемы в том, что она естественно поддерживает балансировку нагрузки, т.е. количество параметров и вычислений согласовано на каждой карте, а недостаток в том, что форма параметра должна быть кратна количеству устройств. Теоретические преимущества этой схемы согласуются с группировкой параметров, и в систему были внесены следующие усовершенствования с целью расширения преимуществ.

Во-первых, разделение весов в сети может еще больше уменьшить статическую память. Однако для этого также требуется выполнить операцию с общим весом в конце итерации перед прямым началом следующей итерации, гарантируя, что исходная форма тензора останется неизменной после перехода к прямым и обратным операциям. Кроме того, основным проигрышем от параллельной работы оптимизатора является время связи общих весов, которое может принести выигрыш в производительности, если мы сможем его уменьшить или скрыть. Одним из преимуществ перекрестной итерации связи является то, что операции связи могут выполняться вперемежку с прямой сетью путем объединения операторов связи в соответствующие группы, тем самым максимально скрывая затраты времени на связь. Затраты времени на общение также связаны с объемом общения. Для сети со смешанной точностью, если мы сможем использовать связь fp16, объем связи уменьшится вдвое по сравнению с fp32.

![MindStore](../pictures/Optimizer_parallelism-MindStore.png)

**Замечание:** Что такое перерестаня итерация?

**Перекрестная итерация** (cross-iteration) представляет собой технику, которая позволяет различным частям нейронной сети обмениваться информацией на протяжении процесса обучения. Это особенно полезно при работе с большими и сложными моделями, которые разбиваются на несколько частей для параллельной обработки.

Как работает перекрестная итерация?
1) Разбиение сети: Нейронная сеть делится на несколько более мелких подсетей.
2) Параллельная обработка: Каждая подсеть обрабатывает свою часть данных независимо.
3) Обмен информацией: После определенного числа итераций (или эпох) обучения, подсети обмениваются информацией о своих текущих весах или активациях.
4) Обновление весов: На основе полученной информации, каждая подсеть обновляет свои веса, чтобы лучше согласовать свою работу с другими подсетями.



![OptimiserScheme](../pictures/myPictures/OptimiserScheme.png)

In [None]:
import os
import mindspore as ms
import mindspore.dataset as ds
from mindspore import nn
from mindspore.communication import init

ms.set_context(mode=ms.GRAPH_MODE)
ms.set_auto_parallel_context(parallel_mode=ms.ParallelMode.SEMI_AUTO_PARALLEL, enable_parallel_optimizer=True)
init()
ms.set_seed(1)

class Network(nn.Cell):
    """Network"""
    def __init__(self):
        super().__init__()
        self.flatten = nn.Flatten()
        self.layer1 = nn.Dense(28*28, 512)
        self.layer2 = nn.Dense(512, 512)
        self.layer3 = nn.Dense(512, 10)
        self.relu = nn.ReLU()

    def construct(self, x):
        x = self.flatten(x)
        x = self.layer1(x)
        x = self.relu(x)
        x = self.layer2(x)
        x = self.relu(x)
        logits = self.layer3(x)
        return logits

net = Network()
net.layer1.set_comm_fusion(0)
net.layer2.set_comm_fusion(1)
net.layer3.set_comm_fusion(2)

def create_dataset(batch_size):
    """create dataset"""
    dataset_path = os.getenv("DATA_PATH")
    dataset = ds.MnistDataset(dataset_path)
    image_transforms = [
        ds.vision.Rescale(1.0 / 255.0, 0),
        ds.vision.Normalize(mean=(0.1307,), std=(0.3081,)),
        ds.vision.HWC2CHW()
    ]
    label_transform = ds.transforms.TypeCast(ms.int32)
    dataset = dataset.map(image_transforms, 'image')
    dataset = dataset.map(label_transform, 'label')
    dataset = dataset.batch(batch_size)
    return dataset

data_set = create_dataset(32)
optimizer = nn.SGD(net.trainable_params(), 1e-2)
loss_fn = nn.CrossEntropyLoss()

def forward_fn(data, target):
    """forward propagation"""
    logits = net(data)
    loss = loss_fn(logits, target)
    return loss, logits

grad_fn = ms.value_and_grad(forward_fn, None, net.trainable_params(), has_aux=True)

@ms.jit
def train_step(inputs, targets):
    """train_step"""
    (loss_value, _), grads = grad_fn(inputs, targets)
    optimizer(grads)
    return loss_value

for epoch in range(10):
    i = 0
    for image, label in data_set:
        loss_output = train_step(image, label)
        if i % 10 == 0:
            print("epoch: %s, step: %s, loss is %s" % (epoch, i, loss_output))
        i += 1