In [1]:

import paddle
import numpy as np
import types
import paddleslim
import copy

import paddle
import paddle.nn.functional as F
from paddle.io import Dataset
from paddle.nn import Conv2D, Linear, ReLU, Sequential
from paddle.quantization import QAT, QuantConfig
from paddle.quantization.quanters import FakeQuanterWithAbsMaxObserver
from paddle.quantization.quanters.abs_max import (
    FakeQuanterWithAbsMaxObserverLayer,
)
from paddle.vision.models import resnet18




In [2]:
model = resnet18()

# def _visit(model):
    
#     for _name, _layer in model.named_sublayers():
#         print(f"name: {_name}; layer type: {type(_layer)}; layer full name: {_layer.full_name()}")
#         _visit(_layer)
        
# _visit(model)
        

In [3]:

def _find_parent_layer_and_sub_name(model, layer):
    for _name, _sub_layer in model.named_sublayers():
        if layer.full_name() == _sub_layer.full_name():
            return model, _name
        else:
            result = _find_parent_layer_and_sub_name(_sub_layer, layer)
            if result is not None:
                return result

def replace_layer(model, source, target):
    
    parent_layer, sub_name = _find_parent_layer_and_sub_name(model, source)
    parent_layer._sub_layers[sub_name] = target
    setattr(parent_layer, sub_name, target)
    #print(f"replace {sub_name} of {parent_layer.full_name()} from {type(source)} to {type(target)}")

class ConvBNWrapper(paddle.nn.Layer):
    def __init__(self, conv, bn):
        super(ConvBNWrapper, self).__init__()
        self._conv = conv
        self._bn = bn
    def forward(self, inputs):
        return self._bn(self._conv(inputs))
    
    

class Constraint():
    """在量化训练或离线量化过程中，需要遵循的约束。可以是且不限于以下几种约束：
    1. Operators Fusion: 将多个Operators当做一个融合的Operator，只量化融合Operator的输入和输出
    2. 多个Tensors的量化相互影响：比如多个Tensors量化参数需要保持一致，或需要满足更复杂的要求
    3. 统计量化参数的过程与常规的forward流程不一样，需要特殊处理
    """
    
    def apply(self, model, qconfig):
        """将约束应用到目标模型上，并更新量化配置信息。应该在量化训练和离线量化校准操作前执行该方法。
        该方法会直接inplace地对model和qconfig进行操作。
        该方法为抽象方法，所有继承Constraint的子类都应该实现该方法。
        """
        pass
        
    
class FusionConstraint(Constraint):
    """Define some functoins used to fuse operators.
    """
    
    def replace_layer(model, source, target):
    
        parent_layer, sub_name = _find_parent_layer_and_sub_name(model, source)
        parent_layer._sub_layers[sub_name] = target
        setattr(parent_layer, sub_name, target)
    
    def _fuse_ops(self, model, fused_layer_type, pair):
        fused_op = fused_layer_type(*pair)
        for _layer in pair[1:]:
            replace_layer(model, _layer, paddle.nn.Identity())
            del self._config._layer2config[_layer]
            
        replace_layer(model, pair[0], fused_op)
        self._config._layer2config[fused_op] = self._config._layer2config[pair[0]]
    
class FreezedConvBNConstraint(FusionConstraint):
    
    
class TIConvBiasConstraint(FusionConstraint):
    

class TIQAT(QAT):
    def __init__(self, q_config):
        super(TIQAT, self).__init__(q_config)


    def _analysis_and_fuse_ops(self, model, inputs):
        assert inputs is not None
        tracer = paddleslim.core.GraphTracer(model)
        tracer(inputs)
        graph = tracer.graph

        if self._config.need_fuse_conv_bn:
            conv_bn_pairs = graph.find_conv_bn()
            for pair in conv_bn_pairs:
                pair = [node._layer for node in pair]
                self._fuse_ops(model, ConvBNWrapper, pair)
                self._config.add_qat_layer_mapping(ConvBNWrapper, paddleslim.quant.nn.QuantedConv2DBatchNorm)

    def _fuse_ops(self, model, fused_layer_type, pair):
        fused_op = fused_layer_type(*pair)
        for _layer in pair[1:]:
            replace_layer(model, _layer, paddle.nn.Identity())
            del self._config._layer2config[_layer]
            
        replace_layer(model, pair[0], fused_op)
        self._config._layer2config[fused_op] = self._config._layer2config[pair[0]]

    def quantize(self, model: paddle.nn.Layer, inplace=False, inputs=None):
        _model = model if inplace else copy.deepcopy(model)
        self._config._specify(_model)
        self._analysis_and_fuse_ops(_model, inputs)
        self._convert_to_quant_layers(_model, self._config)
        self._insert_activation_observers(_model, self._config)
        return _model

