In [1]:
# 准备序列数据及标签
import os
from PIL import Image
import torch
from torchvision import transforms

if torch.cuda.is_available():
    device = torch.device('cuda')
else:
    device = torch.device('cpu')

torch.manual_seed(4) # 42

# 若需更换face和eye，需改图片size
transform = transforms.Compose([
    transforms.Resize((128, 128)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

data_dir = './dataset/DeepfakeTIMIT/eye'

sequences = []
labels = []

for label, class_name in enumerate(['real', 'fake/LQ']):
    label_dir = os.path.join(data_dir, class_name)

    for sequence_idx in range(320):
        if sequence_idx == 25 or sequence_idx == 88 or sequence_idx == 167 or sequence_idx == 195:
            continue
        sequence = []

        for image_idx in range(10):
            image_path = os.path.join(label_dir, f'{sequence_idx}_{image_idx}.png')
            image = Image.open(image_path).convert("RGB")
            if transform:
                image = transform(image)
            sequence.append(image)

        sequences.append(torch.stack(sequence))
        labels.append(label)

sequences = torch.stack(sequences)
labels = torch.tensor(labels)
print(sequences.shape)
print(labels.shape)

torch.Size([632, 10, 3, 128, 128])
torch.Size([632])


In [3]:
# DeepfakeTIMIT sequence
from torch.utils.data import DataLoader, SubsetRandomSampler, TensorDataset
import numpy as np

dataset_size = sequences.size(0)
indices = list(range(dataset_size))
np.random.seed(4) # 42
np.random.shuffle(indices)
print(indices[:10])

sequences = sequences[indices]
labels = labels[indices]

split1 = int(np.floor(0.7 * dataset_size))
split2 = int(np.floor(0.8 * dataset_size))
train_indices, val_indices, test_indices = indices[:split1], indices[split1:split2], indices[split2:]

# split = int(np.floor(0.8 * dataset_size))
# test_size = dataset_size - split
# size = np.ceil(0.5 * test_size).astype(int)
# # size = int(0.5 * test_size)
# m = 3
# n = 4
# indices_left, test_indices, indices_right = indices[:(test_size * m)], indices[(test_size * m):(test_size * n - size)], indices[(test_size * n):]
# indices_left.extend(indices_right)
# train_indices = indices_left

# train_indices, test_indices = indices[:split], indices[split:]
# test_indices, train_indices = indices[:test_size], indices[test_size:]

dataset = TensorDataset(sequences, labels)

train_sampler = SubsetRandomSampler(train_indices)
val_sampler = SubsetRandomSampler(val_indices)
test_sampler = SubsetRandomSampler(test_indices)

seq_train_loader = DataLoader(dataset, batch_size=1, sampler=train_sampler)
seq_val_loader = DataLoader(dataset, batch_size=1, sampler=val_sampler)
seq_test_loader = DataLoader(dataset, batch_size=1, sampler=test_sampler)

print("Training set length:", len(train_indices))
print("Test set length:", len(test_indices))

[550, 248, 459, 433, 389, 490, 626, 596, 175, 320]
Training set length: 442
Test set length: 127


In [3]:
# 检查标签是否正常
all_labels = []
for data in seq_train_loader:
    inputs, label = data
    all_labels.extend(label.tolist())
all_labels = torch.tensor(all_labels)
print("Total number of labels:", len(all_labels))
num_positive_samples = (all_labels == 1).sum().item()
num_negative_samples = (all_labels == 0).sum().item()
print(f"Number of positive samples: {num_positive_samples}")
print(f"Number of negative samples: {num_negative_samples}")

Total number of labels: 505
Number of positive samples: 245
Number of negative samples: 260


In [4]:
# 网络结构
from models import LRCN

seq_model = LRCN(8192, 1024, 1, 2).to(device) # 8192(eye) 25088(face)
# print(seq_model)

In [5]:
import torch.optim as optim
import torch.nn as nn
# seq_model = torch.load('./model/DeepfakeTIMIT/HQ/lrcn_eye_model_5.pth')

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(seq_model.parameters(), lr=0.001)
# optimizer = optim.SGD(seq_model.parameters(), lr=0.01, momentum=0.9)
# torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)  # 设置 clipnorm
# torch.nn.utils.clip_grad_value_(model.parameters(), 0.5)  # 设置 clipvalue

num_epochs = 5
for epoch in range(num_epochs):
    running_loss = 0.0
    correct = 0
    total = 0
    for i, data in enumerate(seq_train_loader, 0):
        inputs, label = data
        inputs, label = inputs.to(device), label.to(device)

        seq_model.train()

        optimizer.zero_grad()

        # 前向传播
        outputs = seq_model(inputs)
        loss = criterion(outputs, label)

        # 反向传播和优化
        loss.backward()
        optimizer.step()

        running_loss += loss.item()
        _, predicted = torch.max(outputs.data, 1)
        total += label.size(0)
        correct += (predicted == label).sum().item()
    
    loss = running_loss / len(seq_train_loader)
    accuracy = 100 * correct / total
    print('Epoch %d, loss: %.3f, accuracy: %.2f %%' % (epoch + 1, loss, accuracy))
    

    # 在每个 epoch 结束后进行模型评估
    # seq_model.eval()  # 将模型设置为评估模式，关闭 Dropout 和 BatchNormalization
    # val_correct = 0
    # val_total = 0
    # val_running_loss = 0.0
    # all_labels = []
    # all_predictions = []
    # with torch.no_grad():  # 在验证阶段不需要计算梯度
    #     for val_data in seq_val_loader:
    #         val_inputs, val_labels = val_data
    #         val_inputs, val_labels = val_inputs.to(device), val_labels.to(device)

    #         val_outputs = seq_model(val_inputs)
    #         val_loss = criterion(val_outputs, val_labels)
    #         # val_loss = F.cross_entropy(val_outputs, val_labels)
    #         val_running_loss += val_loss.item()

    #         _, val_predicted = torch.max(val_outputs.data, 1)
    #         val_total += val_labels.size(0)
    #         val_correct += (val_predicted == val_labels).sum().item()

    # val_accuracy = 100 * val_correct / val_total
    # avg_val_loss = val_running_loss / len(seq_val_loader)

    # print('Epoch %d, loss: %.3f, accuracy: %.2f %%. Validation, loss: %.3f, accuracy: %.2f %%' %
    #     (epoch + 1, loss, accuracy, avg_val_loss, val_accuracy))
    
    # if epoch == 0:
    #     best_val_accuracy = val_accuracy
    # # 保存最佳模型
    # if val_accuracy >= best_val_accuracy:
    #     best_val_accuracy = val_accuracy
    #     best_model_weights = model.state_dict()

Epoch 1, loss: 0.097, accuracy: 97.29 %
Epoch 2, loss: 0.007, accuracy: 99.55 %
Epoch 3, loss: 0.000, accuracy: 100.00 %
Epoch 4, loss: 0.000, accuracy: 100.00 %
Epoch 5, loss: 0.000, accuracy: 100.00 %


In [6]:
torch.save(seq_model, './model/DeepfakeTIMIT/LQ/lrcn_eye_model_712(4).pth')

In [7]:
# 测试
# 用一个数据集的模型去测试另一个数据集的效果怎么样
from sklearn.metrics import roc_auc_score
# seq_model = torch.load('./model/DeepfakeTIMIT/LQ/lrcn_eye_model_811(4).pth')
correct = 0
total = 0
all_labels = []
all_predictions = []
with torch.no_grad():
    seq_model.eval()
    for inputs, label in seq_test_loader:
        inputs, label = inputs.to(device), label.to(device)
        outputs = seq_model(inputs)

        _, predicted = torch.max(outputs, 1)
        total += label.size(0)
        correct += (predicted == label).sum().item()
        
        predictions = torch.sigmoid(outputs[:, 1])
        all_labels.extend(label.cpu().numpy())
        all_predictions.extend(predictions.cpu().detach().numpy())

test_accuracy = 100 * correct / total
auc = roc_auc_score(all_labels, all_predictions)
print("Test Accuracy: %.2f %%, AUC: %.4f" % (test_accuracy, auc))

Test Accuracy: 100.00 %, AUC: 1.0000


In [4]:
# 序列对图片的提升
img_model = torch.load('./model/DeepfakeTIMIT/LQ/cnn_eye_model_5.pth')
correct = 0
all_labels = []
all_predictions = []
with torch.no_grad():
    seq_model.eval()
    img_model.eval()
    for data in seq_test_loader:
        input, label = data
        input, label = input.to(device), label.to(device)
        img_output = []
        seq_output = seq_model(input)
        for t in range(10):
            img_output = img_model(input[:, t, :, :, :])

            img_output += seq_output

            _, predicted = torch.max(img_output, 1)
            correct += (predicted == label).sum().item()

            predictions = torch.sigmoid(img_output[:, 1])
            all_labels.extend(label.cpu().numpy())
            all_predictions.extend(predictions.cpu().detach().numpy())

total = 10 * len(seq_test_loader)
test_accuracy = 100 * correct / total
auc = roc_auc_score(all_labels, all_predictions)
print("Test Accuracy: %.2f %%, AUC: %.4f" % (test_accuracy, auc))

Test Accuracy: 100.00 %, AUC: 1.0000


In [6]:
# Two stream(SVM)
# img_model = torch.load('./model/DeepfakeTIMIT/HQ/cnn_eye_model_5.pth')
img_train_output = []
img_train_label = []

with torch.no_grad():
    seq_model.eval()
    img_model.eval()
    for data in seq_train_loader:
        input, label = data
        input, label = input.to(device), label.to(device)
        output = []
        seq_output = seq_model(input)
        for t in range(10):
            output = img_model(input[:, t, :, :, :])

            output += seq_output
            
            # output = torch.cat((output, seq_output), dim=1)
            # output = torch.sigmoid(output)
            img_train_output.append(output)
            img_train_label.append(label)
print(len(img_train_output))

5050


In [7]:
from sklearn import svm
from sklearn.metrics import accuracy_score
from sklearn.calibration import CalibratedClassifierCV

img_train_output_tensor = torch.stack(img_train_output)
img_train_label_tensor = torch.cat(img_train_label)

img_train_output_flattened = img_train_output_tensor.view(img_train_output_tensor.size(0), -1).cpu().numpy()
img_train_label_flattened = img_train_label_tensor.cpu().numpy()

# svm_classifier = svm.SVC(kernel='linear', probability=True)

# # 使用径向基函数核（RBF kernel）并设置惩罚参数C和gamma
# svm_classifier = svm.SVC(kernel='rbf', C=0.1, gamma='scale', probability=True) # 0.1

# 使用多项式核函数并设置阶数和惩罚参数C
# svm_classifier = svm.SVC(kernel='poly', degree=3, C=0.1, probability=True) # 0.1

# 使用Sigmoid核函数并设置coef0和惩罚参数C
svm_classifier = svm.SVC(kernel='sigmoid', coef0=0.0, C=0.9, probability=True) # 0.2

calibrated_svm = CalibratedClassifierCV(svm_classifier)

calibrated_svm.fit(img_train_output_flattened, img_train_label_flattened)

y_train_pred = calibrated_svm.predict(img_train_output_flattened)

accuracy = accuracy_score(img_train_label_flattened, y_train_pred)
print(f"Accuracy of SVM classifier on train dataset: {accuracy:.4f}")

Accuracy of SVM classifier on train dataset: 1.0000


In [8]:
import joblib
# 保存训练好的 SVM 模型到文件
model_filename = './model/DeepfakeTIMIT/HQ/svm/fusion_eye_5_s.pkl'
joblib.dump(calibrated_svm, model_filename)

['./model/DeepfakeTIMIT/HQ/svm/fusion_eye_5_s.pkl']

In [9]:
test_output = []
test_label = []

with torch.no_grad():
    seq_model.eval()
    img_model.eval()
    for data in seq_test_loader:
        input, label = data
        input, label = input.to(device), label.to(device)
        output = []
        seq_output = seq_model(input)
        for t in range(10):
            output = img_model(input[:, t, :, :, :])

            output += seq_output
            
            # output = torch.cat((output, seq_output), dim=1)
            # output = torch.sigmoid(output)
            test_output.append(output)
            test_label.append(label)
print(len(test_output))

640


In [10]:
# 加载保存的 SVM 模型
# model_filename = './model/UADFV/svm/fusion_face_2.pkl'
loaded_model = joblib.load(model_filename)

img_output_tensor = torch.stack(test_output)
img_label_tensor = torch.cat(test_label)
print(len(img_label_tensor))

img_output_flattened = img_output_tensor.view(img_output_tensor.size(0), -1).cpu().numpy()
img_label_flattened = img_label_tensor.cpu().numpy()

y_pred = loaded_model.predict(img_output_flattened)
# prob_scores = loaded_model.predict_proba(img_output_flattened)

accuracy = accuracy_score(img_label_flattened, y_pred)
print(f"Accuracy of SVM classifier on test dataset: {accuracy:.4f}")

640
Accuracy of SVM classifier on test dataset: 0.9984


In [11]:
# 加载保存的 SVM 模型
import joblib
from sklearn import svm
from sklearn.metrics import accuracy_score, roc_auc_score
# model_filename = './model/DeepfakeTIMIT/HQ/svm/fusion_eye_5(1).pkl'
loaded_model = joblib.load(model_filename)

img_output_tensor = torch.stack(img_output)
img_label_tensor = torch.cat(img_label)
print(len(img_label_tensor))

img_output_flattened = img_output_tensor.view(img_output_tensor.size(0), -1).cpu().numpy()
img_label_flattened = img_label_tensor.cpu().numpy()

y_pred = loaded_model.predict(img_output_flattened)
# svm_scores = loaded_model.decision_function(img_output_flattened)
# svm_scores = torch.sigmoid(svm_scores)
# print(svm_scores[0])
# predictions = torch.sigmoid(img_output[:, 1])

accuracy = accuracy_score(img_label_flattened, y_pred)
# auc = roc_auc_score(img_label_flattened, svm_scores)
print(f"Accuracy of SVM classifier on test dataset: {accuracy:.4f} AUC:{auc:.4f}")

630
Accuracy of SVM classifier on test dataset: 0.9984 AUC:0.9970
