# torch.compile的不同mode，dynamic设置的差异展示
PyTorch 2.x 以后引入了一个新的，强大的功能————torch.compile。它是一个简单，易用的能对pytorch模型进行编译优化的函数，在这里本人（笨人）试图展示一下compile这个函数不同的参数（dynamic，mode）设置对应的不同效果。

In [15]:
import torch
import torch.nn as nn
import torch.nn.functional as F
# 库引入

## dynamic？True or False?
dynamic，意味动态的，这个参数是一个bool类型的参数，可设置为True或False，它实际上指的是，我在优化中是按照“输入是否可变”为标准去进行不同的优化方案。

举个例子，当我的输入固定为（B,C,H,W），每个维度都指定固定的值，比如B=32，C=3，H=64，W=64。那么这种情况下我的输入就是固定的，是一种“静态的”。

但如果，我的某个参数可能发生改变，比如B有时是32，有时是64，那么这个时候我的输入就是不定的，是“动态的”。

显而易见，前者最好用dynamic=False去compile，后者最好用dynamic=True去compile。

那么这两种参数设置方式有什么优缺点呢。

对于dynamic=True的情况下，优点在于我们用一种动态的方式去编译，它允许我们的可变输入和可变控制流，限制较为宽松。缺点在于性能略差一些。

对于dynamic=False的情况下，优点在于编译后的运行性能更好，但是缺点在于输入不可变，可能不支持可变控制流，输入一但变化，可能就要重新编译，引发新的编译耗时。

In [3]:
# 定义一个简单的小网络，以便后边进行展示
class SimpleNet(nn.Module):
    def __init__(self):
        super().__init__()
        self.linear = nn.Linear(512, 512)
        self.relu = nn.ReLU()
        self.output = nn.Linear(512, 10)
    def forward(self, x):
        x = self.linear(x)
        x = self.relu(x)
        x = self.output(x)
        return x

In [4]:
net = SimpleNet().to("cuda")  # 将模型放到GPU上
static_compile = torch.compile(net, dynamic=False)
dynamic_compile = torch.compile(net, dynamic=True)
# 这里的dynamic_compile和static_compile是两个不同的编译版本
# 下边我们用随机数做一个伪输入
input_32 = torch.randn(32, 512).to("cuda")  # 输入数据也放到GPU上
input_64 = torch.randn(64, 512).to("cuda")  # 这里我们也可以准备一个批次为64的输入
# 预热（预热这里其实有很重要的概念，稍后会讲解，这里可以先不在意这个地方）
_ = static_compile(input_32)
_ = dynamic_compile(input_32) 
# 先不使用批次为64的输入，我们来看输入不变的情况下，使用dynamic与否 编译后模型性能的差异
start_event = torch.cuda.Event(enable_timing=True)
end_event = torch.cuda.Event(enable_timing=True)
start_event.record()
# 进行一次前向计算
_ = static_compile(input_32)
end_event.record()
torch.cuda.synchronize() # 等待所有CUDA操作完成
inference_time_static = start_event.elapsed_time(end_event)
print(f"静态编译模型前向传播用时{inference_time_static:.4f} ms")

start_event.record()
_ = dynamic_compile(input_32)
end_event.record()
torch.cuda.synchronize() # 等待所有CUDA操作完成
inference_time_dynamic = start_event.elapsed_time(end_event)
print(f"动态编译模型前向传播用时{inference_time_dynamic:.4f} ms")
# 这里从结果我们可以看出，静态编译的模型前向传播用时更少



静态编译模型前向传播用时0.3697 ms
动态编译模型前向传播用时1.0393 ms


这里需要打断一下了，一个很有意思的点在于，我们刚刚提到dynamic=False是静态去编译，dynamic=True是动态去编译，但是，我们似乎在调用compile函数本身的时候，并没有显式指定我们的输入的形状，换言之，调用compile的时候，模型并不知道它要静态编译为输入batchsize为32的情况。那么它是怎么知道它要为batch size 32 去编译的呢。

这里就和预热有关系，我们在调用compile的时候，其实并没有完全对这个函数/模型去进行编译，而是配置了相关的编译环境，编译参数。当我们第一次运行这个模型（在上边就是预热），它会依据输入去进行实际的编译（lazy compile）。也就是说，调用了compile后模型的首次运行，包含了编译时间和推理时间两部分，第二次及以后调用，就只有推理时间了。

我们刚刚运行过（预热过），所以static的编译版本现在已经固定了为32的batch size去编译优化。我们把输入换成64的batch size去输入，看看会发生什么

