In [1]:

import numpy as np
import os
from sklearn.decomposition import PCA, TruncatedSVD
import cv2
from scipy.signal import convolve2d
from sklearn.impute import KNNImputer
from scipy.ndimage import median_filter
from tqdm import tqdm

In [2]:
# def preprocess_images(batch_size,base_dir, target_size=(64, 64)):
#     processed_images = []
#     img_dir = os.listdir(base_dir)

#     for i in tqdm(range(0, len(img_dir), batch_size)):
#         batch_files = img_dir[i:i + batch_size]
#         batch_images = []
#         for filename in batch_files:
#             filepath = os.path.join(base_dir, filename)
#             img = cv2.imread(filepath) # read image
#             img = cv2.resize(img, target_size, interpolation=cv2.INTER_AREA) # resize image
#             batch_images.append(img)

#         if not batch_images:
#             # nothing to process in this batch
#             continue
#         processed_images.extend(batch_images)
#     return np.array(processed_images)


In [3]:
batch_size = 10  # Adjust this number based on your available memory
# Use the same absolute folder that was used to create A
base_dir = r'E:/@IIT_BBS/@Sem 1/ML/Final Project/P1'

print(f"base_dir is set to: {base_dir}")

base_dir is set to: E:/@IIT_BBS/@Sem 1/ML/Final Project/P1


In [4]:


# ----------------------------------------------------------
# 1. Custom convolution application with multiple filters
# ----------------------------------------------------------
def apply_conv(image, kernel_size=3):
    """Apply a bank of convolutional filters to each input channel and stack results as a multi-channel image.

    Fix: ensure all intermediate OpenCV operations use a consistent depth (CV_32F) to avoid unsupported
    source/destination format combinations.
    """
    # Ensure list of single-channel arrays (use float32 consistently)
    if image.ndim == 2:
        channels = [image.copy().astype(np.float32)]
    else:
        channels = [image[..., c].astype(np.float32) for c in range(image.shape[2])]

    filters = []

    for ch in channels:
        # Use CV_32F for all OpenCV ops to keep consistent dtypes
        sobelx = cv2.Sobel(ch, cv2.CV_32F, 1, 0, ksize=kernel_size)
        sobely = cv2.Sobel(ch, cv2.CV_32F, 0, 1, ksize=kernel_size)
        filters.extend([sobelx, sobely])

        # Laplacian with CV_32F
        lap = cv2.Laplacian(ch, cv2.CV_32F)
        filters.append(lap)

        # Gabor filters (various orientations) - kernel as CV_32F, filter2D output CV_32F
        for theta in np.arange(0, np.pi, np.pi / 6):  # 6 orientations
            kernel = cv2.getGaborKernel((11, 11), 4.0, theta, 10.0, 0.5, 0, ktype=cv2.CV_32F)
            fimg = cv2.filter2D(ch, cv2.CV_32F, kernel)
            filters.append(fimg)

        # Difference of Gaussians (Gaussian retains float32)
        g1 = cv2.GaussianBlur(ch, (3, 3), 1)
        g2 = cv2.GaussianBlur(ch, (5, 5), 2)
        dog = g1 - g2
        filters.append(dog)

    # Stack as multichannel image: H x W x (num_filters * num_input_channels)
    # Ensure everything is float32 before stacking
    filters = [f.astype(np.float32) for f in filters]
    stacked = np.stack(filters, axis=-1)

    # Normalize each channel to 0-1 safely
    stacked = np.moveaxis(stacked, -1, 0)
    normed = []
    for ch in stacked:
        mn = ch.min()
        mx = ch.max()
        if mx - mn < 1e-6:
            normed.append(np.zeros_like(ch))
        else:
            normed.append((ch - mn) / (mx - mn))
    stacked = np.moveaxis(np.array(normed, dtype=np.float32), 0, -1)
    return stacked

