GOOGLENET

In [None]:
import torch

'''Define the CNN model''' 
class Net(torch.nn.Module):
    def __init__(self):
        super(Net, self).__init__()

        # 公用的前3层卷积层
        self.shared_cnn = torch.nn.Sequential(
            torch.nn.Conv2d(1, 8, kernel_size=(50, 50), stride=(5, 40)),
            torch.nn.BatchNorm2d(8),
            torch.nn.LeakyReLU(inplace=True),
            torch.nn.MaxPool2d(kernel_size=2, stride=1),
            torch.nn.Dropout(p=0.2),
            
            torch.nn.Conv2d(8, 16, kernel_size=(2, 3), stride=1),
            torch.nn.BatchNorm2d(16),
            torch.nn.LeakyReLU(inplace=True),
            torch.nn.Dropout(p=0.2),
            
            torch.nn.Conv2d(16, 32, kernel_size=(2, 3), stride=1),
            torch.nn.BatchNorm2d(32),
            torch.nn.LeakyReLU(inplace=True),
        )

        # 插入Google net Inception模块
        self.inceptions = torch.nn.ModuleList([
            torch.nn.Sequential(
                torch.nn.Conv2d(32, 32, kernel_size=(1, 1), stride=1),
                torch.nn.BatchNorm2d(32),
                torch.nn.LeakyReLU(inplace=True)
            ),
            torch.nn.Sequential(
                torch.nn.Conv2d(32, 32, kernel_size=(1, 1), stride=1),
                torch.nn.BatchNorm2d(32),
                torch.nn.LeakyReLU(inplace=True),
                torch.nn.Conv2d(32, 32, kernel_size=(3, 3), stride=1, padding=1),
                torch.nn.BatchNorm2d(32),
                torch.nn.LeakyReLU(inplace=True)
            ),
            torch.nn.Sequential(
                torch.nn.Conv2d(32, 32, kernel_size=(1, 1), stride=1),
                torch.nn.BatchNorm2d(32),
                torch.nn.LeakyReLU(inplace=True),
                torch.nn.Conv2d(32, 32, kernel_size=(5, 5), stride=1, padding=1),
                torch.nn.BatchNorm2d(32),
                torch.nn.LeakyReLU(inplace=True)
            ),
            torch.nn.Sequential(
                torch.nn.MaxPool2d(kernel_size=3, stride=1, padding=1),
                torch.nn.Conv2d(32, 32, kernel_size=(1, 1), stride=1),
                torch.nn.BatchNorm2d(32),
            )
        ])

        # 融合后的全连接层
        self.fc1 = torch.nn.Linear(4832, 1000)
        self.drop_layer = torch.nn.Dropout(p=0.2)
        self.fc2 = torch.nn.Linear(1000, 1)

    def forward(self, x):
        try:
            # 先计算公用卷积层
            shared_out = self.shared_cnn(x)
            
            # 计算Inception模块的输出
            inception_outputs = [inception(shared_out) for inception in self.inceptions]
            combined = torch.cat(inception_outputs, dim=1)  # 在最后一个维度拼接

            # 全连接层
            x = torch.relu(self.fc1(combined))
            x = self.drop_layer(x)
            x = self.fc2(x)
            x = torch.sigmoid(x)  # 修复了这里的错误，去掉了不必要的fc3层
            return x

        except Exception as e:
            print(f"Error occurred during forward pass: {e}")
            return None


RESNET

In [None]:
# resnet-34架构
import torch
import torch.nn as nn
import torch.nn.functional as F

