In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [None]:
import os
import numpy as np
import pandas as pd
import cv2
import seaborn as sns
import tensorflow as tf
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix

from tensorflow.keras.layers import Input, Dense, Flatten, Conv2D, MaxPooling2D, BatchNormalization, Dropout, LeakyReLU
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.models import Sequential
from tensorflow.keras.callbacks import ReduceLROnPlateau

In [None]:
!cp -r '/kaggle/input/deepfake/deepfake_database' '/kaggle/working/deepfake_database'

In [None]:
# 读取图片 [(image, label), (image, label), ...]
labels_name = ['df', 'real','ps']
img_size = 150

def get_data(data_dir):
    images = []
    labels = []

    for label in labels_name: 
        path = os.path.join(data_dir, label)
        class_num = labels_name.index(label)
        for img in os.listdir(path):
            try:
                img_arr = cv2.imread(os.path.join(path, img), cv2.IMREAD_GRAYSCALE)
                resized_arr = cv2.resize(img_arr, (img_size, img_size))
                images.append(resized_arr)
                labels.append(class_num)
            except Exception as e:
                print(e)

    return np.array(images), np.array(labels)

x_train, y_train = get_data('/kaggle/working/deepfake_database/train')
x_test, y_test = get_data('/kaggle/working/deepfake_database/test')
x_val, y_val = get_data('/kaggle/working/deepfake_database/validation')


In [None]:
x_train = np.array(x_train) / 255
x_val = np.array(x_val) / 255
x_test = np.array(x_test) / 255

# resize data for deep learning 
x_train = x_train.reshape(-1, img_size, img_size, 1)
y_train = np.array(y_train)

x_val = x_val.reshape(-1, img_size, img_size, 1)
y_val = np.array(y_val)

x_test = x_test.reshape(-1, img_size, img_size, 1)
y_test = np.array(y_test)

x_test[0].shape
y_train = y_train.reshape(-1,1)
y_test = y_test.reshape(-1,1)
y_val = y_val.reshape(-1,1)

In [None]:
datagen = ImageDataGenerator()
datagen.fit(x_train)

model = tf.keras.Sequential()
model.add(Conv2D(8 , (3,3) ,padding = 'same' , activation = 'relu' , input_shape = (150,150,1) ))
model.add(BatchNormalization())
model.add(MaxPooling2D((2,2) ,padding = 'same') )

model.add(Conv2D(8 , (5,5) ,padding = 'same' , activation = 'relu') )
model.add(BatchNormalization())
model.add(MaxPooling2D((2,2) ,padding = 'same'))

model.add(Conv2D(16 , (5,5) ,padding = 'same' , activation = 'relu'))
model.add(BatchNormalization())
model.add(MaxPooling2D((2,2) ,padding = 'same'))

model.add(Conv2D(16 , (5,5) ,padding = 'same' , activation = 'relu' ))
model.add(BatchNormalization())
model.add(MaxPooling2D((4,24) ,padding = 'same'))

model.add(Flatten())
model.add(Dropout(0.5))
model.add(Dense(16))
model.add(LeakyReLU(alpha=0.1))
model.add(Dropout(0.5))
model.add(Dense(1, activation = 'sigmoid'))
model.compile(optimizer = 'Adam' , loss = 'binary_crossentropy' , metrics = ['accuracy'])

batch_size = 4
epochs = 10

history = model.fit(datagen.flow(x_train,y_train,
         batch_size = batch_size),epochs=epochs,
         validation_data = datagen.flow(x_val, y_val),
         verbose=1)
model.summary()

In [None]:
import matplotlib.pyplot as plt

model_name = "CNN"
result = model.evaluate(x_test,y_test)
loss = result[0]
accuracy = result[1]*100

df = pd.DataFrame({'Model': [model_name], 'Loss': [loss], 'Accuracy (%)': [accuracy]})

fig, ax = plt.subplots()
ax.axis('tight')
ax.axis('off')
ax.table(cellText=df.values, colLabels=df.columns, loc='center')

plt.show()

Torch

In [None]:
import torch
from torch.utils.data import DataLoader
from torchvision import datasets, transforms

transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

train_dataset = datasets.ImageFolder('/kaggle/working/deepfake_database/train', transform=transform)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)

val_dataset = datasets.ImageFolder('/kaggle/working/deepfake_database/validation', transform=transform)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)

test_dataset = datasets.ImageFolder('/kaggle/working/deepfake_database/test', transform=transform)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

In [None]:
from transformers import DeiTForImageClassification

model = DeiTForImageClassification.from_pretrained("facebook/deit-base-distilled-patch16-224", num_labels=2)