In [6]:
# 进行一次前向计算
start_event.record()
_ = static_compile(input_64)
end_event.record()
torch.cuda.synchronize() # 等待所有CUDA操作完成
inference_time_static = start_event.elapsed_time(end_event)
print(f"静态编译模型前向传播用时{inference_time_static:.4f} ms")

start_event.record()
_ = dynamic_compile(input_64)
end_event.record()
torch.cuda.synchronize() # 等待所有CUDA操作完成
inference_time_dynamic = start_event.elapsed_time(end_event)
print(f"动态编译模型前向传播用时{inference_time_dynamic:.4f} ms")
# 这里从结果我们可以看出，静态编译的模型前向传播用时更少

静态编译模型前向传播用时1.2831 ms
动态编译模型前向传播用时0.5662 ms


不难发现，如果我们违背了静态长度编译的使用条件（长度不变），我们就会强制打破编译的条件，这时我们会触发静态编译的重新编译，导致它花费更长的时间去推理（因为里边包含了重新编译的时间）

In [8]:
# 这里再补充一个对比，我们看一下编译前后模型的性能差异
start_event.record()
_ = net(input_32)
end_event.record()
torch.cuda.synchronize() # 等待所有CUDA操作完成
inference_time_static = start_event.elapsed_time(end_event)
print(f"无编译模型前向传播用时{inference_time_static:.4f} ms")

start_event.record()
_ = dynamic_compile(input_32)
end_event.record()
torch.cuda.synchronize() # 等待所有CUDA操作完成
inference_time_dynamic = start_event.elapsed_time(end_event)
print(f"动态编译模型前向传播用时{inference_time_dynamic:.4f} ms")
# 我们发现即便是和编译效果稍差的动态编译对比，他们之间的性能差距都是非常显著的。

无编译模型前向传播用时1.2370 ms
动态编译模型前向传播用时0.7288 ms


## mode
这里介绍三种主要的mode，
default，reduce-overhead，max-autotune