# ResNet类实现
class ResNet(nn.Module):
    # 初始化方法，设置ResNet的构造函数
    def __init__(self, num_classes=1000):
        super(ResNet, self).__init__()
        self.inplanes = 64  # 初始化inplanes
        self.conv1 = nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3, bias=False)
        self.bn1 = nn.BatchNorm2d(64)
        self.relu = nn.ReLU(inplace=True)
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
        self.layer1 = self._make_layer(64, 3)
        self.layer2 = self._make_layer(128, 4, stride=2)
        self.layer3 = self._make_layer(256, 6, stride=2)
        self.layer4 = self._make_layer(512, 3, stride=2)
        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(512, num_classes)  # 这里不需要乘法

    # 创建残差块的方法
    def _make_layer(self, planes, blocks, stride=1):
        downsample = None
        if stride != 1 or self.inplanes != planes * 4:
            downsample = nn.Sequential(
                nn.Conv2d(self.inplanes, planes * 4, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(planes * 4)
            )
        layers = []
        layers.append(self._make_block(planes, stride, downsample))  # 添加第一个块
        for _ in range(1, blocks):
            layers.append(self._make_block(planes))  # 添加后续块
        return nn.Sequential(*layers)  # 将layers转换为Sequential

    def _make_block(self, planes, stride=1, downsample=None):
        layers = []
        layers.append(nn.Conv2d(self.inplanes, planes, kernel_size=1, stride=1, bias=False))
        layers.append(nn.BatchNorm2d(planes))
        layers.append(nn.ReLU(inplace=True))
        layers.append(nn.Conv2d(planes, planes, kernel_size=3, stride=stride, padding=1, bias=False))
        layers.append(nn.BatchNorm2d(planes))
        layers.append(nn.ReLU(inplace=True))
        layers.append(nn.Conv2d(planes, planes * 4, kernel_size=1, stride=1, bias=False))
        layers.append(nn.BatchNorm2d(planes * 4))
        
        if downsample is not None:
            layers.append(downsample)
        
        self.inplanes = planes * 4
        return nn.Sequential(*layers)  # 将block转换为Sequential

    # 前向传播方法
    def forward(self, x):
        if not isinstance(x, torch.Tensor):
            raise ValueError("Input must be a Torch tensor.")
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)
        x = self.avgpool(x)
        x = x.view(x.size(0), -1)
        x = self.fc(x)
        return x


In [None]:
# InceptionResNetV2架构的示例代码
import torch



In [None]:
# resnext架构的示例代码
import torch
import torch.nn as nn
import torch.nn.functional as F

