In [1]:
import cv2
import math
import numpy as np
import torch.utils.data as data
from torchvision import transforms
from torch.utils.data import Dataset, DataLoader
import os,torch
import cv2
import torch.nn as nn
import torch.optim as optim
import torchvision
import torch.nn.functional as F
from torchvision import models
import argparse
import torchfile
from PIL import Image
from torchvision import datasets
import random

In [2]:
class EmotiWDataset(Dataset):
    
    def __init__(self, BoW_filelist, Train_Flag):
        """
        Args:
            filelist: List of names of image/feature files.
            root_dir: Dataset directory
            transform (callable, optional): Optional transformer to be applied
                                            on an image sample.
        """
        
        self.BoW_filelist = BoW_filelist
        self.Train_Flag = Train_Flag

        
        neg_filelist = sorted(os.listdir(BoW_filelist + 'Negative/'))
        neu_filelist = sorted(os.listdir(BoW_filelist + 'Neutral/'))
        pos_filelist = sorted(os.listdir(BoW_filelist + 'Positive/'))
        
        self.label = []
        neg_label = np.array(np.zeros(len(neg_filelist)),dtype = np.int64)
        neu_label = np.array(np.ones(len(neu_filelist)),dtype = np.int64)
        pos_label = np.array(2*np.ones(len(pos_filelist)),dtype = np.int64)
        
        self.label.extend(neg_label)
        self.label.extend(neu_label)
        self.label.extend(pos_label)
        
        self.file_paths = []
        
        for f in neg_filelist:
            path = os.path.join(self.BoW_filelist,'Negative/',f)
            self.file_paths.append(path)
        for f in neu_filelist:
            path = os.path.join(self.BoW_filelist,'Neutral/',f)
            self.file_paths.append(path)
        for f in pos_filelist:
            path = os.path.join(self.BoW_filelist,'Positive/',f)
            self.file_paths.append(path)       

    def __len__(self):
        return (len(self.file_paths)) 
 
    def __getitem__(self, idx):
        BoW_path = self.file_paths[idx]
        BoW = np.load(BoW_path)
        if self.Train_Flag is True:
            BoW_feature = BoW['train_BoW']

        else:
            BoW_feature = BoW['val_BoW']   
            
        label = self.label[idx]
    
        BoW_feature = torch.from_numpy(BoW_feature)
        BoW_feature = BoW_feature.to(torch.float32) 
        #SAMPLE
        return BoW_feature, label

In [3]:
os.environ["CUDA_VISIBLE_DEVICES"] = "1"

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

EPOCH = 50   
pre_epoch = 0  
LR = 0.001        


parser = argparse.ArgumentParser(description='PyTorch GAF_2 Training')
args = parser.parse_known_args()[0]


classes = ('Negative', 'Neutral', 'Positive')


train_dataset = EmotiWDataset(BoW_filelist='../Bag_of_Visual_Words/GAF_2_Train/',Train_Flag = True)

trainloader = DataLoader(train_dataset, shuffle=True, batch_size=256, num_workers=0)

val_dataset = EmotiWDataset(BoW_filelist='../Bag_of_Visual_Words/GAF_2_Val/',Train_Flag = False)

validationloader = DataLoader(val_dataset, shuffle =False, batch_size = 512, num_workers = 0)

In [4]:
class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.fc1_layer = nn.Sequential(nn.Linear(in_features=1600, out_features=4096), nn.ReLU(), nn.Dropout(0.5)) 
        self.fc2_layer = nn.Sequential(nn.Linear(in_features=4096, out_features=128), nn.ReLU(), nn.Dropout(0.5))
        self.fc3_layer = nn.Sequential(nn.Linear(in_features=128, out_features=3)) 
    def forward(self, x):
        x = self.fc1_layer(x)
        x = self.fc2_layer(x)
        x = self.fc3_layer(x)
        return x

In [5]:
net = Net()
net = net.to(device)

In [6]:
criterion = nn.CrossEntropyLoss()  #损失函数为交叉熵，多用于多分类问题
optimizer = optim.SGD(net.parameters(), lr=1e-1) #优化方式为mini-batch momentum-SGD，并采用L2正则化（权重衰减）


scheduler = optim.lr_scheduler.ExponentialLR(optimizer, gamma=0.99)