In [None]:
# 小的模型结果不是很稳定，这里叠甲用个大一点的模型
# PS: claude直接给我设计了一个挺复杂的模型,我们不需要在意这个模型的具体架构
class ComplexBlock(nn.Module):
    def __init__(self, in_channels, out_channels, bottleneck_ratio=0.5):
        super().__init__()
        bottleneck_channels = int(out_channels * bottleneck_ratio)
        
        # 主路径
        self.conv1 = nn.Conv2d(in_channels, bottleneck_channels, 1)
        self.bn1 = nn.BatchNorm2d(bottleneck_channels)
        
        # 多分支结构
        self.branch1 = nn.Conv2d(bottleneck_channels, bottleneck_channels//4, 3, padding=1)
        self.branch2 = nn.Conv2d(bottleneck_channels, bottleneck_channels//4, 5, padding=2)
        self.branch3 = nn.Conv2d(bottleneck_channels, bottleneck_channels//4, 7, padding=3)
        self.branch4 = nn.Conv2d(bottleneck_channels, bottleneck_channels//4, 1)
        
        # 分组卷积
        self.group_conv = nn.Conv2d(bottleneck_channels, bottleneck_channels, 
                                   kernel_size=3, padding=1, groups=4)
        
        # SE注意力块
        self.se = nn.Sequential(
            nn.AdaptiveAvgPool2d(1),
            nn.Conv2d(bottleneck_channels, bottleneck_channels//16, 1),
            nn.ReLU(inplace=True),
            nn.Conv2d(bottleneck_channels//16, bottleneck_channels, 1),
            nn.Sigmoid()
        )
        
        # 输出投影
        self.conv2 = nn.Conv2d(bottleneck_channels, out_channels, 1)
        self.bn2 = nn.BatchNorm2d(out_channels)
        
        # 残差连接
        self.skip = nn.Sequential()
        if in_channels != out_channels:
            self.skip = nn.Sequential(
                nn.Conv2d(in_channels, out_channels, 1),
                nn.BatchNorm2d(out_channels)
            )
        
    def forward(self, x):
        # 主路径
        out = F.relu(self.bn1(self.conv1(x)))
        
        # 多分支
        b1 = self.branch1(out)
        b2 = self.branch2(out)
        b3 = self.branch3(out)
        b4 = self.branch4(out)
        
        # 合并分支
        out = torch.cat([b1, b2, b3, b4], dim=1)
        
        # 分组卷积
        out = F.relu(self.group_conv(out))
        
        # SE注意力
        se_weight = self.se(out)
        out = out * se_weight
        
        # 输出投影
        out = self.bn2(self.conv2(out))
        
        # 残差连接
        out += self.skip(x)
        out = F.relu(out)
        
        return out

class ComplexModel(nn.Module):
    def __init__(self, num_classes=10):
        super().__init__()
        
        # 初始卷积层
        self.stem = nn.Sequential(
            nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
        )
        
        # 复杂块堆叠
        self.block1 = ComplexBlock(64, 128)
        self.block2 = ComplexBlock(128, 256)
        self.block3 = ComplexBlock(256, 512)
        self.block4 = ComplexBlock(512, 1024)
        
        # 高级特征处理
        self.context_block = nn.Sequential(
            nn.Conv2d(1024, 512, kernel_size=1),
            nn.BatchNorm2d(512),
            nn.ReLU(inplace=True),
            nn.Conv2d(512, 512, kernel_size=3, padding=1, dilation=2),
            nn.BatchNorm2d(512),
            nn.ReLU(inplace=True)
        )
        
        # 全局池化和分类器
        self.global_pool = nn.AdaptiveAvgPool2d(1)
        self.fc = nn.Sequential(
            nn.Linear(512, 256),
            nn.ReLU(inplace=True),
            nn.Dropout(0.5),
            nn.Linear(256, num_classes)
        )
        
    def forward(self, x):
        # 输入预处理和特征提取
        x = self.stem(x)
        
        # 复杂块级联
        x = self.block1(x)
        x = F.max_pool2d(x, 2)
        x = self.block2(x)
        x = F.max_pool2d(x, 2)
        x = self.block3(x)
        x = F.max_pool2d(x, 2)  
        x = self.block4(x)
        
        # 上下文建模
        x = self.context_block(x)
        
        # 全局特征和分类
        x = self.global_pool(x)
        x = torch.flatten(x, 1)
        x = self.fc(x)
        
        return x
    
input_tensor = torch.randn(32, 3, 224, 224).to("cuda") 
complex_net = ComplexModel().to("cuda")

### default
默认模式，编译时间短，性能适中，适合大多数的使用场景

In [20]:
n = 20
compile_time_list = []
compiled_net = torch.compile(complex_net, mode='default')
start_event.record()
_ = compiled_net(input_tensor)  # 预热编译
end_event.record()
torch.cuda.synchronize()  # 等待所有CUDA操作完成
warm_time = start_event.elapsed_time(end_event)
print(f"预热编译用时{warm_time:.4f} ms")
# 下边是default模式下编译后的模型前向传播的用时测试
for i in range(n):
    start_event.record()
    _ = compiled_net(input_tensor)
    end_event.record()
    torch.cuda.synchronize()  # 等待所有CUDA操作完成
    compile_time = start_event.elapsed_time(end_event)
    compile_time_list.append(compile_time)
print(f"default模式下编译后的模型前向传播用时{sum(compile_time_list) / n:.4f} ms")


预热编译用时133613.0156 ms
default模式下编译后的模型前向传播用时8.4707 ms


### reduce-overhead
使用 CUDA Graphs 减少 Python 和 GPU 的交互开销，适合小模型或低延迟推理
限制在于：
CUDA Graphs 不支持动态形状（除非显式配置）。
内存使用略高（存储 Graph 状态）。

In [21]:
n = 20
compile_time_list = []
compiled_net = torch.compile(complex_net, mode='reduce-overhead')
start_event.record()
_ = compiled_net(input_tensor)  # 预热编译
end_event.record()
torch.cuda.synchronize()  # 等待所有CUDA操作完成
warm_time = start_event.elapsed_time(end_event)
print(f"预热编译用时{warm_time:.4f} ms")
# 下边是default模式下编译后的模型前向传播的用时测试
for i in range(n):
    start_event.record()
    _ = compiled_net(input_tensor)
    end_event.record()
    torch.cuda.synchronize()  # 等待所有CUDA操作完成
    compile_time = start_event.elapsed_time(end_event)
    compile_time_list.append(compile_time)
print(f"reduce-overhead模式下编译后的模型前向传播用时{sum(compile_time_list) / n:.4f} ms")

预热编译用时108352.1016 ms




reduce-overhead模式下编译后的模型前向传播用时10.6827 ms


### max-autotune
通过性能分析选择最佳内核配置，追求最大性能，适合大模型或高吞吐量场景
编译时间最长，性能最优，适合大模型。适合静态长度编译。

In [22]:
n = 20
compile_time_list = []
compiled_net = torch.compile(complex_net, mode='max-autotune', dynamic=False)
start_event.record()
_ = compiled_net(input_tensor)  # 预热编译
end_event.record()
torch.cuda.synchronize()  # 等待所有CUDA操作完成
warm_time = start_event.elapsed_time(end_event)
print(f"预热编译用时{warm_time:.4f} ms")
# 下边是default模式下编译后的模型前向传播的用时测试
for i in range(n):
    start_event.record()
    _ = compiled_net(input_tensor)
    end_event.record()
    torch.cuda.synchronize()  # 等待所有CUDA操作完成
    compile_time = start_event.elapsed_time(end_event)
    compile_time_list.append(compile_time)
print(f"max-autotune模式下编译后的模型前向传播用时{sum(compile_time_list) / n:.4f} ms")

AUTOTUNE convolution(32x3x224x224, 64x3x7x7)
  convolution 0.2755 ms 100.0% 
  triton_convolution2d_3 0.5868 ms 46.9% ALLOW_TF32=True, BLOCK_K=16, BLOCK_M=128, BLOCK_N=64, GROUPS=1, KERNEL_H=7, KERNEL_W=7, PADDING_H=3, PADDING_W=3, STRIDE_H=2, STRIDE_W=2, UNROLL=False, num_stages=2, num_warps=8
  triton_convolution2d_1 0.5888 ms 46.8% ALLOW_TF32=True, BLOCK_K=16, BLOCK_M=256, BLOCK_N=64, GROUPS=1, KERNEL_H=7, KERNEL_W=7, PADDING_H=3, PADDING_W=3, STRIDE_H=2, STRIDE_W=2, UNROLL=False, num_stages=2, num_warps=4
  triton_convolution2d_4 0.6069 ms 45.4% ALLOW_TF32=True, BLOCK_K=16, BLOCK_M=64, BLOCK_N=64, GROUPS=1, KERNEL_H=7, KERNEL_W=7, PADDING_H=3, PADDING_W=3, STRIDE_H=2, STRIDE_W=2, UNROLL=False, num_stages=2, num_warps=8
  triton_convolution2d_0 0.6267 ms 44.0% ALLOW_TF32=True, BLOCK_K=16, BLOCK_M=64, BLOCK_N=64, GROUPS=1, KERNEL_H=7, KERNEL_W=7, PADDING_H=3, PADDING_W=3, STRIDE_H=2, STRIDE_W=2, UNROLL=False, num_stages=2, num_warps=4
  triton_convolution2d_5 0.7124 ms 38.7% ALLOW_TF

预热编译用时157431.0781 ms




max-autotune模式下编译后的模型前向传播用时12.3890 ms


忧郁脸……
总结一下，按理来说模型编译后的性能理应是max-autotune > reduce-overhead > default，但在这里似乎并不能展示出来这种理论结果。
不过这也另一方面展示，其实不同模式的编译效果也略有些不稳定，本人（笨人）刚刚做实验也发现这一点，他们的编译效果是有不小浮动的。

算了，我开摆了……

先不摆了，师兄说他见到的一般是针对函数compile，下边再探索一下针对函数compile和针对整个model compile的效果区别

In [2]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import time

# 定义一些计算密集的操作，适合单独编译优化
def complex_activation(x):
    """一个计算密集的自定义激活函数"""
    # 使用多个连续操作模拟复杂计算
    a = torch.sin(x)
    b = torch.pow(x, 2) * torch.tanh(x)
    c = torch.exp(-torch.abs(x))
    d = torch.sigmoid(x) * torch.log(torch.clamp(x.abs(), min=1e-10))
    return a + b - c + d

def feature_enhancement(x):
    """特征增强函数"""
    # 复杂的特征处理
    b, c, h, w = x.shape
    # 创建通道注意力
    channel_avg = F.adaptive_avg_pool2d(x, 1)
    channel_weight = torch.sigmoid(channel_avg * 3)
    # 空间注意力
    spatial_avg = torch.mean(x, dim=1, keepdim=True)
    spatial_max, _ = torch.max(x, dim=1, keepdim=True)
    spatial_attn = torch.sigmoid(torch.cat([spatial_avg, spatial_max], dim=1))
    spatial_weight = torch.sigmoid(F.conv2d(spatial_attn, torch.ones(1, 2, 3, 3).to(x.device), padding=1))
    
    return x * channel_weight * spatial_weight

# 主网络类
class LocalOptimizedNetwork(nn.Module):
    def __init__(self):
        super().__init__()
        
        # 基础卷积层
        self.conv1 = nn.Conv2d(3, 64, kernel_size=3, padding=1)
        self.bn1 = nn.BatchNorm2d(64)
        
        # 特征提取块
        self.conv2 = nn.Conv2d(64, 128, kernel_size=3, padding=1)
        self.bn2 = nn.BatchNorm2d(128)
        self.conv3 = nn.Conv2d(128, 128, kernel_size=3, padding=1)
        self.bn3 = nn.BatchNorm2d(128)
        
        # 特征融合块
        self.conv4 = nn.Conv2d(128, 256, kernel_size=3, padding=1)
        self.bn4 = nn.BatchNorm2d(256)
        
        # 分类器
        self.pool = nn.AdaptiveAvgPool2d(1)
        self.fc = nn.Linear(256, 10)
        
        # 特征增强函数 - 可以单独编译
        self.enhance_features = feature_enhancement
        
        # 自定义激活 - 可以单独编译
        self.activation = complex_activation
        
    def forward(self, x):
        # 初始特征提取
        x = self.conv1(x)
        x = self.bn1(x)
        x = F.relu(x)
        
        # 特征处理块 1
        x = self.conv2(x)
        x = self.bn2(x)
        x = self.activation(x)  # 使用计算密集的自定义激活
        
        # 特征处理块 2
        x = self.conv3(x)
        x = self.bn3(x)
        x = F.relu(x)
        
        # 特征增强 - 计算密集操作
        x = self.enhance_features(x)
        
        # 最终处理
        x = self.conv4(x)
        x = self.bn4(x)
        x = F.relu(x)
        
        # 分类
        x = self.pool(x)
        x = torch.flatten(x, 1)
        x = self.fc(x)
        
        return x

# 测试和比较不同编译策略
def test_compilation_strategies():
    # 创建模型和测试数据
    model = LocalOptimizedNetwork().to("cuda")
    x = torch.randn(16, 3, 224, 224).to("cuda")
    
    # 1. 无编译基准
    start = time.time()
    for _ in range(10):
        _ = model(x)
    torch.cuda.synchronize()
    base_time = (time.time() - start) / 10
    print(f"基准性能 (无编译): {base_time*1000:.2f} ms/批次")
    
    # 2. 仅编译计算密集函数
    model_func_compiled = LocalOptimizedNetwork().to("cuda")
    # 单独编译计算密集函数
    model_func_compiled.activation = torch.compile(complex_activation, mode='max-autotune')
    model_func_compiled.enhance_features = torch.compile(feature_enhancement, mode='max-autotune')
    
    # 预热
    _ = model_func_compiled(x)
    
    start = time.time()
    for _ in range(10):
        _ = model_func_compiled(x)
    torch.cuda.synchronize()
    func_time = (time.time() - start) / 10
    print(f"函数级别编译: {func_time*1000:.2f} ms/批次")
    
    # 3. 整个模型编译
    model_full = LocalOptimizedNetwork().to("cuda")
    model_compiled = torch.compile(model_full, mode='max-autotune')
    
    # 预热
    _ = model_compiled(x)
    
    start = time.time()
    for _ in range(10):
        _ = model_compiled(x)
    torch.cuda.synchronize()
    full_time = (time.time() - start) / 10
    print(f"整个模型编译: {full_time*1000:.2f} ms/批次")
    
    # 比较
    func_speedup = base_time / func_time
    full_speedup = base_time / full_time
    print(f"\n函数级别编译加速: {func_speedup:.2f}x")
    print(f"整个模型编译加速: {full_speedup:.2f}x")
    print(f"整个模型相比函数级别额外加速: {full_speedup/func_speedup:.2f}x")

# 调用此函数即可测试
test_compilation_strategies()

基准性能 (无编译): 83.72 ms/批次


AUTOTUNE convolution(16x2x224x224, 1x2x3x3)
  convolution 0.0246 ms 100.0% 
  triton_convolution2d_2 0.0850 ms 28.9% ALLOW_TF32=True, BLOCK_K=16, BLOCK_M=1024, BLOCK_N=16, GROUPS=1, KERNEL_H=3, KERNEL_W=3, PADDING_H=1, PADDING_W=1, STRIDE_H=1, STRIDE_W=1, UNROLL=False, num_stages=1, num_warps=8
  triton_convolution2d_1 0.1055 ms 23.3% ALLOW_TF32=True, BLOCK_K=16, BLOCK_M=256, BLOCK_N=16, GROUPS=1, KERNEL_H=3, KERNEL_W=3, PADDING_H=1, PADDING_W=1, STRIDE_H=1, STRIDE_W=1, UNROLL=False, num_stages=2, num_warps=4
  triton_convolution2d_4 0.1065 ms 23.1% ALLOW_TF32=True, BLOCK_K=16, BLOCK_M=256, BLOCK_N=16, GROUPS=1, KERNEL_H=3, KERNEL_W=3, PADDING_H=1, PADDING_W=1, STRIDE_H=1, STRIDE_W=1, UNROLL=False, num_stages=2, num_warps=8
  triton_convolution2d_3 0.1137 ms 21.6% ALLOW_TF32=True, BLOCK_K=16, BLOCK_M=128, BLOCK_N=16, GROUPS=1, KERNEL_H=3, KERNEL_W=3, PADDING_H=1, PADDING_W=1, STRIDE_H=1, STRIDE_W=1, UNROLL=False, num_stages=2, num_warps=8
  triton_convolution2d_0 0.1300 ms 18.9% ALLOW_

函数级别编译: 75.58 ms/批次


AUTOTUNE convolution(16x128x224x224, 256x128x3x3)
  convolution 6.8065 ms 100.0% 
  triton_convolution2d_31 9.6205 ms 70.8% ALLOW_TF32=True, BLOCK_K=16, BLOCK_M=256, BLOCK_N=64, GROUPS=1, KERNEL_H=3, KERNEL_W=3, PADDING_H=1, PADDING_W=1, STRIDE_H=1, STRIDE_W=1, UNROLL=False, num_stages=2, num_warps=4
  triton_convolution2d_36 11.4135 ms 59.6% ALLOW_TF32=True, BLOCK_K=32, BLOCK_M=256, BLOCK_N=64, GROUPS=1, KERNEL_H=3, KERNEL_W=3, PADDING_H=1, PADDING_W=1, STRIDE_H=1, STRIDE_W=1, UNROLL=False, num_stages=2, num_warps=8
  triton_convolution2d_33 15.7204 ms 43.3% ALLOW_TF32=True, BLOCK_K=32, BLOCK_M=128, BLOCK_N=128, GROUPS=1, KERNEL_H=3, KERNEL_W=3, PADDING_H=1, PADDING_W=1, STRIDE_H=1, STRIDE_W=1, UNROLL=False, num_stages=2, num_warps=8
  triton_convolution2d_32 19.5881 ms 34.7% ALLOW_TF32=True, BLOCK_K=16, BLOCK_M=1024, BLOCK_N=16, GROUPS=1, KERNEL_H=3, KERNEL_W=3, PADDING_H=1, PADDING_W=1, STRIDE_H=1, STRIDE_W=1, UNROLL=False, num_stages=1, num_warps=8
  triton_convolution2d_34 23.5182

整个模型编译: 47.04 ms/批次

函数级别编译加速: 1.11x
整个模型编译加速: 1.78x
整个模型相比函数级别额外加速: 1.61x


这里看起来非常有意思，我们直接对整个模型compile似乎能达到最高性能，而单独针对部分函数进行compile 似乎并不能达到最佳性能增幅，下边我们再尝试不同的例子来验证我们的猜想

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
# 下边这个网络是本人（笨人）直接做某个project写出来的网络
# 接下来我们尝试两种编译方案，第一种是将整个网络编译，第二种是将网络
# 串联的feature和classifier两部分单独编译，我们看一下之后的效果
class MixBlock(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(MixBlock, self).__init__()
        self.channel1 = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=1, padding=0),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),
        )
        self.channel1 = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=1, padding=0),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),
        )
        self.channel2 = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),
        )
        self.channel3 = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=5, padding=2),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),
        )
        # 第四路也用卷积 + 池化，保证输入输出都是 4D
        self.channel4 = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=7, padding=3),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),
        )
        self.fusion = nn.Sequential(
            nn.Conv2d(out_channels * 4, out_channels, kernel_size=1, padding=0),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),
        )
        self._initialize_weights()
        
    def forward(self, x):
        x1 = self.channel1(x)
        x2 = self.channel2(x)
        x3 = self.channel3(x)
        x4 = self.channel4(x)
        
        # 在通道维度上拼接
        x_cat = torch.cat([x1, x2, x3, x4], dim=1)
        # 融合
        x_cat = self.fusion(x_cat)
        
        return x_cat
        
    def _initialize_weights(self):
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
                if m.bias is not None:
                    nn.init.constant_(m.bias, 0)
            elif isinstance(m, nn.BatchNorm2d):
                nn.init.constant_(m.weight, 1)
                nn.init.constant_(m.bias, 0)
            elif isinstance(m, nn.Linear):
                nn.init.normal_(m.weight, 0, 0.01)
                nn.init.constant_(m.bias, 0)

