In [18]:
import os
import numpy as np
import cv2
import glob
from tqdm import tqdm
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import accuracy_score, classification_report, log_loss


# ==========================
# Param√®tres
# ==========================
BLOCK_SIZE = 8
NUM_BLOCKS = 1000  # nombre de blocs √† retenir

# ==========================
# √âtape 1 : Extraction des Features de Bruit
# ==========================
def extract_noise_features(image_path, selected_channels=["Y", "Cb", "Cr"]):
    img = cv2.imread(image_path)
    img_ycc = cv2.cvtColor(img, cv2.COLOR_BGR2YCrCb)  # OpenCV inverse Cb/Cr
    
    channel_map = {
        "Y": img_ycc[:, :, 0],
        "Cb": img_ycc[:, :, 2],  # Cb = channel 2
        "Cr": img_ycc[:, :, 1]   # Cr = channel 1
    }

    features = []
    # selected_channels = list(channels) if len(channels) <= 3 else [channels[i:i+2] for i in range(0, len(channels), 2)]

    for ch in selected_channels:
        if ch not in channel_map:
            continue

        Ic = channel_map[ch].astype(np.float32)
        L4 = np.array([[0, 1, 0], [1, -4, 1], [0, 1, 0]], dtype=np.float32)
        Fc = cv2.filter2D(Ic, -1, L4)

        h, w = Fc.shape
        blocks = [Fc[i:i+BLOCK_SIZE, j:j+BLOCK_SIZE].flatten()
                  for i in range(0, h - BLOCK_SIZE + 1, BLOCK_SIZE)
                  for j in range(0, w - BLOCK_SIZE + 1, BLOCK_SIZE)]

        # S√©lection blocs √† faible variance et moyenne
        block_vars = np.array([np.var(b) for b in blocks])
        block_means = np.array([np.mean(b) for b in blocks])
        idx_var = np.argsort(block_vars)[:NUM_BLOCKS]
        idx_mean = np.argsort(block_means)[:NUM_BLOCKS]
        idx_selected = np.intersect1d(idx_var, idx_mean)
        selected_blocks = [blocks[i] for i in idx_selected]

        # Matrice de corr√©lation
        selected_blocks = np.stack(selected_blocks, axis=1)
        Rc = np.corrcoef(selected_blocks)
        tril_indices = np.tril_indices_from(Rc, k=-1)
        SRc = Rc[tril_indices]

        features.append(SRc)

    return np.concatenate(features)

# ==========================
# Chargement du Dataset
# ==========================
def load_dataset(real_dir, fake_dir, selected_channels=["Y", "Cb", "Cr"]):
    X, y, generator_labels = [], [], []

    # R√©elles
    for img_path in tqdm(glob.glob(os.path.join(real_dir, "*.jpg"))):
        X.append(extract_noise_features(img_path, selected_channels=selected_channels))
        y.append("real")
        generator_labels.append("real")

    # Fakes par g√©n√©rateur
    for gen_name in os.listdir(fake_dir):
        gen_path = os.path.join(fake_dir, gen_name)
        if not os.path.isdir(gen_path):
            continue
        for img_path in tqdm(glob.glob(os.path.join(gen_path, "*.jpg"))):
            X.append(extract_noise_features(img_path, selected_channels=selected_channels))
            y.append("fake")
            generator_labels.append(gen_name)

    return np.array(X), np.array(y), np.array(generator_labels)

# ==========================
# Entra√Ænement du Pipeline Complet
# ==========================

