## FSH 預測

In [None]:
import numpy as np
from sklearn.metrics import roc_auc_score
from collections import defaultdict

# 读取第一个检测器的预测结果文件
detector1_file_path = '/home/jovyan/temporaldeepfake/mmaction2/auc_results_fsh_onlytrain_full.txt'
video_frame_scores = defaultdict(list)

with open(detector1_file_path, 'r') as file:
    for line in file:
        parts = line.strip().split()
        if len(parts) == 3:
            full_path = parts[0]
            score = float(parts[1])
            ground_truth = int(parts[2])

            # 提取影片、frame和face id
            path_parts = full_path.split('/')
            video_id = path_parts[-3]  # 影片ID，例如000_003
            frame_id = path_parts[-2]  # frame ID，例如008

            # 使用影片和frame ID作为键来存储预测分数
            video_frame_scores[(video_id, frame_id)].append(score)

# 计算每个影片的frame的最大预测分数的平均值
video_scores_detector1 = defaultdict(list)
for (video_id, frame_id), scores in video_frame_scores.items():
    max_score = max(scores)
    video_scores_detector1[video_id].append(max_score)

# 计算每个影片的最终预测结果
final_video_scores_detector1 = {video_id: sum(scores) / len(scores) for video_id, scores in video_scores_detector1.items()}

# # 读取第二个检测器的预测结果文件
detector2_file_path = '/home/jovyan/temporaldeepfake/mmaction2/SBI_supcon_contrast0.5_2opt0.01_without45aug_base_07_17_12_58_19_weights_46_0.7688_val.tar_FF-FH_predict.txt'
video_scores_detector2 = {}

with open(detector2_file_path, 'r') as file:
    for line in file:
        parts = line.strip().split()
        if len(parts) == 2:
            video_id = parts[0]
            score = float(parts[1])
            video_scores_detector2[video_id] = score

# Initialize combined_video_scores as a defaultdict of lists
combined_video_scores = defaultdict(list)

# Add scores from the first detector
for video_id, score in final_video_scores_detector1.items():
    combined_video_scores[video_id].append(score)

# Add scores from the second detector
for video_id, score in video_scores_detector2.items():
    combined_video_scores[video_id].append(score)

# Calculate the final score for each video by taking the mean of all available scores
final_combined_scores = {video_id: np.mean(scores) for video_id, scores in combined_video_scores.items()}

# Preparing labels and predictions for AUC calculation
true_labels = []
pred_scores = []

for video_id, score in final_combined_scores.items():
    label = 1 if '_' in video_id else 0
    true_labels.append(label)
    pred_scores.append(score)

# Calculate AUC
auc = roc_auc_score(true_labels, pred_scores)
print(f"AUC: {auc}")



## Celebdf

In [None]:
import numpy as np
from sklearn.metrics import roc_auc_score
from collections import defaultdict

# 读取第一个检测器的预测结果文件

best_auc = 0
best_k = 0