features = nn.Sequential(
            MixBlock(3, 32),  # 输入通道数为3，输出通道数为32
            MixBlock(32, 64),  # 输入通道数为32，输出通道数为64
            MixBlock(64, 128),  # 输入通道数为64，输出通道数为128
        )

classifier = nn.Sequential(
            nn.Dropout(0.5),
            nn.Linear(128 * 8 * 8, 512),
            nn.BatchNorm1d(512),
            nn.ReLU(inplace=True),
            nn.Dropout(0.5),
            nn.Linear(512, 10)
        )

class ConvNetMix(nn.Module):
    '''
    多尺度特征融合
    '''
    def __init__(self, features, classifier):
        super().__init__()
        # 特征提取部分
        self.features = features
        
        # 分类器部分
        self.classifier = classifier
        
        # 初始化权重
        self._initialize_weights()
        
    def forward(self, x):
        x = self.features(x)
        x = torch.flatten(x, 1)  # 扁平化，保留批次维度
        x = self.classifier(x)
        return x
    
    def _initialize_weights(self):
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
                if m.bias is not None:
                    nn.init.constant_(m.bias, 0)
            elif isinstance(m, nn.BatchNorm2d):
                nn.init.constant_(m.weight, 1)
                nn.init.constant_(m.bias, 0)
            elif isinstance(m, nn.Linear):
                nn.init.normal_(m.weight, 0, 0.01)
                nn.init.constant_(m.bias, 0)
                
