In [1]:
!pip install opencv-python-headless scikit-image scikit-learn matplotlib Pillow seaborn --quiet

In [None]:
import numpy as np
import cv2
import os
import scipy.stats
import matplotlib.pyplot as plt
import seaborn as sns

from io import BytesIO
from PIL import Image
from skimage.restoration import denoise_wavelet
from sklearn.svm import SVC
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import GridSearchCV, StratifiedShuffleSplit
from sklearn.metrics import accuracy_score, confusion_matrix

# ------------------------
# PARAMETERS
# ------------------------
DATA_DIR = "/content/data"  # <-- update this to your dataset path
N_REPEATS = 10
USE_COMPRESSION = False  # Set to True for JPEG compression test
IMAGE_SIZE = (768, 1024)  # Width x Height
label_map = {0: "Scanner", 1: "Computer Generated", 2: "Camera"}

# ------------------------
# RESIDUAL NOISE FEATURE EXTRACTION
# ------------------------
def extract_features(img):
    if len(img.shape) == 3:
        img = img[:, :, 1]  # green channel

    denoised = denoise_wavelet(img, channel_axis=None, rescale_sigma=True)
    noise = img.astype(np.float32) - denoised.astype(np.float32)

    M, N = noise.shape
    r_avg = np.mean(noise, axis=0)
    c_avg = np.mean(noise, axis=1)

    def normalized_corr(a, b):
        if np.std(a) == 0 or np.std(b) == 0:
            return 0
        return np.corrcoef(a, b)[0, 1]

    rho_row = [normalized_corr(r_avg, noise[i, :]) for i in range(M)]
    rho_col = [normalized_corr(c_avg, noise[:, j]) for j in range(N)]

    f = [
        np.mean(rho_row), np.std(rho_row),
        scipy.stats.skew(rho_row), scipy.stats.kurtosis(rho_row),
        np.mean(rho_col), np.std(rho_col),
        scipy.stats.skew(rho_col), scipy.stats.kurtosis(rho_col),
        np.std(r_avg), scipy.stats.skew(r_avg), scipy.stats.kurtosis(r_avg),
        np.std(c_avg), scipy.stats.skew(c_avg), scipy.stats.kurtosis(c_avg),
        (1 - np.mean(rho_col) / np.mean(rho_row)) * 100
    ]

    return f

# ------------------------
# JPEG COMPRESSION FUNCTION
# ------------------------
def jpeg_compress_image(cv2_img, quality=90):
    rgb_img = cv2.cvtColor(cv2_img, cv2.COLOR_BGR2RGB)
    pil_img = Image.fromarray(rgb_img)
    buffer = BytesIO()
    pil_img.save(buffer, format='JPEG', quality=quality)
    buffer.seek(0)
    jpeg_img = Image.open(buffer)
    jpeg_arr = np.array(jpeg_img)
    return cv2.cvtColor(jpeg_arr, cv2.COLOR_RGB2BGR)

# ------------------------
# LOAD DATASET
# ------------------------
def load_dataset(data_dir, compress=False):
    X, y = [], []
    labels = {'scanner': 0, 'cg': 1, 'camera': 2}
    for label_name, label_value in labels.items():
        folder = os.path.join(data_dir, label_name)
        for fname in os.listdir(folder):
            if fname.lower().endswith(('.jpg', '.png', '.jpeg', '.tif')):
                path = os.path.join(folder, fname)
                img = cv2.imread(path, cv2.IMREAD_COLOR)
                img = cv2.resize(img, IMAGE_SIZE)
                if compress:
                    img = jpeg_compress_image(img, quality=90)
                f = extract_features(img)
                X.append(f)
                y.append(label_value)
    return np.array(X), np.array(y)

# ------------------------
# MAIN TRAINING LOOP
# ------------------------
print("Loading dataset (compression: {})...".format(USE_COMPRESSION))
X, y = load_dataset(DATA_DIR, compress=USE_COMPRESSION)

scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)

param_grid = {
    'C': [0.1, 1, 10],
    'gamma': ['scale', 0.001, 0.01, 0.1]
}

splitter = StratifiedShuffleSplit(n_splits=N_REPEATS, test_size=0.2, random_state=42)
train_accuracies = []
test_accuracies = []
conf_matrices = []

print("Running {} repetitions with GridSearchCV...".format(N_REPEATS))

for i, (train_idx, test_idx) in enumerate(splitter.split(X_scaled, y)):
    X_train, X_test = X_scaled[train_idx], X_scaled[test_idx]
    y_train, y_test = y[train_idx], y[test_idx]

    grid = GridSearchCV(SVC(kernel='rbf'), param_grid, cv=3, n_jobs=-1)
    grid.fit(X_train, y_train)

    model = grid.best_estimator_
    y_train_pred = model.predict(X_train)
    y_test_pred = model.predict(X_test)

    acc_train = accuracy_score(y_train, y_train_pred)
    acc_test = accuracy_score(y_test, y_test_pred)
    cm = confusion_matrix(y_test, y_test_pred)

    train_accuracies.append(acc_train)
    test_accuracies.append(acc_test)
    conf_matrices.append(cm)

    print("Run {}: Train = {:.2f}%, Test = {:.2f}% | Best Params: {}".format(
        i + 1, acc_train * 100, acc_test * 100, grid.best_params_
    ))

print("\n===== Summary =====")
print("Average Training Accuracy: {:.2f}%".format(np.mean(train_accuracies) * 100))
print("Average Testing Accuracy: {:.2f}%".format(np.mean(test_accuracies) * 100))
print("Standard Deviation (Test): {:.2f}%".format(np.std(test_accuracies) * 100))

avg_cm = np.mean(conf_matrices, axis=0)
sns.heatmap(avg_cm, annot=True, fmt=".1f", cmap="Blues",
            xticklabels=list(label_map.values()),
            yticklabels=list(label_map.values()))
plt.xlabel("Predicted")
plt.ylabel("Actual")
plt.title("Average Confusion Matrix ({} runs)".format(N_REPEATS))
plt.show()

# ------------------------
# TRAIN FINAL MODEL ON FULL DATA
# ------------------------
grid_final = GridSearchCV(SVC(kernel='rbf'), param_grid, cv=3, n_jobs=-1)
grid_final.fit(X_scaled, y)
final_model = grid_final.best_estimator_

# ------------------------
# CLASSIFY NEW IMAGE FUNCTION
# ------------------------
def classify_image(image_path, model, scaler, compress=False):
    img = cv2.imread(image_path)
    img = cv2.resize(img, IMAGE_SIZE)
    if compress:
        img = jpeg_compress_image(img, quality=90)
    features = extract_features(img)
    features_scaled = scaler.transform([features])
    pred = model.predict(features_scaled)[0]
    return label_map[pred]

# ------------------------
# EXAMPLE CLASSIFICATION
# ------------------------
# Upload a test image or provide a path and run:
# image_path = "/content/sample_image.jpg"
# result = classify_image(image_path, final_model, scaler, compress=True)
# print("Predicted Source:", result)

In [None]:
image_path = "/content/sample_image.jpg"
result = classify_image(image_path, final_model, scaler, compress=False)
print("Predicted Source:", result)