In [1]:
!wc -l /kaggle/input/deepfake/phase1/trainset_label.txt
!wc -l /kaggle/input/deepfake/phase1/valset_label.txt
!ls /kaggle/input/deepfake/phase1/trainset/ | wc -l 
!ls /kaggle/input/deepfake/phase1/valset/ | wc -l 

524430 /kaggle/input/deepfake/phase1/trainset_label.txt
147364 /kaggle/input/deepfake/phase1/valset_label.txt
524429
147363


In [2]:
!pip install timm


SyntaxError: invalid syntax (680583204.py, line 2)

In [None]:
import torch
torch.manual_seed(3407)
torch.cuda.manual_seed(3407)
torch.backends.cudnn.deterministic = False
torch.backends.cudnn.benchmark = True

import torchvision.models as models
import torchvision.transforms as transforms
import torchvision.datasets as datasets
import torch.nn as nn
import torch.nn.functional as F
import albumentations as A
from albumentations.pytorch import ToTensorV2
from albumentations import DualTransform, ImageOnlyTransform
import torch.optim as optim
from torch.autograd import Variable
from torch.utils.data.dataset import Dataset
import timm
import time
import os
import pandas as pd
import numpy as np
import cv2
from PIL import Image
from tqdm import tqdm_notebook

train_label = pd.read_csv('/kaggle/input/deepfake/phase1/trainset_label.txt')
val_label = pd.read_csv('/kaggle/input/deepfake/phase1/valset_label.txt')

train_label['path'] = '/kaggle/input/deepfake/phase1/trainset/' + train_label['img_name']
val_label['path'] = '/kaggle/input/deepfake/phase1/valset/' + val_label['img_name']

# 模型训练与验证

In [None]:
class AverageMeter(object):
    """Computes and stores the average and current value"""
    def __init__(self, name, fmt=':f'):
        self.name = name
        self.fmt = fmt
        self.reset()

    def reset(self):
        self.val = 0
        self.avg = 0
        self.sum = 0
        self.count = 0

    def update(self, val, n=1):
        self.val = val
        self.sum += val * n
        self.count += n
        self.avg = self.sum / self.count

    def __str__(self):
        fmtstr = '{name} {val' + self.fmt + '} ({avg' + self.fmt + '})'
        return fmtstr.format(**self.__dict__)

