In [None]:
import os
import numpy as np
from PIL import Image
import cv2
import threading
import hashlib
from pathlib import Path

In [None]:
DATA_DIR = "/kaggle/input/200k-real-vs-ai-visuals-by-mbilal/my_real_vs_ai_dataset/my_real_vs_ai_dataset/"
IMG_SIZE = (128, 128)           # Must match model input!
BATCH_SIZE = 128
ELA_QUALITY = 75
CACHE_DIR = "/kaggle/working/ela_cache"  # Kaggle writable dir
NUM_THREADS = 8
NUM_CLASSES = 2

os.makedirs(CACHE_DIR, exist_ok=True)

In [None]:
def compute_ela_cv2(img_array: np.ndarray, quality: int = 75) -> np.ndarray:
    if img_array.dtype != np.uint8:
        img_array = img_array.astype(np.uint8)

    img_bgr = cv2.cvtColor(img_array, cv2.COLOR_RGB2BGR)
    encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), quality]
    success, enc_img = cv2.imencode('.jpg', img_bgr, encode_param)
    if not success:
        return np.zeros_like(img_array)

    dec_img = cv2.imdecode(enc_img, cv2.IMREAD_COLOR)
    dec_rgb = cv2.cvtColor(dec_img, cv2.COLOR_BGR2RGB)

    diff = np.abs(img_array.astype(np.int16) - dec_rgb.astype(np.int16))
    ela = np.clip(diff * 8, 0, 255).astype(np.uint8)
    return ela

In [None]:
cache_lock = threading.Lock()

def get_cache_path(filepath: str) -> str:
    key = f"{filepath}_{IMG_SIZE[0]}x{IMG_SIZE[1]}_q{ELA_QUALITY}"
    hash_key = hashlib.md5(key.encode()).hexdigest()
    return os.path.join(CACHE_DIR, f"{hash_key}.npy")

def load_or_compute_ela(filepath: str):
    cache_path = get_cache_path(filepath)

    if os.path.exists(cache_path):
        try:
            return np.load(cache_path)
        except:
            pass  # corrupted â†’ recompute

    try:
        img = Image.open(filepath).convert('RGB')
        img = img.resize(IMG_SIZE, Image.Resampling.LANCZOS)
        img_array = np.array(img)
    except Exception as e:
        print(f"Error loading {filepath}: {e}")
        return np.zeros((IMG_SIZE[0], IMG_SIZE[1], 3), dtype=np.uint8)

    ela = compute_ela_cv2(img_array, ELA_QUALITY)

    with cache_lock:
        try:
            np.save(cache_path, ela)
        except:
            pass
    return ela


In [None]:
filepaths = []
labels = []
class_names = ["ai_images", "real"]
class_to_idx = {"ai_images": 0, "real": 1}

for class_name in class_names:
    class_dir = os.path.join(DATA_DIR, class_name)
    if not os.path.exists(class_dir):
        print(f"Warning: {class_dir} not found!")
        continue
    for fname in os.listdir(class_dir):
        if fname.lower().endswith(('.png', '.jpg', '.jpeg', '.bmp', '.tiff')):
            filepaths.append(os.path.join(class_dir, fname))
            labels.append(class_to_idx[class_name])

filepaths = np.array(filepaths)
labels = np.array(labels)

print(f"Total images: {len(filepaths)}")
print(f"AI: {np.sum(labels==0)}, Real: {np.sum(labels==1)}")

# Split
train_paths, val_paths, train_labels, val_labels = train_test_split(
    filepaths, labels,
    test_size=0.2,
    random_state=42,
    stratify=labels
)

print(f"Train: {len(train_paths)}, Val: {len(val_paths)}")

In [None]:
train_gen = FastELASequence(train_paths, train_labels, batch_size=BATCH_SIZE, shuffle=True)
val_gen = FastELASequence(val_paths, val_labels, batch_size=BATCH_SIZE, shuffle=False)

In [None]:
model = Sequential([
    Conv2D(32, (5,5), activation='relu', input_shape=(128,128,3), padding='same'),
    BatchNormalization(),
    MaxPool2D(2,2),
    Dropout(0.25),

    Conv2D(64, (5,5), activation='relu', padding='same'),
    BatchNormalization(),
    MaxPool2D(2,2),
    Dropout(0.25),

    Conv2D(128, (3,3), activation='relu', padding='same'),
    BatchNormalization(),
    MaxPool2D(2,2),
    Dropout(0.3),

    Conv2D(256, (3,3), activation='relu', padding='same'),
    BatchNormalization(),
    MaxPool2D(2,2),
    Dropout(0.4),

    Flatten(),
    Dense(512, activation='relu'),
    Dropout(0.5),
    Dense(256, activation='relu'),
    Dropout(0.5),
    Dense(2, activation='softmax')
])

In [None]:
model.compile(
    optimizer='adam',
    loss='categorical_crossentropy',
    metrics=['accuracy']
)

model.summary()