# =============== TRAINING PIPELINE ===============
def train_classifiers(X, y, gen_labels):
    print("üîß Initialisation de l'entra√Ænement...")

    label_enc = LabelEncoder()
    gen_indices = label_enc.fit_transform(gen_labels)
    N = len(np.unique(gen_indices))

    print(f"üì¶ Nombre de g√©n√©rateurs diff√©rents : {N}")
    print("üîÑ Split des donn√©es pour le mod√®le f (g√©n√©rateur)...")
    X_train, X_test, gen_train, gen_test = train_test_split(X, gen_indices, test_size=0.2, random_state=42)

    print("üèãÔ∏è‚Äç‚ôÇÔ∏è Entra√Ænement du mod√®le f (g√©n√©rateur)...")
    f_model = LogisticRegression(max_iter=1000)
    f_model.fit(X_train, gen_train)
    f_preds = f_model.predict(X_test)
    f_probs = f_model.predict_proba(X_test)

    print("\nüìä M√©triques du mod√®le f (multi-class g√©n√©rateur):")
    print(classification_report(gen_test, f_preds, target_names=label_enc.classes_))
    print(f"üéØ Accuracy f_model: {accuracy_score(gen_test, f_preds):.4f}")
    print(f"üî¢ Log loss f_model: {log_loss(gen_test, f_probs):.4f}")

    g_models = []
    g_preds_all = []
    print("\nüèó Entra√Ænement des mod√®les g (par g√©n√©rateur)...")
    for i in range(N):
        print(f"  üîπ Mod√®le g pour le g√©n√©rateur '{label_enc.classes_[i]}'")

        gi_labels = np.array([(g == i or y[idx] == "real") for idx, g in enumerate(gen_indices)], dtype=int)
        X_train_g, X_test_g, y_train_g, y_test_g = train_test_split(X, gi_labels, test_size=0.2, random_state=42)

        g_model = LogisticRegression(max_iter=1000)
        g_model.fit(X_train_g, y_train_g)
        preds = g_model.predict(X_test_g)
        probs = g_model.predict_proba(X_test_g)[:, 1]

        print(f"    üéØ Accuracy: {accuracy_score(y_test_g, preds):.4f}")
        print(f"    üî¢ Log loss: {log_loss(y_test_g, probs):.4f}")
        print(f"    üßæ Report:\n{classification_report(y_test_g, preds)}")

        g_models.append(g_model)
        g_preds_all.append(g_model.predict_proba(X_test)[:, 1])  # tous √©valu√©s sur m√™me X_test que f_model

    g_preds_all = np.stack(g_preds_all, axis=1)
    final_input = np.concatenate([f_probs, g_preds_all], axis=1)

    print("\nüéØ Entra√Ænement du mod√®le h (final)...")
    mask = gen_indices != N-1  # Exclure le dernier si c‚Äôest ‚Äòreal‚Äô uniquement
    h_labels = np.array([label != "real" for label in y])[mask]
    h_model = LogisticRegression(max_iter=1000)
    h_model.fit(final_input, h_labels[:final_input.shape[0]])
    h_preds = h_model.predict(final_input)
    h_probs = h_model.predict_proba(final_input)[:, 1]

    print("\nüìä M√©triques du mod√®le h (binaire FAKE vs REAL):")
    print(classification_report(h_labels[:final_input.shape[0]], h_preds))
    print(f"üéØ Accuracy h_model: {accuracy_score(h_labels[:final_input.shape[0]], h_preds):.4f}")
    print(f"üî¢ Log loss h_model: {log_loss(h_labels[:final_input.shape[0]], h_probs):.4f}")

    print("‚úÖ Tous les mod√®les ont √©t√© entra√Æn√©s avec succ√®s.")
    return f_model, g_models, h_model, label_enc


# ==========================
# Inference sur une image
# ==========================
def predict_image(img_path, f_model, g_models, h_model, label_enc, selected_channels=["Y", "Cb", "Cr"]):
    x = extract_noise_features(img_path, selected_channels=selected_channels).reshape(1, -1)
    f_out = f_model.predict_proba(x)
    g_out = np.stack([g.predict_proba(x)[:, 1] for g in g_models], axis=1)
    final_input = np.concatenate([f_out, g_out], axis=1)
    final_score = h_model.predict_proba(final_input)[:, 1]
    return final_score

# ==========================
# Exemple d'utilisation
# ==========================
if __name__ == "__main__":
    real_dir = "/path/to/real/images"
    fake_dir = "/path/to/fake/images"  # contient des sous-dossiers
    
    X, y, gen_labels = load_dataset(real_dir, fake_dir, channels="YCbCr")
    f_model, g_models, h_model, label_enc = train_classifiers(X, y, gen_labels)

    # Test sur une image
    test_img = "/path/to/test/image.jpg"
    score = predict_image(test_img, f_model, g_models, h_model, label_enc)
    print(f"Score final (probabilit√© d'√™tre FAKE): {score[0]:.4f}")


TypeError: load_dataset() got an unexpected keyword argument 'channels'

In [9]:
channels = "YCbCr"
selected_channels = list(channels) if len(channels) <= 3 else [channels[i:i+2] for i in range(0, len(channels), 2)]
print(selected_channels)
print(len(channels) <= 3)
print(list(channels))

['Y', 'C', 'r']
True
['Y', 'C', 'r']


In [38]:
import os
import numpy as np
import cv2
from tqdm import tqdm
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from datasets.arrow_dataset import Dataset as ArrowDataset
from datasets import concatenate_datasets
from glob import glob

