<center>
    <h1><a href="https://www.atecup.cn/deepfake">全球Deepfake攻防挑战</a></h1>
</center>

<br/>

# 项目背景

随着人工智能技术的迅猛发展，深度伪造技术（Deepfake）正成为数字世界中的一把双刃剑。这项技术不仅为创意内容的生成提供了新的可能性，同时也对数字安全构成了前所未有的挑战。Deepfake技术可以通过人工智能算法生成高度逼真的图像、视频和音频内容，这些内容看起来与真实的毫无二致。然而，这也意味着虚假信息、欺诈行为和隐私侵害等问题变得更加严重和复杂。

# 项目任务

在这个项目中，项目任务是判断一张人脸图像是否为Deepfake图像，并输出其为Deepfake图像的概率评分。需要开发和优化检测模型，以应对多样化的Deepfake生成技术和复杂的应用场景，从而提升Deepfake图像检测的准确性和鲁棒性。

# 赛题数据集

训练集和验证集已经发布。训练集的标签文件`train_label.txt`用于训练模型，而验证集的标签文件`val_label.txt`仅用于模型调优。例如，在`train_label.txt`或`val_label.txt`中，每行包含两部分，由逗号分隔。第一部分是文件名（后缀为.mp4），第二部分是真实值。目标值为1表示深度伪造音视频，目标值为0表示真实的人脸音视频。

以下是`train_label.txt`和`val_label.txt`的样本：

`train_label.txt`

```
video_name,target
96b04c80704f02cb426076b3f624b69e.mp4,0
16fe4cf5ae8b3928c968a5d11e870360.mp4,1
…
```

`val_label.txt`

```
video_name,target
f859cb3510c69513d5c57c6934bc9968.mp4,0
50ae26b3f3ea85babb2f9dde840830e2.mp4,1
…
```


# 评价指标


#### 评估指标
比赛的性能评估主要使用ROC曲线下的AUC（Area under the ROC Curve）作为指标。AUC的取值范围通常在0.5到1之间。若AUC指标不能区分排名，则会使用TPR@FPR=1E-3作为辅助参考。

**相关公式：**

> 真阳性率 (TPR)：
>
> TPR = TP / (TP + FN)
>
> 假阳性率 (FPR)：
>
> FPR = FP / (FP + TN)
>
> 其中：
> - TP：攻击样本被正确识别为攻击；
> - TN：真实样本被正确识别为真实；
> - FP：真实样本被错误识别为攻击；
> - FN：攻击样本被错误识别为真实。

