In [None]:
%matplotlib inline

对于网络的训练和测试，首先需要定义网络，自己定义了一个简单的网络如下:
---

In [None]:
import os
import math
import torch
import torch.nn as nn
import simple_utils

In [None]:
class ACNN(nn.Module):
    '''ACNN 自己创建的神经网络，用于识别面部表情

        n_class 表示输出的分类数
    '''
    def __init__(self, n_classes=7):
        # nn.Module子类的函数必须在构造函数中执行父类的构造函数
        super(ACNN, self).__init__()
        self.input_size = 96  # 输入96*96大小的图片
        self.features = nn.Sequential(
            nn.Conv2d(in_channels=1, out_channels=64, kernel_size=9, stride=1),  # 64, 88, 88
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),  # 64, 44, 44
            nn.Conv2d(in_channels=64, out_channels=64, kernel_size=7, stride=1),  # 64, 38, 38
            nn.ReLU(inplace=True),
            nn.Conv2d(in_channels=64, out_channels=64, kernel_size=7, stride=1),  # 64, 32, 32
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=1),  # 64, 31, 31
            nn.Conv2d(in_channels=64, out_channels=64, kernel_size=5, stride=1),  # 64, 27, 27
            nn.ReLU(inplace=True),
            nn.Conv2d(in_channels=64, out_channels=64, kernel_size=5, stride=1),  # 64, 23, 23
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=1),  # 64, 22, 22
            nn.Conv2d(in_channels=64, out_channels=64, kernel_size=3, stride=1),  # 64, 20, 20
            nn.ReLU(inplace=True),
            nn.Conv2d(in_channels=64, out_channels=64, kernel_size=3, stride=1),  # 64, 18, 18
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=1),  # 64, 17, 17
            nn.Conv2d(in_channels=64, out_channels=64, kernel_size=3, stride=1),  # 64, 15, 15
            nn.ReLU(inplace=True),
            nn.Conv2d(in_channels=64, out_channels=64, kernel_size=3, stride=1),  # 64, 13, 13
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=1),  # 64, 12, 12
        )
        self.classifier = nn.Sequential(
            nn.Dropout(p=0.6),
            nn.Linear(64 * 12 * 12, n_classes),
            nn.Softmax(1),
        )
        self._initialize_weights()

    def forward(self, x):
        # 通过特征层
        x = self.features(x)
        # 压成1维
        x = x.view(-1, self.num_flat_features(x))
        # 通过全连接层并经过Softmax
        x = self.classifier(x)
        return x

    def num_flat_features(self, x):
        '''获取扁平后的特征数量'''
        size = x.size()[1:]  # all dimensions except the batch dimension
        num_features = 1
        for s in size:
            num_features *= s
        return num_features

    def _initialize_weights(self):
        '''初始化权重，可以自己定义，可以不定义'''
        for layer in self.named_modules():
            if isinstance(layer[1], nn.Conv2d):
                n = layer[1].kernel_size[0] * layer[1].kernel_size[1] * layer[1].out_channels
                layer[1].weight.data.normal_(0, math.sqrt(2. / n))
                if layer[1].bias is not None:
                    layer[1].bias.data.zero_()
            elif isinstance(layer[1], nn.Linear):
                layer[1].weight.data.normal_(0, 0.01)
                layer[1].bias.data.zero_()


In [None]:
net = ACNN(7)
print(net)
simple_utils.num_of_parameters_of_net(net)

网络定义完成后，就可以用封装好的数据集进行训练了
---

In [1]:
# coding=utf-8
import os
import torch
import torch.optim
from torch.autograd import Variable
import torch.nn as nn
import numpy as np
import argparse
import time

from CKPlus_DataSet import CKPlus
from VGG import *
import transforms.transforms as transforms
import simple_utils as utils

use_cuda = torch.cuda.is_available()
DEVICE = torch.device("cuda" if use_cuda else "cpu")  # 让torch判断是否使用GPU，建议使用GPU环境，因为会快很多
print('cuda available: ', use_cuda)
print('using DEVICE: ', DEVICE)

cuda available:  True
using DEVICE:  cuda


