In [4]:
import cv2
import numpy as np
import os
from sklearn.cluster import DBSCAN
from sklearn.preprocessing import StandardScaler

In [5]:
base_dir = "Images"

In [33]:
def extract_sift_features(image_dir):
    # Create the SIFT object
    sift = cv2.SIFT_create()

    all_keypoints = []  # To store keypoints for each image
    all_descriptors = []  # To store descriptors for all images
    image_names = []  # To store the names of images

    # Iterate through each image in the input directory, convert to grayscale, and get the feature descriptors
    for img_file in os.listdir(image_dir):
        img_path = os.path.join(image_dir, img_file)
        image = cv2.imread(img_path, cv2.IMREAD_GRAYSCALE)
        if image is None:
            continue
            
        keypoints, descriptors = sift.detectAndCompute(image, None)
        
        if descriptors is not None and len(descriptors) > 0:
            all_keypoints.append(keypoints)  # Save keypoints with their corresponding file
            all_descriptors.append(descriptors)
            image_names.append(img_file)  # Store the image name separately

    # Combine all descriptors into one array
    combined_descriptors = np.vstack(all_descriptors) if all_descriptors else None

    return image_names, all_keypoints, combined_descriptors

In [34]:
def detect_anomalies(test_image_dir, dbscan_model, scaler):
    anomalies = []

    # Extract keypoints and descriptors from all test images
    image_names, all_keypoints, combined_descriptors = extract_sift_features(test_image_dir)

    # Ensure that keypoints and descriptors are aligned
    for image_name, keypoints, descriptors in zip(image_names, all_keypoints, combined_descriptors):
        image_path = os.path.join(test_image_dir, image_name)
        image = cv2.imread(image_path)

        # Ensure descriptors are available for this image
        if descriptors is not None and len(descriptors) > 0:
            # Reshape descriptors if it's a single descriptor to ensure it's 2D
            if descriptors.ndim == 1:
                descriptors = descriptors.reshape(1, -1)

            # Scale descriptors before applying DBSCAN
            descriptors_scaled = scaler.transform(descriptors)

            # Predict with the pre-trained DBSCAN model (on the "good" training images)
            cluster_labels = dbscan_model.fit_predict(descriptors_scaled)

            # If any descriptors are labeled as -1, they are considered anomalies
            if -1 in cluster_labels:
                anomalies.append(image_name)

            # Visualize detected anomalies
            for idx, kp in enumerate(keypoints):
                if cluster_labels[idx] == -1:  # Only visualize anomalies
                    x, y = int(kp.pt[0]), int(kp.pt[1])
                    cv2.circle(image, (x, y), radius=5, color=(0, 0, 255), thickness=-1)  # Red circle for anomalies

            # Save the image with "_anomalies" appended to the name
            file_name, file_ext = os.path.splitext(image_name)  # Split the name and extension
            output_image_name = f"{file_name}_anomalies{file_ext}"
            output_image_path = os.path.join(test_image_dir, output_image_name)
            cv2.imwrite(output_image_path, image)

    return anomalies


In [8]:
def evaluate_anomaly_detection(image_path, ground_truth_path, anomaly_keypoints):
    # Load ground truth mask where white indicates a region with a defect
    ground_truth_mask = cv2.imread(ground_truth_path, cv2.IMREAD_GRAYSCALE)

    # Create an anomaly map based on keypoints
    anomaly_map = np.zeros_like(ground_truth_mask)
    for kp in anomaly_keypoints:
        x, y = int(kp.pt[0]), int(kp.pt[1])
        cv2.circle(anomaly_map, (x, y), radius=5, color=1, thickness=-1)  # Mark anomalies

    # Compute evaluation metrics
    intersection = np.logical_and(ground_truth_mask, anomaly_map)
    union = np.logical_or(ground_truth_mask, anomaly_map)
    iou = np.sum(intersection) / np.sum(union) if np.sum(union) > 0 else 0

    return {"IoU": iou}

In [36]:
# First approach is to use SIFT feature extraction and the DBSCAN clustering algorithm to detect anomalies. Will use the
# images in the "train/good" directory to establish a baseline of features for defect-free images.

items = [f for f in os.listdir(base_dir) if os.path.isdir(os.path.join(base_dir, f))]
# Iterate through the directories of images
for item in items:
    item_path = os.path.join(base_dir, item)
    train_path = os.path.join(item_path, 'train/good')
    test_path = os.path.join(item_path, 'test')
    ground_truth_path = os.path.join(item_path, 'ground_truth')

    print(f"Processing: {item}")

    # Extract features from defect-free training images
    names, keypoints, train_descriptors = extract_sift_features(train_path)
    print("Extracted SIFT features from:", item)

    # Scale descriptors before clustering
    if train_descriptors.size == 0:
        print("No valid descriptors found in the training images.")
    else:
        scaler = StandardScaler()
        train_features_scaled = scaler.fit_transform(train_descriptors)
        print("Train descriptors scaled successfully.")
    
    # Fit DBSCAN
    dbscan = DBSCAN(eps=0.5, min_samples=5, metric='euclidean')
    dbscan.fit(train_features_scaled)
    print("Fit DBSCAN to scaled train image features")

    # Detect anomalies in test images. Test imageset also contains a "good" subdirectory, which can be used for detecting false-positives.
    for defect_type in os.listdir(test_path):
        defect_path = os.path.join(test_path, defect_type)

        if defect_type == 'good':
            print("Processing 'good' images for false positives.")
            anomalies = detect_anomalies(defect_path, dbscan, scaler)
            print(f"False Positives in 'good' images: {anomalies}\n")
        else:
            # Process defect directories
            print(f"Processing defect type: {defect_type}")
            anomalies = detect_anomalies(defect_path, dbscan, scaler)
            print(f"Detected anomalies: {anomalies}\n")

            # Evaluate detected anomalies against ground truth masks
            # ground_truth_defect_path = os.path.join(ground_truth_path, defect_type)
            # for anomaly in anomalies:
            #     test_image_path = os.path.join(defect_path, anomaly)
                
            #     base_name, ext = os.path.splitext(anomaly)
            #     gt_image_name = f"{base_name}_mask{ext}"
            #     gt_image_path = os.path.join(ground_truth_defect_path, gt_image_name)

            #     if os.path.exists(gt_image_path):
            #         results = evaluate_anomaly_detection(test_image_path, gt_image_path, anomaly_keypoints=[])
            #         print(f"Evaluation for {anomaly}: {results}")
        
        break
    break

Processing: capsule
Extracted SIFT features from: capsule
Train descriptors scaled successfully.
Fit DBSCAN to scaled train image features
Processing defect type: crack


IndexError: index 1 is out of bounds for axis 0 with size 1