for k in np.arange(0, 1.1, 0.1):
    detector1_file_path = "/home/jovyan/temporaldeepfake/mmaction2/20240925_071635_CDF_random_margin_5frames.txt"
    video_frame_scores = defaultdict(list)
    with open(detector1_file_path, 'r') as file:
        for line in file:
            parts = line.strip().split()
            if len(parts) == 3:
                full_path = parts[0]
                score = float(parts[1])
                ground_truth = int(parts[2])

                # 提取影片、frame和face id
                path_parts = full_path.split('/')
                video_id = path_parts[-3]  # 影片ID，例如000_003
                frame_id = path_parts[-2]  # frame ID，例如008

                # 使用影片和frame ID作为键来存储预测分数
                video_frame_scores[(video_id, frame_id)].append(score)

    # 计算每个影片的frame的最大预测分数的平均值
    video_scores_detector1 = defaultdict(list)
    for (video_id, frame_id), scores in video_frame_scores.items():
        max_score = scores[0]
        video_scores_detector1[video_id].append(max_score)

    # 计算每个影片的最终预测结果
    # final_video_scores_detector1 = {}
    # for video_id, scores in video_scores_detector1.items():
    #     top_k_scores = sorted(scores, reverse=True)[:k]
    #     average_top_k = sum(top_k_scores) / len(top_k_scores)
    #     final_video_scores_detector1[video_id] = average_top_k
    final_video_scores_detector1 = {video_id: sum(scores) / len(scores) for video_id, scores in video_scores_detector1.items()}


    # # 读取第二个检测器的预测结果文件
    detector2_file_path = '/home/jovyan/temporaldeepfake/auc_results/spatial/SBI_base_09_02_13_37_12_weights_63_0.7780_val.tar_Celeb-DF-v2_predict_top128_predictions.txt'
    video_scores_detector2 = {}

    with open(detector2_file_path, 'r') as file:
        for line in file:
            parts = line.strip().split()
            video_id = parts[0]
            score = float(parts[1])
            video_scores_detector2[video_id] = score

    # Initialize combined_video_scores as a defaultdict of lists
    combined_video_scores = defaultdict(list)

    #Add scores from the first detector
    for video_id, score in final_video_scores_detector1.items():
        combined_video_scores[video_id].append(k*score)

    # # Add scores from the second detector
    for video_id, score in video_scores_detector2.items():
        combined_video_scores[video_id].append((1-k)*score)

    # Calculate the final score for each video by taking the mean of all available scores
    final_combined_scores = {video_id: np.sum(scores) for video_id, scores in combined_video_scores.items()}

    # Preparing labels and predictions for AUC calculation
    true_labels = []
    pred_scores = []

    for video_id, score in final_combined_scores.items():
        label = 1 if video_id.count("id") == 2 else 0  # 假设video_id中含有两个下划线表示id
        true_labels.append(label)
        pred_scores.append(score)

    # Calculate AUC
    if len(set(true_labels)) > 1:  # 确保标签中有至少两个不同值
        auc = roc_auc_score(true_labels, pred_scores)
        print(auc, len(true_labels))
        if auc > best_auc:
            best_auc = auc
            best_k = k
    else:
        print(f"top{k}: Not enough different labels to calculate AUC")
print(f"Highest AUC: {best_auc} at temporal ratio: {best_k}, spatial ratio: {1-best_k}")

## only temporal

In [None]:
from collections import defaultdict
from sklearn.metrics import roc_auc_score

# 假设文件路径
file_path = "/home/jovyan/temporaldeepfake/mmaction2/20240903_141415_CDF_random_margin_5frames.txt"

# 初始化字典
grouped_scores = defaultdict(list)
labels = {}

# 读取和处理文件
with open(file_path, 'r') as file:
    data = file.readlines()

for entry in data:
    parts = entry.split()
    path = parts[0]
    score = float(parts[1])
    label = int(parts[2])

    # 获取父文件夹路径
    parent_folder = path.split('/')[-3]
    
    # 收集相同父文件夹的分数
    grouped_scores[parent_folder].append(score)
    
    # 保存父文件夹对应的标签
    if parent_folder not in labels:
        labels[parent_folder] = label

average_scores = []
average_labels = []

# 计算每个父文件夹的平均分数
for parent_folder, scores in grouped_scores.items():

    average_score = sum(scores) / len(scores)
    average_scores.append(average_score)
    average_labels.append(labels[parent_folder])


# 计算AUC
if len(set(average_labels)) > 1:  # 确保标签有多个值
    auc = roc_auc_score(average_labels, average_scores)
    print(f"AUC: {auc}")
else:
    print("AUC cannot be computed (insufficient label variance)")


## only spatial

In [None]:
from collections import defaultdict
import numpy as np
from sklearn.metrics import roc_auc_score
import ast

# 讀取txt內容
video_predictions = defaultdict(list)

labels_dict = {}

# 读取文件内容
file_path = '/home/jovyan/temporaldeepfake/auc_results/spatial/SBI_base_09_02_13_37_12_weights_63_0.7780_val.tar_Celeb-DF-v2_predict.txt'
with open(file_path, 'r') as file:
    lines = file.readlines()