# 方案一：整个网络编译
net = ConvNetMix(features, classifier).to("cuda")
compiled_net = torch.compile(net)
# 预热编译
input_tensor = torch.randn(32, 3, 64, 64).to("cuda")  # 假设输入是64x64的RGB图像
_ = compiled_net(input_tensor)
# 测试编译后的模型性能
start_event = torch.cuda.Event(enable_timing=True)
end_event = torch.cuda.Event(enable_timing=True)
start_event.record()
_ = compiled_net(input_tensor)
end_event.record()
torch.cuda.synchronize()  # 等待所有CUDA操作完成
inference_time = start_event.elapsed_time(end_event)
print(f"整个网络编译后的模型前向传播用时{inference_time:.4f} ms")
# 方案二：将特征提取和分类器部分单独编译
features_compiled = torch.compile(features)
classifier_compiled = torch.compile(classifier)
# 创建一个新的模型，将编译后的特征提取和分类器组合起来
net_separate = ConvNetMix(features_compiled, classifier_compiled).to("cuda")
# 预热编译
_ = net_separate(input_tensor)
# 测试编译后的模型性能
start_event.record()
_ = net_separate(input_tensor)
end_event.record()
torch.cuda.synchronize()  # 等待所有CUDA操作完成
inference_time_separate = start_event.elapsed_time(end_event)
print(f"特征提取和分类器部分单独编译后的模型前向传播用时{inference_time_separate:.4f} ms")
# 比较两种编译方案的性能
print(f"整个网络编译的前向传播用时: {inference_time:.4f} ms")
print(f"特征提取和分类器部分单独编译的前向传播用时: {inference_time_separate:.4f} ms")
# 这里我们可以看到，整个网络编译的前向传播用时通常会更短，这一点很符合直觉。