# ----------------------------------------------------------
# 2. Max pooling utility
# ----------------------------------------------------------
def apply_maxpooling(image, target_pool_size=8):
    """Apply pooling safely even when image has >4 channels, ensuring a fixed output spatial size."""
    pooled = image.copy().astype(np.float32)
    if target_pool_size is None:
        raise ValueError("target_pool_size must be provided.")

    def safe_resize(image, new_w, new_h):
        if image.ndim == 2 or image.shape[2] <= 4:
            return cv2.resize(image, (new_w, new_h), interpolation=cv2.INTER_AREA)
        else:
            resized_channels = [
                cv2.resize(image[..., c], (new_w, new_h), interpolation=cv2.INTER_AREA)
                for c in range(image.shape[2])
            ]
            return np.stack(resized_channels, axis=-1)

    # Iteratively reduce size until dimensions are close to target_pool_size
    while True:
        h, w = pooled.shape[:2]
        if h <= target_pool_size and w <= target_pool_size: # Stop if both dimensions are <= target
            break
        # Reduce by half, ensuring dimensions don't become zero
        pooled = safe_resize(pooled, max(1, w // 2), max(1, h // 2))

    # Final resize to ensure exact target_pool_size x target_pool_size spatial dimensions
    # This handles cases where pooled might be (7,7) or (9,9) after the loop, for target_pool_size=8
    if pooled.shape[0] != target_pool_size or pooled.shape[1] != target_pool_size:
        pooled = safe_resize(pooled, target_pool_size, target_pool_size)

    # print("apply_maxpooling is running....")
    return pooled

# ----------------------------------------------------------
# 3. Full feature extraction pipeline
# ----------------------------------------------------------
def feature_extraction(image, kernel_size=3, target_pool_size=7):
    """Extract rich handcrafted features using convolutional filter bank and pooling."""
    conv_output = apply_conv(image, kernel_size=kernel_size)
    pooled_output = apply_maxpooling(conv_output, target_pool_size=target_pool_size)

    # Flatten pooled output
    features = pooled_output.flatten()

    # Optionally add summary stats per channel (mean, std)
    channel_means = pooled_output.mean(axis=(0, 1))
    channel_stds = pooled_output.std(axis=(0, 1))

    full_feature_vector = np.concatenate([features, channel_means, channel_stds])
    # print("feature_extraction is running....\n")
    return full_feature_vector


# feats = feature_extraction(processed_images[9], kernel_size=3,target_pool_size=8)
# print(feats.shape)


In [5]:
def batch_CFE(batch_size,base_dir, target_size=7):
    """ Batch Custom Feature Extraction using convolutional filter bank """
    processed_images = []
    img_dir = os.listdir(base_dir)

    for i in tqdm(range(0, len(img_dir), batch_size)):
        batch_files = img_dir[i:i + batch_size]
        batch_images = []
        for filename in batch_files:
            filepath = os.path.join(base_dir, filename)
            img = cv2.imread(filepath) # read image
            img = feature_extraction(img, kernel_size=3,target_pool_size=target_size)
            batch_images.append(img)

        if not batch_images:
            # nothing to process in this batch
            continue
        processed_images.extend(batch_images)
    return np.array(processed_images)

In [6]:
# from tqdm import tqdm
# batch_features = batch_CFE(batch_size=20,base_dir=base_dir, target_size=5)
# batch_features.shape

In [7]:
class ConvolutionalFeatureExtractor:
    def __init__(self, kernel_size=3, target_pool_size=8):
        self.kernel_size = kernel_size
        self.target_pool_size = target_pool_size
    
    # ----------------------------------------------------------
    # 1. Custom convolution application with multiple filters
    # ----------------------------------------------------------
    def apply_conv(image,grayscale = False, kernel_size=3):
        """Apply a bank of convolutional filters to each input channel and stack results as a multi-channel image.

        Fix: ensure all intermediate OpenCV operations use a consistent depth (CV_32F) to avoid unsupported
        source/destination format combinations.
        """
        # Ensure list of single-channel arrays (use float32 consistently)
        
        if grayscale:
            gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
            channels = [gray.astype(np.float32)]
        else:
            channels = [image[..., c].astype(np.float32) for c in range(image.shape[2])]
            
        filters = []

        for ch in channels:
            # Use CV_32F for all OpenCV ops to keep consistent dtypes
            sobelx = cv2.Sobel(ch, cv2.CV_32F, 1, 0, ksize=kernel_size)
            sobely = cv2.Sobel(ch, cv2.CV_32F, 0, 1, ksize=kernel_size)
            filters.extend([sobelx, sobely])

            # Laplacian with CV_32F
            lap = cv2.Laplacian(ch, cv2.CV_32F)
            filters.append(lap)

            # Gabor filters (various orientations) - kernel as CV_32F, filter2D output CV_32F
            for theta in np.arange(0, np.pi, np.pi / 6):  # 6 orientations
                kernel = cv2.getGaborKernel((11, 11), 4.0, theta, 10.0, 0.5, 0, ktype=cv2.CV_32F)
                fimg = cv2.filter2D(ch, cv2.CV_32F, kernel)
                filters.append(fimg)

            # Difference of Gaussians (Gaussian retains float32)
            g1 = cv2.GaussianBlur(ch, (3, 3), 1)
            g2 = cv2.GaussianBlur(ch, (5, 5), 2)
            dog = g1 - g2
            filters.append(dog)

        # Stack as multichannel image: H x W x (num_filters * num_input_channels)
        # Ensure everything is float32 before stacking
        filters = [f.astype(np.float32) for f in filters]
        stacked = np.stack(filters, axis=-1)

        # Normalize each channel to 0-1 safely
        stacked = np.moveaxis(stacked, -1, 0)
        normed = []
        for ch in stacked:
            mn = ch.min()
            mx = ch.max()
            if mx - mn < 1e-6:
                normed.append(np.zeros_like(ch))
            else:
                normed.append((ch - mn) / (mx - mn))
        stacked = np.moveaxis(np.array(normed, dtype=np.float32), 0, -1)
        return stacked

    # ----------------------------------------------------------
    # 2. Max pooling utility
    # ----------------------------------------------------------
    def apply_maxpooling(image, target_pool_size=8):
        """Apply pooling safely even when image has >4 channels, ensuring a fixed output spatial size."""
        pooled = image.copy().astype(np.float32)
        if target_pool_size is None:
            raise ValueError("target_pool_size must be provided.")

        def safe_resize(image, new_w, new_h):
            if image.ndim == 2 or image.shape[2] <= 4:
                return cv2.resize(image, (new_w, new_h), interpolation=cv2.INTER_AREA)
            else:
                resized_channels = [
                    cv2.resize(image[..., c], (new_w, new_h), interpolation=cv2.INTER_AREA)
                    for c in range(image.shape[2])
                ]
                return np.stack(resized_channels, axis=-1)

        # Iteratively reduce size until dimensions are close to target_pool_size
        while True:
            h, w = pooled.shape[:2]
            if h <= target_pool_size and w <= target_pool_size: # Stop if both dimensions are <= target
                break
            # Reduce by half, ensuring dimensions don't become zero
            pooled = safe_resize(pooled, max(1, w // 2), max(1, h // 2))

        # Final resize to ensure exact target_pool_size x target_pool_size spatial dimensions
        # This handles cases where pooled might be (7,7) or (9,9) after the loop, for target_pool_size=8
        if pooled.shape[0] != target_pool_size or pooled.shape[1] != target_pool_size:
            pooled = safe_resize(pooled, target_pool_size, target_pool_size)

        # print("apply_maxpooling is running....")
        return pooled

    # ----------------------------------------------------------
    # 3. Full feature extraction pipeline
    # ----------------------------------------------------------
    def feature_extraction(image, greyscale=False, kernel_size=3, target_pool_size=7):
        """Extract rich handcrafted features using convolutional filter bank and pooling."""
        conv_output = apply_conv(image,grayscale=greyscale, kernel_size=kernel_size)
        pooled_output = apply_maxpooling(conv_output, target_pool_size=target_pool_size)

        # Flatten pooled output
        features = pooled_output.flatten()

        # Optionally add summary stats per channel (mean, std)
        channel_means = pooled_output.mean(axis=(0, 1))
        channel_stds = pooled_output.std(axis=(0, 1))

        full_feature_vector = np.concatenate([features, channel_means, channel_stds])
        # print("feature_extraction is running....\n")
        return full_feature_vector


    # feats = feature_extraction(processed_images[9], kernel_size=3,target_pool_size=8)
    # print(feats.shape)
    def batch_CFE(batch_size,base_dir,greyscale=False, kernel_size=3, target_size=7):
        """ Batch Custom Feature Extraction using convolutional filter bank """
        processed_images = []
        img_dir = os.listdir(base_dir)

        for i in tqdm(range(0, len(img_dir), batch_size)):
            batch_files = img_dir[i:i + batch_size]
            batch_images = []
            for filename in batch_files:
                filepath = os.path.join(base_dir, filename)
                img = cv2.imread(filepath) # read image
                img = feature_extraction(img, kernel_size=3,target_pool_size=target_size)
                batch_images.append(img)

            if not batch_images:
                # nothing to process in this batch
                continue
            processed_images.extend(batch_images)
        return np.array(processed_images)


In [8]:

batch_features = ConvolutionalFeatureExtractor.batch_CFE(batch_size=20,base_dir=base_dir,greyscale=False, kernel_size=3, target_size=5)
batch_features.shape

100%|██████████| 22/22 [2:34:40<00:00, 421.85s/it]  


(427, 810)