class ProgressMeter(object):
    def __init__(self, num_batches, *meters):
        self.batch_fmtstr = self._get_batch_fmtstr(num_batches)
        self.meters = meters
        self.prefix = ""


    def pr2int(self, batch):
        entries = [self.prefix + self.batch_fmtstr.format(batch)]
        entries += [str(meter) for meter in self.meters]
        print('\t'.join(entries))

    def _get_batch_fmtstr(self, num_batches):
        num_digits = len(str(num_batches // 1))
        fmt = '{:' + str(num_digits) + 'd}'
        return '[' + fmt + '/' + fmt.format(num_batches) + ']'

In [None]:
def validate(val_loader, model, criterion):
    batch_time = AverageMeter('Time', ':6.3f')
    losses = AverageMeter('Loss', ':.4e')
    top1 = AverageMeter('Acc@1', ':6.2f')
    progress = ProgressMeter(len(val_loader), batch_time, losses, top1)

    model.eval()
    with torch.no_grad():
        end = time.time()
        for i, (input, target) in tqdm_notebook(enumerate(val_loader), total=len(val_loader)):
            input = input.cuda()
            target = target.cuda().float().unsqueeze(1)

            output = model(input)
            loss = criterion(output, target)

            predicted = (output > 0.5).float()
            acc = (predicted == target).float().mean() * 100
            losses.update(loss.item(), input.size(0))
            top1.update(acc, input.size(0))

            batch_time.update(time.time() - end)
            end = time.time()

        print(' * Acc@1 {top1.avg:.3f}'.format(top1=top1))
        return top1.avg




def train(train_loader, model, criterion, optimizer, epoch):
    batch_time = AverageMeter('Time', ':6.3f')
    losses = AverageMeter('Loss', ':.4e')
    top1 = AverageMeter('Acc@1', ':6.2f')
    progress = ProgressMeter(len(train_loader), batch_time, losses, top1)

    # switch to train mode
    model.train()

    end = time.time()
    for i, (input, target) in enumerate(train_loader):
        input = input.cuda()
        target = target.cuda().float().unsqueeze(1)  # 调整目标形状

        # compute output
        output = model(input)
        loss = criterion(output, target)

        # measure accuracy and record loss
        losses.update(loss.item(), input.size(0))

        predicted = (output > 0.5).float()
        acc = (predicted == target).float().mean() * 100
        top1.update(acc, input.size(0))

        # compute gradient and do SGD step
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        # measure elapsed time
        batch_time.update(time.time() - end)
        end = time.time()

        if i % 100 == 0:
            progress.pr2int(i)

In [None]:
class FFDIDataset(Dataset):
    def __init__(self, img_path, img_label, transform=None):
        self.img_path = img_path
        self.img_label = img_label
        
        if transform is not None:
            self.transform = transform
        else:
            self.transform = None
    
    def __getitem__(self, index):
        img = Image.open(self.img_path[index]).convert('RGB')
        
        if self.transform is not None:
            img = self.transform(img)
        
        return img, torch.from_numpy(np.array(self.img_label[index]))
    
    def __len__(self):
        return len(self.img_path)
    
# class deepfake_dataset(Dataset):
#     def __init__(self, df, transform=None):
#         self.df = df.reset_index(drop=True)
#         self.transform = transform
    
#     def __getitem__(self, index):
#         row = self.df.iloc[index]
#         image = cv2.imread(row['path'])
#         target = row['target']
#         if self.transform is not None:
#             data = self.transform(image=image)
#             image = data['image'].to(torch.float32)

#         target = torch.tensor(target, dtype=torch.int64)
#         return image, target
    
#     def __len__(self):
#         return len(self.df)
    
    
# train_transform = A.Compose([
#     A.Resize(height=256, width=256),
#     A.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
#     ToTensorV2()
# ])

# valid_transform = A.Compose([
#     A.Resize(height=256, width=256),
#     A.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
#     ToTensorV2()
# ])

# train_loader = deepfake_dataset(train_label, train_transform)
# val_loader = deepfake_dataset(val_label, train_transform)

# 加载模型

In [None]:
import timm
base_model = timm.create_model('tf_efficientnet_b3.ns_jft_in1k', pretrained=True, num_classes=0)
base_model = base_model.cuda()

epoch_num = 1
bs_value = 32

In [None]:
class GELU_PyTorch_Tanh(nn.Module):
    def forward(self, x):
        return 0.5 * x * (1 + torch.tanh((2 / torch.pi) ** 0.5 * (x + 0.044715 * x ** 3)))

# 定义CustomModel类
class CustomModel(nn.Module):
    def __init__(self, base_model):
        super(CustomModel, self).__init__()
        self.base_model = base_model
        
        # 定义新的分类层
        self.fc1 = nn.Linear(1536, 2048)  # EfficientNet-B1的输出维度是1536
        self.bn1 = nn.BatchNorm1d(2048)
        self.dropout1 = nn.Dropout(p=0.5)        
        self.fc2 = nn.Linear(2048, 2048)
        self.bn2 = nn.BatchNorm1d(2048)
        self.gelu = GELU_PyTorch_Tanh()
        self.dropout2 = nn.Dropout(p=0.5)
        
        self.final_fc = nn.Linear(2048, 1)  # 二分类问题，输出维度为1
        self.sigmoid = nn.Sigmoid()
        
    def forward(self, x):
        x = self.base_model(x)
        x = self.fc1(x)
        x = self.bn1(x)
        x = self.gelu(x)
        x = self.dropout1(x)
        x = self.fc2(x)
        x = self.bn2(x)
        x = self.gelu(x)
        x = self.dropout2(x)
        x = self.final_fc(x)
        x = self.sigmoid(x)
        return x
custom_model = CustomModel(base_model)
custom_model = custom_model.cuda()
model_path = "/kaggle/input/3112141515515/pytorch/default/1/custom_model_96.64.pt"
if os.path.exists(model_path):
    custom_model.load_state_dict(torch.load(model_path))

In [None]:
train_loader = torch.utils.data.DataLoader(
    FFDIDataset(train_label['path'], train_label['target'],             
            transforms.Compose([
                        transforms.Resize((300, 300)),
#                         transforms.ColorJitter(brightness=.5, hue=.3),
                        transforms.ToTensor(),
                        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
        ])
    ), batch_size=bs_value, shuffle=True, num_workers=4, pin_memory=True
)

val_loader = torch.utils.data.DataLoader(
#     FFDIDataset(val_label['path'].head(1000), val_label['target'].head(1000), 
    FFDIDataset(val_label['path'], val_label['target'], 
            transforms.Compose([
                        transforms.Resize((300,300)),
                        transforms.ToTensor(),
                        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
        ])
    ), batch_size=bs_value, shuffle=False, num_workers=4, pin_memory=True
)

criterion = nn.BCELoss().cuda()
optimizer = torch.optim.Adam(custom_model.parameters(), 0.0001)
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=4, gamma=0.85)
best_acc = 0.0
for epoch in range(epoch_num):
    scheduler.step()
    print('Epoch: ', epoch)

    train(train_loader, custom_model, criterion, optimizer, epoch)
    val_acc = validate(val_loader, custom_model, criterion)
    val_acc = val_acc.item()  # 将 val_acc 转换为浮点数
    if val_acc > best_acc:
        best_acc = round(val_acc, 2)
        torch.save(custom_model.state_dict(), f'./custom_model_{best_acc}.pt')

print(f'Best validation accuracy: {best_acc}')


In [None]:
def predict(test_loader, model, tta=10):
    # switch to evaluate mode
    model.eval()
    
    test_pred_tta = None
    for _ in range(tta):
        test_pred = []
        with torch.no_grad():
            for i, (input, target) in tqdm_notebook(enumerate(test_loader), total=len(test_loader)):
                input = input.cuda()
                target = target.cuda()

                # compute output
                output = model(input)
                output = output.data.cpu().numpy()

                test_pred.append(output)
        test_pred = np.vstack(test_pred)
    
        if test_pred_tta is None:
            test_pred_tta = test_pred
        else:
            test_pred_tta += test_pred
    
    # Average the predictions if TTA is used
    test_pred_tta /= tta
    
    # Apply threshold to get binary predictions
    #test_pred_tta = (test_pred_tta > 0.5).astype(int)
    
    return test_pred_tta

In [None]:
test_loader = torch.utils.data.DataLoader(
    FFDIDataset(val_label['path'], val_label['target'], 
            transforms.Compose([
                        transforms.Resize((300, 300)),
                        transforms.ToTensor(),
                        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
        ])
    ), batch_size=bs_value, shuffle=False, num_workers=4, pin_memory=True
)

val_label['y_pred'] = predict(test_loader, custom_model, tta=1)[:, 0]  
val_label[['img_name', 'y_pred']].to_csv('submission.csv', index=None)