In [8]:
import torch
from torch import nn
import torch.nn.functional as F
import math

In [2]:
# 通道混洗
def channels_shuffle(x, group):
    batch_size, num_channels, h, w = x.detach().size()
    channels_per_group = num_channels // group
    # reshape
    x = x.view(batch_size, group, channels_per_group, h, w)
    # transpose
    x = torch.transpose(x, 1, 2).contiguous()
    # flatten
    x = x.view(batch_size, -1, h, w)
    return x

In [3]:
class Bottleneck(nn.Module):  # shuffle Net 模仿的是resnet bottleblock的结构
    def __init__(self, in_planes, out_planes, stride, groups):
        super(Bottleneck, self).__init__()
        self.stride = stride

        mid_planes = out_planes // 4  # 每个ShuffleNet unit的bottleneck通道数为输出的1/4(和ResNet设置一致)
        self.groups = 1 if in_planes == 24 else groups  # 第一层卷积之后是24，所以不必group
        self.conv1 = nn.Conv2d(in_planes, mid_planes, kernel_size=1, groups=self.groups, bias=False)
        self.bn1 = nn.BatchNorm2d(mid_planes)
        self.conv2 = nn.Conv2d(mid_planes, mid_planes, kernel_size=3, stride=stride, padding=1, groups=mid_planes, bias=False)  # 这里应该用dw conv的
        self.bn2 = nn.BatchNorm2d(mid_planes)
        self.conv3 = nn.Conv2d(mid_planes, out_planes, kernel_size=1, groups=groups, bias=False)
        self.bn3 = nn.BatchNorm2d(out_planes)
        if stride == 2:
            self.shortcut = nn.Sequential(nn.AvgPool2d(3, stride=2, padding=1))  # 每个阶段第一个block步长是2，下个阶段通道翻倍

    @staticmethod
    def shuffle_channels(x, groups):
        '''Channel shuffle: [N,C,H,W] -> [N,g,C/g,H,W] -> [N,C/g,g,H,W] -> [N,C,H,W]'''
        '''一共C个channel要分成g组混合的channel，先把C reshape成(g, C/g)的形状，然后转置成(C/g, g)最后平坦成C组channel'''
        N, C, H, W = x.size()
        return x.view(N, groups, C // groups, H, W).permute(0, 2, 1, 3, 4).contiguous().view(N, C, H, W)  # 因为x之前view过了，他的内存不连续了，需要contiguous来规整一下

    def forward(self, x):
        out = F.relu(self.bn1(self.conv1(x)))
        out = self.shuffle_channels(out, self.groups)
        out = F.relu(self.bn2(self.conv2(out)))
        out = self.bn3(self.conv3(out))

        if self.stride == 2:
            res = self.shortcut(x)
            out = F.relu(torch.cat([out, res], 1))  # shuffle-net 对做了下采样的res用的是cat而不是+
        else:
            out = F.relu(out + x)
        return out

In [9]:
class ShuffleNet(nn.Module):

    def __init__(self, out_planes, num_blocks, groups, num_classes=None, depth_multiplier=1.):  # depth_multiplier是控制通道数的缩放因子
        super(ShuffleNet, self).__init__()
        self.num_classes = num_classes
        self.in_planes = int(24 * depth_multiplier)  # 通常第一个卷积的输出为24
        self.out_planes = [int(depth_multiplier * x) for x in out_planes]

        self.conv1 = nn.Conv2d(3, self.in_planes, kernel_size=3, stride=2, bias=False)
        self.bn1 = nn.BatchNorm2d(self.in_planes)
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)

        # self.bn1 = nn.BatchNorm2d(24)
        # self.in_planes = 24
        layers = []
        for out_plane, num_block in zip(out_planes, num_blocks):
            layers.append(self._make_layer(out_plane, num_block, groups))
        self.layers = nn.ModuleList(layers)  # 把list里面的每一个元素变成一个module
        if num_classes is not None:
            self.avgpool = nn.AdaptiveMaxPool2d(1)
            self.fc = nn.Linear(out_planes[-1], num_classes)

#         for m in self.modules():
#             if isinstance(m, nn.Conv2d):
#                 n = m.kernel_size[0] * m.kernel_size[1] * m.out_channels
#                 m.weight.data.normal_(0, math.sqrt(2. / n))
#             elif isinstance(m, nn.BatchNorm2d):
#                 m.weight.data.fill_(1)
#                 m.bias.data.zero_()

    def _make_layer(self, out_planes, num_blocks, groups):
        layers = []
        for i in range(num_blocks):
            stride = 2 if i == 0 else 1  # 如果是第一个block就是2 否则就是1
            cat_planes = self.in_planes if i == 0 else 0  # 因为第一个要下采样并且cat，所以为了下一个block加的时候能够通道匹配，要先减掉cat的通道数
            layers.append(Bottleneck(self.in_planes, out_planes - cat_planes, stride=stride, groups=groups))
            self.in_planes = out_planes  # 第一个过后input就都是out_planes
        return nn.Sequential(*layers)

    @property
    def layer_channels(self):
        return self.out_planes

    def forward(self, x):
        x = F.relu(self.bn1(self.conv1(x)))
        x = self.maxpool(x)

        c = []
        for i in range(len(self.layers)):
            x = self.layers[i](x)
            c.append(x)

        if self.num_classes is not None:
            x = self.avgpool(x)
            x = x.view(x.size(0), -1)
            x = self.fc(x)
            return x
        else:  # 返回每一个阶段的特征
            return c

In [10]:
def shufflenet(**kwargs):  # group = 3 论文实验中效果最好的
    planes = [240, 480, 960]
    layers = [4, 8, 4]
    model = ShuffleNet(planes, layers, groups=3, **kwargs)
    return model

In [11]:
Shufflenet = shufflenet()

In [12]:
Shufflenet

ShuffleNet(
  (conv1): Conv2d(3, 24, kernel_size=(3, 3), stride=(2, 2), bias=False)
  (bn1): BatchNorm2d(24, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
  (layers): ModuleList(
    (0): Sequential(
      (0): Bottleneck(
        (conv1): Conv2d(24, 54, kernel_size=(1, 1), stride=(1, 1), bias=False)
        (bn1): BatchNorm2d(54, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (conv2): Conv2d(54, 54, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), groups=54, bias=False)
        (bn2): BatchNorm2d(54, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (conv3): Conv2d(54, 216, kernel_size=(1, 1), stride=(1, 1), groups=3, bias=False)
        (bn3): BatchNorm2d(216, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (shortcut): Sequential(
          (0): AvgPool2d(kernel_size=3, stride=2, padding=1)
        )
      )

In [14]:
data = torch.randn(1, 3, 224, 224)
out = Shufflenet(data)

In [21]:
import numpy as np
out = np.array(out)

In [24]:
out.shape

(3,)