# 训练
if __name__ == "__main__":
    best_acc = 30  #2 
    with open("acc.txt", "w") as f:
        with open("log.txt", "w")as f2:
            for epoch in range(EPOCH):
                print('\nEpoch: %d' % (epoch + 1))
                net.train()
                sum_loss = 0.0
                correct = 0.0
                total = 0.0

                for i, data in enumerate(trainloader, 0):
                    torch.cuda.empty_cache()
                    # 准备数据
                    length = len(trainloader)
                    BoW_feature, labels = data
                    one_hot = torch.zeros(labels.shape[0], 3).scatter_(1, labels.unsqueeze(1), 1) 
                    one_hot = one_hot.to(device)                    
                    BoW_feature, labels = BoW_feature.to(device),labels.to(device)
                    optimizer.zero_grad()
                    outputs = net(BoW_feature)
                    
                    # forward + backward
                    loss = criterion(outputs, labels)
                    loss.backward()
                    optimizer.step()
                    
                    # 每训练1个batch打印一次loss和准确率
                    sum_loss += loss.item()
                    _, predicted = torch.max(outputs.data, 1)
                    total += labels.size(0)
                    correct += predicted.eq(labels.data).cpu().sum()
                    print('[epoch:%d, iter:%d] Loss: %.03f | Acc: %.3f%% '
                          % (epoch + 1, (i + 1 + epoch * length), sum_loss / (i + 1), 100. * correct / total))
                    f2.write('%03d  %05d |Loss: %.03f | Acc: %.3f%% |'
                          % (epoch + 1, (i + 1 + epoch * length), sum_loss / (i + 1), 100. * correct / total))
                    f2.write('\n')
                    f2.flush()
  
                scheduler.step()
                torch.cuda.empty_cache()
                # 每训练完一个epoch测试一下准确率
                print("Waiting Test!")
                with torch.no_grad():
                    correct = 0
                    total = 0
                    class_correct = list(0. for j in range(3)) # 定义一个存储每类中测试正确的个数的 列表，初始化为0
                    class_total = list(0. for j in range(3))   # 定义一个存储每类中测试总数的个数的 列表，初始化为0
                    for data in validationloader:
                        net.eval()
                        BoW_feature, labels = data
                        BoW_feature, labels = BoW_feature.to(device),labels.to(device)                       
                        outputs = net(BoW_feature)
                        torch.cuda.empty_cache()
                        _, predicted = torch.max(outputs.data, 1)
                        c = (predicted == labels).squeeze() 
                        total += labels.size(0)
                        correct += (predicted == labels).sum()                       
                        if  labels.shape[0] == 1:
                            class_correct[labels] += c
                        else:
                            for j in range(labels.shape[0]):      # 因为每个batch都有多张图片，所以还需要一个小循环
                                label = labels[j]   # 对各个类的进行各自累加
                                class_correct[label] += c[j]
                                class_total[label] += 1    
                        
                    for j in range(3):
                        print('Accuracy of %5s : %.3f %%' % (
                                classes[j], 100 * class_correct[j] // class_total[j]))
                    print('验证集分类准确率为：%.3f%%' % (100 * correct // total))
                    acc = 100. * correct / total
                    # 将每次测试结果实时写入acc.txt文件中
                    f.write("EPOCH=%03d,Accuracy= %.3f%%" % (epoch + 1, acc))
                    f.write('\n')
                    f.flush()
                    if acc > best_acc:
                        print('Saving model......')
                        #torch.save(net.state_dict(), '%snet_%03d.pth' % (args.outf, epoch + 1))
                        torch.save(net.state_dict(), 'GAF_2_best_net_BoW.pth')
                        # 记录最佳测试分类准确率并写入best_acc.txt文件中
                        f3 = open("best_acc.txt", "w")
                        f3.write("EPOCH=%d,best_acc= %.3f%%" % (epoch + 1, acc))
                        f3.close()
                        best_acc = acc

            print("Training Finished, TotalEPOCH=%d" % EPOCH)


Epoch: 1
[epoch:1, iter:1] Loss: 1.107 | Acc: 26.562% 
[epoch:1, iter:2] Loss: 1.103 | Acc: 30.078% 
[epoch:1, iter:3] Loss: 1.101 | Acc: 32.422% 
[epoch:1, iter:4] Loss: 1.102 | Acc: 31.348% 
[epoch:1, iter:5] Loss: 1.103 | Acc: 31.016% 
[epoch:1, iter:6] Loss: 1.102 | Acc: 31.250% 


KeyboardInterrupt: 