Import



In [None]:
from google.colab import drive
drive.mount('/content/drive')

import math
import cv2
import time
import os
import pandas as pd
import progressbar
import numpy as np
from matplotlib import pyplot
from PIL import Image, ImageFilter
from scipy.fftpack import dct  # For DCT
from skimage.feature import hog  # For HOG

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
def visualize_result(rgb, gt, pred, figsize=(21, 7), title=None):
    """Visualize raw input, ground truth, and BusterNet result"""
    pyplot.figure(figsize=figsize)
    pyplot.subplot(131)
    pyplot.imshow(rgb)
    pyplot.title('input image')
    pyplot.subplot(132)
    pyplot.title('ground truth')
    pyplot.imshow(gt)
    pyplot.subplot(133)
    pyplot.imshow(pred)
    pyplot.title('pred')
    if title is not None:
        pyplot.suptitle(title)


def acc_result(gt, pred, display=True):
    """
    Calculate precision, recall, F1-score, and balanced accuracy.
    """
    H, W = gt.shape

    # Initialize counts
    tpc, fpc, tnc, fnc = 0, 0, 0, 0

    # Calculate TPC, FPC, TNC, FNC
    for i in range(H):
        for j in range(W):
            if gt[i, j] == 1 and pred[i, j] == 1:
                tpc += 1
            elif gt[i, j] == 0 and pred[i, j] == 0:
                tnc += 1
            elif gt[i, j] == 1 and pred[i, j] == 0:
                fnc += 1
            elif gt[i, j] == 0 and pred[i, j] == 1:
                fpc += 1

    # Calculate additional metrics
    recall = tpc / (tpc + fnc) if (tpc + fnc) != 0 else 0
    precision = tpc / (tpc + fpc) if (tpc + fpc) != 0 else 0
    f1_score = 2 * recall * precision / (recall + precision) if (recall + precision) != 0 else 0
    tnr = tnc / (tnc + fpc) if (tnc + fpc) != 0 else 0
    bal_acc = (recall + tnr) / 2
    acc=(tpc+tnc)/(tpc+tnc+fnc+fpc)
    # Display results if required
    if display:
        # Calculate FP, FN, and TP masks for visualization
        tp = gt * pred  # True Positives
        fp = pred - tp  # False Positives
        fn = gt - tp    # False Negatives
        tn=(1-gt)*(1-pred) #True Negatives

        print("Metrics:")
        print(f"True Positives (TP): {tpc}")
        print(f"False Positives (FP): {fpc}")
        print(f"True Negatives (TN): {tnc}")
        print(f"False Negatives (FN): {fnc}")
        print(f"Precision: {precision:.4f}")
        print(f"Recall: {recall:.4f}")
        print(f"F1-Score: {f1_score:.4f}")
        print(f"Balanced Accuracy: {bal_acc:.4f}")
        print(f"Accuracy: {acc:.4f}")

        # Visualize FP, FN, and TP
        pyplot.figure(figsize=(21, 7))
        pyplot.subplot(131)
        pyplot.imshow(fp, cmap='gray')
        pyplot.title('False Positive')
        pyplot.subplot(132)
        pyplot.imshow(fn, cmap='gray')
        pyplot.title('False Negative')
        pyplot.subplot(133)
        pyplot.imshow(tp, cmap='gray')
        pyplot.title('True Positive')
        title = f"Precision = {precision:.4f}, Recall = {recall:.4f}, F1-Score = {f1_score:.4f}, Balanced Accuracy = {bal_acc:.4f}"
        pyplot.suptitle(title)

    # Return all metrics
    return {
        "TP": tpc,
        "FP": fpc,
        "TN": tnc,
        "FN": fnc,
        "Precision": precision,
        "Recall": recall,
        "F1-Score": f1_score,
        "Balanced Accuracy": bal_acc,
        "Accuracy": acc
    }


Read Image

In [None]:
def read(name, size=256):
    img = Image.open(name).convert('L')  # Convert to grayscale
    numpydata = np.asarray(img).astype('float') / 255.0  # Normalize to [0, 1]
    return numpydata

Overlapping Blocks


In [None]:
import numpy as np

def divideIntoBlocks(gray_image, blockSize=7, stride=1):
    """
    Divides a grayscale image into blocks with a given stride.

    Args:
        gray_image (numpy.ndarray): Grayscale image as a NumPy array.
        blockSize (int): Size of each block (blockSize x blockSize).
        stride (int): Step size for moving the block (stride=1 = overlapping, stride=blockSize = non-overlapping).

    Returns:
        numpy.ndarray: List of blocks, each with shape (3, blockSize, blockSize) - (x_coords, y_coords, block_pixels).
    """
    height, width = gray_image.shape
    img_blocks = []

    for i in range(0, height - blockSize + 1, stride):
        for j in range(0, width - blockSize + 1, stride):
            indx = np.ones((blockSize, blockSize)) * j  # X-coordinates
            indy = np.ones((blockSize, blockSize)) * i  # Y-coordinates
            block = np.array([indx, indy, gray_image[i:i + blockSize, j:j + blockSize]])
            img_blocks.append(block)

    return np.array(img_blocks)