# 定义ResNeXt类，继承自nn.Module
class ResNeXt(nn.Module):
    # 初始化方法，设置网络结构及参数
    def __init__(self, block, layers, num_classes=1000):
        super(ResNeXt, self).__init__()
        self.inplanes = 64
        
        # 定义卷积层和其他层
        self.conv1 = nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3, bias=False)
        self.bn1 = nn.BatchNorm2d(64)
        self.relu = nn.ReLU(inplace=True)
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
        self.layer1 = self._make_layer(block, 64, layers[0])
        self.layer2 = self._make_layer(block, 128, layers[1], stride=2)
        self.layer3 = self._make_layer(block, 256, layers[2], stride=2)
        self.layer4 = self._make_layer(block, 512, layers[3], stride=2)
        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))  # 使用自适应平均池化
        self.fc = nn.Linear(512 * block.expansion, num_classes)

        # 权重初始化
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
            elif isinstance(m, nn.BatchNorm2d):
                nn.init.constant_(m.weight, 1)
                nn.init.constant_(m.bias, 0)

    # 构建网络的层
    def _make_layer(self, block, planes, blocks, stride=1):
        downsample = None
        if stride != 1 or self.inplanes != planes * block.expansion:
            downsample = nn.Sequential(
                nn.Conv2d(self.inplanes, planes * block.expansion, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(planes * block.expansion),
            )

        layers = [block(self.inplanes, planes, stride, downsample)]
        self.inplanes = planes * block.expansion
        layers.extend(block(self.inplanes, planes) for _ in range(1, blocks))  # 使用列表推导式简化代码

        return nn.Sequential(*layers)

    # 定义前向传播方法
    def forward(self, x):
        try:
            x = self.conv1(x)
            x = self.bn1(x)
            x = self.relu(x)
            x = self.maxpool(x)

            # 依次经过各层
            x = self.layer1(x)
            x = self.layer2(x)
            x = self.layer3(x)
            x = self.layer4(x)

            # 使用自适应平均池化，避免硬编码
            x = self.avgpool(x)
            x = torch.flatten(x, 1)  # 使用torch.flatten简化代码
            x = self.fc(x)

            return x
        except Exception as e:
            print(f"在前向传播中发生错误: {e}")
            return None  # 这里返回None或根据需要返回其他值

In [None]:
# 余弦退火算法的实现
import math
import torch
from torch.optim.lr_scheduler import _LRScheduler

class CosineAnnealingWarmupRestarts(_LRScheduler):
    """
        optimizer (Optimizer): Wrapped optimizer.
        first_cycle_steps (int): First cycle step size.
        cycle_mult(float): Cycle steps magnification. Default: -1.
        max_lr(float): First cycle's max learning rate. Default: 0.1.
        min_lr(float): Min learning rate. Default: 0.001.
        warmup_steps(int): Linear warmup step size. Default: 0.
        gamma(float): Decrease rate of max learning rate by cycle. Default: 1.
        last_epoch (int): The index of last epoch. Default: -1.
    """
    
    def __init__(self,
                 optimizer : torch.optim.Optimizer,
                 first_cycle_steps : int,
                 cycle_mult : float = 1.,
                 max_lr : float = 0.1,
                 min_lr : float = 0.001,
                 warmup_steps : int = 0,
                 gamma : float = 1.,
                 last_epoch : int = -1
        ):
        assert warmup_steps < first_cycle_steps
        
        self.first_cycle_steps = first_cycle_steps # first cycle step size
        self.cycle_mult = cycle_mult # cycle steps magnification
        self.base_max_lr = max_lr # first max learning rate
        self.max_lr = max_lr # max learning rate in the current cycle
        self.min_lr = min_lr # min learning rate
        self.warmup_steps = warmup_steps # warmup step size
        self.gamma = gamma # decrease rate of max learning rate by cycle
        
        self.cur_cycle_steps = first_cycle_steps # first cycle step size
        self.cycle = 0 # cycle count
        self.step_in_cycle = last_epoch # step size of the current cycle
        
        super(CosineAnnealingWarmupRestarts, self).__init__(optimizer, last_epoch)
        
        # set learning rate min_lr
        self.init_lr()
    
    def init_lr(self):
        self.base_lrs = []
        for param_group in self.optimizer.param_groups:
            param_group['lr'] = self.min_lr
            self.base_lrs.append(self.min_lr)
    
    def get_lr(self):
        if self.step_in_cycle == -1:
            return self.base_lrs
        elif self.step_in_cycle < self.warmup_steps:
            return [(self.max_lr - base_lr)*self.step_in_cycle / self.warmup_steps + base_lr for base_lr in self.base_lrs]
        else:
            return [base_lr + (self.max_lr - base_lr) \
                    * (1 + math.cos(math.pi * (self.step_in_cycle-self.warmup_steps) \
                                    / (self.cur_cycle_steps - self.warmup_steps))) / 2
                    for base_lr in self.base_lrs]

    def step(self, epoch=None):
        if epoch is None:
            epoch = self.last_epoch + 1
            self.step_in_cycle = self.step_in_cycle + 1
            if self.step_in_cycle >= self.cur_cycle_steps:
                self.cycle += 1
                self.step_in_cycle = self.step_in_cycle - self.cur_cycle_steps
                self.cur_cycle_steps = int((self.cur_cycle_steps - self.warmup_steps) * self.cycle_mult) + self.warmup_steps
        else:
            if epoch >= self.first_cycle_steps:
                if self.cycle_mult == 1.:
                    self.step_in_cycle = epoch % self.first_cycle_steps
                    self.cycle = epoch // self.first_cycle_steps
                else:
                    n = int(math.log((epoch / self.first_cycle_steps * (self.cycle_mult - 1) + 1), self.cycle_mult))
                    self.cycle = n
                    self.step_in_cycle = epoch - int(self.first_cycle_steps * (self.cycle_mult ** n - 1) / (self.cycle_mult - 1))
                    self.cur_cycle_steps = self.first_cycle_steps * self.cycle_mult ** (n)
            else:
                self.cur_cycle_steps = self.first_cycle_steps
                self.step_in_cycle = epoch
                
        self.max_lr = self.base_max_lr * (self.gamma**self.cycle)
        self.last_epoch = math.floor(epoch)
        for param_group, lr in zip(self.optimizer.param_groups, self.get_lr()):
            param_group['lr'] = lr

In [None]:
# 使用 SGD 优化器并添加余弦退火学习率调度器
optimizer_CosineLR = optim.SGD(net.parameters(), lr=0.0001, momentum=0.9)
CosineLR = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer_CosineLR, T_max=100, eta_min=0.000001)