# 處理每一行
for line in lines:
    parts = line.split()
    video_id = parts[0]
    probabilities = parts[2]
    video_predictions[video_id].append(float(probabilities))
    if video_id not in labels_dict:
        labels_dict[video_id] = 1 if video_id.count('id')==2 else 0
print(len(video_predictions))

# 計算每個video從1到32的預測topk的平均並寫入文件
for k in [1 , 32 , 64 , 128]:
    topk_averages = {}
    for video_id, predictions in video_predictions.items():
        sorted_predictions = sorted(predictions, reverse=True)
        topk_averages[video_id] = np.mean(sorted_predictions[:k])
    
    labels = []
    scores = []
    with open(f'SBI_base_09_02_13_37_12_weights_63_0.7780_val.tar_Celeb-DF-v2_predict_top{k}_predictions.txt', 'w') as output_file:
        for video_id, avg_score in topk_averages.items():
            label = labels_dict[video_id]
            labels.append(label)
            scores.append(avg_score)
            output_file.write(f'{video_id} {avg_score} {label}\n')
    
    # 計算並打印AUC
    auc = roc_auc_score(labels, scores)
    print(f"K={k}, AUC: {auc}")


In [None]:
## sptial topk + temporal topk

In [None]:
from collections import defaultdict
import numpy as np
from sklearn.metrics import roc_auc_score
import ast

############################# spatial ################################
video_predictions = defaultdict(list)
spatial_predictions = defaultdict(int)
labels = {}

# 读取文件内容
file_path = '/home/jovyan/temporaldeepfake/auc_results/spatial/only_train_without45_base_08_05_01_40_56_weights_22_0.7602_val.tar_Celeb-DF-v2_predict.txt'
with open(file_path, 'r') as file:
    lines = file.readlines()

# 處理每一行
for line in lines:
    parts = line.split(maxsplit=2)  # 只分割前两部分，保留预测值的完整性
    video_id = parts[0]
    probabilities = ast.literal_eval(parts[2])
    if isinstance(probabilities, list):
        max_prediction = max(probabilities)
    else:
        max_prediction = probabilities
    video_predictions[video_id].append(max_prediction)
        # 生成标签：包含两个不同ID的标为1，否则为0
    if video_id not in labels:
        labels[video_id] = 1 if video_id.count('id')==2 else 0

# 計算每個video從1到32的預測topk的平均
for k in range(1, 130):
    topk_scores = []
    topk_labels = []
    
for video_id, predictions in video_predictions.items():
    # 对每个视频的预测分数进行排序
    sorted_predictions = sorted(predictions, reverse=True)
    # 取前k个分数的平均值
    topk_avg = sum(sorted_predictions[:k]) / k
    topk_scores.append(topk_avg)
    topk_labels.append(labels[video_id])

# 计算 AUC 分数
if len(set(topk_labels)) > 1:  # 确保标签有多样性
    auc_score = roc_auc_score(topk_labels, topk_scores)
    print(f"Top-{k} AUC: {auc_score:.4f}")
else:
    print(f"Top-{k} AUC cannot be computed due to insufficient label variance")
        
###########################################temporal############################################

# DFDC 預測

In [None]:
import numpy as np
from sklearn.metrics import roc_auc_score
from collections import defaultdict
import pandas as pd

def read_gt_csv(csv_path):
    df = pd.read_csv(csv_path)
    real_list = df.loc[df['label'] == 0, 'filename'].tolist()
    real_list = [video.split(".")[0] for video in real_list]
    fake_list = df.loc[df['label'] == 1, 'filename'].tolist()
    fake_list = [video.split(".")[0] for video in fake_list]
    return real_list, fake_list

real_list, fake_list = read_gt_csv("/home/jovyan/dataset/DFDC/test/labels.csv")

best_auc = 0
best_k = 0