整个网络编译后的模型前向传播用时5.6893 ms


  torch.has_cuda,
  torch.has_cudnn,
  torch.has_mps,
  torch.has_mkldnn,


特征提取和分类器部分单独编译后的模型前向传播用时5.9003 ms
整个网络编译的前向传播用时: 5.6893 ms
特征提取和分类器部分单独编译的前向传播用时: 5.9003 ms


唔……那既然如此，所有情况下都是直接编译全局网络会得到最优性能吗，我们考虑引入多个分支试试，还是上边的网络。

In [10]:
import torch
import torch.nn as nn
import torch.nn.functional as F
class MixBlock(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(MixBlock, self).__init__()
        self.channel1 = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=1, padding=0),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),
        )
        self.channel1 = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=1, padding=0),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),
        )
        self.channel2 = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),
        )
        self.channel3 = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=5, padding=2),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),
        )
        # 第四路也用卷积 + 池化，保证输入输出都是 4D
        self.channel4 = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=7, padding=3),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),
        )
        self.fusion = nn.Sequential(
            nn.Conv2d(out_channels * 4, out_channels, kernel_size=1, padding=0),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),
        )
        self._initialize_weights()
        
    def forward(self, x):
        x1 = self.channel1(x)
        x2 = self.channel2(x)
        x3 = self.channel3(x)
        x4 = self.channel4(x)
        
        # 在通道维度上拼接
        x_cat = torch.cat([x1, x2, x3, x4], dim=1)
        # 融合
        x_cat = self.fusion(x_cat)
        
        return x_cat
        
    def _initialize_weights(self):
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
                if m.bias is not None:
                    nn.init.constant_(m.bias, 0)
            elif isinstance(m, nn.BatchNorm2d):
                nn.init.constant_(m.weight, 1)
                nn.init.constant_(m.bias, 0)
            elif isinstance(m, nn.Linear):
                nn.init.normal_(m.weight, 0, 0.01)
                nn.init.constant_(m.bias, 0)

