## **0. ModelScope运行环境配置**
首先，参考[ModelScope官方文档](https://www.modelscope.cn/docs)，做好运行环境配置，确定ModelScope 能在本地开发机上运行。在这个基础上进行模型代码的开发和接入工作。

## 1. 标准流程代码开发
首先贡献者需要申请Modelscope library github的代码权限，并在新的开发分支上进行新模型的开发接入工作。modelscope架构设计支持了代码原生接入（只基于tf或者pytorch），还有就是依赖第三方库的接入方式，如cv任务依赖easycv，ofa依赖fairseq。
对于依赖第三方库的方式接入sdk，部分代码需要有动态接入的机制：

1. 代码库打包成whl包， 并在model_card requirement.txt中指定whl包链接， 使用模型时自动安装
2. 代码库打成压缩包，使用模型时，动态下载压缩包加入python搜索路径下， 同时使用requirements.txt指定所依赖的第三方库

我们鼓励贡献者直接基于无第三方库的方式接入modelscope，目前modelscope的pipeline，model，preprocess等各个模块化的组件已经在各个任务上有做大量抽象，可供算法同学复用。对于无法复用的场景，如需要新添加model的时候，只需要将模型实现，前处理，后处理等分别抽象成model，放在ModelScope代码库model相应的代码库里，实现模型定义类，同时完成个部分的注册。

下面我们重点介绍一下pipeline，model 以及preprocessor以及几个重要的模块开发接入方式。
### Pipeline的接入：
我们首先来简单的看一下如何定义一个任务pipeline，从而能够完成一次简单的推理过程。


In [1]:
@PIPELINES.register_module(group_key=Tasks.text_classification, 
                           module_name=Pipelines.my_pipeline)
class MyPipeline(Pipeline):
    def __init__(self, model_dir:str, *args, **kwargs):
        super().__init__(model_dir, *args, **kwargs)
        pass
    
    def __call__(self, input: Union[Input, List[Input]], *args,
                 **kwargs) -> Union[Dict[str, Any], Generator]:
        input_ids = self.preprocessor(input)
        model_output = self.forward(input_ids)
        output = self.postprocessor(model_output)
        return output
    
    # define the preprocess 
    def preprocess(self, inputs: Dict[str, Any]):
        pass
    
    # define the forward pass
    def forward(self, *args, **kwargs) -> Dict[str, Any]:
        pass
      
    # format the outputs from pipeline
    def postprocess(self, input, **kwargs) -> Dict[str, Any]:
        pass





用户新定义的pipeline均来自于基类Pipeline , 基类主要定义了如何实例化preprocessor 和model 两个类，这里后面会讲到，并且如何连接preprocess、 forward 和 postprocess 的方法，形成一个pipeline。
#### 模块的注册
所有包括pipeline，model，preprocessor以及其他需要被管理的模块，上面的装饰器起到的作用是注册模块的作用，加上这个注册器后模块会被自动注册到对应模块字典表里面。这里主要关注在他的两个参数，如上面例子中的group_key=Tasks.text_classification 和module_name=Pipeline.my_pipeline。
首先，拿上面的例子说明，group_key 是注册在名为PIPELINES这个字典表中的一个组，而 module_name 是这个组里面具体的模块名。这两个参数在PIPELINES的字典表里面是能够唯一关联到上述模型类的。 当我们知道这两个参数是什么意思后，接下来会解释为什么分别传入Tasks.text_classification和Pipeline.my_pipeline这两个参数。
一般的，pipeline指代的是具体的细分任务， 譬如，在NLP领域一个文本分类的下游任务（text_classification），它有很多细分二级任务，如sentiment_classification, nli, sentence_similarity等。所以，在注册pipeline模块的时候，某个任务Tasks 的枚举值会被用作组名group_key，具体的Pipelines 的某个枚举值作为module_name关键字进行定义。在实践中，Tasks可以是大类任务，如 Tasks.text_classification ,而在Pipelines的模块可以是小类任务，如 Pipelines.sentiment_classification ，两者是包含关系，前者包含后者。当然对于贡献者来说，也可以把 Pipelines 定义为组织类，如Pipelines.THU_text_classification，均可。用户可以按照如下方法直接注册一个新的任务名或包含了所在机构，或者特殊实现的名称进去。


In [1]:
@MODELS.register_module(group_key=Tasks.text_classification,
                        module_name=Pipelines.THU_text_classification)






完成注册并且调通后续流程后，用户可以通过调用如下命令完成自己pipeline的命令；


In [1]:
from modelscope.pipelines import pipeline
thuClassifier = pipeline('THU_text_classification'，model_id="model_id")
inputs = '今天天气不错，适合出去游玩'
print(thuClassifier(inputs))
###  {'scores': [0.023, 0.977], 'labels':['负向'，'正向']}





#### 快速接入
Pipeline基类中已经实现了将preprocess、 forward 和 postprocess 三个方法串起来的 __call__ 方法，以及 
preprocess中如何调用 preprocessor类的实现，在forward中如何调用 model类的实现，因此在绝大多数情况下我们不需要考虑去变动 __call__、preprocess 和forward的三个方法。
如下接着上面text_classification 的model举例，我们新加入一个 pipeline的时候，实现如下：


In [1]:
@PIPELINES.register_module(
    Tasks.text_classification, module_name=Pipelines.sentiment_classification)
class TextClassificationPipeline(Pipeline):

    def __init__(self,
                 model: Union[BertForTextClassification, str],
                 preprocessor: SequenceClassificationPreprocessor = None,
                 **kwargs):
        """use `model` and `preprocessor` to create a nlp text classification pipeline for prediction

        Args:
            model (BertForSequenceClassification): a model instance
            preprocessor (SequenceClassificationPreprocessor): a preprocessor instance
        """
        assert isinstance(model, str) or isinstance(model, BertForSequenceClassification), \
            'model must be a single str or BertForSequenceClassification'
        sc_model = model if isinstance(
            model,
            BertForSequenceClassification) else Model.from_pretrained(model)
        if preprocessor is None:
            preprocessor = SequenceClassificationPreprocessor(
                sc_model.model_dir,
                first_sequence='sentence',
                second_sequence=None,
                sequence_length=kwargs.pop('sequence_length', 512))
        super().__init__(model=sc_model, preprocessor=preprocessor, **kwargs)

        assert hasattr(self.model, 'id2label'), \
            'id2label map should be initalizaed in init function.'

    def postprocess(self,
                    inputs: Dict[str, Any],
                    topk: int = 5) -> Dict[str, str]:
        """process the prediction results

        Args:
            inputs (Dict[str, Any]): input data dict
            topk (int): return topk classification result.

        Returns:
            Dict[str, str]: the prediction results
        """
        # NxC np.ndarray
        probs = inputs['probs'][0]
        num_classes = probs.shape[0]
        topk = min(topk, num_classes)
        top_indices = np.argpartition(probs, -topk)[-topk:]
        cls_ids = top_indices[np.argsort(probs[top_indices])]
        probs = probs[cls_ids].tolist()

        cls_names = [self.model.id2label[cid] for cid in cls_ids]

        return {OutputKeys.SCORES: probs, OutputKeys.LABELS: cls_names}





实际使用过程中pipeline基本已经定制好，相关已有的pipeline可以参考[文档](https://www.modelscope.cn/docs/%E6%A8%A1%E5%9E%8B%E7%9A%84%E6%8E%A8%E7%90%86Pipeline)

### Model接入：
Modelscope是以model 为核心（Model Centric）构建的。所以，model 的api设计是重点也是核心，如下展示的是一个model的api。


In [1]:
@MODELS.register_module(group_key=Tasks.text_classification, module_name=Models.my_model)
class MyClassificationModel(Model):
    def __init__(self, model_dir:str, *args, **kwargs):
        super().__init__(model_dir, *args, **kwargs)
        pass

        # define the forward pass
    def forward(self, *args, **kwargs) -> Dict[str, Any]:
        pass

        # format the outputs from model
    def postprocess(self, input, **kwargs) -> Dict[str, Any]:
        ...
        return {
            OutputKeys.prediction: pred,
            OutputKeys.probobility: probs,
            OutputKeys.logits: logits
        }





#### 模块的注册
设计相关解释可以参考上面pipeline部分，下面具体解释group_key和module_name在model里面的用法。
一般的，model包括纯主干部分和包含下游任务的主干模型，由此可以看出任务和主干模型是可以解耦的。 譬如，在NLP领域一个文本分类的下游任务（text_classification），它的主干模型可以来自于近年很火的 bert 也可以是来自达摩院的 structbert 或者veco 等模型。 于是，当我们的模型是利用bert，完成一次text_classification任务的时候，模型就可以注册为。


In [1]:
@MODELS.register_module(group_key=Tasks.text_classification,
                        module_name=Models.bert)







#### 快速接入
在上面的类方法中，__init__定义了模型的结构，forward完成了模型的前向计算过程，这里用户可以根据实际算法自行实现。 postprocess主要是进行标准化的输出，以便和下游的pipeline或者finetune任务对齐的适配层，用户可以自行根据实际情况确认。
#### configuration介绍
在加载模型文件的过程中，configuration.json为必选文件，同时也会根据里面的配置去在注册器中自动寻找并加载相应的模块完成任务。以下再简单罗列一些配置，详细可以参考config[设计文档](https://www.modelscope.cn/docs/Configuration%E8%AF%A6%E8%A7%A3)，： 

- framework(必填)： 模型运行所需的框架， 例如pytorch，tensorflow，kaldi等。
- task(必填)： 模型所支持的任务类型，可以是str或者 str列表。
- pipeline(必填): 推理使用的pipeline类型。
- model (可选): 模型实例化相关参数配置，具体参数请直接参考对应模型库的configuration.json示例。
- dataset(可选): 训练评估过程中使用的数据集配置信息。
- preprocessor(可选): 训练评估过程中使用的预处理配置
- train(可选)：用以配置训练过程中的超参数，例如模型保存目录、训练轮数、优化器、学习率等参数。
- evaluation(可选): 用以配置评估过程中数据读取、评估指标等参数。

一个完整的最小配置文件示例如下，仅在调用pipeline推理时可用：


In [1]:
{
    "framework": "pytorch",
    "task": "text-classification",
    "pipeline": {
       "type": "sentiment-classification"
    }
}






一个常见的配置文件示例如下
```
{
    "framework": "pytorch",
    "task": "text-classification",
    "pipeline": {
        "type": "sentiment-classification"
    },
    "preprocessor": {
        "type": "seq-cls-tokenizer",
        "first_sequence": "sentence1",
    },
    "model": {
        "type": "bert",
        "attention_probs_dropout_prob": 0.1,
        "position_embedding_type": "absolute",
        "transformers_version": "4.6.0.dev0",
        "type_vocab_size": 2,
        "use_cache": true,
        "vocab_size": 21128
    },
    "dataset": {
        "train": {
            "name": "modelscope/afqmc_small",
            "split": "train"
        },
        "val": {
            "name": "modelscope/afqmc_small",
            "split": "val"
        }
    }
    "train": {
        "work_dir": "/tmp",
        "max_epochs": 10,
        "dataloader": {
            "batch_size_per_gpu": 2,
            "workers_per_gpu": 1
        },
        "optimizer": {
            "type": "SGD",
            "lr": 0.01,
        },
        "lr_scheduler": {
            "type": "StepLR",
            "step_size": 2,
        },
        "hooks": [{
            "type": "CheckpointHook",
            "interval": 1
        }]
    },
    "evaluation": {
        "dataloader": {
            "batch_size_per_gpu": 2,
            "workers_per_gpu": 1,
            "shuffle": false
        },
        "metrics": ["accuracy"]
    }
}
```
#### 高阶接入
当然，在上述例子中过多的模型对应相同的任务text_classficiation，会有很多代码重复，为了解决这个问题，目前我们已经提供了部分可供复用的任务模型。如下：


In [1]:
@MODELS.register_module(group_key = Tasks.text_classification, 
                        module_name=TaskModels.text_classification)






其中，TaskModels依赖于backbone-head的已有的抽象，仅仅需要改动configuration.json即可。如下为针对上面configuration的改动


In [1]:
{
  "task": "text-classification",
  "framework": "pytorch",
  "preprocessor": {
    "type": "sen-cls-tokenizer",
    "first_sequence": "sentence1"
  },
  "model": {
    "type": "text-classification",
    "backbone": {
      "type": "bert",
      "prefix": "encoder",
      "attention_probs_dropout_prob": 0.1,
      "position_embedding_type": "absolute",
      "transformers_version": "4.6.0.dev0",
      "type_vocab_size": 2,
      "use_cache": true,
      "vocab_size": 21128
    },
    "head": {
      "type": "text-classification",
      "hidden_dropout_prob": 0.1,
      "hidden_size": 768
    }
  },
  "pipeline": {
    "type": "sentiment-classification"
  },
  ...
}





可以看到，原本model.type是bert（即Models.bert）的地方，变换到了model.backbone.type，现在model.type改成 text-classification (即 TaskModels.text_classification)，这与设计吻合。同时还有变化的是多了一个model.head，对应的type是text-classification （即 Heads.text_classification）。
其中backbone和head的抽象如下，目前已经常用的NLP，CV模型已经有实现backbone、head注册，用户可以直接使用。如果需要新接入的话，可以参考以下案例，目前该方法仅支持pytorch的接入方式。


In [1]:
@BACKBONES.register_module(Fields.nlp, module_name=Models.structbert)
class SbertModel(TorchModel, SbertModelTransform):
    def __init__(self, model_dir=None, add_pooling_layer=True, **config):
        """
        Args:
            model_dir (str, optional): The model checkpoint directory. Defaults to None.
            add_pooling_layer (bool, optional): to decide if pool the output from hidden layer. Defaults to True.
        """
        config = SbertConfig(**config)
        super().__init__(model_dir)
        self.config = config
        SbertModelTransform.__init__(self, config, add_pooling_layer)

    def extract_sequence_outputs(self, outputs):
        return outputs['last_hidden_state']

    def extract_pooled_outputs(self, outputs):
        return outputs['pooler_output']

    def forward(self,
                input_ids=None,
                attention_mask=None,
                token_type_ids=None,
                position_ids=None,
                head_mask=None,
                inputs_embeds=None,
                encoder_hidden_states=None,
                encoder_attention_mask=None,
                past_key_values=None,
                use_cache=None,
                output_attentions=None,
                output_hidden_states=None,
                return_dict=None,
                **kwargs):
        return SbertModelTransform.forward(
            self, input_ids, attention_mask, token_type_ids, position_ids,
            head_mask, inputs_embeds, encoder_hidden_states,
            encoder_attention_mask, past_key_values, use_cache,
            output_attentions, output_hidden_states, return_dict, **kwargs)






In [1]:
@HEADS.register_module(
    Tasks.text_classification, module_name=Heads.text_classification)
class SequenceClassificationHead(TorchHead):
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        config = self.config
        self.num_labels = config.num_labels
        classifier_dropout = (
            config['classifier_dropout'] if config.get('classifier_dropout')
            is not None else config['hidden_dropout_prob'])
        self.dropout = nn.Dropout(classifier_dropout)
        self.classifier = nn.Linear(config['hidden_size'],
                                    config['num_labels'])
    def forward(self, inputs=None):
        if isinstance(inputs, dict):
            assert inputs.get('pooled_output') is not None
            pooled_output = inputs.get('pooled_output')
        else:
            pooled_output = inputs
        pooled_output = self.dropout(pooled_output)
        logits = self.classifier(pooled_output)
        return {OutputKeys.LOGITS: logits}
    
    def compute_loss(self, outputs: Dict[str, torch.Tensor],
                     labels) -> Dict[str, torch.Tensor]:
        logits = outputs[OutputKeys.LOGITS]
        return {OutputKeys.LOSS: F.cross_entropy(logits, labels)}





### Preprocessor的接入：
#### 模块的注册：
值得注意的是，preocessor的注册组group_key为Fields的枚举值，如 Fields.nlp值得就是nlp领域，一般前处理与任务无关，主要与领域相关，如文本、图像、视频等。在此基础之上，由于前处理较为复杂多样，module_name可以自由定义，或者与任务相关，或者与模型相关，只要唯一确定即可。

#### 快速接入：
由于数据在进入模型推理或者训练前需要进行处理，而这些处理是多种多样的，尤其是在做finetune的时候数据源不同的情况下前处理也会和pipeline有所不同，因此架构设计时候，将preprocessor 单独设计一个模块,以便和model， pipeline和finetune都解藕，用户可以在preprocessor中的__call__ 方法上自己定义如何实现前处理。下面定义了一个 text_classification 的preprocessor例子如下所示，具体可参考相关论文了解具体方法背后原理，这里不过多赘述。 


In [1]:
@PREPROCESSORS.register_module(
    Fields.nlp, module_name=Preprocessors.seq_cls_tokenizer)
class SequenceClassificationPreprocessor(Preprocessor):
    def __init__(self, model_dir: str, *args, **kwargs):
        """preprocess the data
        Args:
            model_dir (str): model path
        """
        super().__init__(*args, **kwargs)
        from easynlp.modelzoo import AutoTokenizer
        self.model_dir: str = model_dir
        self.first_sequence: str = kwargs.pop('first_sequence',
                                              'first_sequence')
        self.second_sequence = kwargs.pop('second_sequence', 'second_sequence')
        self.sequence_length = kwargs.pop('sequence_length', 128)
        self.tokenizer = AutoTokenizer.from_pretrained(self.model_dir)
        print(f'this is the tokenzier {self.tokenizer}')
        self.label2id = parse_label_mapping(self.model_dir)

    @type_assert(object, (str, tuple, Dict))
    def __call__(self, data: Union[str, tuple, Dict]) -> Dict[str, Any]:
        feature = super().__call__(data)
        if isinstance(data, str):
            new_data = {self.first_sequence: data}
        elif isinstance(data, tuple):
            sentence1, sentence2 = data
            new_data = {
                self.first_sequence: sentence1,
                self.second_sequence: sentence2
            }
        else:
            new_data = data
        # preprocess the data for the model input

        rst = {
            'id': [],
            'input_ids': [],
            'attention_mask': [],
            'token_type_ids': [],
        }
        max_seq_length = self.sequence_length
        text_a = new_data[self.first_sequence]
        text_b = new_data.get(self.second_sequence, None)
        feature = self.tokenizer(
            text_a,
            text_b,
            padding='max_length',
            truncation=True,
            max_length=max_seq_length)
        rst['id'].append(new_data.get('id', str(uuid.uuid4())))
        rst['input_ids'].append(feature['input_ids'])
        rst['attention_mask'].append(feature['attention_mask'])
        rst['token_type_ids'].append(feature['token_type_ids'])
        return rst







### 模型文件准备
接下来，贡献者先按照model hub[模型创建以及上传指导](https://www.modelscope.cn/docs/%E6%A8%A1%E5%9E%8B%E7%9A%84%E5%88%9B%E5%BB%BA%E4%B8%8E%E6%96%87%E4%BB%B6%E4%B8%8A%E4%BC%A0)，完成相关模型文件的上传。这里给出几个标准模型文件layout供参考。
note 1: top-level文件夹无需创建，直接将文件上传到模型repo的root即可）
note 2: README.ipynb文件的组织和撰写，参见Section 4.
note 3: 对于有C++额外环境需求的模型，后续将提供更加详细的代码/binaries接入方法。
上传完成后，可以获得一个model_id，用于关联使用该模型。

- PyTorch Bert 模型：


In [1]:
bert-base/
├── README.ipynb
├── label_mapping.json
├── configuration.json
├── pytorch_model.pt
├── train_config.json
└── vocab.txt






-  Tensorflow Image-Matting模型 (frozen graph)


In [1]:
image_matting_person/
├── README.ipynb
├── configuration.json
└── tf_graph.pb






- Tensorflow Image-Matting模型 (saved model格式)


In [1]:
image_matting_person/
├── README.ipynb
├── configuration.json
├── saved_model.pb
└── variables
    ├── variables.data-00000-of-00001
    └── variables.index






#### 版本管理
目前ModelScope上模型的版本体系在逐步完善中。对于hub上的模型，默认使用的是hub的master分支上的最新版本。而对于贡献者**已经发布**的ModelScope SDK/library版本是针对master的一个历史版本做的测试，对于更新hub上模型文件（包括模型checkpoint，配置文件等等）的需求，分为两种场景：

- 向前兼容的更新，这种情况目前可以直接更新hub的master head。
- 模型文件的breadking change（包括模型结构的修改，配置文件定义的改动等等），需要对应模型使用（pipeline/finetune等）代码的改动，而代码的改动只能在library的master分支上更新。这种情况下，hub上文件的直接更新，可能会导致线上已经发布的SDK library以及镜像等被break而不可用。

为了防止第二种情况的出现，目前的解决方案是对于breaking的改动：

- 单独在hub上拉一个git分支来更新（比如新加一个'beta'分支），这样子hub master上的 binaries保持不变。
- library 的master分支上最新的代码，则去适配新hub分支（比如'beta'）的模型文件：对于模型的使用，比如测试里构建pipeline/finetune逻辑，都指定REVISION='beta'，通过这种方式来定制化新代码和更新的binaries的适配。更多的具体例子，可以参考当前library里已有的用法（搜索'beta'关键词）。
- library的新代码发布到线上后，则hub上的beta分支就可以并入master分支。代码里的REVISION指定逻辑也可以去掉。

当然这种方式目前假设的是，用户在使用ModelScope的library SDK时，只有更新到最新的发布版本，才保障模型的可用性。这对于MLOps的诉求是不够的，ModelScope上更完善的模型版本管理机制正在设计实现中。

### finetune的接入（待补充）：
待添加

## 2. 大模型代码开发
本文档适用于使用** Megatron mpu **进行训练的大模型接入 modelscope。
对于单 GPU 显存无法加载的大模型，在单机多卡环境下 tensor 并行相比于 pipeline 并行显存效率要更高，模型本身使用 mpu 训练的代码基本不需改动即可便利地接入到 modelscope 中。目前将模型 inference 接入modelscope pipeline 时主要使用**单机多卡 tensor 并行**。
由于 Megatron 更新过程中对算子的替换和代码的更新，V1 与 V3 版本存在部分不兼容的问题。为了兼容使用不同版本 mpu 训练的大模型，我们将 V1 与 V3 中一部分 Megatron 与 mpu 相关的调用打包到** megatron-util **在 modelscope 中使用，可以通过以下命令安装。


In [1]:
pip install "modelscope[nlp]" -f https://modelscope.oss-cn-beijing.aliyuncs.com/releases/repo.html





### pipeline接入
大模型接入时，pipeline 中的 preprocessor 与 postprocessor 部分与中小模型接入没有区别，只跟模型本身相关，请参考上述标准模型接入方案。
模型接入 modelscope 需要提供使用 pipeline 的调用方式，为保持 modelscope pipeline 调用的一致性，大模型不需使用** torchrun / python -m torch.distributed.launch **等常用多进程调用方式，对于多进程的调度将由 DistributedPipeline 进行封装。
#### DistributedPipeline介绍
大模型接入 pipeline 时可继承此类实现自己的 Pipeline 类。此文档对于接入时的使用方法进行简单的介绍，以下为 DistributedPipeline 的主要代码结构：


In [1]:
class DistributedPipeline(Pipeline):
    def __init__(self,
                 model: str = None,
                 preprocessor: Union[Preprocessor, List[Preprocessor]] = None,
                 auto_collate=True,
                 **kwargs):
        # ...
        if torch.multiprocessing.get_start_method(allow_none=True) is None:
            torch.multiprocessing.set_start_method('spawn')

        ranks = list(range(self.world_size))
        self.model_pool = Pool(self.world_size)
        master_ip = '127.0.0.1' if 'master_ip' not in kwargs else kwargs[
            'master_ip']
        master_port = '29500' if 'master_port' not in kwargs else kwargs[
            'master_port']
        if not _is_free_port(int(master_port)):
            master_port = str(_find_free_port())
        self.model_pool.map(
            partial(
                self.__class__._instantiate_one,
                model_dir=self.model_dir,
                master_ip=master_ip,
                master_port=master_port,
                **self.cfg.model,
                **kwargs), ranks)

    @classmethod
    def _instantiate_one(cls, rank, model_dir, **kwargs):
        """Instantiate one model piece.

        @param rank: The model rank.
        @param model_dir: The model_dir in the node.
        @param kwargs: Any extra args.
        @return: None. The model handler should be kept in the class field.
        """
        pass

    def forward(self, inputs: Dict[str, Any],
                **forward_params) -> Dict[str, Any]:
        inputs = {
            'inputs': inputs,
            'forward_params': forward_params,
        }
        res = self.model_pool.map(self.__class__._forward_one,
                                  [inputs] * self.world_size)
        return res[0]

    @classmethod
    def _forward_one(cls, inputs):
        """Forward the inputs to one model piece.

        Use the model handler kept in the class field to forward.

        @param inputs: The inputs after the preprocessing.
        @return: The forward results.
        """
        pass





DistributedPipeline 的初始化过程包含了必要的 rank，进程池，同步通讯使用的 ip 及端口的配置，并在各个子进程中进行了_instantiate_one 方法的调用。
在继承后实现 pipeline 时，**需要注意保持 _instantiate_one 为类方法**，目的是使进程池中的每个子进程均初始化完成对应 rank 的模型，得到 cls.model，每片模型的生命周期均与当前进程相同。_instantiate_one 函数无返回值，调用后此进程的 cls.model 初始化完成，可以在 _forward_one 中对应地调用。
主进程中调用 forward 即为在进程池中分别调用子进程的 _forward_one。
同理**需要保持  _forward_one 为类方法**，通常对应各进程中模型 cls.model 的 forward 方法，根据 pipeline 对应的 task 不同，由开发者自行定义和实现。
#### 单片模型的初始化与调用
此部分以 GPT3 大模型为例简单介绍模型的初始化与调用方法：


In [1]:
from modelscope.utils.nlp.distributed import initialize_distributed
from modelscope.utils.nlp.load_checkpoint import pre_load
from modelscope.utils.torch_utils import set_random_seed_mpu

@PIPELINES.register_module(
    Tasks.text_generation, module_name=Pipelines.gpt3_generation)
class DistributedGPT3Pipeline(DistributedPipeline):
    @classmethod
    def _instantiate_one(cls, rank, model_dir, **kwargs):
        cls.model = DistributedGPT3(model_dir, rank, **kwargs)
        cls.model.eval()

class DistributedGPT3(TorchModel):
    def __init__(self,
                 model_dir,
                 rank,
                 path_load_tag='model',
                 *args,
                 **kwargs):
        super().__init__(model_dir, *args, **kwargs)
        initialize_distributed(rank, mpu, kwargs['world_size'],
                               kwargs['model_parallel_size'],
                               kwargs['master_ip'], kwargs['master_port'])
        seed = 0 if 'seed' not in kwargs else kwargs['seed']
        set_random_seed_mpu(seed)

        self.config = GPT3Config.from_pretrained(model_dir)
        # Build model.
        model = GPT3Model(self.config)

        # GPU allocation.
        model.cuda(torch.cuda.current_device())
                     
        self.dist_model = model
        load_model = pre_load(mpu, model_dir, tag=path_load_tag)
        self.dist_model.load_state_dict(load_model)





如不考虑 DistributedGPT3 的封装， _instantiate_one 实际上调用了 initialize_distributed 与set_random_seed_mpu，构造了 GPT3Model，移动到对应的 GPU 上，且加载了模型参数。
initialize_distributed 函数需传入 mpu 作为参数，内部调用 torch.distributed 及 mpu 对各个子进程进行了初始化，参考已有代码即可。


In [1]:
@PIPELINES.register_module(
    Tasks.text_generation, module_name=Pipelines.gpt3_generation)
class DistributedGPT3Pipeline(DistributedPipeline):

    @classmethod
    def _forward_one(cls, inputs: Dict[str, Any]) -> Dict[str, Any]:
        tokens = inputs['inputs']['input_ids'].cuda(
            torch.cuda.current_device())
        return cls.model.generate(tokens)





相比之下 _forward_one 要更为简单，由于与模型的拆分与通讯全部使用 mpu 管理，_forward_one 与单卡调用相同即可。
#### megatron-util 与 mpu 的使用
由于 Megatron V3 版本更新时间较短，现有大模型大多基于 V1 版本构建，因此目前 megatron-util 中提供的mpu 相关 api 主要基于 V1 版本，因此**如果原有代码基于 V1 则不需进行修改**，以下列出基于 V3 的大模型接入时需要修改的常用类与函数：

| Megatron-V3 | megatron-util |
| --- | --- |
| ColumnParallelLinear | ColumnParallelLinearV3 |
| RowParallelLinear | RowParallelLinearV3 |
| get_model_parallel_rank | get_tensor_model_parallel_rank |
| get_model_parallel_src_rank | get_tensor_model_parallel_src_rank |
| get_model_parallel_world_size | get_tensor_model_parallel_world_size |

其他如 get_data_parallel_rank 等多数函数对于基于 V1 或 V3 的大模型是通用的。megatron-util 库目前还在开发中，如果缺少您需要的代码功能，请联系项目组。
如果接入的大模型代码中使用了 Megatron-V3 更新的部分算子，如 LayerNorm，bias_gelu_impl，FusedScaleMaskSoftmax 等，可像在 megatron 中一样直接 import。


In [1]:
# 一个例子
from megatron.model import (AttnMaskType, LayerNorm,
                            bias_gelu_impl)
from megatron.model.fused_softmax import FusedScaleMaskSoftmax





第一次运行 import 时会自动为这些算子编译对应的 cpp 或 cu 代码，若已编译过会跳过并正常调用。
 如果接入的大模型需要使用 fp16 或 bf16，可以使用 Float16Module 进行包装：


In [1]:
from megatron.model import Float16Module

assert not (args.fp16 and args.bf16) and (args.fp16 or args.bf16)
model = Float16Module(model, args)





此后大多数情况下可以当作直接使用 model 本身即可，如需调用 model 的其他方法，可以通过 model.module 进行调用。
实际调用的时候参考方式同其他pipeline一样，传入任务Tasks，和model_id即可。

### finetune接入（待补充）：

## 3. 代码测试合并流程（待补充）
在新的分支上完成代码功能开发和测试后，可以参考标准github上的标准流程，提交merge request进master，在完成code reiew流程并被接受后可以合并进ModelScope lib代码库。

## 4. Model Card的完整与质量（待补充）
README.ipynb是ModelScope用户，即模型的使用者了解模型的重要途径，其内容除了提供模型必要的meta信息以外，还会用来渲染ModelScope网页上模型的landing page。所以每个Modelscope上的模型，除了加载模型所必要的bianries和配置文件以外，README.ipynb对于模型能被正常使用也具有至关重要的作用。在一个模型正式被ModelScope接受之前，请务必保证README.ipynb文件参照如下规范填写。