In [None]:
from torch.optim import AdamW
from tqdm import tqdm
import torch
from torch.nn import DataParallel
from sklearn.metrics import accuracy_score

if torch.cuda.is_available():
    device = torch.device("cuda")
    print("Using", torch.cuda.device_count(), "GPUs")
    model = DataParallel(model)
    model.to(device)
else:
    device = torch.device("cpu")
    print("Using CPU")

optimizer = AdamW(model.parameters(), lr=0.0001)
criterion = torch.nn.CrossEntropyLoss()

total_accuracy = 0
total_loss = 0
num_batches = 0
model.train()

for epoch in range(1):
    epoch_loss = 0
    for batch in tqdm(train_loader):
        inputs, labels = batch

        inputs, labels = inputs.to(device), labels.to(device)

        optimizer.zero_grad()
        outputs = model(inputs).logits
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        epoch_loss += loss.item()
        num_batches += 1

    avg_epoch_loss = epoch_loss / num_batches
    total_loss += avg_epoch_loss

    model.eval()
    val_labels = []
    val_preds = []
    with torch.no_grad():
        for batch in val_loader:
            inputs, labels = batch
            inputs, labels = inputs.to(device), labels.to(device)

            outputs = model(inputs).logits
            _, preds = torch.max(outputs, 1)
            val_labels.extend(labels.cpu().numpy())
            val_preds.extend(preds.cpu().numpy())

    val_accuracy = accuracy_score(val_labels, val_preds)
    total_accuracy += val_accuracy
    print(f'Epoch {epoch+1}/{10}, Loss: {avg_epoch_loss:.4f}, Validation Accuracy: {val_accuracy:.4f}')

D2l

In [None]:
!pip install d2l==1.0.3

In [None]:
import os
import shutil
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as transforms
from torchvision.models import resnet18
from torch.utils.data import DataLoader
from torchvision.datasets import ImageFolder
import pandas as pd
import torchvision
from d2l import torch as d2l

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns
import tensorflow.keras
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Conv2D , MaxPool2D , Flatten , Dropout , BatchNormalization
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report,confusion_matrix
from keras.callbacks import ReduceLROnPlateau
import cv2
import os
import numpy as np
import pandas as pd
from keras.models import Sequential
from sklearn.model_selection import train_test_split

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from tensorflow.keras.layers import Input, Dense, Flatten, Conv2D, MaxPooling2D, BatchNormalization, Dropout, Reshape, Concatenate, LeakyReLU
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.models import Model
import sklearn.preprocessing 
from sklearn.preprocessing import LabelEncoder

In [None]:
data_dir='/kaggle/working/deepfake_database'

In [None]:
# generate labels
labels = ['df', 'real','ps']
img_size = 150
def get_data(data_dir):
    data = [] 
    for label in labels: 
        path = os.path.join(data_dir, label)
        class_num = labels.index(label)
        #print(class_num)
        for img in os.listdir(path):
            try:
                #img_arr = cv2.imread(os.path.join(path, img), cv2.IMREAD_GRAYSCALE)
                #resized_arr = cv2.resize(img_arr, (img_size, img_size)) # Reshaping images to preferred size
                #print(img)
                resized_arr=img.split(".")[0]
                data.append([resized_arr, class_num])
            except Exception as e:
                print(e)
    return data

In [None]:
data_test = get_data('/kaggle/working/deepfake_database/test')
data_train = get_data('/kaggle/working/deepfake_database/train')
for i in data_test:
    data_train.append(i)
    
data_label=data_train

x=[]
y=[]

for feature, label in data_label:
    x.append(feature)
    y.append(label)
    
label = pd.DataFrame({'name':x,'label':y})

In [None]:
# Convert the DataFrame to a CSV string with a specified delimiter (comma by default)
csv_string = label.to_string(index=False)
#print(csv_string)
# Split the CSV string into a list of lines
csv_lines = csv_string.split('\n')
#print(csv_lines)
# Save the CSV lines to a file using Python's built-in file I/O functions
with open('/kaggle/working/deepfake_database/train/labels.csv', 'w') as csvfile:
    for line in csv_lines:
        # Split the line by whitespace
        row = line.split()
        csvfile.write(row[0]+','+row[1]+'\n')

In [None]:
def copyfile(filename, target_dir):
    """将文件复制到目标目录"""
    os.makedirs(target_dir, exist_ok=True)
    shutil.copy(filename, target_dir)
    
def read_csv_labels(fname):
    """读取fname来给标签字典返回一个文件名"""
    with open(fname, 'r') as f:
        # 跳过文件头行(列名)
        lines = f.readlines()[1:]
    tokens = [l.rstrip().split(',') for l in lines]
    return dict(((name, label) for name, label in tokens))
