In [1591]:
# 固定numpy torch 生成的随机数
import numpy as np
import torch
from sklearn import datasets

seed = 129

np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed_all(seed)

In [1592]:
# 生成一个简单的数据集
N = 4000
BS = 128


def makeToyData(n=10000):
    x, t = datasets.make_swiss_roll(n, noise=5., random_state=6)
    # 标准化x
    l = torch.nn.functional.normalize(torch.tensor(x, dtype=torch.float32), dim=1)
    A = torch.rand(3, 512)
    toyA = l @ A

    # 生成OneHot label [n, 7] 
    toyLabel = torch.randint(0, 7, (n,))
    toyLabelOnehot = torch.nn.functional.one_hot(toyLabel, num_classes=7)
    B = torch.rand(7, 512)
    toyLabelOnehot = toyLabelOnehot.to(torch.float32) @ B

    # 生成e
    toyE = torch.rand(n, 512)

    # 生成最终数据
    f = toyA + 0.1 * toyLabelOnehot + 0.01 * toyE

    return f, toyLabel

In [1593]:
toyData, toyLabel = makeToyData(N + 1000)

# 分离训练集和测试集
testData, testLabel = toyData[N:], toyLabel[N:]
toyData, toyLabel = toyData[:N], toyLabel[:N]

In [1594]:
from torch import nn


class SimpleClassifier(torch.nn.Module):
    def __init__(self):
        super(SimpleClassifier, self).__init__()
        self.out = nn.Sequential(

            nn.TransformerEncoder(
                nn.TransformerEncoderLayer(
                    d_model=512, nhead=8, dim_feedforward=1024, batch_first=True
                ),
                num_layers=6,
            ),
            nn.ReLU(),
            nn.Linear(512, 7),
            nn.Softmax(dim=-1),

        )

    def forward(self, x):
        x = self.out(x)

        return x


class AdversarialAutoencoder(torch.nn.Module):
    def __init__(self):
        super(AdversarialAutoencoder, self).__init__()

        self.r = nn.Sequential(

            nn.TransformerEncoder(
                nn.TransformerEncoderLayer(
                    d_model=512, nhead=8, dim_feedforward=1024, batch_first=True
                ),
                num_layers=6,
            ),
            nn.ReLU(),
            nn.Linear(512, 512),
        )

    def forward(self, f):
        r = self.r(f)
        return r

In [1595]:
classifierModel = SimpleClassifier()

In [1596]:
import torch.optim as optim
import torch.nn.functional as F

optimizer = optim.Adam(classifierModel.parameters(), lr=0.0001)

In [1597]:
step = 0
for epoch in range(10):
    for i in range(0, N, BS):
        optimizer.zero_grad()

        end = min(i + BS, N)

        output = classifierModel(toyData[i:end])
        classifierLoss = F.cross_entropy(output, toyLabel[i:end])
        classifierLoss.backward()
        optimizer.step()

        step += 1
        if step % 20 == 0:
            print(f"epoch {epoch} loss {classifierLoss.item():.4f}")

# bs 为1的测试
_correct, _sum = 0, 0
# 使用testData进行测试
test_length = len(testData)
for i in range(test_length):
    output = classifierModel(testData[i:i + 1])
    # print(output.shape, testLabel[i].shape)
    _, predicted = torch.max(output, dim=1)
    if predicted == testLabel[i:i + 1]:
        _correct += 1
    _sum += 1

print(f"Accuracy: {_correct / _sum * 100:.4f}%")

epoch 0 loss 1.9396
epoch 1 loss 1.9545
epoch 1 loss 1.8878
epoch 2 loss 1.4678
epoch 3 loss 1.2435
epoch 3 loss 1.1767
epoch 4 loss 1.1758
epoch 4 loss 1.1712
epoch 5 loss 1.1684
epoch 6 loss 1.1675
epoch 6 loss 1.1673
epoch 7 loss 1.1670
epoch 8 loss 1.1668
epoch 8 loss 1.1667
epoch 9 loss 1.1666
epoch 9 loss 1.1667
Accuracy: 30.1000%


In [1598]:
import pandas

# 关于表情耦合特征
clip = pandas.read_parquet("RAF_clip_openai.parquet", engine="pyarrow")

# image_name	label	feature
# 处理label=label-1
clip["label"] = clip["label"] - 1
# 采样70%的数据作为训练集
RAF_clip_train = clip.sample(frac=0.7)
# 剩下的数据作为测试集
RAF_clip_test = clip.drop(RAF_clip_train.index)

In [1599]:
import numpy as np
import torch


class DecoupleDataSet(torch.utils.data.Dataset):
    def __init__(self, clip: pandas.DataFrame):
        self.feature = clip["feature"].tolist()
        self.label = clip["label"].tolist()

    def __len__(self):
        return len(self.feature)

    def __getitem__(self, idx):
        # 模型输入
        # x, identity_label=None, expression_label=None
        return self.feature[idx], self.label[idx]

    def collate_fn(self, batch: list[tuple[np.ndarray, int, np.ndarray, int]]):
        # 从batch中提取出数据和标签
        feature, label = zip(*batch)
        # 转换为tensor
        feature = torch.tensor(feature, dtype=torch.float32)
        # 标准化
        feature = torch.nn.functional.normalize(feature, dim=1)
        label = torch.tensor(label, dtype=torch.long)

        # lda

        return feature, label


class Dataloader(torch.utils.data.DataLoader):
    def __init__(self, dataset: DecoupleDataSet, batch_size=256, **kwargs):
        super(Dataloader, self).__init__(
            dataset, batch_size=batch_size, collate_fn=dataset.collate_fn, **kwargs
        )

