## Megatron-LM张量并行

Megatron是由 NVIDIA 的应用深度学习研究团队开发的大型、强大的Transformer模型，主要针对大规模训练大型 transformer 语言模型的研究。其主要贡献时提出了将模型进行横向分割而进行张量并行的思想。

关于Tensor parallelism的中文解读可以参考：

英伟达中国：https://zhuanlan.zhihu.com/p/420908718

知乎大佬对具体切分方法的图示+伪代码：https://zhuanlan.zhihu.com/p/366906920

博客园罗西的思考：https://www.cnblogs.com/rossiXYZ/p/15840803.html

这一节Notebook主要想介绍针对一个比较简单的模型，如何使用Megatron快速将其进行张量并行的部署

首先，安装megatron库：

In [None]:
!git clone https://github.com/NVIDIA/Megatron-LM.git
!cd Megatron-LM
%pip install -v -e .

测试安装是否正确

In [3]:
import megatron

print(megatron)

<module 'megatron' from '/mnt/configblob/users/ruizhe/Megatron-LM/megatron/__init__.py'>


### 单元测试

接下来的代码部分引用了Megatron单元测试的代码：https://github.com/NVIDIA/Megatron-LM/tree/main/tests/unit_tests

我们首先引入Megatron内做模型并行初始化的API：

In [None]:
import os
import torch
import megatron.core.parallel_state as ps
from megatron.core.tensor_parallel.data import broadcast_data


# 这个Utils类的作用是定义了初始化分布式环境，初始化模型并行环境，销毁模型并行环境的函数
class Utils:

    world_size = torch.cuda.device_count()
    # 这个地方需要使用torchrun来启动分布式环境，否则这里的rank不会在环境变量中被发现，于是就直接报错了
    rank = int(os.environ['LOCAL_RANK'])

    @staticmethod
    def initialize_distributed():
        print(f'Initializing torch.distributed with rank: {Utils.rank}, world_size: {Utils.world_size}')
        torch.cuda.set_device(Utils.rank % torch.cuda.device_count())
        init_method = 'tcp://'
        master_ip = os.getenv('MASTER_ADDR', 'localhost')
        master_port = os.getenv('MASTER_PORT', '6000')
        init_method += master_ip + ':' + master_port
        torch.distributed.init_process_group(backend='nccl', world_size=Utils.world_size, rank=Utils.rank, init_method=init_method)
        
    @staticmethod
    def destroy_model_parallel():
        ps.destroy_model_parallel()
        torch.distributed.barrier()

    ''' initial_model_parallel: 初始化模型并行环境：
    tensor_model_parallel_size: 指定张量并行级别
    pipeline_model_parallel_size: 指定模型并行级别
    virtual_pipeline_model_parallel_size: 指定虚拟模型并行级别
    pipeline_model_parallel_split_rank: 指定模型并行切分的rank
    '''
    @staticmethod
    def initialize_model_parallel(tensor_model_parallel_size = 1, pipeline_model_parallel_size = 1, virtual_pipeline_model_parallel_size = None, pipeline_model_parallel_split_rank = None):
        ps.destroy_model_parallel()
        if not torch.distributed.is_initialized():
            Utils.initialize_distributed()
        ps.initialize_model_parallel(tensor_model_parallel_size, pipeline_model_parallel_size, virtual_pipeline_model_parallel_size, pipeline_model_parallel_split_rank)


然后我们可以定义一些单元测试，比如这里的代码测试megatron的**张量广播**功能：

In [None]:
# 测试广播：指定张量并行级别为2，模型并行级别为4的这样的一个分布式环境，然后制造一些数据，看看broadcast_data的效果
def test_broadcast_data():
    Utils.initialize_model_parallel(2,4)
    input_data = {
        0 : torch.ones((8,8)).cuda() * 0.0,
        1 : torch.ones((8,8)).cuda() * 1.0,
        2 : torch.ones((8,8)).cuda() * 2.0,
        3 : torch.ones((8,8)).cuda() * 3.0,
        4 : torch.ones((8,8)).cuda() * 4.0,
        5 : torch.ones((8,8)).cuda() * 5.0,
        6 : torch.ones((8,8)).cuda() * 6.0,
        7 : torch.ones((8,8)).cuda() * 7.0
        }
    dtype = torch.float32
    # broadcast_data：将rank=0的进程的数据广播到所有进程
    actual_output = broadcast_data([0,1],input_data, dtype)
    assert(torch.equal(actual_output[0], input_data[0]))
    assert(torch.equal(actual_output[1], input_data[1]))
    
    if Utils.rank == 0:
        print("Broadcast assertion passed")
    Utils.destroy_model_parallel()