Features

In [None]:
def zigzag_indices(n):
    indices = []
    for s in range(2 * n - 1):
        if s % 2 == 0:
            for i in range(s + 1):
                j = s - i
                if i < n and j < n:
                    indices.append((i, j))
        else:
            for i in range(s + 1):
                j = s - i
                if j < n and i < n:
                    indices.append((j, i))
    return indices
def dct_feature_extraction(block):
    # Step 1: Apply 2D DCT
    dct_block = dct(dct(block, axis=0, norm='ortho'), axis=1, norm='ortho')

    # Step 2: Zigzag scan
    n = dct_block.shape[0]  # assuming square block
    indices = zigzag_indices(n)
    zigzag_values = np.array([dct_block[i, j] for i, j in indices])

    return zigzag_values[:8]

In [None]:
def hog_feature_extraction(block):
    """Extract HOG features from a block"""
    # Compute HOG features with 8 bins
    hog_features = hog(block, pixels_per_cell=(7, 7), cells_per_block=(1, 1), visualize=False, orientations=8) #orientations is number of bins
    return hog_features

In [None]:
def combined_feature_extraction(block):
    #Extract combined DCT and HOG features from a block
    dct_features = dct_feature_extraction(block)[:8]  # Ensure length is 8
    hog_features = hog_feature_extraction(block)[:8]  # Ensure length is 8
    return np.concatenate((dct_features,hog_features))  # Concatenate to match length 16

In [None]:
import numpy as np
import random
from copy import deepcopy

def print_first_5_blocks_features(features):
    """Print first 5 blocks' feature values before shuffling and lexsorting"""
    print("\nFirst 5 blocks' feature values before shuffling:")
    for i in range(min(5, len(features))):
        print(f"Block {i+1} features:", features[i][0])

In [None]:
# Replace FIXED_SHUFFLE_ORDERS with a single fixed shuffle order
FIXED_SHUFFLE_ORDER = [2, 8, 4, 12, 14, 5, 7, 9, 3, 13, 15, 10, 6, 11, 0, 1]
print("Fixed Shuffle Order:", FIXED_SHUFFLE_ORDER)
def shuffle_features(features):
    """Shuffle features using the fixed shuffle order."""
    shuffled = deepcopy(features)
    for block in shuffled:
        block[0] = block[0][FIXED_SHUFFLE_ORDER]  # Apply fixed shuffle
    return shuffled

Fixed Shuffle Order: [2, 8, 4, 12, 14, 5, 7, 9, 3, 13, 15, 10, 6, 11, 0, 1]


Average

In [None]:
def calculate_and_store_averages(csv_file_path):
    """Calculate the average of all metric columns and store them in the CSV file."""
    # Read the CSV file
    df = pd.read_csv(csv_file_path)

    # Calculate the average of each metric column
    metric_columns = df.columns[1:]  # Assuming the first column is not a metric
    averages = df[metric_columns].mean()

    # Append the average values as a new row at the end of the DataFrame
    averages_row = pd.DataFrame(averages).T
    averages_row.insert(0, df.columns[0], 'Average')  # Insert label for the first column
    df = pd.concat([df, averages_row], ignore_index=True)

    # Write the updated DataFrame back to the CSV file
    df.to_csv(csv_file_path, index=False)

Encode feature

In [None]:
def encode_features(blocks, gray_image, feature_extraction_func):
    """
    Encodes blocks into feature vectors including block coordinates.

    Args:
        blocks (numpy.ndarray): List of non-overlapping image blocks.
        gray_image (numpy.ndarray): Input grayscale image.
        feature_extraction_func (function): Feature extraction function (e.g., combined_feature_extraction).

    Returns:
        numpy.ndarray: Encoded feature matrix with block features and coordinates.
    """
    bar = progressbar.ProgressBar(max_value=len(blocks)).start()  # Initialize progress bar
    features = []  # List to store encoded features

    for count, block in enumerate(blocks):
        bar.update(count)  # Update progress bar

        # Extract block coordinates
        blockX = block[0][0][0]  # X-coordinate of the block
        blockY = block[1][0][0]  # Y-coordinate of the block

        # Copy the grayscale block
        block_data = np.copy(block[2])  # Grayscale pixel values

        # Extract features using the specified feature extraction function
        feature = feature_extraction_func(block_data)[:16]  # Limit to first 16 features

        # Create corresponding coordinate values
        fX = np.ones(16) * blockX
        fY = np.ones(16) * blockY

        # Append the feature and coordinates as a single row
        features.append(np.array([feature, fX, fY]))

    bar.finish()  # Finish progress bar
    return np.array(features)  # Convert to NumPy array