In [2]:
# 如果在 .py 文件中，则可以如下定义，从命令行执行的时候，根据 --Variabels 来指定参数
# parser = argparse.ArgumentParser(description='PyTorch CNN Training With JAFFE')
# # 存储的模型序号
# parser.add_argument('--save_number', default=1, type=int, help='save_number')
# # 批次大小
# parser.add_argument('--bs', default=4, type=int, help='batch_size')
# # 学习率
# parser.add_argument('--lr', default=0.01, type=float, help='learning rate')
# # epoch
# parser.add_argument('--epoch', default=200, type=int, help='training epoch num')
# # 每次获得到更优的准确率后，会进行一次存储，此选项选择是否从上次存储位置继续
# parser.add_argument('--resume', default=True, type=bool, help='resume training from last checkpoint')
# 表示默认从第 $lrd_se 次epoch开始进行lr的递减
# parser.add_argument('--lrd_se', default=180, type=int, help='learning rate decay start epoch')
# 表示默认每经过2次epoch进行一次递减
# parser.add_argument('--lrd_s', default=2, type=int, help='learning rate decay step')
# 表示每次的lr的递减率，默认每递减一次乘一次0.9
# parser.add_argument('--lrd_r', default=0.9, type=float, help='learning rate decay rate')
# opt = parser.parse_args()

# 在这里使用一个类进行替代
class OPT:
    def __init__(self):
        # 存储的模型序号
        self.save_number = 1
        # 批次大小
        self.bs = 4
        # 学习率
        self.lr = 0.008
        # epoch
        self.epoch = 200
        # 每次获得到更优的准确率后，会进行一次存储，此选项选择是否从上次存储位置继续
        self.resume = False
        self.dataset = "CKPlus"
        # 表示默认从第 $lrd_se 次epoch开始进行lr的递减
        self.lrd_se = 100
        # 表示默认每经过2次epoch进行一次递减
        self.lrd_s = 10
        # 表示每次的lr的递减率，默认每递减一次乘一次0.9
        self.lrd_r = 0.9
opt = OPT()

train_acc_map = {'best_acc': 0, 'best_acc_epoch': -1}
test_acc_map = {'best_acc': 0, 'best_acc_epoch': -1}
Train_acc, Test_acc = 0., 0.

In [3]:
print("------------Preparing Data...----------------")
train_data = CKPlus(is_train=True, img_dir_pre_path="data/CK+")
test_data = CKPlus(is_train=False, img_dir_pre_path="data/CK+")
print("------------%s Data Already be Prepared------------" % opt.dataset)

------------Preparing Data...----------------
train_num:  906  test_num: 75
train_num:  906  test_num: 75
------------CKPlus Data Already be Prepared------------


In [4]:
print("------------Preparing Model...----------------")
n_classes = 7
net_to_save_dir = "Saved_Models"
net_to_save_path = os.path.join(net_to_save_dir, str(opt.save_number),
                                opt.dataset + '_' + str(opt.save_number))
saved_model_name = "Best_model.t7"
saved_temp_model_name = "Best_model_temp.t7"
model_over_flag_name = "__%d_success__" % (opt.epoch)
history_file_name = "history.txt"

over_flag = False  # 如果已经成功训练完，就可以结束了
TEMP_EPOCH = 5  # 用于暂时存储，每TEMP_EPOCH次存一次
temp_internal = TEMP_EPOCH

# net = ACNN(n_classes=n_classes).to(DEVICE)
net = vgg11_bn(n_classes=n_classes).to(DEVICE)

start_epoch = 0
if opt.resume:
    # Load checkpoint.
    print('==> Loading Model Parameters...')
    if os.path.exists(os.path.join(net_to_save_path, saved_temp_model_name)):
        if os.path.exists(os.path.join(net_to_save_path, model_over_flag_name)):
            print("Model trained over flag checked!")
            over_flag = True
        assert os.path.isdir(net_to_save_path), 'Error: no checkpoint directory found!'
        checkpoint = torch.load(os.path.join(net_to_save_path, saved_temp_model_name))
        net.load_state_dict(checkpoint['net'])
        test_acc_map['best_acc'] = checkpoint['best_test_acc']
        test_acc_map['best_acc_epoch'] = checkpoint['best_test_acc_epoch']
        start_epoch = checkpoint['cur_epoch'] + 1
    else:
        print("Checkout File not Found, No initialization.")
print("------------Model Already be Prepared------------")

------------Preparing Model...----------------
------------Model Already be Prepared------------


In [5]:
input_img_size = net.input_size

