In [None]:
import os
import numpy as np
from PIL import Image
import cv2
import threading
import hashlib
from pathlib import Path
from sklearn.model_selection import train_test_split
from tensorflow.keras.utils import Sequence
from keras.models import Sequential
from keras.layers import Dense, Dropout, Flatten, Conv2D, MaxPool2D, BatchNormalization
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping, ReduceLROnPlateau
import concurrent.futures
from tensorflow.keras.utils import Sequence, to_categorical


In [None]:
DATA_DIR = "/kaggle/input/200k-real-vs-ai-visuals-by-mbilal/my_real_vs_ai_dataset/my_real_vs_ai_dataset/"
IMG_SIZE = (128, 128)           # Must match model input!
BATCH_SIZE = 128
ELA_QUALITY = 75
CACHE_DIR = "/kaggle/working/ela_cache"  # Kaggle writable dir
NUM_THREADS = 8
NUM_CLASSES = 2

os.makedirs(CACHE_DIR, exist_ok=True)

In [None]:
def compute_ela_cv2(img_array: np.ndarray, quality: int = 75) -> np.ndarray:
    if img_array.dtype != np.uint8:
        img_array = img_array.astype(np.uint8)

    img_bgr = cv2.cvtColor(img_array, cv2.COLOR_RGB2BGR)
    encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), quality]
    success, enc_img = cv2.imencode('.jpg', img_bgr, encode_param)
    if not success:
        return np.zeros_like(img_array)

    dec_img = cv2.imdecode(enc_img, cv2.IMREAD_COLOR)
    dec_rgb = cv2.cvtColor(dec_img, cv2.COLOR_BGR2RGB)

    diff = np.abs(img_array.astype(np.int16) - dec_rgb.astype(np.int16))
    ela = np.clip(diff * 8, 0, 255).astype(np.uint8)
    return ela

In [None]:
cache_lock = threading.Lock()

def get_cache_path(filepath: str) -> str:
    key = f"{filepath}_{IMG_SIZE[0]}x{IMG_SIZE[1]}_q{ELA_QUALITY}"
    hash_key = hashlib.md5(key.encode()).hexdigest()
    return os.path.join(CACHE_DIR, f"{hash_key}.npy")

def load_or_compute_ela(filepath: str):
    cache_path = get_cache_path(filepath)

    if os.path.exists(cache_path):
        try:
            return np.load(cache_path)
        except:
            pass  # corrupted → recompute

    try:
        img = Image.open(filepath).convert('RGB')
        img = img.resize(IMG_SIZE, Image.Resampling.LANCZOS)
        img_array = np.array(img)
    except Exception as e:
        print(f"Error loading {filepath}: {e}")
        return np.zeros((IMG_SIZE[0], IMG_SIZE[1], 3), dtype=np.uint8)

    ela = compute_ela_cv2(img_array, ELA_QUALITY)

    with cache_lock:
        try:
            np.save(cache_path, ela)
        except:
            pass
    return ela

In [None]:
class FastELASequence(Sequence):
    def __init__(self, filepaths, labels, batch_size=BATCH_SIZE, img_size=IMG_SIZE, shuffle=True):
        self.filepaths = list(filepaths)
        self.labels = np.array(labels, dtype=np.int32)
        self.batch_size = batch_size
        self.img_size = img_size
        self.shuffle = shuffle
        self.indices = np.arange(len(self.filepaths))
        self.on_epoch_end()

    def __len__(self):
        return int(np.ceil(len(self.filepaths) / self.batch_size))

    def on_epoch_end(self):
        if self.shuffle:
            np.random.shuffle(self.indices)

    def __getitem__(self, index):
        start = index * self.batch_size
        end = min((index + 1) * self.batch_size, len(self.filepaths))
        batch_indices = self.indices[start:end]

        batch_x = []
        batch_y = []

        def load_one(i):
            ela = load_or_compute_ela(self.filepaths[i])
            batch_x.append(ela)
            batch_y.append(self.labels[i])

        if len(batch_indices) >= 8:
            with concurrent.futures.ThreadPoolExecutor(max_workers=NUM_THREADS) as executor:
                list(executor.map(load_one, batch_indices))
        else:
            for i in batch_indices:
                load_one(i)

        X = np.array(batch_x, dtype=np.float32) / 255.0
        y = to_categorical(batch_y, num_classes=NUM_CLASSES)
        return X, y

In [None]:
print("Loading filepaths and labels...")
filepaths = []
labels = []
class_names = ["ai_images", "real"]
class_to_idx = {"ai_images": 0, "real": 1}

for class_name in class_names:
    class_dir = os.path.join(DATA_DIR, class_name)
    if not os.path.exists(class_dir):
        print(f"Warning: {class_dir} not found!")
        continue
    for fname in os.listdir(class_dir):
        if fname.lower().endswith(('.png', '.jpg', '.jpeg', '.bmp', '.tiff')):
            filepaths.append(os.path.join(class_dir, fname))
            labels.append(class_to_idx[class_name])

