In [7]:
import torch
from torch.quantization.observer import MinMaxObserver, MovingAverageMinMaxObserver, HistogramObserver

# 定义通道数和序列长度
C, L = 3, 4
# 创建一个正太分布随机数生成器
normal = torch.distributions.normal.Normal(0, 1)

# 生成两个随机输入张量
inputs = [normal.sample((C, L)), normal.sample((C, L))]

for x in inputs:
    print(x.shape)

print(inputs)
# [tensor([[-0.0590,  1.1674,  0.7119, -1.1270],
#          [-1.3974,  0.5077, -0.5601,  0.0683],
#          [-0.0929,  0.9473,  0.7159, -0.4574]]]),

# tensor([[-0.0236, -0.7599,  1.0290,  0.8914],
#          [-1.1727, -1.2556, -0.2271,  0.9568],
#          [-0.2500,  1.4579,  1.4707,  0.4043]])]

# 创建观察者对象
observers = [MinMaxObserver(),          # 最小值、最大值 观察者
             MovingAverageMinMaxObserver(),     # 移动平均最小值、最大值 观察者
             HistogramObserver()]

# 遍历观察者对象列表
# 对于每一个观察者 遍历输入数据并且使用观察者对象 观察每一个输入张量
# 打印观察者的类名以及观察之后的计算的量化参数 这些参数包括量化的最小值和最大值


for obs in observers:
    for x in inputs:
        obs(x) # 使用观察者对象 观察输入数据
    print(obs.__class__.__name__, obs.calculate_qparams())
    # MinMaxObserver (tensor([0.0112]), tensor([124], dtype=torch.int32))
    # MovingAverageMinMaxObserver (tensor([0.0101]), tensor([139], dtype=torch.int32))
    # HistogramObserver (tensor([0.0100]), tensor([106], dtype=torch.int32))

torch.Size([3, 4])
torch.Size([3, 4])
[tensor([[ 1.0503, -0.4696, -1.0716,  0.1848],
        [-0.2722, -1.7757, -0.2725,  1.0091],
        [-0.3572,  1.0260, -0.4611, -1.3358]]), tensor([[ 0.5702,  3.0193,  0.7060, -0.6825],
        [-0.1247,  0.5108,  0.0955,  0.3479],
        [ 1.8009,  1.8067,  1.3069, -0.2442]])]
MinMaxObserver (tensor([0.0188]), tensor([94], dtype=torch.int32))
MovingAverageMinMaxObserver (tensor([0.0111]), tensor([159], dtype=torch.int32))
HistogramObserver (tensor([0.0188]), tensor([94], dtype=torch.int32))


In [8]:
for qscheme in [torch.per_tensor_affine, torch.per_tensor_symmetric]:
    # 计算移动最大平均值 和最小平均值
    obs = MovingAverageMinMaxObserver(qscheme=qscheme)
    for x in inputs:
        obs(x)
    print(f"Qscheme: {qscheme} | {obs.calculate_qparams()}")
    # Qscheme: torch.per_tensor_affine | (tensor([0.0101]), tensor([139], dtype=torch.int32))
    # Qscheme: torch.per_tensor_symmetric | (tensor([0.0109]), tensor([128]))

Qscheme: torch.per_tensor_affine | (tensor([0.0111]), tensor([159], dtype=torch.int32))
Qscheme: torch.per_tensor_symmetric | (tensor([0.0138]), tensor([128]))


In [11]:
from torch.quantization.observer import MovingAveragePerChannelMinMaxObserver

# 通道参数量化
obs =MovingAveragePerChannelMinMaxObserver(ch_axis=0)  # 分别计算所有' C '通道的qparams
for x in inputs:
    obs(x)
print(obs.calculate_qparams())
# (tensor([0.0090, 0.0075, 0.0055]), tensor([125, 187,  82], dtype=torch.int32))

(tensor([0.0084, 0.0108, 0.0092]), tensor([127, 162, 143], dtype=torch.int32))