In [None]:
# EMA-attention-module
import torch
from torch import nn
#GitHub地址：https://github.com/YOLOonMe/EMA-attention-module
#论文地址：https://arxiv.org/abs/2305.13563v2
class EMA(nn.Module):
    def __init__(self, channels, factor=8):
        super(EMA, self).__init__()
        self.groups = factor
        assert channels // self.groups > 0
        self.softmax = nn.Softmax(-1)
        self.agp = nn.AdaptiveAvgPool2d((1, 1))
        self.pool_h = nn.AdaptiveAvgPool2d((None, 1))
        self.pool_w = nn.AdaptiveAvgPool2d((1, None))
        self.gn = nn.GroupNorm(channels // self.groups, channels // self.groups)
        self.conv1x1 = nn.Conv2d(channels // self.groups, channels // self.groups, kernel_size=1, stride=1, padding=0)
        self.conv3x3 = nn.Conv2d(channels // self.groups, channels // self.groups, kernel_size=3, stride=1, padding=1)

    def forward(self, x):
        b, c, h, w = x.size()
        group_x = x.reshape(b * self.groups, -1, h, w)  # b*g,c//g,h,w
        x_h = self.pool_h(group_x)
        x_w = self.pool_w(group_x).permute(0, 1, 3, 2)
        hw = self.conv1x1(torch.cat([x_h, x_w], dim=2))
        x_h, x_w = torch.split(hw, [h, w], dim=2)
        x1 = self.gn(group_x * x_h.sigmoid() * x_w.permute(0, 1, 3, 2).sigmoid())
        x2 = self.conv3x3(group_x)
        x11 = self.softmax(self.agp(x1).reshape(b * self.groups, -1, 1).permute(0, 2, 1))
        x12 = x2.reshape(b * self.groups, c // self.groups, -1)  # b*g, c//g, hw
        x21 = self.softmax(self.agp(x2).reshape(b * self.groups, -1, 1).permute(0, 2, 1))
        x22 = x1.reshape(b * self.groups, c // self.groups, -1)  # b*g, c//g, hw
        weights = (torch.matmul(x11, x12) + torch.matmul(x21, x22)).reshape(b * self.groups, 1, h, w)
        return (group_x * weights.sigmoid()).reshape(b, c, h, w)


# 输入 B C H W,  输出 B C H W
if __name__ == '__main__':
    block = EMA(64).cuda()
    input = torch.rand(1, 64, 64, 64).cuda()
    output = block(input)
    print(input.size(), output.size())

In [None]:
# CBAM: Convolutional Block Attention Module
import torch
import torch.nn as nn
import math
class CBAM(nn.Module):
    def __init__(self,in_channel,reduction=16,kernel_size=7):
        super(CBAM, self).__init__()
        #通道注意力机制
        self.max_pool=nn.AdaptiveMaxPool2d(output_size=1)
        self.avg_pool=nn.AdaptiveAvgPool2d(output_size=1)
        self.mlp=nn.Sequential(
            nn.Linear(in_features=in_channel,out_features=in_channel//reduction,bias=False),
            nn.ReLU(),
            nn.Linear(in_features=in_channel//reduction,out_features=in_channel,bias=False)
        )
        self.sigmoid=nn.Sigmoid()
        #空间注意力机制
        self.conv=nn.Conv2d(in_channels=2,out_channels=1,kernel_size=kernel_size ,stride=1,padding=kernel_size//2,bias=False)

    def forward(self,x):
        #通道注意力机制
        maxout=self.max_pool(x)
        maxout=self.mlp(maxout.view(maxout.size(0),-1))
        avgout=self.avg_pool(x)
        avgout=self.mlp(avgout.view(avgout.size(0),-1))
        channel_out=self.sigmoid(maxout+avgout)
        channel_out=channel_out.view(x.size(0),x.size(1),1,1)
        channel_out=channel_out*x
        #空间注意力机制
        max_out,_=torch.max(channel_out,dim=1,keepdim=True)
        mean_out=torch.mean(channel_out,dim=1,keepdim=True)
        out=torch.cat((max_out,mean_out),dim=1)
        out=self.sigmoid(self.conv(out))
        out=out*channel_out
        return out
    
# 拆分版，提升效率
class ChannelAttention(nn.Module):
    def __init__(self, in_channel, reduction=16):
        super(ChannelAttention, self).__init__()
        self.max_pool = nn.AdaptiveMaxPool2d(output_size=1)
        self.avg_pool = nn.AdaptiveAvgPool2d(output_size=1)
        self.mlp = nn.Sequential(
            nn.Linear(in_features=in_channel, out_features=in_channel // reduction, bias=False),
            nn.ReLU(),
            nn.Linear(in_features=in_channel // reduction, out_features=in_channel, bias=False)
        )
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        max_out = self.max_pool(x)
        max_out = self.mlp(max_out.view(max_out.size(0), -1))
        avg_out = self.avg_pool(x)
        avg_out = self.mlp(avg_out.view(avg_out.size(0), -1))
        channel_out = self.sigmoid(max_out + avg_out).view(x.size(0), x.size(1), 1, 1)
        return channel_out * x


class SpatialAttention(nn.Module):
    def __init__(self, kernel_size=7):
        super(SpatialAttention, self).__init__()
        self.conv = nn.Conv2d(in_channels=2, out_channels=1, kernel_size=kernel_size, stride=1,
                              padding=kernel_size // 2, bias=False)
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        max_out, _ = torch.max(x, dim=1, keepdim=True)
        mean_out = torch.mean(x, dim=1, keepdim=True)
        out = torch.cat((max_out, mean_out), dim=1)
        return out * self.sigmoid(self.conv(out))


class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        # ... 其他层定义 ...
        self.channel_attention = ChannelAttention(64)
        self.spatial_attention = SpatialAttention()

    def forward(self, x):
        x = self.layers1(x)
        x = self.channel_attention(x)  # 使用通道注意力
        x = self.spatial_attention(x)   # 使用空间注意力
        x = self.layer2(x)
        # ... 其他操作 ...
        return x


In [None]:
from torch import nn


class SELayer(nn.Module):
    def __init__(self, channel, reduction=16):
        super(SELayer, self).__init__()
        self.avg_pool = nn.AdaptiveAvgPool2d(1)
        self.fc = nn.Sequential(
            nn.Linear(channel, channel // reduction, bias=False),
            nn.ReLU(inplace=True),
            nn.Linear(channel // reduction, channel, bias=False),
            nn.Sigmoid()
        )

    def forward(self, x):
        b, c, _, _ = x.size()
        y = self.avg_pool(x).view(b, c)
        y = self.fc(y).view(b, c, 1, 1)
        return x * y.expand_as(x)