labels = read_csv_labels(os.path.join("/kaggle/working/deepfake_database/train", 'labels.csv'))
print('# 训练样本 :', len(labels))
print('# 类别 :', len(set(labels.values())))

In [None]:
def reorg_train_valid(data_dir, labels, valid_ratio):
    """将验证集从原始的训练集中拆分出来"""
    # 训练数据集中样本最少的类别中的样本数
    n = collections.Counter(labels.values()).most_common()[-1][1]
    # 验证集中每个类别的样本数
    n_valid_per_label = max(1, math.floor(n * valid_ratio))
    label_count = {}
    # 遍历训练集中的所有图片
    for train_file in os.listdir(os.path.join(data_dir, 'train')):
        # 获取图片对应的label
        label = labels[train_file.split('.')[0]]
        # 获取图片地址
        fname = os.path.join(data_dir, 'train', train_file)
        # 将图片复制到label对应的文件夹下
        copyfile(fname, os.path.join(data_dir, 'train_valid_test',
                                     'train_valid', label))
        # 如果验证集还没存满，则把图片存到对应label的验证集下
        if label not in label_count or label_count[label] < n_valid_per_label:
            copyfile(fname, os.path.join(data_dir, 'train_valid_test',
                                         'valid', label))
            label_count[label] = label_count.get(label, 0) + 1
        # 如果验证集存满了，则把图片存到对应label的训练集下    
        else:
            copyfile(fname, os.path.join(data_dir, 'train_valid_test',
                                         'train', label))
    return n_valid_per_label

In [None]:
# 调整训练集与预测集，将图片放到train文件夹下
for train_file in os.listdir(os.path.join("/kaggle/working/deepfake_database/train", 'df')):
    fname = os.path.join("/kaggle/working/deepfake_database/train", 'df', train_file)
    copyfile(fname,"/kaggle/working/deepfake_database/train/train")

In [None]:
for train_file in os.listdir(os.path.join("/kaggle/working/deepfake_database/train", 'real')):
    fname = os.path.join("/kaggle/working/deepfake_database/train", 'real', train_file)
    copyfile(fname,"/kaggle/working/deepfake_database/train/train")

In [None]:
for train_file in os.listdir(os.path.join("/kaggle/working/deepfake_database/train", 'ps')):
    fname = os.path.join("/kaggle/working/deepfake_database/train", 'ps', train_file)
    copyfile(fname,"/kaggle/working/deepfake_database/train/train")

In [None]:
for test_file in os.listdir(os.path.join("/kaggle/working/deepfake_database/test", 'df')):
    fname = os.path.join("/kaggle/working/deepfake_database/test", 'df', test_file)
    copyfile(fname,"/kaggle/working/deepfake_database/train/test")

In [None]:
for test_file in os.listdir(os.path.join("/kaggle/working/deepfake_database/test", 'real')):
    fname = os.path.join("/kaggle/working/deepfake_database/test", 'real', test_file)
    copyfile(fname,"/kaggle/working/deepfake_database/train/test")

In [None]:
for test_file in os.listdir(os.path.join("/kaggle/working/deepfake_database/test", 'ps')):
    fname = os.path.join("/kaggle/working/deepfake_database/test", 'ps', test_file)
    copyfile(fname,"/kaggle/working/deepfake_database/train/test")

In [None]:
# 切分训练集与预测集

def reorg_dog_data(data_dir, valid_ratio):
    labels = d2l.read_csv_labels(os.path.join("/kaggle/working/deepfake_database/train", 'labels.csv'))
    d2l.reorg_train_valid(data_dir, labels, valid_ratio)
    d2l.reorg_test(data_dir)
    
batch_size = 128
valid_ratio = 0.1
reorg_dog_data("/kaggle/working/deepfake_database/train", valid_ratio)

In [None]:
# 构造feather
transform_train = torchvision.transforms.Compose([
    # Randomly crop the image to obtain an image with an area of 0.08 to 1 of
    # the original area and height-to-width ratio between 3/4 and 4/3. Then,
    # scale the image to create a new 224 x 224 image
    torchvision.transforms.RandomResizedCrop(224, scale=(0.08, 1.0),
                                             ratio=(3.0/4.0, 4.0/3.0)),
    torchvision.transforms.RandomHorizontalFlip(),
    # Randomly change the brightness, contrast, and saturation
    torchvision.transforms.ColorJitter(brightness=0.4,
                                       contrast=0.4,
                                       saturation=0.4),
    # Add random noise
    torchvision.transforms.ToTensor(),
    # Standardize each channel of the image
    torchvision.transforms.Normalize([0.485, 0.456, 0.406],
                                     [0.229, 0.224, 0.225])])