In [12]:
!TEST_BROAD=1 torchrun --nproc_per_node=8 test_megatron.py

*****************************************
Setting OMP_NUM_THREADS environment variable for each process to be 1 in default, to avoid your system being overloaded, please further tune the variable for optimal performance in your application as needed. 
*****************************************
Initializing torch.distributed with rank: 0, world_size: 8
Initializing torch.distributed with rank: 2, world_size: 8
Initializing torch.distributed with rank: 6, world_size: 8
Initializing torch.distributed with rank: 5, world_size: 8
Initializing torch.distributed with rank: 4, world_size: 8
Initializing torch.distributed with rank: 1, world_size: 8
Initializing torch.distributed with rank: 3, world_size: 8
Initializing torch.distributed with rank: 7, world_size: 8
Broadcast assertion passed


接下来进行张量并行级别，各并行张量间收集数据的测试：

In [14]:
# 测试copy_to_model_parallel_region：此时张量并行级别为4，模型并行级别为2

from megatron.core.tensor_parallel import mappings
def test_CopyToModelParallelRegion():
    Utils.initialize_model_parallel(4,2)
    input_data = torch.ones((1)).cuda()*Utils.rank
    output_data = mappings._CopyToModelParallelRegion.backward(None, input_data)
    result = torch.ones(1).cuda()
    
    # 这里result的结果：因为张量并行级别为4，所以0，1，2，3这四张卡是彼此收集数据的，他们上面的tensor值分别为(0,1,2,3)，加起来后每个便都为6；在流水线并行级别的第二层，4，5，6，7这四张卡也是彼此收集数据的，他们上面的tensor值分别为(4,5,6,7)，加起来后每个便都为22
    result = result * 22 if Utils.rank >= 4 else result * 6
    assert(torch.equal(output_data, result))
    print(f'rank: {Utils.rank}, input_data: {input_data}, output_data: {output_data}')
    assert(torch.equal(input_data, mappings.copy_to_tensor_model_parallel_region(input_data)))
    assert(torch.equal(input_data, mappings._CopyToModelParallelRegion.symbolic(None, input_data)))
    
    
    print_in_rank_zero("Copy to model parallel region test passed")
    Utils.destroy_model_parallel()

In [17]:
!TEST_COPY=1 torchrun --nproc_per_node=8 test_megatron.py

