# mobilenetV1

In [1]:
%matplotlib inline
import torch
from torch import nn
from torch.nn import functional as F
import torchvision.datasets as datasets
import torchvision.transforms as transforms
import torch.optim as optim
from tqdm import tqdm
import os
print(torch.__version__)

1.10.2


# 核心理念就是深度可分离卷积

也就是dw,pw卷积

![image.png](mobilenetv1_stru.png)

In [2]:
class DPConv(nn.Module):
    '''
    Depthwise conv + Pointwise conv
    '''
    def __init__(self,in_channel,out_channel,size=3,pad=1,strides=1):
        super().__init__()
        # 不理解这里为什么bias=False不带偏置项了
        self.conv1=nn.Conv2d(in_channel,in_channel,kernel_size=size,groups=in_channel,stride=strides,padding=1,bias=False)
        self.bn1=nn.BatchNorm2d(in_channel)
        # 1x1合并层,没有groups,也没有bias
        self.conv2=nn.Conv2d(in_channel,out_channel,kernel_size=1,stride=1,padding=0,bias=False)
        self.bn2=nn.BatchNorm2d(out_channel)
    def forward(self,x):
        out=self.conv1(x)
        out=F.relu(self.bn1(out))
        out=F.relu(self.bn2(self.conv2(out)))
        return out
    

In [3]:
class MobileNet(nn.Module):
    '''
    定义默认网络结构
    输入图片默认大小224x224x3
    输入要是32的倍数
    '''
    # 括号(128,2)表示输出通道128,stride=2
    cfg=(64, (128,2),128, (256,2), 256, (512,2), 
           512, 512, 512, 512, 512, (1024,2), 1024)
    # 第一层卷积的输出
    first_conv=32
    def __init__(self,input_channel=3,num_class=10):
        super().__init__()
        self.conv1=nn.Conv2d(input_channel,self.first_conv,kernel_size=3,padding=1,stride=2,bias=False)
        self.bn1=nn.BatchNorm2d(self.first_conv)
        self.bone=self.make_layers(self.first_conv)
        self.avg=nn.AdaptiveAvgPool2d((1,1))
        self.line=nn.Linear(1024,num_class)
    def make_layers(self,in_channel):
        layer=[]
        for x in self.cfg:
            if isinstance(x,tuple):
                layer.append(DPConv(in_channel,x[0],strides=x[1]))
                in_channel=x[0]
            elif isinstance(x,int):
                layer.append(DPConv(in_channel,x))
                in_channel=x
        return nn.Sequential(*layer)
    def forward(self,x):
        out=F.relu(self.bn1(self.conv1(x)))
        out=self.bone(out)
        out=self.line(self.avg(out).view(out.size(0),-1))
        return out

In [4]:
# 测试
device='cuda' if torch.cuda.is_available() else "cpu"
print('use device',device)
x=torch.randn(1,3,224,224)
print('input shape',x.shape)
net=MobileNet(num_class=10)
# print(net)
out=net(x)
print('out shape',out.shape)
net=net.to(device)
net

use device cuda
input shape torch.Size([1, 3, 224, 224])
out shape torch.Size([1, 10])


MobileNet(
  (conv1): Conv2d(3, 32, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
  (bn1): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (bone): Sequential(
    (0): DPConv(
      (conv1): Conv2d(32, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), groups=32, bias=False)
      (bn1): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv2): Conv2d(32, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    )
    (1): DPConv(
      (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), groups=64, bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv2): Conv2d(64, 128, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (bn2): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    )
    (2):

## 数据加载模块

In [5]:
#prepare dataset and preprocessing
transform_train = transforms.Compose([
    transforms.RandomCrop(32, padding=4),
    transforms.RandomHorizontalFlip(),
    # 至少要加上下面这句ToTensor
    transforms.ToTensor(),
    # ciaf10固有均值标准差
    transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010))
])

transform_test = transforms.Compose([
    # 至少要加上下面这句ToTensor
    transforms.ToTensor(),
    transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010))
])

In [6]:
batch=64
lr=0.01

In [7]:
# minist数据集
# 正常来说,train_set这个类是需要自己定义的,但是在官方数据集中已经给定义好了
data_dir="D:/data/image/"
if not os.path.exists(data_dir):
    # 尝试mac的文件夹
    data_dir="~/data"
    if not os.path.exists(data_dir):
            raise FileExistsError("data source not exist!")
train_set=datasets.CIFAR10(root=data_dir,
                        transform=transform_train,
                        train=True,
                        download=True)

val_set=datasets.CIFAR10(root=data_dir,
                        transform=transform_test,
                        train=False,
                        download=True)