# =============== CONFIG ===============
FAKE_ARROW_DIR = "/medias/db/ImagingSecurity_misc/sitcharn/paper_reproduction/cache/datasets/nebula___df-arrow/default/0.0.0/93117d58649bcf660f80fecf2122fac1f59d0453"
REAL_DIR = "/medias/db/ImagingSecurity_misc/Collaborations/Hermes deepfake challenge/data/defacto/COCO/train2017"
BLOCK_SIZE = 8
NUM_BLOCKS = 100
CHANNELS = ["Y", "Cb", "Cr"]
MAX_REAL_IMAGES = 100
FEATURE_SIZE = int((NUM_BLOCKS * (NUM_BLOCKS - 1) / 2) * len(CHANNELS))  # Corr√©lation triangulaire

# =============== FEATURE EXTRACTION ===============
def extract_noise_features(image_bytes, selected_channels=CHANNELS):
    try:
        img_array = np.frombuffer(image_bytes, np.uint8)
        img = cv2.imdecode(img_array, cv2.IMREAD_COLOR)
        if img is None:
            raise ValueError("Image non d√©codable")

        img_ycc = cv2.cvtColor(img, cv2.COLOR_BGR2YCrCb)

        channel_map = {
            "Y": img_ycc[:, :, 0],
            "Cb": img_ycc[:, :, 2],
            "Cr": img_ycc[:, :, 1]
        }

        features = []
        for ch in selected_channels:
            Ic = channel_map[ch].astype(np.float32)
            L4 = np.array([[0, 1, 0], [1, -4, 1], [0, 1, 0]], dtype=np.float32)
            Fc = cv2.filter2D(Ic, -1, L4)

            h, w = Fc.shape
            blocks = [Fc[i:i+BLOCK_SIZE, j:j+BLOCK_SIZE].flatten()
                      for i in range(0, h - BLOCK_SIZE + 1, BLOCK_SIZE)
                      for j in range(0, w - BLOCK_SIZE + 1, BLOCK_SIZE)]

            if len(blocks) == 0:
                continue

            while len(blocks) < NUM_BLOCKS:
                blocks += blocks  # r√©plication
            blocks = blocks[:NUM_BLOCKS]

            selected_blocks = np.stack(blocks, axis=1)
            Rc = np.corrcoef(selected_blocks)
            if np.isnan(Rc).any():
                continue

            tril_indices = np.tril_indices_from(Rc, k=-1)
            SRc = Rc[tril_indices]
            features.append(SRc)

        if not features:
            raise ValueError("Aucune feature extraite")

        full_feat = np.concatenate(features)

        if full_feat.shape[0] != FEATURE_SIZE:
            pad_width = FEATURE_SIZE - full_feat.shape[0]
            full_feat = np.pad(full_feat, (0, pad_width), mode='constant')

        return full_feat

    except Exception as e:
        raise ValueError(f"Erreur dans l'image : {e}")

# =============== TRAINING PIPELINE ===============
from sklearn.metrics import accuracy_score, classification_report, log_loss, roc_auc_score

# =============== TRAINING PIPELINE ===============
# =============== TRAINING PIPELINE ===============
from sklearn.metrics import classification_report, accuracy_score, log_loss, roc_auc_score

