In [1]:
import os
os.getcwd()

'C:\\Users\\Administrator\\FTP\\20230715'

In [2]:
import pandas as pd
from PIL import Image
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader,Dataset
from torchvision.transforms import transforms
from torchvision.datasets import ImageFolder

In [3]:
class Reshape(nn.Module):
    def __init__(self, *args):
        super(Reshape, self).__init__()

    def forward(self, x):
        return x.view(x.shape[0], -1)


class Residual(nn.Module):
    def __init__(self, in_channels, out_channels, use_1x1conv=False, stride=1):
        super(Residual, self).__init__()
        self.conv1 = nn.Conv2d(
            in_channels, out_channels, kernel_size=3, padding=1, stride=stride
        )
        self.conv2 = nn.Conv2d(
            out_channels, out_channels, kernel_size=3, padding=1
        )
        if use_1x1conv:
            self.conv3 = nn.Conv2d(
                in_channels, out_channels, kernel_size=1, stride=stride
            )
        else:
            self.conv3 = None
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.bn2 = nn.BatchNorm2d(out_channels)

    def forward(self, X):
        Y = F.relu(self.bn1(self.conv1(X)))
        Y = self.bn2(self.conv2(Y))
        if self.conv3:
            X = self.conv3(X)
        return F.relu(Y + X)


def resnet_block(in_channels, out_channels, num_residuals, first_block=False):
    if first_block:
        assert in_channels == out_channels
    blk = []
    for i in range(num_residuals):
        if i == 0 and not first_block:
            blk.append(
                Residual(in_channels, out_channels, use_1x1conv=True, stride=2)
            )
        else:
            blk.append(Residual(out_channels, out_channels))
    return nn.Sequential(*blk)


class GlobalAvgPool2d(nn.Module):
    def __init__(self):
        super(GlobalAvgPool2d, self).__init__()

    def forward(self, x):
        return F.avg_pool2d(x, kernel_size=x.size()[2:])



In [4]:
class CustomDataset(Dataset):
    def __init__(self, data, transform=None):
        self.data = data
        self.transform = transform

    def __getitem__(self, index):
        image, label = self.data[index]

        if self.transform:
            image = self.transform(image)

        return image, label

    def __len__(self):
        return len(self.data)

In [5]:
# 修改数据预处理
transform = transforms.Compose([
    transforms.Resize((48, 48)),
    transforms.Grayscale(num_output_channels=1),
    transforms.ToTensor()
])

In [6]:
# 加载FER2013数据集的CSV文件
data = pd.read_csv('fer2013.csv')

# 数据预处理和加载
train_data = []
test_data = []

for i in range(len(data)):
    emotion = data['emotion'][i]
    pixels = data['pixels'][i].split()
    image = Image.new('L', (48, 48))  # 创建灰度图像对象
    image.putdata([int(p) for p in pixels])  # 将像素值填充到图像中

    if data['Usage'][i] == 'Training':
        train_data.append((image, emotion))
    elif data['Usage'][i] == 'PublicTest':
        test_data.append((image, emotion))
    elif data['Usage'][i] == 'PrivateTest':
        test_data.append((image, emotion))

# 创建训练数据集和测试数据集
train_dataset = CustomDataset(train_data, transform=transform)
test_dataset = CustomDataset(test_data, transform=transform)

# 创建训练数据集和测试数据集的加载器
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

In [7]:
learning_rate = 0.01
num_epochs = 10

In [8]:
model = nn.Sequential(
    nn.Conv2d(1, 64, kernel_size=7, stride=2, padding=3),
    nn.BatchNorm2d(64),
    nn.ReLU(),
    nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
)

model.add_module("resnet_block1", resnet_block(64, 64, 2, first_block=True))
model.add_module("resnet_block2", resnet_block(64, 128, 2))
model.add_module("resnet_block3", resnet_block(128, 256, 2))
model.add_module("resnet_block4", resnet_block(256, 512, 2))
model.add_module("global_avg_pool", GlobalAvgPool2d())
model.add_module("fc", nn.Sequential(Reshape(), nn.Linear(512, 7)))

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

In [None]:
for epoch in range(num_epochs):
    train_loss = 0.0
    train_correct = 0.0
    model.train()
    for inputs, labels in train_loader:
        inputs = inputs.to(device)
        labels = labels.to(device)

        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        _, predicted = torch.max(outputs.data, 1)
        train_correct += (predicted == labels).sum().item()
        train_loss += loss.item() * inputs.size(0)

    train_loss = train_loss / len(train_dataset)
    train_accuracy = train_correct / len(train_dataset)

    model.eval()
    test_loss = 0.0
    test_correct = 0.0
    with torch.no_grad():
        for inputs, labels in test_loader:
            inputs = inputs.to(device)
            labels = labels.to(device)
            outputs = model(inputs)
            loss = criterion(outputs, labels)

            _, predicted = torch.max(outputs.data, 1)
            test_correct += (predicted == labels).sum().item()
            test_loss += loss.item() * inputs.size(0)

    test_loss = test_loss / len(test_dataset)
    test_accuracy = test_correct / len(test_dataset)

    print(
        f"Epoch [{epoch+1}/{num_epochs}], "
        f"Train Loss: {train_loss:.4f}, Train Accuracy: {train_accuracy:.4f}, "
        f"Test Loss: {test_loss:.4f}, Test Accuracy: {test_accuracy:.4f}"
    )

Epoch [1/10], Train Loss: 1.8331, Train Accuracy: 0.2448, Test Loss: 1.7903, Test Accuracy: 0.2471
Epoch [2/10], Train Loss: 1.7020, Train Accuracy: 0.3087, Test Loss: 1.7043, Test Accuracy: 0.3168


In [None]:
# 保存模型
torch.save(model.state_dict(), 'model.pth')