In [12]:
backend = 'fbgemm' if x86 else 'qnnpack'
qconfig = torch.quantization.get_default_qconfig(backend)  
torch.backends.quantized.engine = backend

NameError: name 'x86' is not defined

In [13]:
my_qconfig = torch.quantization.QConfig(
  activation=MovingAverageMinMaxObserver.with_args(qscheme=torch.per_tensor_affine),
  weight=MovingAveragePerChannelMinMaxObserver.with_args(qscheme=torch.qint8)
)
# >>>>>
# QConfig(activation=functools.partial(<class 'torch.ao.quantization.observer.MovingAverageMinMaxObserver'>, qscheme=torch.per_tensor_affine){}, weight=functools.partial(<class 'torch.ao.quantization.observer.MovingAveragePerChannelMinMaxObserver'>, qscheme=torch.qint8){})

In [19]:
import torch

x = torch.randn(2, 2, dtype=torch.float32)
# tensor([[ 0.9872, -1.6833],
#         [-0.9345,  0.6531]])

print(x)

# 公式1(量化)：xq = round(x / scale + zero_point)
# 使用给定的scale和 zero_point 来把一个float tensor转化为 quantized tensor
xq = torch.quantize_per_tensor(x, scale=0.5, zero_point=8, dtype=torch.quint8)
# tensor([[ 1.0000, -1.5000],
#         [-1.0000,  0.5000]], size=(2, 2), dtype=torch.quint8,
#        quantization_scheme=torch.per_tensor_affine, scale=0.5, zero_point=8)

# print(xq.int_repr())  # 给定一个量化的张量，返回一个以 uint8_t 作为数据类型的张量
# tensor([[10,  5],
#         [ 6,  9]], dtype=torch.uint8)

# 公式2(反量化)：xdq = (xq - zero_point) * scale
# 使用给定的scale和 zero_point 来把一个 quantized tensor 转化为 float tensor
xdq = xq.dequantize()
# tensor([[ 1.0000, -1.5000],
#         [-1.0000,  0.5000]])

print(xdq)

tensor([[-0.5235,  0.8499],
        [-3.1486,  0.0299]])
tensor([[-0.5000,  1.0000],
        [-3.0000,  0.0000]])


In [20]:
import torch
from torch import nn


class F32Model(nn.Module):
    def __init__(self):
        super(F32Model,self).__init__()
        self.fc = nn.Linear(3,2,bias=False)
        self.relu = nn.ReLU(inplace=False)
        
    def forward(self,x):
        x = self.fc(x)
        x = self.relu(x)
        
        return x
    
    
model_fp32 = F32Model()

print(model_fp32)


model_fp32_fused = torch.quantization.fuse_modules(model_fp32,[['fc','relu']])

print(model_fp32_fused)


F32Model(
  (fc): Linear(in_features=3, out_features=2, bias=False)
  (relu): ReLU()
)
F32Model(
  (fc): LinearReLU(
    (0): Linear(in_features=3, out_features=2, bias=False)
    (1): ReLU()
  )
  (relu): Identity()
)


In [21]:

#如果要部署在ARM上
model_fp32.qconfig = torch.quantization.get_default_qconfig('qnnpack')

In [22]:
model_fp32_prepared = torch.quantization.prepare(model_fp32_fused)





In [29]:
import torch
from torch import nn


class F32Model(torch.nn.Module):
    def __init__(self):
        super(F32Model,self).__init__()
        
        self.quant = torch.quantization.QuantStub()  # 转换张量从浮点到量化
        
        
        self.conv =  nn.Conv2d(1,1,1)
        self.fc = nn.Linear(2,2,bias = False)
        self.relu = nn.ReLU()
        
        # 将量化张量  转换为浮点
        self.dequant = torch.quantization.DeQuantStub()
        
    def forward(self,x):
        x = self.quant(x)
        x = self.conv(x)
        x = self.fc(x)
        x = self.relu(x)
        x = self.dequant(x)
        
        return x
    
model_fp32 = F32Model()