def train_classifiers(X, y, gen_labels):
    print("üîß Initialisation de l'entra√Ænement...")

    # Affichage du ratio fake/real
    unique, counts = np.unique(y, return_counts=True)
    counts_dict = dict(zip(unique, counts))
    total = sum(counts)
    print("\nüìà R√©partition des classes :")
    for cls in sorted(counts_dict):
        pct = 100 * counts_dict[cls] / total
        print(f"  - {cls}: {counts_dict[cls]} ({pct:.2f}%)")

    label_enc = LabelEncoder()
    gen_indices = label_enc.fit_transform(gen_labels)
    N = len(np.unique(gen_indices))

    print(f"\nüì¶ Nombre de g√©n√©rateurs diff√©rents : {N}")
    print("üîÑ Split des donn√©es pour le mod√®le f (g√©n√©rateur)...")
    X_train, X_test, gen_train, gen_test, y_train, y_test, gen_labels_train, gen_labels_test = train_test_split(
        X, gen_indices, y, gen_labels, test_size=0.2, random_state=42
    )

    print("y_test: ", y_test)

    print("üèãÔ∏è‚Äç‚ôÇÔ∏è Entra√Ænement du mod√®le f (g√©n√©rateur)...")
    f_model = LogisticRegression(max_iter=1000)
    f_model.fit(X_train, gen_train)
    f_preds = f_model.predict(X_test)
    f_probs = f_model.predict_proba(X_test)

    print("\nüìä M√©triques du mod√®le f (multi-class g√©n√©rateur):")
    print(classification_report(gen_test, f_preds, target_names=label_enc.classes_))
    print(f"üéØ Accuracy f_model: {accuracy_score(gen_test, f_preds):.4f}")
    print(f"üî¢ Log loss f_model: {log_loss(gen_test, f_probs):.4f}")

    g_models = []
    g_preds_all = []
    print("\nüèó Entra√Ænement des mod√®les g (par g√©n√©rateur)...")
    for i in range(N):
        print(f"  üîπ Mod√®le g pour le g√©n√©rateur '{label_enc.classes_[i]}'")

        gi_labels = np.array([(g == i or y[idx] == "real") for idx, g in enumerate(gen_indices)], dtype=int)
        X_train_g, X_test_g, y_train_g, y_test_g = train_test_split(X, gi_labels, test_size=0.2, random_state=42)

        g_model = LogisticRegression(max_iter=1000)
        g_model.fit(X_train_g, y_train_g)
        preds = g_model.predict(X_test_g)
        probs = g_model.predict_proba(X_test_g)[:, 1]

        try:
            auc = roc_auc_score(y_test_g, probs)
        except ValueError:
            auc = float("nan")

        print(f"    üéØ Accuracy: {accuracy_score(y_test_g, preds):.4f}")
        print(f"    üî¢ Log loss: {log_loss(y_test_g, probs):.4f}")
        print(f"    üß† AUC: {auc:.4f}")
        print(f"    üßæ Report:\n{classification_report(y_test_g, preds)}")

        g_models.append(g_model)
        g_preds_all.append(g_model.predict_proba(X_test)[:, 1])  # tous √©valu√©s sur m√™me X_test que f_model

    g_preds_all = np.stack(g_preds_all, axis=1)
    final_input = np.concatenate([f_probs, g_preds_all], axis=1)

    print("\nüéØ Entra√Ænement du mod√®le h (final FAKE vs REAL)...")

    print("\nüéØ Entra√Ænement du mod√®le h (final FAKE vs REAL)...")

    # Labels binaires : 1 = fake, 0 = real
    h_labels = (np.array(y_test) == "fake").astype(int)

    # V√©rifier qu'on a bien les deux classes
    if len(np.unique(h_labels)) < 2:
        raise ValueError("‚ö†Ô∏è Pas assez de classes (fake/real) pour entra√Æner h_model.")

    # Entra√Ænement
    h_model = LogisticRegression(max_iter=1000)
    h_model.fit(final_input, h_labels)

    # Pr√©diction
    h_preds = h_model.predict(final_input)
    h_probs = h_model.predict_proba(final_input)[:, 1]

    # Affichage des m√©triques
    print("\nüìä M√©triques du mod√®le h (binaire FAKE vs REAL):")
    print(classification_report(h_labels, h_preds))
    print(f"üéØ Accuracy h_model: {accuracy_score(h_labels, h_preds):.4f}")
    print(f"üî¢ Log loss h_model: {log_loss(h_labels, h_probs):.4f}")
    print(f"üìà AUC h_model: {roc_auc_score(h_labels, h_probs):.4f}")

    print("‚úÖ Tous les mod√®les ont √©t√© entra√Æn√©s avec succ√®s.")

    return f_model, g_models, h_model, label_enc