In [None]:
def lexiSort(feaMap):
    """Lexicographically sorts feature vectors and returns them."""
    if feaMap.size == 0:  # Check if feaMap is empty
        return feaMap  # Return the empty array if it is
    onlyfea = feaMap[:, 0]
    indx = np.lexsort(np.rot90(onlyfea))
    sorted_features = [feaMap[i] for i in indx]
    return np.array(sorted_features)

In [None]:
def connectedComp(predout, thresY=0, thresX=0):
    w = np.where(predout == 1)
    begX = np.min(w[1])
    endX = np.max(w[1])
    begY = np.min(w[0])
    endY = np.max(w[0])
    thresX += (endX - begX)
    thresY += (endY - begY)
    predx = np.copy(predout)
    for i in range(begX, endX):
        flag = False
        lo = begY
        for j in range(begY, endY):
            if predx[j][i] == 1:
                if flag and (j - lo) < thresY // 8:
                    predx[lo:j, i] = 1
                flag = True
                lo = j + 1
    predy = np.copy(predout)
    for i in range(begY, endY):
        flag = False
        lo = begX
        for j in range(begX, endX):
            if predy[i][j] == 1:
                if flag and (j - lo) < thresX // 8:
                    predy[i, lo:j] = 1
                flag = True
                lo = j + 1

    z = np.clip(predx + predy, 0, 1)

    predg = z * 0
    for i in range(begY, endY):
        for j in range(begX, endX):
            if np.sum(z[i - 1:i + 3, j - 1:j + 3]) > 1:
                predg[i:i + 4, j:j + 4] = 1
    return predg


In [None]:
def analyzeBlocks(blockA, blockB, simThres=0.02, shuffle_idx=0):
    """
    Analyze two blocks to determine if they are similar.
    Now uses ONLY MAX difference calculation (other methods are commented out).
    """
    dx = blockA[1][0] - blockB[1][0]
    dy = blockA[2][0] - blockB[2][0]
    dist = math.sqrt(pow(dx, 2) + pow(dy, 2))

    # ===== FEATURE DIFFERENCE CALCULATION =====
    # 1. Original approach (sum of all features) - COMMENTED OUT
    fA = np.sum(blockA[0])
    fB = np.sum(blockB[0])
    dif = abs(fA - fB)

    # 2. Alternative approach (mean absolute difference) - COMMENTED OUT
    # dif = np.mean(np.abs(blockA[0] - blockB[0]))

    # 3. MAX absolute difference (ACTIVE)
    #dif = np.max(np.abs(blockA[0] - blockB[0]))

    # ===== THRESHOLD AND DISTANCE CHECK =====
    # Fixed threshold (no shuffle-based adjustment)
    adjusted_threshold = simThres * 1.0

    # Fixed minimum distance (no shuffle-based adjustment)
    min_dist = 10

    # Create result array with coordinates
    res = np.array([
        blockA[1][0],  # xa
        blockA[2][0],  # ya
        blockB[1][0],  # xb
        blockB[2][0],  # yb
        abs(dx),       # offsetX
        abs(dy),       # offsetY
    ])

    if dif < adjusted_threshold and dist >= min_dist:
        return res
    return None