detector1_file_path = '/home/jovyan/temporaldeepfake/mmaction2/20240925_075400_CDF_random_margin_5frames.txt'

for k in np.arange(0, 1.1, 0.1):
    video_frame_scores = defaultdict(list)  # 重置字典

    with open(detector1_file_path, 'r') as file:
        for line in file:
            parts = line.strip().split()
            if len(parts) == 3:
                full_path = parts[0]
                score = float(parts[1])
                ground_truth = int(parts[2])

                path_parts = full_path.split('/')
                video_id = path_parts[-3]
                frame_id = path_parts[-2]

                video_frame_scores[(video_id, frame_id)].append(score)

    video_scores_detector1 = defaultdict(list)
    for (video_id, frame_id), scores in video_frame_scores.items():
        max_score = scores[0]
        video_scores_detector1[video_id].append(max_score)

    final_video_scores_detector1 = {video_id: sum(scores) / len(scores) for video_id, scores in video_scores_detector1.items()}

    detector2_file_path = '/home/jovyan/temporaldeepfake/auc_results/spatial/SBI_supcon_contrast0.5_2opt0.01_without45aug_base_07_17_12_58_19_weights_46_0.7688_val.tar_DFDC_predict.txt'
    video_scores_detector2 = {}

    with open(detector2_file_path, 'r') as file:
        for line in file:
            parts = line.strip().split()
            if len(parts) == 2:
                video_id = parts[0]
                score = float(parts[1])
                video_scores_detector2[video_id] = score

    combined_video_scores = defaultdict(list)

    for video_id, score in final_video_scores_detector1.items():
        combined_video_scores[video_id].append(k * score)

    # for video_id, score in video_scores_detector2.items():
    #     combined_video_scores[video_id].append((1 - k) * score)

    final_combined_scores = {video_id: np.mean(scores) for video_id, scores in combined_video_scores.items()}

    true_labels = []
    pred_scores = []

    for video_id, score in final_combined_scores.items():
        label = 0 if video_id in real_list else 1
        true_labels.append(label)
        pred_scores.append(score)

    if len(set(true_labels)) > 1:
        auc = roc_auc_score(true_labels, pred_scores)
        if auc > best_auc:
            best_auc = auc
            best_k = k
        print(k , auc)
    else:
        print(f"top{k}: Not enough different labels to calculate AUC")

print(f"Highest AUC: {best_auc} at temporal ratio: {best_k}, spatial ratio: {1-best_k}")
 

## DFD with identity

In [None]:
import numpy as np
from sklearn.metrics import roc_auc_score
from collections import defaultdict

# 读取第一个检测器的预测结果文件
detector1_file_path = '/home/jovyan/temporaldeepfake/mmaction2/20240813_082409_CDF_wotest_woid.txt'
video_frame_scores = defaultdict(list)
video_frame_ground_truth = defaultdict(int)
with open(detector1_file_path, 'r') as file:
    for line in file:
        parts = line.strip().split()
        if len(parts) == 3:
            full_path = parts[0]
            score = float(parts[1])
            ground_truth = int(parts[2])

            # 提取影片、frame和face id
            path_parts = full_path.split('/')
            video_id = path_parts[-3]  # 影片ID，例如000_003
            frame_id = path_parts[-2]  # frame ID，例如008

            # 使用影片和frame ID作为键来存储预测分数
            video_frame_scores[(video_id, frame_id)].append(score)
            video_frame_ground_truth[video_id] = ground_truth

# 计算每个影片的frame的最大预测分数的平均值
video_scores_detector1 = defaultdict(list)
for (video_id, frame_id), scores in video_frame_scores.items():
    max_score = max(scores)
    video_scores_detector1[video_id].append(max_score)


# 计算每个影片的最终预测结果
final_video_scores_detector1 = {video_id: sum(scores) / len(scores) for video_id, scores in video_scores_detector1.items()}


detector2_file_path = '/home/jovyan/temporaldeepfake/top60_predictions.txt'
video_scores_detector2 = {}