transform_test = torchvision.transforms.Compose([
    torchvision.transforms.Resize(256),
    # Crop a 224 x 224 square area from the center of the image
    torchvision.transforms.CenterCrop(224),
    torchvision.transforms.ToTensor(),
    torchvision.transforms.Normalize([0.485, 0.456, 0.406],
                                     [0.229, 0.224, 0.225])])

In [None]:
new_data_dir="/kaggle/working/deepfake_database/train"

In [None]:
train_ds, train_valid_ds = [torchvision.datasets.ImageFolder(
    os.path.join(new_data_dir, 'train_valid_test', folder),
    transform=transform_train) for folder in ['train', 'train_valid']]

valid_ds, test_ds = [torchvision.datasets.ImageFolder(
    os.path.join(new_data_dir, 'train_valid_test', folder),
    transform=transform_test) for folder in ['valid', 'test']]

train_iter, train_valid_iter = [torch.utils.data.DataLoader(
    dataset, batch_size, shuffle=True, drop_last=True)
    for dataset in (train_ds, train_valid_ds)]

valid_iter = torch.utils.data.DataLoader(valid_ds, batch_size, shuffle=False,
                                         drop_last=True)

test_iter = torch.utils.data.DataLoader(test_ds, batch_size, shuffle=False,
                                        drop_last=False)

In [None]:
# 构造模型
def get_net(devices):
    finetune_net = nn.Sequential()
    finetune_net.features = torchvision.models.vit_l_16(pretrained=True)
    # Define a new output network (there are 120 output categories)
    finetune_net.output_new = nn.Sequential(nn.Linear(1000, 256),
                                            nn.ReLU(),
                                            nn.Linear(256, 120))
    # Move the model to devices
    finetune_net = finetune_net.to(devices[0])
    # Freeze parameters of feature layers
    for param in finetune_net.features.parameters():
        param.requires_grad = False
    return finetune_net

In [None]:
loss = nn.CrossEntropyLoss(reduction='none')

def evaluate_loss(data_iter, net, devices):
    l_sum, n = 0.0, 0
    for features, labels in data_iter:
        features, labels = features.to(devices[0]), labels.to(devices[0])
        outputs = net(features)
        l = loss(outputs, labels)
        l_sum += l.sum()
        n += labels.numel()
    return l_sum / n

In [None]:
def train(net, train_iter, valid_iter, num_epochs, lr, wd, devices, lr_period,
          lr_decay):
    # Only train the small custom output network
    net = nn.DataParallel(net, device_ids=devices).to(devices[0])
    trainer = torch.optim.SGD((param for param in net.parameters()
                               if param.requires_grad), lr=lr,
                              momentum=0.9, weight_decay=wd)
    scheduler = torch.optim.lr_scheduler.StepLR(trainer, lr_period, lr_decay)
    num_batches, timer = len(train_iter), d2l.Timer()
    legend = ['train loss']
    if valid_iter is not None:
        legend.append('valid loss')
    animator = d2l.Animator(xlabel='epoch', xlim=[1, num_epochs],
                            legend=legend)
    for epoch in range(num_epochs):
        metric = d2l.Accumulator(2)
        for i, (features, labels) in enumerate(train_iter):
            timer.start()
            features, labels = features.to(devices[0]), labels.to(devices[0])
            trainer.zero_grad()
            output = net(features)
            l = loss(output, labels).sum()
            l.backward()
            trainer.step()
            metric.add(l, labels.shape[0])
            timer.stop()
            if (i + 1) % (num_batches // 5) == 0 or i == num_batches - 1:
                animator.add(epoch + (i + 1) / num_batches,
                             (metric[0] / metric[1], None))
        measures = f'train loss {metric[0] / metric[1]:.3f}'
        if valid_iter is not None:
            valid_loss = evaluate_loss(valid_iter, net, devices)
            animator.add(epoch + 1, (None, valid_loss.detach().cpu()))
        scheduler.step()
    if valid_iter is not None:
        measures += f', valid loss {valid_loss:.3f}'
    print(measures + f'\n{metric[1] * num_epochs / timer.sum():.1f}'
          f' examples/sec on {str(devices)}')

In [None]:
# 训练
devices, num_epochs, lr, wd = d2l.try_all_gpus(), 5, 1e-4, 1e-4
lr_period, lr_decay, net = 2, 0.9, get_net(devices)
train(net, train_iter, valid_iter, num_epochs, lr, wd, devices, lr_period,
      lr_decay)