filepaths = np.array(filepaths)
labels = np.array(labels)

print(f"Total images: {len(filepaths)}")
print(f"AI: {np.sum(labels==0)}, Real: {np.sum(labels==1)}")

# Split
train_paths, val_paths, train_labels, val_labels = train_test_split(
    filepaths, labels,
    test_size=0.2,
    random_state=42,
    stratify=labels
)

print(f"Train: {len(train_paths)}, Val: {len(val_paths)}")


In [None]:
train_gen = FastELASequence(train_paths, train_labels, batch_size=BATCH_SIZE, shuffle=True)
val_gen = FastELASequence(val_paths, val_labels, batch_size=BATCH_SIZE, shuffle=False)

In [None]:
model = Sequential([
    Conv2D(32, (5,5), activation='relu', input_shape=(128,128,3), padding='same'),
    BatchNormalization(),
    MaxPool2D(2,2),
    Dropout(0.25),

    Conv2D(64, (5,5), activation='relu', padding='same'),
    BatchNormalization(),
    MaxPool2D(2,2),
    Dropout(0.25),

    Conv2D(128, (3,3), activation='relu', padding='same'),
    BatchNormalization(),
    MaxPool2D(2,2),
    Dropout(0.3),

    Conv2D(256, (3,3), activation='relu', padding='same'),
    BatchNormalization(),
    MaxPool2D(2,2),
    Dropout(0.4),

    Flatten(),
    Dense(512, activation='relu'),
    Dropout(0.5),
    Dense(256, activation='relu'),
    Dropout(0.5),
    Dense(2, activation='softmax')
])

In [None]:
model.compile(
    optimizer='adam',
    loss='categorical_crossentropy',
    metrics=['accuracy']
)

model.summary()

In [None]:
checkpoint = ModelCheckpoint(
    "/kaggle/working/best_model.h5",
    monitor="val_loss",
    mode="min",
    save_best_only=True,
    verbose=1
)

early_stop = EarlyStopping(
    monitor='val_loss',
    patience=7,
    restore_best_weights=True,
    verbose=1
)

reduce_lr = ReduceLROnPlateau(
    monitor='val_loss',
    factor=0.2,
    patience=3,
    min_lr=1e-7,
    verbose=1
)
   

In [None]:
print("Starting training... (First epoch will build ELA cache — be patient!)")
history = model.fit(
    train_gen,
    validation_data=val_gen,
    epochs=70,
    callbacks=[checkpoint, early_stop, reduce_lr],
    verbose=1
)

# Save final model
model.save("/kaggle/working/final_real_vs_ai_ela_model.h5")
# model.save("/kaggle/working/final_real_vs_ai_ela_model.keras")
print("Training complete! Model saved.")

In [None]:

import matplotlib.pyplot as plt
plt.figure(figsize=(20,8))
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('model loss')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(['train', 'val'], loc='upper left')
plt.show()

In [None]:
plt.figure(figsize=(20,8))
plt.plot(history.history['loss'])
plt.plot(history.history['accuracy'])
plt.title('model accuracy')
plt.ylabel('accuracy')
plt.xlabel('epoch')
plt.legend(['train', 'val'], loc='upper left')
plt.show()

In [None]:
from keras.models import load_model
import matplotlib.pyplot as plt

def predict_image(image_path, model_path=model):
    model = model_path
    
    ela = convert_to_ela_image(image_path, quality=95)
    ela_resized = ela.resize((128,128))
    ela_array = np.array(ela_resized) / 255.0
    ela_array = ela_array.reshape(1, 128, 128, 3)
    
    pred = model.predict(ela_array)[0]
    confidence_fake = pred[1] * 100
    confidence_real = pred[0] * 100
    
    plt.figure(figsize=(15,5))
    plt.subplot(1,3,1)
    plt.title("Original Image")
    plt.imshow(Image.open(image_path))
    plt.axis('off')
    
    plt.subplot(1,3,2)
    plt.title("ELA (Quality=75)")
    plt.imshow(ela)
    plt.axis('off')
    
    plt.subplot(1,3,3)
    plt.title(f"Prediction\nReal: {confidence_real:.2f}%\nFake: {confidence_fake:.2f}%")
    plt.bar(['Real', 'Fake'], [confidence_real, confidence_fake], color=['green', 'red'])
    plt.ylim(0, 100)
    
    plt.show()
    
    print(f"➜ Prediction: {'Fake' if confidence_fake > 50 else 'Real'}")
    print(f"➜ Confidence: {max(confidence_fake, confidence_real):.2f}%")
    print(f"Raw probabilities → Real: {confidence_real:.2f}% | Fake: {confidence_fake:.2f}%")

In [None]:
image_path=""
predict_image(image_path)