# =============== MAIN PIPELINE ===============
if __name__ == "__main__":
    arrow_files = sorted([
        os.path.join(FAKE_ARROW_DIR, f) for f in os.listdir(FAKE_ARROW_DIR)
        if f.startswith("df-arrow-test") and f.endswith(".arrow")
    ])
    fake_dataset = concatenate_datasets([ArrowDataset.from_file(f) for f in arrow_files])

    X, y, generator_labels = [], [], []
    success_count, fail_count = 0, 0

    for sample in tqdm(fake_dataset, desc="Extracting fake image features"):
        try:
            img_bytes = sample["image"]
            path = sample["image_path"]
            gen_name = path.split("/")[0]
            feat = extract_noise_features(img_bytes)
            X.append(feat)
            y.append("fake")
            generator_labels.append(gen_name)
            success_count += 1
        except Exception:
            fail_count += 1
        if success_count >= MAX_REAL_IMAGES:
            break
    
    real_image_paths = sorted(
        glob(os.path.join(REAL_DIR, "**", "*.jpg"), recursive=True)
        + glob(os.path.join(REAL_DIR, "**", "*.png"), recursive=True)
    )

    real_count = 0
    for path in tqdm(real_image_paths, desc="Extracting real image features"):
        try:
            with open(path, "rb") as f:
                img_bytes = f.read()
                feat = extract_noise_features(img_bytes)

                if feat is None or len(feat.shape) != 1 or (len(X) > 0 and feat.shape[0] != X[0].shape[0]):
                    raise ValueError("Vecteur de features invalide ou incoh√©rent")

                X.append(feat)
                y.append("real")
                generator_labels.append("real")
                success_count += 1
                real_count += 1
                if real_count >= MAX_REAL_IMAGES:
                    break
        except Exception as e:
            print(f"Erreur pour image r√©elle {os.path.basename(path)}: {e}")
            fail_count += 1

    print(f"\n‚úÖ Total features extraites: {success_count}")
    print(f"‚ùå Images ignor√©es: {fail_count}")
    print(f"üìä Total analys√©: {success_count + fail_count}")
    print(f"üì¶ G√©n√©rateurs d√©tect√©s: {set(generator_labels)}")

    X = np.array(X)
    y = np.array(y)
    print(f"X shape: {X.shape}")
    print(f"y shape: {y.shape}")

    generator_labels = np.array(generator_labels)

    f_model, g_models, h_model, label_enc = train_classifiers(X, y, generator_labels)
    print("\n‚úÖ Entra√Ænement termin√©")


  c /= stddev[:, None]
  c /= stddev[None, :]
Extracting fake image features:   0%|          | 99/76000 [00:01<21:33, 58.67it/s]
Extracting real image features:   0%|          | 99/118287 [00:01<32:53, 59.89it/s]



‚úÖ Total features extraites: 200
‚ùå Images ignor√©es: 0
üìä Total analys√©: 200
üì¶ G√©n√©rateurs d√©tect√©s: {'ADM', 'VQDM', 'Midjourney', 'real', 'stable_diffusion_v_1_5', 'wukong', 'stable_diffusion_v_1_4'}
X shape: (200, 14850)
y shape: (200,)
üîß Initialisation de l'entra√Ænement...

üìà R√©partition des classes :
  - fake: 100 (50.00%)
  - real: 100 (50.00%)

üì¶ Nombre de g√©n√©rateurs diff√©rents : 7
üîÑ Split des donn√©es pour le mod√®le f (g√©n√©rateur)...
y_test:  ['fake' 'fake' 'fake' 'real' 'real' 'real' 'fake' 'real' 'real' 'fake'
 'fake' 'real' 'real' 'fake' 'real' 'real' 'fake' 'real' 'fake' 'fake'
 'real' 'fake' 'real' 'fake' 'fake' 'fake' 'fake' 'fake' 'real' 'real'
 'fake' 'fake' 'fake' 'fake' 'real' 'real' 'real' 'real' 'real' 'fake']
üèãÔ∏è‚Äç‚ôÇÔ∏è Entra√Ænement du mod√®le f (g√©n√©rateur)...

üìä M√©triques du mod√®le f (multi-class g√©n√©rateur):
                        precision    recall  f1-score   support

                   ADM       0.00      0.

  _warn_prf(average, modifier, f"{metric.capitalize()} is", result.shape[0])
  _warn_prf(average, modifier, f"{metric.capitalize()} is", result.shape[0])
  _warn_prf(average, modifier, f"{metric.capitalize()} is", result.shape[0])


    üéØ Accuracy: 0.7250
    üî¢ Log loss: 0.8543
    üß† AUC: 0.7050
    üßæ Report:
              precision    recall  f1-score   support

           0       0.80      0.60      0.69        20
           1       0.68      0.85      0.76        20

    accuracy                           0.72        40
   macro avg       0.74      0.72      0.72        40
weighted avg       0.74      0.72      0.72        40

  üîπ Mod√®le g pour le g√©n√©rateur 'Midjourney'
    üéØ Accuracy: 0.7000
    üî¢ Log loss: 0.7884
    üß† AUC: 0.7214
    üßæ Report:
              precision    recall  f1-score   support

           0       0.75      0.38      0.50        16
           1       0.69      0.92      0.79        24

    accuracy                           0.70        40
   macro avg       0.72      0.65      0.64        40
weighted avg       0.71      0.70      0.67        40

  üîπ Mod√®le g pour le g√©n√©rateur 'VQDM'
    üéØ Accuracy: 0.6250
    üî¢ Log loss: 0.9634
    üß† AUC: 0.68