In [None]:
# Debug library, very useful
from icecream import ic

In [None]:
import os

dir_fake = ["../dataset/fake"]
dir_real = ["../dataset/real"]

# Collect all file paths and filter invalid files
fake_files = [
    os.path.join(subdir, file)
    for dir in dir_fake
    for subdir, _, files in os.walk(dir)
    for file in files
    if os.path.isfile(os.path.join(subdir, file))
]

real_files = [
    os.path.join(subdir, file)
    for dir in dir_real
    for subdir, _, files in os.walk(dir)
    for file in files
    if os.path.isfile(os.path.join(subdir, file))
]

In [None]:
import torch.multiprocessing
import torch.utils.data
from torchvision.io import decode_image


class BenfordDataset(torch.utils.data.Dataset):
    def __init__(self, files, label):
        self.files = files
        self.label = label

    def __len__(self):
        return len(self.files)

    def __getitem__(self, idx):
        image = decode_image(self.files[idx], mode="GRAY")
        return image, self.label[idx]


# Parameters
params = {
    "batch_size": 20,
    "shuffle": False,
    "num_workers": torch.multiprocessing.cpu_count(),
    "pin_memory": True,
    "prefetch_factor": 2,
}

fake_data_loader = torch.utils.data.DataLoader(
    BenfordDataset(fake_files, [1] * len(fake_files)), **params
)

real_data_loader = torch.utils.data.DataLoader(
    BenfordDataset(real_files, [0] * len(real_files)), **params
)

In [None]:
from tqdm import tqdm
from ImageForensics import FeatureExtraction

N = 300
extract = FeatureExtraction(features=N)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

psd1D_total_fake = []
psd1D_total_real = []

for fake_batch, fake_labels in tqdm(fake_data_loader):
    fake_batch = fake_batch.to(device)
    psd1D_batch = [extract.fft(fake_image) for fake_image in fake_batch]
    psd1D_total_fake.extend(psd1D_batch)

for real_batch, real_labels in tqdm(real_data_loader):
    real_batch = real_batch.to(device)
    psd1D_batch = [extract.fft(real_image) for real_image in real_batch]
    psd1D_total_real.extend(psd1D_batch)

# Convert lists to tensors and send the results back to the CPU
psd1D_total_fake = torch.stack(psd1D_total_fake).to("cpu")
psd1D_total_real = torch.stack(psd1D_total_real).to("cpu")

# Remove None results if any files failed to process
psd1D_total_fake = [result for result in psd1D_total_fake if result is not None]
psd1D_total_real = [result for result in psd1D_total_real if result is not None]

In [None]:
import numpy as np

label_total_fake = np.full(len(psd1D_total_fake), True)
label_total_real = np.full(len(psd1D_total_real), False)

# psd1D_total_final = psd1D_total_fake
# label_total_final = label_total_fake

features = np.concatenate((psd1D_total_fake, psd1D_total_real))
labels = np.concatenate((label_total_fake, label_total_real))

In [None]:
# Benford's Law for the first digit
DIGITS = np.arange(1, 10)
BENFORD = np.log10(1 + 1 / DIGITS)

In [None]:
def get_first_digit(value: float) -> int:
    return int(str(value)[0])


def get_digit_counts(array: list[int]) -> list[int]:
    # use List comprehension to count the occurrences of each digit
    return [array.count(digit) for digit in DIGITS]


# Get first digit of each value
first_digits = [[get_first_digit(value) for value in array] for array in features]

# Count the occurrences of each first digit
first_digits_counts = [get_digit_counts(array) for array in first_digits]

In [None]:
import scipy.stats as stats
from sklearn import metrics


def test_results(
    alpha: int,
    first_digits_counts: list[list[int]] = first_digits_counts,
) -> dict:
    # Test the goodness of fit for each feature
    goodness_of_fit = [
        stats.pearsonr(first_digits_count, BENFORD)
        for first_digits_count in first_digits_counts
    ]

    # calculate True Positive, False Positive, True Negative, False Negative
    results = [1 - alpha >= p_value for p_value, _ in goodness_of_fit]

    # label for fake is 0/False, real is 1/True
    TN, FP, FN, TP = metrics.confusion_matrix(labels, results).ravel()

    return {
        "TP": TP,
        "FP": FP,
        "TN": TN,
        "FN": FN,
        "precision": metrics.precision_score(labels, results),
        "recall": metrics.recall_score(labels, results),
        "f1": metrics.f1_score(labels, results),
        "accuracy": metrics.accuracy_score(labels, results),
    }

In [None]:
ALPHA = [0.01, 0.05, 0.1]

for alpha in ALPHA:
    ic(alpha, test_results(alpha))