In [1]:
import torch # 导入的是 torch 而不是 pytorch
print(torch.__version__) # 输出当前pytorch的版本

1.11.0+cu113


In [2]:
import numpy as np
import pandas as pd
 
# 读入数据
file_path = 'fer2013.csv'
df = pd.read_csv(file_path)

In [3]:
# 将数据集划分为训练和测试模块
X_train, Y_train, X_test, Y_test = [], [], [], []

for index, row in df.iterrows():
    val = row['pixels'].split(" ")
    try:
        if 'Training' in row['Usage']:
            X_train.append(np.array(val, 'float32'))
            Y_train.append(row['emotion'])
        elif 'PublicTest' in row['Usage']:
            X_test.append(np.array(val, 'float32'))
            Y_test.append(row['emotion'])
    except:
        print(f"error occured at index :{index} and row:{row}")

batch_size = 64
epochs = 200

X_train = np.array(X_train, 'float32')
Y_train = np.array(Y_train, 'float32')
X_test = np.array(X_test, 'float32')
Y_test = np.array(Y_test, 'float32')

# 将0-6的标签数据转化为onehot数据
def to_onehot(y):
    temp_outs = np.zeros((y.shape[0], np.unique(y).size), dtype=np.float32)
    temp_outs[np.arange(y.shape[0]), np.uint(y)] = 1.0
    return temp_outs
Y_train = to_onehot(Y_train)
Y_test = to_onehot(Y_test)
# print(Y_train)
# print(Y_test)

# normalizing data between 0 and 1
X_train -= np.mean(X_train, axis=0)
X_train /= np.std(X_train, axis=0)

X_test -= np.mean(X_test, axis=0)
X_test /= np.std(X_test, axis=0)

X_train = X_train.reshape(X_train.shape[0], 48, 48, 1)
X_test = X_test.reshape(X_test.shape[0], 48, 48, 1)

In [4]:
import torch
import matplotlib.pyplot as plt
import torch.nn as nn
import torch.utils.data as Data
import torch.nn.functional as F

from functools import partial
from typing import Union, List, Dict, Any, Optional, cast

device = torch.device('cuda:0')
cpu = torch.device('cpu')

X_train = torch.tensor(X_train, dtype=torch.float)
X_test = torch.tensor(X_test, dtype=torch.float)
Y_train = torch.tensor(Y_train, dtype=torch.float)
Y_test = torch.tensor(Y_test, dtype=torch.float)

train_set = Data.TensorDataset(X_train, Y_train)
train_loader = Data.DataLoader(dataset=train_set, shuffle=True, batch_size=batch_size, num_workers=2)
test_set = Data.TensorDataset(Y_test, Y_test)

for x,y in train_loader:
    print(y)

