In [1]:
import torch     
import torch.nn as nn    
import torchvision    
from torch.utils.data import Dataset, DataLoader        
from torchvision import transforms, utils     
from PIL import Image  

In [2]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [3]:
# 定义超参数
# RNN的输入是一个序列，sequence_length为序列长度，input_size为序列每个长度。
sequence_length = 28*3
input_size = 28
# 定义RNN隐含单元的大小。
hidden_size = 128
# 定义rnn的层数
num_layers = 2
# 识别的类别数量
num_classes = 7
# 批的大小
batch_size = 32
# 定义迭代次数
num_epochs = 50
# 定义学习率
learning_rate = 0.001

In [4]:
def default_loader(path):        
    # 注意要保证每个batch的tensor大小时候一样的。        
    return Image.open(path).convert('RGB')   

In [5]:
class MyDataset(Dataset):        
    def __init__(self, txt, transform=None, target_transform=None, loader=default_loader):        
        fh = open(txt, 'r')        
        imgs = []        
        for line in fh:        
            line = line.strip('\n')        
            # line = line.rstrip()        
            words = line.split(' ')        
            imgs.append((words[0],int(words[1])))        
        self.imgs = imgs        
        self.transform = transform        
        self.target_transform = target_transform        
        self.loader = loader        
            
    def __getitem__(self, index):        
        fn, label = self.imgs[index]        
        img = self.loader(fn)        
        if self.transform is not None:        
            img = self.transform(img)        
        return img,label        
            
    def __len__(self):        
        return len(self.imgs)  

In [6]:
def get_loader(dataset='/mnt/emotion_recognition/dataset/basic/Image/rnntrain_label.txt', crop_size=128, image_size=28, batch_size=2, mode='train', num_workers=1):        
    """Build and return a data loader."""        
    transform = []        
    if mode == 'train':        
        transform.append(transforms.RandomHorizontalFlip())        
    transform.append(transforms.CenterCrop(crop_size))        
    transform.append(transforms.Resize(image_size))        
    transform.append(transforms.ToTensor())        
    transform.append(transforms.Normalize(mean=(0.5, 0.5, 0.5), std=(0.5, 0.5, 0.5)))        
    transform = transforms.Compose(transform)        
    train_data=MyDataset(txt=dataset, transform=transform)        
    data_loader = DataLoader(dataset=train_data,        
                                  batch_size=batch_size,        
                                  shuffle=(mode=='train'),        
                                  num_workers=num_workers)        
    return data_loader 

In [7]:
# 注意要保证每个batch的tensor大小时候一样的。        
# data_loader = DataLoader(train_data, batch_size=2,shuffle=True)        
train_loader = get_loader('/mnt/emotion_recognition/dataset/basic/Image/rnntrain_label.txt', batch_size=batch_size)        
print(len(train_loader))        
test_loader = get_loader('/mnt/emotion_recognition/dataset/basic/Image/rnntest_label.txt', batch_size=batch_size)        
print(len(test_loader))  
print(train_loader)

384
96
<torch.utils.data.dataloader.DataLoader object at 0x7fe5606dc880>


In [13]:
# 定义RNN（LSTM）
class VRNN(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, num_classes):
        super(VRNN, self).__init__()
        self.maxpool1 = nn.Sequential(
            nn.Conv2d(3,64,kernel_size=3,stride=1,padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(64,64,kernel_size=3,stride=1,padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2,stride=2)
            )
        self.maxpool2 = nn.Sequential(
            nn.Conv2d(64,128,kernel_size=3,stride=1,padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(128,128,kernel_size=3,stride=1,padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2,stride=2)
            )
        self.maxpool3 = nn.Sequential(
            nn.Conv2d(128, 256, kernel_size=3,stride=1, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(256, 256, kernel_size=3,stride=1, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(256, 256, kernel_size=3,stride=1, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2)
            )
        
        self.maxpool4 = nn.Sequential(
            nn.Conv2d(256, 512, kernel_size=3,stride=1, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(512, 512, kernel_size=3,stride=1, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(512, 512, kernel_size=3,stride=1, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2)
            )
        
        self.maxpool5= nn.Sequential(
            nn.Conv2d(512, 512, kernel_size=3,stride=1, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(512, 512, kernel_size=3,stride=1, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(512, 512, kernel_size=3,stride=1, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2)
            )
        
        

        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, num_classes)
        self.dense = nn.Sequential(
            nn.Linear(512 * 5 * 5, 4096),
            nn.ReLU(),
            nn.Linear(4096, 4096),
            nn.ReLU(),
            nn.Linear(4096, 1000)
            )
    def forward(self, x):
        pool1=self.maxpool1(x)
        pool2=self.maxpool2(pool1)
        pool3=self.maxpool3(pool2)
        pool4=self.maxpool4(pool3)
        pool5=self.maxpool5(pool4)
        
        flat = pool5.view(pool5.size(0), -1)
        class_ = self.dense(flat)

        # Set initial hidden and cell states 
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(device) 
        c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(device)
        
        # Forward propagate LSTM
        out, _ = self.lstm(x, (h0, c0))  # out: tensor of shape (batch_size, seq_length, hidden_size)
        
        # Decode the hidden state of the last time step
        out = self.fc(out[:, -1, :])
        return out

In [14]:
model = VRNN(input_size, hidden_size, num_layers, num_classes).to(device)

In [15]:
print(model)

VRNN(
  (maxpool1): Sequential(
    (0): Conv2d(3, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): ReLU(inplace=True)
    (2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (3): ReLU(inplace=True)
    (4): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  )
  (maxpool2): Sequential(
    (0): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): ReLU(inplace=True)
    (2): Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (3): ReLU(inplace=True)
    (4): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  )
  (maxpool3): Sequential(
    (0): Conv2d(128, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): ReLU(inplace=True)
    (2): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (3): ReLU(inplace=True)
    (4): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (5): ReLU(inplace=True)
    (6): Ma

In [16]:
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

In [18]:
# 训练模型 
train_acc=[]
total_step = len(train_loader)
for epoch in range(num_epochs):
    for i, (images, labels) in enumerate(train_loader):
#         images = images.reshape(-1, sequence_length, input_size).to(device)
        images = images.to(device)
#         CUDA_LAUNCH_BLOCKING=1
        labels = labels.to(device)
        
        # 前向传播+计算loss  
        with torch.cuda.amp.autocast():
            outputs=model(images)
            loss=criterion(outputs,labels)
#         outputs = model(images)
#         loss = criterion(outputs, labels)
        
        # 后向传播+调整参数 
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        _,argmax=torch.max(outputs,1)
        acc=(labels==argmax.squeeze()).float().mean()
        train_acc.append(acc.item())
        # 每100个batch打印一次数据    
        if (i+1) % 100 == 0:
            print ('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f},Accuracy:{:.3f}' 
                   .format(epoch+1, num_epochs, i+1, total_step, loss.item(),acc.item()))


RuntimeError: Given input size: (512x1x1). Calculated output size: (512x0x0). Output size is too small