with open(detector2_file_path, 'r') as file:
    for line in file:
        parts = line.strip().split()
        video_id = parts[0]
        score = float(parts[2])
        video_scores_detector2[video_id] = score

# Initialize combined_video_scores as a defaultdict of lists
combined_video_scores = defaultdict(list)

# Add scores from the first detector
for video_id, score in final_video_scores_detector1.items():
    combined_video_scores[video_id].append(score)

for video_id, score in video_scores_detector2.items():
    combined_video_scores[video_id].append(score)

# Calculate the final score for each video by taking the mean of all available scores
final_combined_scores = {video_id: np.mean(scores) for video_id, scores in combined_video_scores.items()}

# Preparing labels and predictions for AUC calculation
true_labels = []
pred_scores = []

for video_id, score in final_combined_scores.items():
    label = video_frame_ground_truth[video_id]
    true_labels.append(label)
    pred_scores.append(score)

# Calculate AUC
auc = roc_auc_score(true_labels, pred_scores)
print(f"AUC: {auc}")



In [None]:
from collections import defaultdict
import numpy as np
from sklearn.metrics import roc_auc_score
import ast
import re

# 讀取txt內容
video_predictions = defaultdict(list)

labels_dict = {}
pattern = r'\d{2}_\d{2}'

# 读取文件内容
file_path = '/home/jovyan/temporaldeepfake/auc_results/spatial/only_train_without45_base_08_05_01_40_56_weights_22_0.7602_val.tar_DFD_fake_predict.txt'
with open(file_path, 'r') as file:
    lines = file.readlines()

# 處理每一行
for line in lines:
    parts = line.split(maxsplit=2)  # 只分割前两部分，保留预测值的完整性
    video_id = parts[0]
    probabilities = ast.literal_eval(parts[2])
    if isinstance(probabilities, list):
        max_prediction = max(probabilities)
    else:
        max_prediction = probabilities
    video_predictions[video_id].append(max_prediction)
        # 生成标签：包含两个不同ID的标为1，否则为0
    print( re.search(pattern, video_id))
    if video_id not in labels_dict:
        labels_dict[video_id] = 1 if re.search(pattern, video_id) is not None else 0

# 計算每個video從1到32的預測topk的平均並寫入文件
for k in [10 , 20, 30, 40, 50, 60, 70, 80, 90, 100, 110, 120, 130]:
    topk_averages = {}
    for video_id, predictions in video_predictions.items():
        sorted_predictions = sorted(predictions, reverse=True)
        topk_averages[video_id] = np.mean(sorted_predictions[:k])
    
    labels = []
    scores = []
    with open(f'top{k}_predictions.txt', 'w') as output_file:
        for video_id, avg_score in topk_averages.items():
            label = labels_dict[video_id]
            labels.append(label)
            scores.append(avg_score)
            output_file.write(f'{video_id} {avg_score} {label}\n')
    
    # 計算並打印AUC
    auc = roc_auc_score(labels, scores)
    print(f"K={k}, AUC: {auc}")


## Robustness

In [None]:
import numpy as np
from sklearn.metrics import roc_auc_score
from collections import defaultdict