参考文献：[Aghajan, H., Augusto, J. C., & Delgado, R. L. C. (Eds.). (2009). Human-centric interfaces for ambient intelligence. Academic Press.](https://books.google.com/books?hl=zh-CN&lr=&id=64icBAAAQBAJ&oi=fnd&pg=PP1&dq=Human-centric+interfaces+for+ambient+intelligence&ots=mKNsJrymuK&sig=_ZrNLwqT9R6BDddTLy02FF1B3WE)



!wc -l /kaggle/input/ffdv-phase1-sample-10k/ffdv_phase1_sample-0708/trainset_label.txt
!wc -l /kaggle/input/ffdv-phase1-sample-10k/ffdv_phase1_sample-0708/valset_label.txt

# Code

## Step1 下载标签数据和导入相关包

In [1]:
#下载数据集标签
!wc -l /kaggle/input/alldeepfake/phase1/trainset_label.txt
!wc -l /kaggle/input/alldeepfake/phase1/valset_label.txt

241991 /kaggle/input/alldeepfake/phase1/trainset_label.txt
79509 /kaggle/input/alldeepfake/phase1/valset_label.txt


In [2]:
# 查看文件地址是否正确
from IPython.display import Video
Video("/kaggle/input/alldeepfake/phase1/trainset/00154f42886002f8a2a6e40343617510.mp4", embed=True)

In [3]:
#下载相关的包
!pip install moviepy librosa matplotlib numpy timm

Collecting moviepy
  Downloading moviepy-1.0.3.tar.gz (388 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m388.3/388.3 kB[0m [31m10.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l- done
Collecting decorator<5.0,>=4.0.2 (from moviepy)
  Downloading decorator-4.4.2-py2.py3-none-any.whl.metadata (4.2 kB)
Collecting proglog<=1.0.0 (from moviepy)
  Downloading proglog-0.1.10-py3-none-any.whl.metadata (639 bytes)
Collecting imageio_ffmpeg>=0.2.0 (from moviepy)
  Downloading imageio_ffmpeg-0.5.1-py3-none-manylinux2010_x86_64.whl.metadata (1.6 kB)
Downloading decorator-4.4.2-py2.py3-none-any.whl (9.2 kB)
Downloading imageio_ffmpeg-0.5.1-py3-none-manylinux2010_x86_64.whl (26.9 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m26.9/26.9 MB[0m [31m65.2 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading proglog-0.1.10-py3-none-any.whl (6.1 kB)
Building wheels for collected packages: moviepy
  Building wheel for mo

In [4]:
#导入相关的包
import torch
torch.manual_seed(0)
torch.backends.cudnn.deterministic = False
torch.backends.cudnn.benchmark = True

import torchvision.models as models
import torchvision.transforms as transforms
import torchvision.datasets as datasets
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.autograd import Variable
from torch.utils.data.dataset import Dataset
import timm
import time

import pandas as pd
import numpy as np
import cv2, glob, os
from PIL import Image
import moviepy.editor as mp
import librosa
import numpy as np
import cv2

## Step2 音视频数据预处理

In [5]:

#实现将视频音频转换成MEL频谱图
def generate_mel_spectrogram(video_path, n_mels=128, fmax=8000, target_size=(256, 256)):
    # 提取音频
    audio_path = 'extracted_audio.wav'
    video = mp.VideoFileClip(video_path)
    video.audio.write_audiofile(audio_path, verbose=False, logger=None)

    # 加载音频文件
    y, sr = librosa.load(audio_path)

    # 生成MEL频谱图
    S = librosa.feature.melspectrogram(y=y, sr=sr, n_mels=n_mels)

    # 将频谱图转换为dB单位
    S_dB = librosa.power_to_db(S, ref=np.max)

    # 归一化到0-255之间
    S_dB_normalized = cv2.normalize(S_dB, None, 0, 255, cv2.NORM_MINMAX)
    
    # 将浮点数转换为无符号8位整型
    S_dB_normalized = S_dB_normalized.astype(np.uint8)

    # 缩放到目标大小
    img_resized = cv2.resize(S_dB_normalized, target_size, interpolation=cv2.INTER_LINEAR)

    return img_resized

# 使用示例
# video_path = '/kaggle/input/alldeepfake/phase1/trainset/00154f42886002f8a2a6e40343617510.mp4'  # 替换为您的视频文件路径
# mel_spectrogram_image = generate_mel_spectrogram(video_path)

下面内容是直接将数据集的所有视频音频转换成MEL频谱图，处理时间较长，这边上传了转换好的所有频谱图，可以跳过这段代码，感兴趣的也可以自己尝试一下转换

In [6]:

# !mkdir ffdv_phase1_sample
# !mkdir ffdv_phase1_sample/trainset
# !mkdir ffdv_phase1_sample/valset

In [7]:
# 时间太长可以将241991和79500修改较小的数
# for video_path in glob.glob('/kaggle/input/alldeepfake/phase1/trainset/*.mp4')[:241991]:
#     mel_spectrogram_image = generate_mel_spectrogram(video_path)
#     cv2.imwrite('./ffdv_phase1_sample/trainset/' + video_path.split('/')[-1][:-4] + '.jpg', mel_spectrogram_image)
    
# for video_path in glob.glob('/kaggle/input/alldeepfake/phase1/valset/*.mp4')[:79509]:
#     mel_spectrogram_image = generate_mel_spectrogram(video_path)
#     cv2.imwrite('./ffdv_phase1_sample/valset/' + video_path.split('/')[-1][:-4] + '.jpg', mel_spectrogram_image)

## Step3 定义模型训练、验证和预测

In [8]:
class AverageMeter(object):
    """Computes and stores the average and current value"""
    def __init__(self, name, fmt=':f'):
        self.name = name
        self.fmt = fmt
        self.reset()

    def reset(self):
        self.val = 0
        self.avg = 0
        self.sum = 0
        self.count = 0

    def update(self, val, n=1):
        self.val = val
        self.sum += val * n
        self.count += n
        self.avg = self.sum / self.count

    def __str__(self):
        fmtstr = '{name} {val' + self.fmt + '} ({avg' + self.fmt + '})'
        return fmtstr.format(**self.__dict__)

class ProgressMeter(object):
    def __init__(self, num_batches, *meters):
        self.batch_fmtstr = self._get_batch_fmtstr(num_batches)
        self.meters = meters
        self.prefix = ""


    def pr2int(self, batch):
        entries = [self.prefix + self.batch_fmtstr.format(batch)]
        entries += [str(meter) for meter in self.meters]
        print('\t'.join(entries))

    def _get_batch_fmtstr(self, num_batches):
        num_digits = len(str(num_batches // 1))
        fmt = '{:' + str(num_digits) + 'd}'
        return '[' + fmt + '/' + fmt.format(num_batches) + ']'

In [9]:
def validate(val_loader, model, criterion):
    batch_time = AverageMeter('Time', ':6.3f')
    losses = AverageMeter('Loss', ':.4e')
    top1 = AverageMeter('Acc@1', ':6.2f')
    progress = ProgressMeter(len(val_loader), batch_time, losses, top1)

    # switch to evaluate mode
    model.eval()

    with torch.no_grad():
        end = time.time()
        for i, (input, target) in enumerate(val_loader):
            input = input.cuda()
            target = target.cuda()

            # compute output
            output = model(input)
            loss = criterion(output, target)

            # measure accuracy and record loss
            acc = (output.argmax(1).view(-1) == target.float().view(-1)).float().mean() * 100
            losses.update(loss.item(), input.size(0))
            top1.update(acc, input.size(0))
            # measure elapsed time
            batch_time.update(time.time() - end)
            end = time.time()

        # TODO: this should also be done with the ProgressMeter
        print(' * Acc@1 {top1.avg:.3f}'
              .format(top1=top1))
        return top1

def predict(test_loader, model, tta=10):
    # switch to evaluate mode
    model.eval()
    
    test_pred_tta = None
    for _ in range(tta):
        test_pred = []
        with torch.no_grad():
            end = time.time()
            for i, (input, target) in enumerate(test_loader):
                input = input.cuda()
                target = target.cuda()

                # compute output
                output = model(input)
                output = F.softmax(output, dim=1)
                output = output.data.cpu().numpy()

                test_pred.append(output)
        test_pred = np.vstack(test_pred)
    
        if test_pred_tta is None:
            test_pred_tta = test_pred
        else:
            test_pred_tta += test_pred
    
    return test_pred_tta

def train(train_loader, model, criterion, optimizer, epoch):
    batch_time = AverageMeter('Time', ':6.3f')
    losses = AverageMeter('Loss', ':.4e')
    top1 = AverageMeter('Acc@1', ':6.2f')
    progress = ProgressMeter(len(train_loader), batch_time, losses, top1)

    # switch to train mode
    model.train()

    end = time.time()
    for i, (input, target) in enumerate(train_loader):
        input = input.cuda(non_blocking=True)
        target = target.cuda(non_blocking=True)

        # compute output
        output = model(input)
        loss = criterion(output, target)

        # measure accuracy and record loss
        losses.update(loss.item(), input.size(0))

        acc = (output.argmax(1).view(-1) == target.float().view(-1)).float().mean() * 100
        top1.update(acc, input.size(0))

        # compute gradient and do SGD step
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        # measure elapsed time
        batch_time.update(time.time() - end)
        end = time.time()

        if i % 100 == 0:
            progress.pr2int(i)

## Step4 导入数据

In [10]:
train_label = pd.read_csv("/kaggle/input/alldeepfake/phase1/trainset_label.txt")
val_label = pd.read_csv("/kaggle/input/alldeepfake/phase1/valset_label.txt")

# 导入处理过的MEL频谱图
train_label['path'] = '/kaggle/input/deepfakeoutput/ffdv_phase1_sample/trainset/' + train_label['video_name'].apply(lambda x: x[:-4] + '.jpg')
val_label['path'] = '/kaggle/input/deepfakeoutput/ffdv_phase1_sample/valset/' + val_label['video_name'].apply(lambda x: x[:-4] + '.jpg')

train_label = train_label[train_label['path'].apply(os.path.exists)]
val_label = val_label[val_label['path'].apply(os.path.exists)]

## Step5 定义图像数据处理

In [11]:
class FFDIDataset(Dataset):
    def __init__(self, img_path, img_label, transform=None):
        self.img_path = img_path
        self.img_label = img_label
        
        if transform is not None:
            self.transform = transform
        else:
            self.transform = None
    
    def __getitem__(self, index):
        img = Image.open(self.img_path[index]).convert('RGB')
        
        if self.transform is not None:
            img = self.transform(img)
        
        return img, torch.from_numpy(np.array(self.img_label[index]))
    
    def __len__(self):
        return len(self.img_path)

## Step6 加载模型开始训练

In [12]:
train_loader = torch.utils.data.DataLoader(
    FFDIDataset(train_label['path'].values, train_label['target'].values, 
            transforms.Compose([
                        transforms.Resize((256, 256)),
                        transforms.RandomHorizontalFlip(),
                        transforms.RandomVerticalFlip(),
                        transforms.ToTensor(),
                        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
        ])
    ), batch_size=40, shuffle=True, num_workers=12, pin_memory=True
)

val_loader = torch.utils.data.DataLoader(
    FFDIDataset(val_label['path'].values, val_label['target'].values, 
            transforms.Compose([
                        transforms.Resize((256, 256)),
                        transforms.ToTensor(),
                        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
        ])
    ), batch_size=40, shuffle=False, num_workers=10, pin_memory=True
)

model = timm.create_model('efficientnet_b1', pretrained=True, num_classes=2)
model.classifier
model = model.cuda()

# 开始训练
criterion = nn.CrossEntropyLoss().cuda()
optimizer = torch.optim.Adam(model.parameters(), 0.003)
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=4, gamma=0.85)
best_acc = 0.0
for epoch in range(15):
    scheduler.step()
    print('Epoch: ', epoch)

    train(train_loader, model, criterion, optimizer, epoch)
    val_acc = validate(val_loader, model, criterion)
    
    if val_acc.avg.item() > best_acc:
        best_acc = round(val_acc.avg.item(), 2)
        torch.save(model.state_dict(), f'./model_{best_acc}.pt')

model.safetensors:   0%|          | 0.00/31.5M [00:00<?, ?B/s]

Epoch:  0
[   0/6050]	Time  5.705 ( 5.705)	Loss 4.0238e+00 (4.0238e+00)	Acc@1  42.50 ( 42.50)
[ 100/6050]	Time  0.220 ( 0.275)	Loss 1.5944e-02 (5.2271e-01)	Acc@1 100.00 ( 91.14)
[ 200/6050]	Time  0.232 ( 0.249)	Loss 1.1457e-02 (3.1014e-01)	Acc@1 100.00 ( 93.92)
[ 300/6050]	Time  0.204 ( 0.240)	Loss 1.3360e-01 (2.3626e-01)	Acc@1  97.50 ( 95.06)
[ 400/6050]	Time  0.223 ( 0.236)	Loss 3.0058e-03 (1.9761e-01)	Acc@1 100.00 ( 95.60)
[ 500/6050]	Time  0.225 ( 0.233)	Loss 7.7057e-02 (1.6857e-01)	Acc@1  97.50 ( 96.15)
[ 600/6050]	Time  0.222 ( 0.231)	Loss 2.0134e-02 (1.5213e-01)	Acc@1 100.00 ( 96.47)
[ 700/6050]	Time  0.223 ( 0.230)	Loss 2.5016e-02 (1.3816e-01)	Acc@1 100.00 ( 96.74)
[ 800/6050]	Time  0.221 ( 0.229)	Loss 7.4123e-02 (1.2750e-01)	Acc@1  97.50 ( 96.94)
[ 900/6050]	Time  0.224 ( 0.228)	Loss 3.3820e-03 (1.1947e-01)	Acc@1 100.00 ( 97.05)
[1000/6050]	Time  0.215 ( 0.228)	Loss 7.2873e-02 (1.1364e-01)	Acc@1  97.50 ( 97.15)
[1100/6050]	Time  0.232 ( 0.227)	Loss 1.2775e-02 (1.0819e-01)	Acc@

## Step7 输出并保存结果

In [13]:
val_pred = predict(val_loader, model, 1)[:, 1]
val_label["y_pred"] = val_pred

In [14]:
!\rm -rf /kaggle/working/ffdv_phase1_sample

In [15]:
submit = pd.read_csv("/kaggle/input/multi-ffdv/prediction.txt.csv")
merged_df = submit.merge(val_label[['video_name', 'y_pred']], on='video_name', suffixes=('', '_df2'), how='left', )
merged_df['y_pred'] = merged_df['y_pred_df2'].combine_first(merged_df['y_pred'])

merged_df[['video_name', 'y_pred']].to_csv('submit.csv', index=None)