IMG_MEAN = [0.5]
IMG_STD = [0.225]
transform_train = transforms.Compose([
    transforms.Resize(input_img_size),  # 缩放将图片的最小边缩放为 input_img_size，因此如果输入是非正方形的，那么输出也不是正方形的
    transforms.CenterCrop(input_img_size),
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(30),
    transforms.ToTensor(),
    transforms.Normalize(IMG_MEAN, IMG_STD),
])
transform_test = transforms.Compose([
    transforms.Resize(input_img_size),  # 缩放将图片的最小边缩放为 input_img_size，因此如果输入是非正方形的，那么输出也不是正方形的
    transforms.CenterCrop(input_img_size),
    transforms.ToTensor(),
    transforms.Normalize(IMG_MEAN, IMG_STD),
])
train_data.set_transform(transform_train)
test_data.set_transform(transform_test)
train_loader = torch.utils.data.DataLoader(train_data, batch_size=opt.bs, shuffle=True)
test_loader = torch.utils.data.DataLoader(test_data, batch_size=opt.bs, shuffle=False)
# 交叉熵损失函数
criterion = nn.CrossEntropyLoss()
# 随机梯度下降 优化
optimizer = torch.optim.SGD(net.parameters(), lr=opt.lr, momentum=0.9, weight_decay=5e-4)