# 读取第一个检测器的预测结果文件
for k in range(1, 26):
    detector1_file_path = "/home/jovyan/temporaldeepfake/auc_results/robustness/temporal/robustness_JPEG_5.txt"
    video_frame_scores = defaultdict(list)
    with open(detector1_file_path, 'r') as file:
        for line in file:
            parts = line.strip().split()
            if len(parts) == 3:
                full_path = parts[0]
                score = float(parts[1])
                ground_truth = int(parts[2])

                # 提取影片、frame和face id
                path_parts = full_path.split('/')
                video_id = path_parts[-3]  # 影片ID，例如000_003
                frame_id = path_parts[-2]  # frame ID，例如008

                # 使用影片和frame ID作为键来存储预测分数
                video_frame_scores[(video_id, frame_id)].append(score)

    # 计算每个影片的frame的最大预测分数的平均值
    video_scores_detector1 = defaultdict(list)
    for (video_id, frame_id), scores in video_frame_scores.items():
        max_score = scores[0]
        video_scores_detector1[video_id].append(max_score)

    # 计算每个影片的最终预测结果
    final_video_scores_detector1 = {}
    for video_id, scores in video_scores_detector1.items():
        top_k_scores = sorted(scores, reverse=True)[:k]
        average_top_k = sum(top_k_scores) / len(top_k_scores)
        final_video_scores_detector1[video_id] = average_top_k



    # # 读取第二个检测器的预测结果文件
    detector2_file_path = '/home/jovyan/temporaldeepfake/auc_results/robustness/spatial/JPEG_5_top128_predictions.txt'
    video_scores_detector2 = {}

    with open(detector2_file_path, 'r') as file:
        for line in file:
            parts = line.strip().split()
            video_id = parts[0]
            score = float(parts[1])
            video_scores_detector2[video_id] = score

    # Initialize combined_video_scores as a defaultdict of lists
    combined_video_scores = defaultdict(list)

    #Add scores from the first detector
    for video_id, score in final_video_scores_detector1.items():
        combined_video_scores[video_id].append(score)

    # Add scores from the second detector
    for video_id, score in video_scores_detector2.items():
        combined_video_scores[video_id].append(score)

    # Calculate the final score for each video by taking the mean of all available scores
    final_combined_scores = {video_id: np.mean(scores) for video_id, scores in combined_video_scores.items()}

    # Preparing labels and predictions for AUC calculation
    true_labels = []
    pred_scores = []
    
    for video_id, score in final_combined_scores.items():
        label = 1 if '_' in video_id else 0  # 假设video_id中含有两个下划线表示id
        true_labels.append(label)
        pred_scores.append(score)
    # Calculate AUC
    if len(set(true_labels)) > 1:  # 确保标签中有至少两个不同值
        auc = roc_auc_score(true_labels, pred_scores)
        print(f"{auc}")
    else:
        print(f"top{k}: Not enough different labels to calculate AUC")


## only spatial

In [None]:
from collections import defaultdict
import numpy as np
from sklearn.metrics import roc_auc_score
import ast

# 讀取txt內容
video_predictions = defaultdict(list)

labels_dict = {}

# 读取文件内容
file_path = '/home/jovyan/temporaldeepfake/auc_results/robustness/spatial/robustness_base_08_15_17_48_26_weights_172_0.9855_val.tar_FaceForensics++_JPEG_3_predict.txt'
with open(file_path, 'r') as file:
    lines = file.readlines()

# 處理每一行
for line in lines:
    parts = line.split(maxsplit=2)  # 只分割前两部分，保留预测值的完整性
    video_id = parts[0]
    probabilities = ast.literal_eval(parts[2])
    if isinstance(probabilities, list):
        max_prediction = max(probabilities)
    else:
        max_prediction = probabilities
    video_predictions[video_id].append(max_prediction)
        # 生成标签：包含两个不同ID的标为1，否则为0
    if video_id not in labels_dict:
        labels_dict[video_id] = 1 if '_' in video_id else 0
print(len(video_predictions))
print(video_predictions)
# 計算每個video從1到32的預測topk的平均並寫入文件
for k in [32, 64, 128 , 256]:
    topk_averages = {}
    for video_id, predictions in video_predictions.items():
        sorted_predictions = sorted(predictions, reverse=True)
        topk_averages[video_id] = np.mean(sorted_predictions[:k])
    
    labels = []
    scores = []
    with open(f'/home/jovyan/temporaldeepfake/auc_results/robustness/spatial/JPEG_3_top{k}_predictions.txt', 'w') as output_file:
        for video_id, avg_score in topk_averages.items():
            label = labels_dict[video_id]
            labels.append(label)
            scores.append(avg_score)
            output_file.write(f'{video_id} {avg_score} {label}\n')
    
    # 計算並打印AUC
    auc = roc_auc_score(labels, scores)
    print(f"K={k}, AUC: {auc}")