train_set_len=len(train_set)
val_set_len=len(val_set)
print('train data',train_set_len)
print('val data',val_set_len)
train_loader = torch.utils.data.DataLoader(train_set, batch_size=batch,
                                         shuffle=True, num_workers=6)
val_loader = torch.utils.data.DataLoader(val_set, batch_size=batch,
                                         shuffle=False, num_workers=6)

Files already downloaded and verified
Files already downloaded and verified
train data 50000
val data 10000


显示结构图

```shell
# 使用之前先运行
~/PythonCode/selfProject/CommonNetwork   main ●  tensorboard --logdir ./mobilenet
```

In [8]:
# from torch.utils.tensorboard import SummaryWriter
# with SummaryWriter(log_dir='./', comment='mobilenet') as writer:
#     writer.add_graph(net, x)

## 求损失,单独写一个类来求

In [9]:
class CalcLoss(nn.Module):
    def __init__(self,num_classes=10):
        super().__init__()
        self.criterion=nn.CrossEntropyLoss()
    def forward(self,y_true,y_pred):
        return self.criterion(y_true,y_pred)


In [10]:
class TrainModel(object):
    _defaults={
        "eopch":2,
    }
    def __init__(self,net,loss,train_dataloder,optimizer,**kwargs):
        '''
        还支持传入字典参数
        '''
        self.__dict__.update(self._defaults)
        self.net=net
        self.loss=loss
        self.dataloder=train_dataloder
        self.optimizer=optimizer
        for name, value in kwargs.items():
            setattr(self, name, value)
        self.device='cuda' if torch.cuda.is_available() else "cpu"
    def train(self,train_total_len,batch_size):
        self.net.train()
        # 需要注意这里写的是train_set的长度,如果写错成train_loader,返回的是数据集一共有多少个batch
        with tqdm(total=train_total_len,desc=f'Train:') as pbar:
            for idx,data in enumerate(self.dataloder):
                data,label=data
                data,label=data.to(self.device),label.to(self.device)
                self.optimizer.zero_grad()
                # forward
                outputs=self.net(data)
                loss=self.loss(outputs,label)
                loss.backward()
                self.optimizer.step()
                # 更新进度条
                pbar.update(batch_size)


In [11]:
class TestModel(object):
    _defaults={
    "eopch":1,
    }
    def __init__(self,net,loss,val_dataloder,**kwargs):
        '''
        还支持传入字典参数
        '''
        self.__dict__.update(self._defaults)
        self.net=net
        self.loss=loss
        self.dataloder=val_dataloder
        for name, value in kwargs.items():
            setattr(self, name, value)
        self.device='cuda' if torch.cuda.is_available() else "cpu"
    def test(self,total_num,batch_size):
        # eval 模式下,dropout失效,bn层参数采用之前训练的,不更新
        self.net.eval()
        val_loss=0
        correct=0
        with tqdm(total=total_num,desc=f'Validation:') as pbar:
            # 不计算损失,这样速度更快
            with torch.no_grad():
                for idx,data in enumerate(self.dataloder):
                    x,y=data
                    x,y=x.to(self.device),y.to(self.device)
                    y_pre=self.net(x)
                    val_loss+=self.loss(y_pre,y).item()
                    # max 第一个返回的是元素值,第二个为索引值
                    # 求第一个维度的max,因此结果返回的是batch维度的max
                    # 返回是一个第一个元素为值,第二个元素为idx的tuple
                    pred=torch.max(y_pre,dim=1)[1]

                    # pred维度为batch,每个元素为索引
                    correct+=pred.eq(y).sum().item()
                    # 更新进度条
                    pbar.update(batch_size)
            # 格式化打印直接有% 带f%这种{:.2f%}是错的格式,format这种有点坑
            print("test loss {},accuracy {:.2%}".format(val_loss,correct/total_num))
        # 返回损失和准确率
        return (val_loss,correct/total_num)

In [12]:
optimizer=optim.SGD(net.parameters(),lr=lr,momentum=0.9)

In [13]:
loss=CalcLoss().to(device)
train=TrainModel(net,loss,train_loader,optimizer)
test=TestModel(net,loss,val_loader)

In [14]:
total_epoch=1
for i in range(total_epoch):
    train.train(train_set_len,batch)
    test.test(val_set_len,batch)

Train:: 50048it [00:22, 2228.65it/s]                                                                                                 
Validation:: 10048it [00:07, 1306.63it/s]                                                                                            

test loss 254.89687299728394,accuracy 39.92%





In [15]:
# torch.save(net.state_dict(),"loss100_accuracy78.pth")