# 量化需要开启验证模式
model_fp32.eval()

# 将模型部署在arm
model_fp32.qconfig = torch.quantization.get_default_qconfig('qnnpack')

#  将网络的一些层进行融合

model_fp32_fused = torch.quantization.fuse_modules(model_fp32,[['fc','relu']])

#  准备模型  插入观察对象    观察activation 和weight
model_fp32_prepared = torch.quantization.prepare(model_fp32_fused)


# 代表性数据集 获取与数据的分布特点  来更好的计算 及或者的scale 和zp

# batch x channel x h x w
input_fp32 = torch.randn(1,1,2,2)

#  喂数据  计算参数
model_fp32_prepared(input_fp32)


# 量化模型
model_int8 = torch.quantization.convert(model_fp32_prepared)

# 运行模型  计算都以int8来计算

import time

# 测量float32模型的执行时间
start_time_fp32 = time.time()
output_fp32 = model_fp32(input_fp32)
# end_time_fp32 = 
execution_time_fp32 = time.time() - start_time_fp32

# 测量int8模型的执行时间
start_time_int8 = time.time()
output_int8 = model_int8(input_fp32)
# end_time_int8 = 
execution_time_int8 = time.time() - start_time_int8

print("Execution time (float32): {:.8f} seconds".format(execution_time_fp32))
print("Execution time (int8): {:.8f} seconds".format(execution_time_int8))

Execution time (float32): 0.00096154 seconds
Execution time (int8): 0.00000000 seconds


In [39]:
torch.save(model_int8.state_dict(), "./state_dict.pth")
model_int8.load_state_dict(torch.load("./state_dict.pth"))
print(model_int8)

F32Model(
  (quant): Quantize(scale=tensor([0.0068]), zero_point=tensor([45]), dtype=torch.quint8)
  (conv): QuantizedConv2d(1, 1, kernel_size=(1, 1), stride=(1, 1), scale=0.006129027809947729, zero_point=0)
  (fc): QuantizedLinearReLU(in_features=2, out_features=2, scale=0.004328194074332714, zero_point=0, qscheme=torch.per_tensor_affine)
  (relu): Identity()
  (dequant): DeQuantize()
)


  device=storage.device,


In [44]:
print(model_int8.fc.weight().int_repr())
print(model_int8.fc.bias())

tensor([[127, 123],
        [ 97,   0]], dtype=torch.int8)
None


In [35]:
import torch

from torch import nn
import copy

# 部署的后端计算引擎  运行在x86 芯片
backend = "fbgemm"


model = nn.Sequential(
    nn.Conv2d(2,64,3),
    nn.ReLU(),
    nn.Conv2d(64,128,3),
    nn.ReLU()
)

m = copy.deepcopy(model)

m.eval()

#  然后 开始融合模型
torch.quantization.fuse_modules(m,['0','1'],inplace=True)
torch.quantization.fuse_modules(m,['2','3'],inplace=True)

# 插入Stub

m = nn.Sequential(
    torch.quantization.QuantStub(),
    m,
    torch.quantization.DeQuantStub()
)

# 设置后端
m.qconfig = torch.quantization.get_default_qconfig(backend)

#  插入观察对象
torch.quantization.prepare(m,inplace = True)


# 喂数据  计算scale和zero_point

#  推理模式  没有反向传播计算
with torch.inference_mode():
    for _ in range(10):
        x = torch.rand(1,2,28,28)
        m(x)
    
# 转换为int8量化模型 
torch.quantization.convert(m,inplace=True)

# 检查一下 权重参数是不是Int8 

# print(m[[1]].weight().element_size())


"""Check"""
print(m[1][0].weight().element_size()) # 1 byte instead of 4 bytes for FP32

from torch.quantization import quantize_fx


m = copy.deepcopy(model)
m.eval()








1


In [38]:
torch.save(m.state_dict(), "./state_dict.pth")
model_int8.load_state_dict(torch.load("./state_dict.pth"))
print(model_int8)

KeyError: 'conv.weight'