features = nn.Sequential(
            MixBlock(3, 32),  # 输入通道数为3，输出通道数为32
            MixBlock(32, 64),  # 输入通道数为32，输出通道数为64
            MixBlock(64, 128),  # 输入通道数为64，输出通道数为128
        )


classifier_list = nn.ModuleList([nn.Sequential(
            nn.Dropout(0.5),
            nn.Linear(128 * 8 * 8, 512),
            nn.BatchNorm1d(512),
            nn.ReLU(inplace=True),
            nn.Dropout(0.5),
            nn.Linear(512, 10)
            ) for _ in range(5)])  
# 创建多个分类器，但最后只会使用一个


class ConvNetMix(nn.Module):
    '''
    多尺度特征融合
    '''
    def __init__(self, features, classifier_list):
        super().__init__()
        # 特征提取部分
        self.features = features
        
        # 分类器部分
        self.classifiers = classifier_list
        
        # 初始化权重
        self._initialize_weights()
        
    def forward(self, x):
        x = self.features(x)
        x = torch.flatten(x, 1)  # 扁平化，保留批次维度
        x = self.classifiers[np.random.randint(0, len(self.classifiers))](x)  # 随机选择一个分类器
        return x
    
    def _initialize_weights(self):
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
                if m.bias is not None:
                    nn.init.constant_(m.bias, 0)
            elif isinstance(m, nn.BatchNorm2d):
                nn.init.constant_(m.weight, 1)
                nn.init.constant_(m.bias, 0)
            elif isinstance(m, nn.Linear):
                nn.init.normal_(m.weight, 0, 0.01)
                nn.init.constant_(m.bias, 0)
                