In [4]:
quanter = FakeQuanterWithAbsMaxObserver(moving_rate=0.9)
q_config = QuantConfig(activation=quanter, weight=quanter)
q_config.need_fuse_conv_bn = True
qat = TIQAT(q_config)
x = paddle.rand([1, 3, 224, 224])
quant_model = qat.quantize(model, inputs=x)
#paddle.jit.save(quant_model, "./qat_model", input_spec=[x])
# convert_model = qat.convert(quant_model)
# paddle.jit.save(convert_model, "./fp_infer", input_spec=[x])
# out = quant_model(x)
# out.backward()



self._qat_layer_mapping: {<class 'paddle.nn.quant.stub.Stub'>: <class 'paddle.nn.quant.stub.QuanterStub'>, <class 'paddle.nn.layer.common.Linear'>: <class 'paddle.nn.quant.qat.linear.QuantedLinear'>, <class 'paddle.nn.layer.conv.Conv2D'>: <class 'paddle.nn.quant.qat.conv.QuantedConv2D'>, <class '__main__.ConvBNWrapper'>: <class 'paddleslim.quant.nn.conv_bn.QuantedConv2DBatchNorm'>}
self._qat_layer_mapping: {<class 'paddle.nn.quant.stub.Stub'>: <class 'paddle.nn.quant.stub.QuanterStub'>, <class 'paddle.nn.layer.common.Linear'>: <class 'paddle.nn.quant.qat.linear.QuantedLinear'>, <class 'paddle.nn.layer.conv.Conv2D'>: <class 'paddle.nn.quant.qat.conv.QuantedConv2D'>, <class '__main__.ConvBNWrapper'>: <class 'paddleslim.quant.nn.conv_bn.QuantedConv2DBatchNorm'>}
self._qat_layer_mapping: {<class 'paddle.nn.quant.stub.Stub'>: <class 'paddle.nn.quant.stub.QuanterStub'>, <class 'paddle.nn.layer.common.Linear'>: <class 'paddle.nn.quant.qat.linear.QuantedLinear'>, <class 'paddle.nn.layer.conv.C

In [5]:
print(quant_model)

ResNet(
  (conv1): QuantedConv2DBatchNorm(
    (bn): BatchNorm(
      (_bn): BatchNorm2D(num_features=64, momentum=0.9, epsilon=1e-05)
    )
    (weight_quanter): FakeQuanterWithAbsMaxObserverLayer()
    (activation_quanter): FakeQuanterWithAbsMaxObserverLayer()
  )
  (bn1): Identity()
  (relu): ObserveWrapper(
    (_observer): FakeQuanterWithAbsMaxObserverLayer()
    (_observed): ReLU()
  )
  (maxpool): ObserveWrapper(
    (_observer): FakeQuanterWithAbsMaxObserverLayer()
    (_observed): MaxPool2D(kernel_size=3, stride=2, padding=1)
  )
  (layer1): Sequential(
    (0): BasicBlock(
      (conv1): QuantedConv2DBatchNorm(
        (bn): BatchNorm(
          (_bn): BatchNorm2D(num_features=64, momentum=0.9, epsilon=1e-05)
        )
        (weight_quanter): FakeQuanterWithAbsMaxObserverLayer()
        (activation_quanter): FakeQuanterWithAbsMaxObserverLayer()
      )
      (bn1): Identity()
      (relu): ObserveWrapper(
        (_observer): FakeQuanterWithAbsMaxObserverLayer()
        (_o

### 导出ONNX格式模型

In [None]:
!paddle2onnx --model_dir ./ \
            --model_filename fp_infer.pdmodel \
            --params_filename fp_infer.pdiparams \
            --save_file fp_infer.onnx \
            --opset_version 13

In [None]:
import paddleslim
print(dir(paddleslim.quant))
print(paddleslim.__file__)