*****************************************
Setting OMP_NUM_THREADS environment variable for each process to be 1 in default, to avoid your system being overloaded, please further tune the variable for optimal performance in your application as needed. 
*****************************************
Initializing torch.distributed with rank: 5, world_size: 8
Initializing torch.distributed with rank: 2, world_size: 8
Initializing torch.distributed with rank: 6, world_size: 8
Initializing torch.distributed with rank: 0, world_size: 8
Initializing torch.distributed with rank: 7, world_size: 8
Initializing torch.distributed with rank: 1, world_size: 8
Initializing torch.distributed with rank: 3, world_size: 8
Initializing torch.distributed with rank: 4, world_size: 8
rank: 2, input_data: tensor([6.], device='cuda:2'), output_data: tensor([6.], device='cuda:2')
rank: 3, input_data: tensor([6.], device='cuda:3'), output_data: tensor([6.], device='cuda:3')
rank: 1, input_data: tensor([6.], device='cu

这部分内容与`all_reduce`的操作很像，这里就不再赘述，将命令行开头令TEST_REDUCE=1即可看到测试`all_reduce`的结果

接下来这部分是测试`all_gather_split`:

In [None]:
import megatron.core.tensor_parallel.utils as util

# 测试all_gather
def test_gather_split_1d_tensor():
    rank = Utils.rank
    Utils.initialize_model_parallel(tensor_model_parallel_size=2, pipeline_model_parallel_size=4)
    input_tensor = torch.ones((2,4)).cuda() * rank
    actual_output_tensor = util.gather_split_1d_tensor(input_tensor)
    if rank %2 == 0:
        expected_output_tensor = torch.concat((input_tensor.flatten(), input_tensor.flatten() + 1))
    else : 
        expected_output_tensor = torch.concat((input_tensor.flatten() - 1, input_tensor.flatten()))
    assert(torch.equal(actual_output_tensor, expected_output_tensor))
    Utils.destroy_model_parallel()

In [18]:
!TEST_GATHER_SPLIT=1 torchrun --nproc_per_node=8 test_megatron.py

*****************************************
Setting OMP_NUM_THREADS environment variable for each process to be 1 in default, to avoid your system being overloaded, please further tune the variable for optimal performance in your application as needed. 
*****************************************
Initializing torch.distributed with rank: 1, world_size: 8
Initializing torch.distributed with rank: 7, world_size: 8
Initializing torch.distributed with rank: 0, world_size: 8
Initializing torch.distributed with rank: 4, world_size: 8
Initializing torch.distributed with rank: 5, world_size: 8
Initializing torch.distributed with rank: 2, world_size: 8
Initializing torch.distributed with rank: 6, world_size: 8
Initializing torch.distributed with rank: 3, world_size: 8
rank: 1, input_tensor: tensor([[1., 1., 1., 1.],
        [1., 1., 1., 1.]], device='cuda:1'); output_tensor: tensor([0., 0., 0., 0., 0., 0., 0., 0., 1., 1., 1., 1., 1., 1., 1., 1.],
       device='cuda:1')
rank: 7, input_tensor: tenso

这里从all_gather的结果看出，因为是设置了张量并行级别为2，流水线并行级别为4，因此同一批次的tensor从横向是被划分到两张卡上（张量并行），所以`test_gather_split_1d_tensor()`这个函数的all_gather收集是横向收集的，而不涉及后续的纵向流水线并行。

### 模型部署

将Megatron-LM张量并行系统直接部署到一个简单的Transformer模型上。这部分内容参考Megatron-LM官方的指导文档：https://docs.nvidia.com/megatron-core/developer-guide/latest/user-guide/index.html#quick-start

由于Megatron-LM在张量并行中重新定义了FFN和Self-Attention的前向传播/反向传播过程，所以我们很难将一个现成的模型添加一些语句使其能够进行模型并行（先前提到的数据并行就不是这样，因为数据并行使用的是pytorch原生的API，因此像是已经写好的模型训练脚本可以通过简单的并行环境搭建和一些必要的改动使其能够数据并行）。此外，数据并行和megatron模型并行另一显著差异再于，megatron模型并行目前只为transformer架构的模型做了设计，不过这也符合目前主流神经网络研究的趋势。

以下代码除了在Jupyter Notebook内展示外，还全部包括在./simple_megatron_transformer.py文件中，启动该文件的方式也依然是使用命令行中的torchrun去launch这样一个分布式训练的py文件。

In [None]:
# 初始化分布式环境。这里利用了2张GPU做初始化，其中张量并行化尺寸设为1，流水线并行化尺寸设为1
import os
import torch
from megatron.core import parallel_state

def initialize_distributed(tensor_model_parallel_size = 1, pipeline_model_parallel_size = 1):
    # Torch setup for distributed training
    rank = int(os.environ['LOCAL_RANK'])
    world_size = torch.cuda.device_count()
    torch.cuda.set_device(rank)
    torch.distributed.init_process_group(world_size=world_size, rank=rank)

    # Megatron core distributed training initialization
    parallel_state.initialize_model_parallel(tensor_model_parallel_size, pipeline_model_parallel_size)

#### 利用Megatron提供的API初始化一个很简单2层的Transformer模型

In [None]:
from megatron.core.transformer.transformer_config import TransformerConfig
from megatron.core.models.gpt.gpt_model import GPTModel
from megatron.core.models.gpt.gpt_layer_specs import get_gpt_layer_local_spec

def model_provider():
    """Build the model."""

    transformer_config = TransformerConfig(
        num_layers=2,
        hidden_size=12,
        num_attention_heads=4,
        use_cpu_initialization=True,
        pipeline_dtype=torch.float32)

    gpt_model = GPTModel(
        config=transformer_config,
        transformer_layer_spec=get_gpt_layer_local_spec(),
        vocab_size=100,
        max_sequence_length=64)

    return gpt_model

#### 生成一个虚假的数据集用来训练

In [None]:
from torch.utils.data import DataLoader
from megatron.core.datasets.utils import Split
from megatron.core.datasets.gpt_dataset import GPTDatasetConfig, MockGPTDataset

def get_train_data_iterator():
    config = GPTDatasetConfig(
        is_built_on_rank=lambda:(parallel_state.is_pipeline_last_stage() or parallel_state.is_pipeline_first_stage()),
        random_seed = 0,
        sequence_length = 64,
        blend=[],
        mock=True,
        reset_position_ids=False,
        reset_attention_mask=False,
        eod_mask_loss=False,
        tokenizer="dummy")

    training_data= MockGPTDataset(Split.train, config)

    train_dataloader = DataLoader(training_data, batch_size=8, shuffle=True)

    train_iterator = iter(train_dataloader)
    return train_iterator

#### 接下来，定义前向函数。注意到这个前向函数是接受到了一个输入参数`data_iterator`，这个数据器是从dataloader中获取的。同时它的返回值是前向的输出和损失函数。这个函数输入输出的设计方式略为奇特，但megatron官方认为这样的设计更加高效。

In [None]:
from functools import partial

def forward_step_func(data_iterator, model):

    def loss_func(loss_mask: torch.Tensor, output_tensor: torch.Tensor):

        losses = output_tensor.float()
        loss_mask = loss_mask.view(-1).float()
        loss = torch.sum(losses.view(-1) * loss_mask) / loss_mask.sum()
        # If you have data parallel reduce loss across data parallel groups.
        # If pipeline parallel, loss computation is done only in last stage.

        return loss, {'lm loss': loss}

    data = next(data_iterator)
    tokens = data['tokens'].to(device)
    attention_mask = data['attention_mask'].to(device)
    position_ids = data['position_ids'].to(device)
    labels = data['labels'].to(device)
    loss_mask = data['loss_mask'].to(device)

    output_tensor = model(tokens, position_ids, attention_mask,
                          labels=labels)

    return output_tensor, partial(loss_func, loss_mask)

#### 接下来是进行分布式保存和加载checkpoint的函数。megatron设计的这种保存和加载checkpoint的方式比较高效，使得用户可以灵活地调节checkpoint。比如，一个使用张量并行级别为2的模型，可以被读取成张量并行级别为4
#### 注意：这部分代码可能需要安装额外的依赖：zarr和tensorstore:
```sh
pip install tensorstore==0.1.45
pip install zarr
```

In [None]:
from megatron.core import dist_checkpointing

def save_distributed_checkpoint(checkpoint_path, gpt_model):
    sharded_state_dict = gpt_model.sharded_state_dict(prefix='')
    dist_checkpointing.save(sharded_state_dict=sharded_state_dict, checkpoint_dir=checkpoint_path)

def load_distributed_checkpoint(checkpoint_path, gpt_model):
    sharded_state_dict=gpt_model.sharded_state_dict(prefix='')
    checkpoint = dist_checkpointing.load(sharded_state_dict=sharded_state_dict, checkpoint_dir=checkpoint_path)
    gpt_model.load_state_dict(checkpoint)
    return gpt_model

#### 接下来是训练主函数

In [None]:
from pathlib import Path
from torch.optim import Adam
from megatron.core.pipeline_parallel.schedules import get_forward_backward_func
from megatron.core.tensor_parallel.random import model_parallel_cuda_manual_seed

if __name__ == "__main__":
    initialize_distributed(tensor_model_parallel_size=2, pipeline_model_parallel_size=1)
    model_parallel_cuda_manual_seed(123)

    gpt_model = model_provider()
    device = torch.device("cuda")
    gpt_model.to(device)

    optim = Adam(gpt_model.parameters())

    train_iterator = get_train_data_iterator()

    forward_backward_func = get_forward_backward_func()

    # Running the model for 5 iterations
    for _ in range(5):
        optim.zero_grad()

        losses_reduced = forward_backward_func(
            forward_step_func=forward_step_func,
            data_iterator=train_iterator,
            model=gpt_model,
            num_microbatches=1,
            seq_length=64,
            micro_batch_size=8,
            decoder_seq_length=64,
            forward_only=False)

        optim.step()

        print(f'Losses reduced :{losses_reduced}')

    # Saving the model
    save_distributed_checkpoint(gpt_model=gpt_model, checkpoint_path='/workspace/ckpt')

    # Loading the model
    gpt_model = load_distributed_checkpoint(gpt_model=gpt_model, checkpoint_path='/workspace/ckpt')
    gpt_model.to(device)
    print('Successfully loaded the model')

#### 一切准备好之后，便可以launch分布式脚本，观察输出。注意launch脚本时的安装包依赖问题

In [None]:
!torchrun --nproc-per-node=2 simple_megatron_transformer.py