In [1600]:
train_dataset = DecoupleDataSet(RAF_clip_train)
train_dataloader = Dataloader(train_dataset, batch_size=256, shuffle=True)

test_dataset = DecoupleDataSet(RAF_clip_test)
test_dataloader = Dataloader(test_dataset, batch_size=256)

In [1601]:
classifierModel = SimpleClassifier()

optimizer = optim.Adam(classifierModel.parameters(), lr=0.0001)

step = 0
for epoch in range(10):
    for i, (feature, label) in enumerate(train_dataloader):
        optimizer.zero_grad()
        output = classifierModel(feature)
        classifierLoss = F.cross_entropy(output, label)
        classifierLoss.backward()
        optimizer.step()

        step += 1
        if step % 20 == 0:
            print(f"epoch {epoch} loss {classifierLoss.item():.4f}")

# bs 为1的测试
_correct, _sum = 0, 0
# 使用testData进行测试
for i, (feature, label) in enumerate(test_dataloader):
    output = classifierModel(feature)
    # print(output.shape, testLabel[i].shape)
    _, predicted = torch.max(output, dim=1)
    _correct += (predicted == label).sum().item()
    _sum += len(label)

print(f"Accuracy: {_correct / _sum * 100:.4f}%")

epoch 0 loss 1.8246
epoch 1 loss 1.7767
epoch 1 loss 1.6794
epoch 2 loss 1.5597
epoch 2 loss 1.5469
epoch 3 loss 1.4638
epoch 4 loss 1.4018
epoch 4 loss 1.4140
epoch 5 loss 1.3842
epoch 5 loss 1.3613
epoch 6 loss 1.3276
epoch 7 loss 1.3229
epoch 7 loss 1.3173
epoch 8 loss 1.3364
epoch 8 loss 1.3346
epoch 9 loss 1.2953
epoch 9 loss 1.3213
Accuracy: 67.0742%


In [1602]:
# Adversarial Autoencoder
aaeModel = AdversarialAutoencoder()
classifierModel = SimpleClassifier()

In [1603]:
# 分别优化两个模型
optimizer = optim.Adam(aaeModel.parameters(), lr=0.00001)
classifierOptimizer = optim.Adam(classifierModel.parameters(), lr=0.0001)

In [1604]:
step = 0
# 进行对抗训练
for epoch in range(10):
    for i, (feature, label) in enumerate(train_dataloader):
        optimizer.zero_grad()
        classifierOptimizer.zero_grad()

        r = aaeModel(feature)
        c = classifierModel(feature + r)
        classifierLoss = F.cross_entropy(c, label)
        classifierLoss.backward()
        classifierOptimizer.step()

        # 对抗训练
        r = aaeModel(feature)
        c = classifierModel(feature + r)

        reconstruction_loss = F.mse_loss(feature + r, feature)

        # 计算判断对的概率,变成二分类问题
        adversarial_c_max_index = torch.argmax(c, dim=1)

        adversarial_label = (adversarial_c_max_index == label).to(torch.long)

        adversarial_c = torch.zeros(c.shape[0], 2)
        adversarial_c[:, 0] = c[torch.arange(c.shape[0]), label]
        adversarial_c[:, 1] = 1 - adversarial_c[:, 0]

        adversarial_loss = F.cross_entropy(adversarial_c, adversarial_label)

        aaeLoss = (reconstruction_loss + adversarial_loss) / 2
        # 
        # # 查看模型梯度
        # for name, param in aaeModel.named_parameters():
        #     print(name, param.grad)
        # for name, param in classifierModel.named_parameters():
        #     print(name, param.grad)

        aaeLoss.backward()
        optimizer.step()

        step += 1
        if step % 20 == 0:
            print(
                f"epoch {epoch} classifier loss {classifierLoss.item():.4f} aae loss {aaeLoss.item():.4f} "
                f"reconstruction loss {reconstruction_loss.item():.4f} adversarial loss {adversarial_loss.item():.4f}"
            )

# bs 为1的测试
_correct, _sum = 0, 0
# 使用testData进行测试
for i, (feature, label) in enumerate(test_dataloader):
    r = aaeModel(feature)
    output = classifierModel(feature)
    # print(output.shape, testLabel[i].shape)
    _, predicted = torch.max(output, dim=1)
    _correct += (predicted == label).sum().item()
    _sum += len(label)

print(f"Accuracy: {_correct / _sum * 100:.4f}%")

epoch 0 classifier loss 1.8173 aae loss 0.6920 reconstruction loss 0.0810 adversarial loss 1.3030
epoch 1 classifier loss 1.8366 aae loss 0.6748 reconstruction loss 0.0429 adversarial loss 1.3067
epoch 1 classifier loss 1.7941 aae loss 0.6744 reconstruction loss 0.0460 adversarial loss 1.3028
epoch 2 classifier loss 1.7961 aae loss 0.6605 reconstruction loss 0.0609 adversarial loss 1.2601
epoch 2 classifier loss 1.7000 aae loss 0.6311 reconstruction loss 0.0723 adversarial loss 1.1899
epoch 3 classifier loss 1.6682 aae loss 0.6596 reconstruction loss 0.0727 adversarial loss 1.2465
epoch 4 classifier loss 1.6630 aae loss 0.6467 reconstruction loss 0.0827 adversarial loss 1.2106
epoch 4 classifier loss 1.6422 aae loss 0.6445 reconstruction loss 0.0792 adversarial loss 1.2098
epoch 5 classifier loss 1.5986 aae loss 0.5958 reconstruction loss 0.0572 adversarial loss 1.1344
epoch 5 classifier loss 1.5952 aae loss 0.6516 reconstruction loss 0.0570 adversarial loss 1.2462
epoch 6 classifier l