In [None]:
def detectSimi_with_shuffles(gray_image, gt, blockSize=7, feature_extraction_func=combined_feature_extraction):
    """Detect similarities using Shuffle 3's logic (max difference) with original output format."""
    shapeA, shapeB = gray_image.shape
    blocks = divideIntoBlocks(gray_image, blockSize)
    features = encode_features(blocks, gray_image, feature_extraction_func)

    # Print first 5 blocks' features before shuffling (original diagnostic)
    print("\nFirst 5 blocks' feature values before shuffling:")
    for i in range(min(5, len(features))):
        print(f"Block {i+1} features:", features[i][0])

    # Apply fixed shuffle (Shuffle 3's order)
    shuffled_features = shuffle_features(features)
    lsorted = lexiSort(shuffled_features)

    # Print sample after shuffling (original diagnostic)
    print("\nSample of first sorted block after shuffling:")
    print(lsorted[0][0][:5])  # First 5 features of first block

    vectors = []
    windowSize = 3  # Shuffle 3 used windowSize=5 (3 + shuffle_idx=2)

    # Block comparison with Shuffle 3's max-difference logic
    for i in range(len(lsorted) - windowSize + 1):
        for j in range(1, windowSize):
            blockA, blockB = np.copy(lsorted[i]), np.copy(lsorted[i + j])
            res = analyzeBlocks(blockA, blockB, simThres=0.002, shuffle_idx=2)  # shuffle_idx=2 for Shuffle 3
            if res is not None:
                vectors.append(res)

    print(f"\nNumber of similar vectors found: {len(vectors)}")

    # Original shift map calculation
    shift = np.zeros((shapeA, shapeB))
    for vector in vectors:
        shift[int(vector[5])][int(vector[4])] += 1

    maxoff = np.max(shift)
    offsetThreshold = max(1, 3 * maxoff // 4)  # Shuffle 3 used 4/5 of maxoff

    offset = np.copy(shift).astype(int)
    offset[offset <= offsetThreshold] = 0
    offset[offset > offsetThreshold] = 1

    duplicates = [vec for vec in vectors if offset[int(vec[5])][int(vec[4])] == 1]
    print(f"Number of detected duplicates: {len(duplicates)}")

    # Generate final mask (original logic)
    final = gray_image * 0
    for res in duplicates:
        final[int(res[1])][int(res[0])] = 1
        final[int(res[3])][int(res[2])] = 1

    # Shuffle 3's post-processing parameters
    thresY = 0  # shuffle_idx=2 → 2*2=4
    thresX = 0
    predMask = connectedComp(final, thresY=thresY, thresX=thresX)

    kernel_size = 4  # 4 + shuffle_idx=2 → 6
    kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (kernel_size, kernel_size))
    predOut = cv2.dilate(predMask, kernel, iterations=2)

    kernel0_size = 3  # 3 + shuffle_idx=2 → 5
    kernel0 = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (kernel0_size, kernel0_size))
    predOut2 = cv2.erode(predMask, kernel0, iterations=1)

    min_size = 10  # 10 - shuffle_idx=2 → 8
    if np.sum(np.round(np.clip(predOut2, 0, 1))) < min_size:
        predOut = predOut * 0

    # Original visualization and metrics
    rgb_image = np.stack([gray_image]*3, axis=-1)
    visualize_result(rgb_image, gt, predOut, title="Shuffle 1 (Sum)")
    metrics = acc_result(gt, predOut, display=True)

    return metrics

Process Image from Drive


In [None]:
def process_images_from_drive(folder_path, output_csv='dct+hog_7_shuffled.csv', blockSize=7, feature_extraction_func=combined_feature_extraction):
    """Process images with a single fixed shuffle order."""
    results = []
    files = os.listdir(folder_path)

    forged_files = [f for f in files if '_F.' in f]
    gt_files = [f for f in files if '_B.' in f]

    forged_files.sort()
    gt_files.sort()

    if len(forged_files) != len(gt_files):
        print("Error: Forged and ground truth files do not match.")
        return

    for forged_file, gt_file in zip(forged_files, gt_files):
        forged_path = os.path.join(folder_path, forged_file)
        gt_path = os.path.join(folder_path, gt_file)

        gray_image = read(forged_path)
        gt = read(gt_path)

        if len(gt.shape) == 3:
            gt = gt[:, :, 0]  # Ensure single channel

        print(f"\n\n{'='*80}")
        print(f"Processing {forged_file} and {gt_file}...")
        print(f"{'='*80}")

        metrics = detectSimi_with_shuffles(
            gray_image, gt,
            blockSize=blockSize,
            feature_extraction_func=feature_extraction_func
        )

        results.append({
            "File Name": forged_file,
            "True Positives (TP)": metrics["TP"],
            "False Positives (FP)": metrics["FP"],
            "True Negatives (TN)": metrics["TN"],
            "False Negatives (FN)": metrics["FN"],
            "Precision": metrics["Precision"],
            "Recall": metrics["Recall"],
            "F1-Score": metrics["F1-Score"],
            "Accuracy": metrics["Accuracy"],
            "Balanced Accuracy": metrics["Balanced Accuracy"]
        })

        pd.DataFrame(results).to_csv(output_csv, index=False)
        print(f"Results saved to {output_csv}")

    calculate_and_store_averages(output_csv)
    print(f"Final results with averages saved to {output_csv}")

In [None]:
# Mount Google Drive
drive.mount('/content/drive')

# Define the folder path containing images
folder_path = '/content/drive/MyDrive/Project/COMOFOD-40'  # Update this path

# Process images and save results to CSV
process_images_from_drive(folder_path, output_csv='dct+hog_7_shuffled.csv',
                         blockSize=7, feature_extraction_func=combined_feature_extraction)

# Download the results CSV file
from google.colab import files
files.download('dct+hog_7_shuffled.csv')

Output hidden; open in https://colab.research.google.com to view.