In [6]:
# Training
def train(epoch):
    # 根据训练的epoch次数来降低learning rate
    if epoch >= opt.lrd_se > 0:
        frac = ((epoch - opt.lrd_se) // opt.lrd_s) + 1
        decay_factor = opt.lrd_r ** frac
        current_lr = opt.lr * decay_factor  # current_lr = opt.lr * 降低率 ^ ((epoch - 开始decay的epoch) // 每次decay的epoch num)
        utils.set_lr(optimizer, current_lr)  # set the learning rate
    else:
        current_lr = opt.lr
    print('learning_rate: %s' % str(current_lr))
    
    global Train_acc
    net.train()
    train_loss = 0
    correct = 0
    total = 0
    cur_train_acc = 0.
    time_start = time.time()
    for batch_idx, (inputs, targets) in enumerate(train_loader):
        optimizer.zero_grad()  # 优化的梯度清零
        if use_cuda:
            inputs, targets = inputs.to(DEVICE), targets.to(DEVICE, torch.long)
        outputs = net(inputs)
        # print("outputs:", outputs)
        # print("targets:", targets)
        loss = criterion(outputs, targets)
        loss.backward()
        utils.clip_gradient(optimizer, 2*current_lr)  # 解决梯度爆炸 https://blog.csdn.net/u010814042/article/details/76154391
        optimizer.step()

        train_loss += float(loss.data)
        _, predicted = torch.max(outputs.data, 1)  # torch.max() 加上dim参数后，返回值为 max_value, max_value_index
        ground_value = targets.data
        # print("predicted:", predicted)
        # print("ground_value:", ground_value)

        total += targets.size(0)
        correct += predicted.eq(ground_value.data).cpu().sum()
        # print("equal: ", predicted.eq(ground_value.data).cpu())
        cur_train_acc = float(correct) / float(total) * 100.

        time_end = time.time()
        duration = time_end - time_start
        utils.progress_bar(batch_idx, len(train_loader), 'Time: %.2fs | Loss: %.3f | Acc: %.3f%% (%d/%d)' %
                           (duration, train_loss / (batch_idx + 1), cur_train_acc, correct, total))

        # 删除无用的变量，释放显存
        del loss
        del inputs
        del outputs
        del predicted
    Train_acc = cur_train_acc
    write_history('Train', epoch, cur_train_acc, train_loss / (batch_idx + 1), None)

# Testing
def test(epoch):
    global Test_acc
    private_test_loss = 0
    net.eval()
    correct = 0
    total = 0
    cur_test_acc = 0.
    correct_map = [0, 0, 0, 0, 0, 0, 0]
    time_start = time.time()
    with torch.no_grad():
        for batch_idx, (inputs, targets) in enumerate(test_loader):
            if use_cuda:
                inputs, targets = inputs.to(DEVICE), targets.to(DEVICE, torch.long)
            outputs = net(inputs)

            loss = criterion(outputs, targets)
            private_test_loss += float(loss.data)
            _, predicted = torch.max(outputs.data, 1)
            ground_value = targets.data

            for i in range(len(predicted)):
                if predicted[i] == ground_value[i]:
                    correct_map[predicted[i].item()] += 1

            total += targets.size(0)
            correct += predicted.eq(ground_value.data).cpu().sum()
            cur_test_acc = float(correct) / float(total) * 100.

            time_end = time.time()
            duration = time_end - time_start
            utils.progress_bar(batch_idx, len(test_loader), 'Time: %.2fs | Loss: %.3f | Acc: %.3f%% (%d/%d)' %
                               (duration, private_test_loss / (batch_idx + 1), cur_test_acc, correct, total))

            # 删除无用的变量，释放显存
            del loss
            del inputs
            del outputs
            del predicted

    Test_acc = cur_test_acc
    if test_acc_map['best_acc'] < Test_acc:
        train_acc_map['best_acc'] = Train_acc
        train_acc_map['best_acc_epoch'] = epoch
        test_acc_map['best_acc'] = Test_acc
        test_acc_map['best_acc_epoch'] = epoch
        print('Saving net to %s' % net_to_save_path)
        print('best_acc: %0.3f' % test_acc_map['best_acc'])
        print('correct_map: %s' % correct_map)
        state = {'net': net.state_dict() if use_cuda else net,
                 'best_test_acc': test_acc_map['best_acc'],
                 'best_test_acc_epoch': test_acc_map['best_acc_epoch'],
                 'best_train_acc': train_acc_map['best_acc'],
                 'best_train_acc_epoch': train_acc_map['best_acc_epoch'],
                 'cur_epoch': epoch,
                 'correct_map': correct_map,
                 }
        torch.save(state, os.path.join(net_to_save_path, saved_model_name))
    write_history('Test', epoch, cur_test_acc, private_test_loss / (batch_idx + 1), correct_map)


def write_history(train_or_test, epoch, acc, loss, predictions):
    '''
    将数据写入history.txt文件保存
    :param train_or_test: 训练过程还是测试过程（'Train' or 'Test'）
    :param epoch: 迭代次数
    :param acc: 准确率
    :param loss: 损失
    :param predictions: 预测情况
    :return: 无
    '''
    with open(os.path.join(net_to_save_path, history_file_name), "a+", encoding="utf-8") as history_file:
        msg = train_or_test + " %d %.3f %.3f " % (epoch, acc, loss)
        if predictions:
            msg += str(predictions)
        msg += "\n"
        history_file.write(msg)
        history_file.flush()


def save_over_flag():
    '''
    创建一个空文件表示训练完成
    :return: 无
    '''
    file_path = os.path.join(net_to_save_path, model_over_flag_name)
    with open(file_path, "w+", encoding="utf-8") as file:
        file.write(train_acc_map.__str__())
        file.write("\n")
        file.write(test_acc_map.__str__())
        file.write("\n")
        file.flush()

In [7]:
# 创建存储模型的文件夹
if not os.path.isdir(net_to_save_dir):
    os.mkdir(net_to_save_dir)
if not os.path.isdir(os.path.join(net_to_save_dir, str(opt.save_number))):
    os.mkdir(os.path.join(net_to_save_dir, str(opt.save_number)))
if not os.path.isdir(net_to_save_path):
    os.mkdir(net_to_save_path)
    
if not over_flag:
    for epoch in range(start_epoch, opt.epoch, 1):
        print('\n------------Epoch: %d-------------' % epoch)
        train(epoch)
        test(epoch)
        temp_internal -= 1
        if temp_internal <= 0:
            temp_internal = TEMP_EPOCH
            print("Saving Temp Model...")
            state = {'net': net.state_dict() if use_cuda else net,
                     'best_test_acc': test_acc_map['best_acc'],
                     'best_test_acc_epoch': test_acc_map['best_acc_epoch'],
                     'best_train_acc': train_acc_map['best_acc'],
                     'best_train_acc_epoch': train_acc_map['best_acc_epoch'],
                     'cur_epoch': epoch,
                     }
            torch.save(state, os.path.join(net_to_save_path, saved_temp_model_name))
    print(train_acc_map)
    print(test_acc_map)
    save_over_flag()
print("Trained Over")


------------Epoch: 0-------------
learning_rate: 0.008
Saving net to Saved_Models\1\CKPlus_1
best_acc: 28.000
correct_map: [0, 0, 0, 0, 0, 0, 21]

------------Epoch: 1-------------
learning_rate: 0.008

------------Epoch: 2-------------
learning_rate: 0.008

------------Epoch: 3-------------
learning_rate: 0.008

------------Epoch: 4-------------
learning_rate: 0.008
Saving Temp Model...

------------Epoch: 5-------------
learning_rate: 0.008

------------Epoch: 6-------------
learning_rate: 0.008

------------Epoch: 7-------------
learning_rate: 0.008

------------Epoch: 8-------------
learning_rate: 0.008

------------Epoch: 9-------------
learning_rate: 0.008
Saving Temp Model...

------------Epoch: 10-------------
learning_rate: 0.008

------------Epoch: 11-------------
learning_rate: 0.008

------------Epoch: 12-------------
learning_rate: 0.008

------------Epoch: 13-------------
learning_rate: 0.008

------------Epoch: 14-------------
learning_rate: 0.008
Saving Temp Model...



KeyboardInterrupt: 