tensor([[0., 0., 0., 0., 0., 0., 1.],
        [0., 0., 0., 0., 0., 0., 1.],
        [1., 0., 0., 0., 0., 0., 0.],
        [1., 0., 0., 0., 0., 0., 0.],
        [0., 0., 0., 0., 1., 0., 0.],
        [0., 0., 0., 1., 0., 0., 0.],
        [0., 0., 0., 1., 0., 0., 0.],
        [0., 0., 0., 0., 0., 0., 1.],
        [1., 0., 0., 0., 0., 0., 0.],
        [0., 0., 0., 1., 0., 0., 0.],
        [0., 0., 0., 1., 0., 0., 0.],
        [1., 0., 0., 0., 0., 0., 0.],
        [0., 0., 1., 0., 0., 0., 0.],
        [0., 0., 1., 0., 0., 0., 0.],
        [0., 0., 1., 0., 0., 0., 0.],
        [1., 0., 0., 0., 0., 0., 0.],
        [0., 0., 0., 0., 1., 0., 0.],
        [0., 0., 1., 0., 0., 0., 0.],
        [1., 0., 0., 0., 0., 0., 0.],
        [0., 0., 0., 0., 1., 0., 0.],
        [0., 0., 0., 0., 0., 1., 0.],
        [0., 0., 0., 0., 0., 0., 1.],
        [0., 0., 1., 0., 0., 0., 0.],
        [0., 0., 1., 0., 0., 0., 0.],
        [0., 0., 0., 1., 0., 0., 0.],
        [1., 0., 0., 0., 0., 0., 0.],
        [0.,

class VGG(nn.Module):
    def __init__(
        self, features: nn.Module, num_classes: int = 1000, init_weights: bool = True, dropout: float = 0.5
    ) -> None:
        super().__init__()
        torch._C._log_api_usage_once(self)
        self.features = features
        self.avgpool = nn.AdaptiveAvgPool2d((7, 7))
        self.classifier = nn.Sequential(
            nn.Linear(512 * 7 * 7, 4096),
            nn.ReLU(True),
            nn.Dropout(p=dropout),
            nn.Linear(4096, 4096),
            nn.ReLU(True),
            nn.Dropout(p=dropout),
            nn.Linear(4096, num_classes),
        )
        if init_weights:
            for m in self.modules():
                if isinstance(m, nn.Conv2d):
                    nn.init.kaiming_normal_(m.weight, mode="fan_out", nonlinearity="relu")
                    if m.bias is not None:
                        nn.init.constant_(m.bias, 0)
                elif isinstance(m, nn.BatchNorm2d):
                    nn.init.constant_(m.weight, 1)
                    nn.init.constant_(m.bias, 0)
                elif isinstance(m, nn.Linear):
                    nn.init.normal_(m.weight, 0, 0.01)
                    nn.init.constant_(m.bias, 0)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        x = self.features(x)
        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.classifier(x)
        return x

def make_layers(cfg: List[Union[str, int]], batch_norm: bool = False) -> nn.Sequential:
    layers: List[nn.Module] = []
    in_channels = 3
    for v in cfg:
        if v == "M":
            layers += [nn.MaxPool2d(kernel_size=2, stride=2)]
        else:
            v = cast(int, v)
            conv2d = nn.Conv2d(in_channels, v, kernel_size=3, padding=1)
            if batch_norm:
                layers += [conv2d, nn.BatchNorm2d(v), nn.ReLU(inplace=True)]
            else:
                layers += [conv2d, nn.ReLU(inplace=True)]
            in_channels = v
    return nn.Sequential(*layers)

cfg = [64, 64, 'M', 128, 128, 'M', 256, 256, 256, 256, 'M', 512, 512, 512, 512, 'M', 512, 512, 512, 512, 'M']
model = VGG(make_layers(cfg, batch_norm=True))
print(model)

In [6]:
# for x,y in train_loader:
#     print(y)

class VGGNet(nn.Module):
    def __init__(self):
        super(VGGNet, self).__init__()
        # self.classifier = nn.Sequential(
        #     nn.Conv2d(in_channels=1, out_channels=64, kernel_size=3, padding=1), nn.ReLU(True), nn.BatchNorm2d(64),
        #     nn.Conv2d(in_channels=64, out_channels=64, kernel_size=3, padding=1), nn.ReLU(True), nn.BatchNorm2d(64),
        #     nn.MaxPool2d(kernel_size=2, stride=2),
        #     nn.Conv2d(in_channels=64, out_channels=128, kernel_size=3, padding=1), nn.ReLU(True), nn.BatchNorm2d(128),
        #     nn.Conv2d(in_channels=128, out_channels=128, kernel_size=3, padding=1), nn.ReLU(True), nn.BatchNorm2d(128),
        #     nn.MaxPool2d(kernel_size=2, stride=2), nn.Dropout2d(0.2),
        #     nn.Conv2d(in_channels=128, out_channels=256, kernel_size=3, padding=1), nn.ReLU(True), nn.BatchNorm2d(256),
        #     nn.Conv2d(in_channels=256, out_channels=256, kernel_size=3, padding=1), nn.ReLU(True), nn.BatchNorm2d(256),
        #     nn.MaxPool2d(kernel_size=2, stride=2), nn.Dropout2d(0.25),
        #     nn.Conv2d(in_channels=256, out_channels=512, kernel_size=3, padding=1), nn.ReLU(True), nn.BatchNorm2d(512),
        #     nn.Conv2d(in_channels=512, out_channels=512, kernel_size=3, padding=1), nn.ReLU(True), nn.BatchNorm2d(512), 
        #     nn.MaxPool2d(kernel_size=2, stride=2), nn.Dropout2d(0.25),
        #     nn.Flatten(), 
        #     nn.Linear(3*3*512, 1024), nn.BatchNorm1d(1024), nn.ReLU(True), nn.Dropout2d(0.45),
        #     nn.Linear(1024, 1024), nn.BatchNorm1d(1024), nn.ReLU(True), nn.Dropout2d(0.45),
        #     nn.Linear(1024, 7), 
        #     nn.Softmax(),
        # )
        self.conv1_1 = nn.Conv2d(in_channels=1, out_channels=64, kernel_size=3, padding=1)
        self.conv1_2 = nn.Conv2d(in_channels=64, out_channels=64, kernel_size=3, padding=1)

        self.conv2_1 = nn.Conv2d(in_channels=64, out_channels=128, kernel_size=3, padding=1)
        self.conv2_2 = nn.Conv2d(in_channels=128, out_channels=128, kernel_size=3, padding=1)

        self.conv3_1 = nn.Conv2d(in_channels=128, out_channels=256, kernel_size=3, padding=1)
        self.conv3_2 = nn.Conv2d(in_channels=256, out_channels=256, kernel_size=3, padding=1)

        self.conv4_1 = nn.Conv2d(in_channels=256, out_channels=512, kernel_size=3, padding=1)
        self.conv4_2 = nn.Conv2d(in_channels=512, out_channels=512, kernel_size=3, padding=1)

        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)

        self.fc1 = nn.Linear(3*3*512, 1024)
        self.fc2 = nn.Linear(1024, 1024)
        self.fc3 = nn.Linear(1024, 7)

        self.bn1 = nn.BatchNorm2d(64)
        self.bn2 = nn.BatchNorm2d(128)
        self.bn3 = nn.BatchNorm2d(256)
        self.bn4 = nn.BatchNorm2d(512)
        self.bn5 = nn.BatchNorm1d(1024)

    # 定义前向传播过程，输入为x
    def forward(self, x):
        x = self.bn1(F.relu(self.conv1_1(x)))
        x = self.bn1(F.relu(self.conv1_2(x)))
        x = self.pool(x)

        x = self.bn2(F.relu(self.conv2_1(x)))
        x = self.bn2(F.relu(self.conv2_2(x)))
        x = F.dropout2d(self.pool(x), 0.2)

        x = self.bn3(F.relu(self.conv3_1(x)))
        x = self.bn3(F.relu(self.conv3_2(x)))
        x = F.dropout2d(self.pool(x), 0.25)

        x = self.bn4(F.relu(self.conv4_1(x)))
        x = self.bn4(F.relu(self.conv4_2(x)))
        x = F.dropout2d(self.pool(x), 0.25)

        x = x.reshape(-1, 3*3*512)
        # x = x.flatten()
        x = self.fc1(x)
        # print(x.shape)
        x = F.dropout(F.relu(self.bn5(x)), 0.45)        
        x = self.fc2(x)
        # print(x.shape)
        x = F.dropout(F.relu(self.bn5(x)), 0.45)
        x = F.softmax(self.fc3(x))
        # x = self.classifier(x)
        return x

net = VGGNet().to(device)
print(net)


VGGNet(
  (conv1_1): Conv2d(1, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (conv1_2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (conv2_1): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (conv2_2): Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (conv3_1): Conv2d(128, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (conv3_2): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (conv4_1): Conv2d(256, 512, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (conv4_2): Conv2d(512, 512, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (pool): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (fc1): Linear(in_features=4608, out_features=1024, bias=True)
  (fc2): Linear(in_features=1024, out_features=1024, bias=True)
  (fc3): Linear(in_features=1024, out_features=7, bias=True)
  (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running

In [7]:
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(net.parameters(), lr=0.001)

losses = []
for epoch in range(epochs):
    for i, (batch_x, batch_y) in enumerate(train_loader):
        batch_x = batch_x.reshape(-1,1,48,48).to(device)
        batch_y = batch_y.to(device)
        output = net(batch_x).to(device)
        # print(output.shape, batch_y.shape)
        loss = criterion(output, batch_y)
        # print(batch_y)
        batch_x.to(cpu)
        batch_y.to(cpu)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        losses.append(loss.item())
        print ("Epoch {}/{}, loss = {}".format(epoch + 1, epochs, loss.item()))

torch.save(net, "model.h5")

  x = F.softmax(self.fc3(x))


Epoch 1/200, loss = 1.945941686630249
Epoch 1/200, loss = 1.9353113174438477
Epoch 1/200, loss = 1.9145647287368774
Epoch 1/200, loss = 1.9282373189926147
Epoch 1/200, loss = 1.9579873085021973
Epoch 1/200, loss = 1.92964768409729
Epoch 1/200, loss = 1.874953269958496
Epoch 1/200, loss = 1.928072214126587
Epoch 1/200, loss = 1.946770429611206
Epoch 1/200, loss = 1.9642643928527832
Epoch 1/200, loss = 1.9187560081481934
Epoch 1/200, loss = 1.9161121845245361
Epoch 1/200, loss = 1.9226232767105103
Epoch 1/200, loss = 1.9089947938919067
Epoch 1/200, loss = 1.89217209815979
Epoch 1/200, loss = 1.87032949924469
Epoch 1/200, loss = 1.933304786682129
Epoch 1/200, loss = 1.8709888458251953
Epoch 1/200, loss = 1.9560188055038452
Epoch 1/200, loss = 1.865186333656311
Epoch 1/200, loss = 1.986936330795288
Epoch 1/200, loss = 1.961665391921997
Epoch 1/200, loss = 1.8716280460357666
Epoch 1/200, loss = 1.9152734279632568
Epoch 1/200, loss = 1.935766339302063
Epoch 1/200, loss = 1.8984284400939941
E