'''
# 方案一：整个网络编译
net = ConvNetMix(features, classifier_list).to("cuda")
compiled_net = torch.compile(net, dynamic=True, mode='reduce-overhead')
# 预热编译
input_tensor = torch.randn(32, 3, 64, 64).to("cuda")  # 假设输入是64x64的RGB图像
_ = compiled_net(input_tensor)
# 测试编译后的模型性能
start_event = torch.cuda.Event(enable_timing=True)
end_event = torch.cuda.Event(enable_timing=True)
start_event.record()
_ = compiled_net(input_tensor)
end_event.record()
torch.cuda.synchronize()  # 等待所有CUDA操作完成
inference_time = start_event.elapsed_time(end_event)
print(f"整个网络编译后的模型前向传播用时{inference_time:.4f} ms")
'''
# 方案二：将特征提取和分类器部分单独编译
features_compiled = torch.compile(features)
classifier_compiled = nn.ModuleList([torch.compile(classifier) for classifier in classifier_list])
# 创建一个新的模型，将编译后的特征提取和分类器组合起来
net_separate = ConvNetMix(features_compiled, classifier_compiled).to("cuda")
# 预热编译
_ = net_separate(input_tensor)
# 测试编译后的模型性能
start_event.record()
_ = net_separate(input_tensor)
end_event.record()
torch.cuda.synchronize()  # 等待所有CUDA操作完成
inference_time_separate = start_event.elapsed_time(end_event)
print(f"特征提取和分类器部分单独编译后的模型前向传播用时{inference_time_separate:.4f} ms")


特征提取和分类器部分单独编译后的模型前向传播用时1522.7024 ms


这里发现了一个很有意思的地方，我丧心病狂的用numpy随机数决定我下游使用什么分类器，然后在我直接对整个网络进行编译的时候————编译失败了！
也就是说，尽管整个网络进行编译，更多时候能达到最佳性能，但是当网络内部包含复杂的控制流等逻辑的时候，似乎更可能导致编译失败，所以函数级别，小网络级别的编译，虽然可能达不到最佳性能，但是更灵活更稳定！

一些个人分析：
当网络包含复杂的控制流的时候，可能会涉及一些非torch原生的操作，编译后端可能不能很好的处理控制节点上下游的网络的编译优化，因而在这里会容易出现编译失败。但如果单独编译节点上下游的网络，一般就更容易编译成功

这下可以开摆了~
有误请[指正](mailto:12410615@mail.sustech.edu.cn)