### Libraries

In [None]:
import cv2
import matplotlib.pyplot as plt
import numpy as np
from skimage import feature
from sklearn.feature_extraction.image import extract_patches_2d
from keras.preprocessing import image
import time
from skimage.feature import local_binary_pattern
from skimage.color import rgb2gray
import pandas as pd
from scipy.stats import entropy
import seaborn as sns
from skimage import data, color, exposure
from skimage.feature import graycomatrix, graycoprops
from skimage.feature import hog
from scipy.stats import entropy, skew, kurtosis
from scipy.stats import mstats
from tqdm import tqdm  # For progress tracking

In [None]:
def get_images(training_mode, directory_path):
    """
    Loads and processes images from a specified directory.

    Args:
        training_mode (bool): If True, the images will be shuffled randomly.
        directory_path (str): Path to the directory containing the images.

    Returns:
        list: A list of processed images in RGB format.
    """
    images = []  # List to store processed images

    # Retrieve and sort file names in the directory
    file_list = sorted(os.listdir(directory_path))

    for file_name in file_list:
        # Construct the full path for each file
        image_path = os.path.join(directory_path, file_name)

        # Read the image using OpenCV
        image = cv2.imread(image_path)

        if image is not None:  # Ensure the image was loaded successfully
            # Convert the image from BGR to RGB format
            image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
            images.append(image_rgb)  # Add the processed image to the list

    if training_mode:
        # Shuffle images randomly if training mode is enabled
        np.random.shuffle(images)

    return images  # Return the list of processed images

In [None]:
path_images="path_to_the_train_images"
path_masks= "path_to_the_train_masks"
images_train=get_images(train=False, path=path_images)
masks_train=get_images(train=False, path=path_masks)

In [None]:
path_images_test="path_to_the_test_images"
path_mask_test= "path_to_the_test_masks"
images_test=get_images(train=False, path=path_images_test)
masks_test=get_images(train=False, path=path_mask_test)

In [None]:
images=images_train+images_test
masks=masks_train+masks_test

## Patch extraction

In [None]:
def get_patches_with_stride(image: np.ndarray, patch_size: tuple, stride: int) -> list[np.ndarray]:
    """
    Efficiently extracts image patches using strided operations.

    Args:
        image: Input image as a numpy array (H, W, C) or (H, W)
        patch_size: Tuple (patch_height, patch_width)
        stride: Step size between patch origins (same for vertical/horizontal)

    Returns:
        List of extracted patches as numpy arrays
    """
    patch_h, patch_w = patch_size
    img_h, img_w = image.shape[:2]

    # Create sliding window view using numpy's optimized strided operations
    patches = np.lib.stride_tricks.sliding_window_view(image, (patch_h, patch_w), axis=(0, 1))

    # Select patches with specified stride
    selected_patches = patches[::stride, ::stride]

    # Reshape to list of patches and return
    return [patch for patch in selected_patches.reshape(-1, patch_h, patch_w, *image.shape[2:])]

## Gaussian blur

In [None]:
def gaussian_blur(image, kernel_size, sigma):
    """
    Applies a Gaussian blur to an image using OpenCV's GaussianBlur function.

    Args:
        image (numpy.ndarray): The input image to be blurred.
        kernel_size (int): Size of the Gaussian kernel. Must be an odd number.
        sigma (float): Standard deviation of the Gaussian kernel. Determines the intensity of the blur.

    Returns:
        numpy.ndarray: The blurred image.
    """
    # Ensure kernel_size is odd, as required by the GaussianBlur function
    if kernel_size % 2 == 0:
        raise ValueError("Kernel size must be an odd number.")

    # Apply Gaussian blur to the input image
    blurred_image = cv2.GaussianBlur(image, (kernel_size, kernel_size), sigma)

    return blurred_image


## Color statistics

In [None]:
def calculate_channel_statistics(image_channel: np.ndarray) -> tuple[float, ...]:
    """
    Calculates comprehensive statistical features for an image channel.

    Args:
        image_channel: 2D numpy array representing a single image channel

    Returns:
        Tuple containing:
        - mean: Average pixel intensity
        - std_dev: Standard deviation of intensities
        - entropy: Information entropy of pixel distribution
        - kurt: Kurtosis (tailedness) of intensity distribution
        - energy: Total energy of the signal
        - skewness: Asymmetry of intensity distribution
    """
    # Flatten the 2D array to 1D for statistical calculations
    flat_channel = image_channel.ravel()

    # Intensity distribution analysis (32-bin histogram from 0-255)
    histogram_bins = np.arange(0, 256, 32)
    hist, _ = np.histogram(flat_channel, bins=histogram_bins)

    # Central tendency and dispersion
    mean = np.mean(flat_channel)
    std_dev = np.std(flat_channel)

    # Information theory measures
    hist_nonzero = hist[hist > 0]
    entropy_val = entropy(hist_nonzero) if hist_nonzero.size > 0 else np.nan

    # Shape characteristics
    kurt = kurtosis(flat_channel)
    skewness = skew(flat_channel)

    # Signal energy calculation
    energy = np.sum(flat_channel**2)

    return (mean, std_dev, entropy_val, kurt, energy, skewness)

In [None]:
def extract_color_statistics_rgb(image_rgb: np.ndarray) -> tuple[tuple[float, ...], ...]:
    """
    Extracts statistical features for each channel of an RGB image.

    Args:
        image_rgb: Input image in RGB format with shape (H, W, 3)

    Returns:
        Tuple containing three feature tuples (red, green, blue channels)
        Each channel tuple contains:
        (mean, std_dev, entropy, kurtosis, energy, skewness)

    Raises:
        ValueError: If input is not a 3-channel RGB image

    """
    # Validate input dimensions
    if image_rgb.ndim != 3 or image_rgb.shape[2] != 3:
        raise ValueError("Input must be a 3-channel RGB image (H, W, 3)")

    # Extract color channels using numpy slicing
    red_channel = image_rgb[..., 0]  # More efficient than [:, :, 0]
    green_channel = image_rgb[..., 1]
    blue_channel = image_rgb[..., 2]

    # Calculate features for each channel
    return (
        calculate_channel_statistics(red_channel),
        calculate_channel_statistics(green_channel),
        calculate_channel_statistics(blue_channel)
    )

In [None]:
def extract_color_statistics_hsv(image_rgb: np.ndarray) -> tuple[tuple[float, ...], ...]:
    """
    Extracts statistical features for each channel of an HSV image.

    Args:
        image_rgb: Input image in RGB format with shape (H, W, 3)

    Returns:
        Tuple containing three feature tuples (Hue, Saturation, Value)
        Each channel tuple contains:
        (mean, std_dev, entropy, kurtosis, energy, skewness)

    Raises:
        ValueError: If input is not a 3-channel RGB image


    Notes:
        - Hue range: 0-179 (OpenCV's HSV representation)
        - Saturation/Value range: 0-255
        - Requires calculate_channel_statistics() implementation
    """
    # Validate input dimensions
    if image_rgb.ndim != 3 or image_rgb.shape[2] != 3:
        raise ValueError("Input must be a 3-channel RGB image (H, W, 3)")

    # Convert to HSV color space (OpenCV expects BGR->HSV, ensure correct input)
    hsv_image = cv2.cvtColor(image_rgb, cv2.COLOR_RGB2HSV)

    # Extract channels using efficient numpy slicing
    return (
        calculate_channel_statistics(hsv_image[..., 0]),  # Hue
        calculate_channel_statistics(hsv_image[..., 1]),  # Saturation
        calculate_channel_statistics(hsv_image[..., 2])   # Value
    )

## Features algorithms

### HOG

In [None]:
def compute_hog_histogram(features: np.ndarray, bin_width: float = 0.1) -> np.ndarray:
    """
    Computes a normalized histogram of HOG feature values.

    Args:
        features: HOG feature vector from skimage.feature.hog
        bin_width: Width of histogram bins (0-1 range)

    Returns:
        Normalized histogram of HOG features with specified binning.
    """
    # Create bins covering full [0, 1] range with specified width
    bin_edges = np.arange(0, 1.0 + bin_width, bin_width)

    # Compute histogram and normalize
    hist, _ = np.histogram(features, bins=bin_edges)
    return hist / hist.sum()  # Return probability distribution

def extract_hog_features(
    image: np.ndarray,
    orientations: int = 9,
    pixels_per_cell: tuple[int, int] = (8, 8),
    cells_per_block: tuple[int, int] = (2, 2),
    **hog_kwargs
) -> np.ndarray:
    """
    Extracts HOG features without resizing images smaller than 16x16.

    Args:
        image: Input image (grayscale or RGB).
        orientations: Number of gradient orientation bins.
        pixels_per_cell: Cell size in pixels.
        cells_per_block: Block size in cells.
        **hog_kwargs: Additional arguments for skimage.feature.hog.

    Returns:
        Normalized HOG feature histogram.

    Raises:
        ValueError: If the image dimensions are smaller than 16x16.
    """
    # Validate input dimensions
    if image.shape[0] < 16 or image.shape[1] < 16:
        raise ValueError("Image dimensions must be at least 16x16 pixels.")

    # Extract HOG features
    try:
        features = hog(
            image,
            orientations=orientations,
            pixels_per_cell=pixels_per_cell,
            cells_per_block=cells_per_block,
            block_norm='L2-Hys',
            channel_axis=-1 if image.ndim == 3 else None,
            **hog_kwargs
        )
    except Exception as e:
        raise RuntimeError(f"HOG feature extraction failed: {str(e)}") from e

    return compute_hog_histogram(features)


### LBP

In [None]:
def lbp_features_color(img: np.ndarray, radius: int = 1, sampling_pixels: int = 8) -> np.ndarray:
    """
    Computes Local Binary Pattern (LBP) features for a color image.

    Args:
        img: Input image in BGR/RGB format (3 channels).
        radius: Radius of the LBP pattern (default=1).
        sampling_pixels: Number of sampling points in the circular LBP pattern (default=8).

    Returns:
        A concatenated feature vector of LBP histograms for the 3 color channels.

    Raises:
        ValueError: If the input image is empty or does not have 3 color channels.
    """

    # Input validation
    if img.size == 0:
        raise ValueError("The input image is empty.")
    if len(img.shape) != 3 or img.shape[2] != 3:
        raise ValueError("The image must have exactly 3 color channels.")

    # LBP configuration
    METHOD = "uniform"
    BINS = sampling_pixels + 2  # Bin for non-uniform patterns

    # Process each color channel separately
    histograms = []
    for channel_index in range(3):
        # Extract the channel and convert it to the appropriate type
        channel = img[:, :, channel_index].astype(np.uint8)

        # Compute the LBP pattern
        lbp = feature.local_binary_pattern(
            channel,
            P=sampling_pixels,
            R=radius,
            method=METHOD
        )

        # Compute normalized histogram
        hist, _ = np.histogram(
            lbp.ravel(),
            bins=np.arange(0, BINS + 1),
            density=True  # Automatic normalization
        )

        histograms.append(hist)

    # Concatenate histograms and ensure float32 type
    return np.concatenate(histograms).astype(np.float32)

### GLCM

In [None]:
def extract_glcm_features(image: np.ndarray,
                         distances: list = [1],
                         angles: list = [0, np.pi/4, np.pi/2, 3*np.pi/4]) -> np.ndarray:
    """
    Extracts texture features using Gray-Level Co-occurrence Matrix (GLCM) from a grayscale image.

    Args:
        image: Input RGB image (will be converted to grayscale)
        distances: List of pixel distances for co-occurrence (default: [1])
        angles: List of angles in radians (default: [0°, 45°, 90°, 135°])

    Returns:
        Concatenated feature vector containing:
        [contrast, dissimilarity, homogeneity, energy, correlation]

    Raises:
        ValueError: For invalid image input
    """

    # Input validation
    if not isinstance(image, np.ndarray) or image.size == 0:
        raise ValueError("Input must be a non-empty numpy array")
    if len(image.shape) != 3 or image.shape[2] != 3:
        raise ValueError("Input must be a 3-channel RGB image")

    # Convert to grayscale using OpenCV for better performance
    gray_image = cv2.cvtColor(image, cv2.COLOR_RGB2GRAY)

    # Normalize to 0-255 range if needed
    if gray_image.dtype != np.uint8:
        gray_image = (255 * (gray_image - gray_image.min()) /
                     (gray_image.max() - gray_image.min() + 1e-6)).astype(np.uint8)

    # Calculate GLCM matrix
    glcm = graycomatrix(gray_image,
                       distances=distances,
                       angles=angles,
                       symmetric=True,
                       normed=True)

    # Extract texture properties
    features = []
    for prop in ['contrast', 'dissimilarity', 'homogeneity', 'energy', 'correlation']:
        feature_values = graycoprops(glcm, prop)
        features.append(feature_values.mean())  # Average across distances/angles

    return np.array(features, dtype=np.float32)

### Label

In [None]:
class CorrosionLevel(Enum):
    NONE = 'no_corrosion'
    MILD = 'mild_corrosion'
    MODERATE = 'moderate_corrosion'
    SEVERE = 'severe_corrosion'

def classify_corrosion_binary(region: np.ndarray) -> str:
    """
    Classifies image region into corrosion (white) or no corrosion (black) using optimized vector operations.

    Args:
        region: Input image region (3-channel RGB array)

    Returns:
        Classification result ('corrosion' or 'no_corrosion')

    """
    # Validate input
    if not isinstance(region, np.ndarray) or region.size == 0:
        raise ValueError("Invalid input region")

    # Reshape and vectorize operations
    pixels = region.reshape(-1, 3)

    # Count non-black pixels (corrosion)
    corrosion_count = np.any(pixels != [0, 0, 0], axis=1).sum()
    total_pixels = pixels.shape[0]

    return 'corrosion' if corrosion_count / total_pixels > 0.5 else 'no_corrosion'

def classify_corrosion_multiclass(region: np.ndarray) -> Tuple[CorrosionLevel, np.ndarray]:
    """
    Classifies corrosion severity using color-coded pixels with vectorized operations.

    Args:
        region: Input image region (3-channel RGB array)

    Returns:
        Tuple containing:
        - CorrosionLevel enum
        - Normalized color distribution array
    """
    # Validate input
    if not isinstance(region, np.ndarray) or region.size == 0:
        raise ValueError("Invalid input region")

    # Define color thresholds (BGR format - adjust)
    COLOR_THRESHOLDS = {
        CorrosionLevel.NONE: ([0, 0, 0], 10),       # Black
        CorrosionLevel.MILD: ([0, 0, 200], 60),     # Red
        CorrosionLevel.MODERATE: ([0, 200, 0], 60), # Green
        CorrosionLevel.SEVERE: ([0, 200, 200], 60)  # Yellow
    }

    pixels = region.reshape(-1, 3)
    counts = np.zeros(len(COLOR_THRESHOLDS), dtype=int)

    # Vectorized color matching with tolerance
    for idx, (level, (color, tolerance)) in enumerate(COLOR_THRESHOLDS.items()):
        lower_bound = np.array(color) - tolerance
        upper_bound = np.array(color) + tolerance
        counts[idx] = np.sum(np.all((pixels >= lower_bound) & (pixels <= upper_bound), axis=1))

    # Get distribution and classification
    distribution = counts / counts.sum()
    max_idx = np.argmax(counts)

    return list(COLOR_THRESHOLDS.keys())[max_idx], distribution

## Dataframe

In [None]:
# Initialize DataFrame with proper columns
columns= ['ID',
    'red_mean', 'red_std', 'red_entropy','red_kurtosis', 'red_energy', 'red_skewness',
    'green_mean', 'green_std', 'green_entropy','green_kurtosis', 'green_energy', 'green_skewness',
    'blue_mean', 'blue_std', 'blue_entropy','blue_kurtosis', 'blue_energy', 'blue_skewness',
    'hue_mean', 'hue_std', 'hue_entropy','hue_kurtosis', 'hue_energy', 'hue_skewness',
    'saturation_mean', 'saturation_std', 'saturation_entropy','saturation_kurtosis', 'saturation_energy', 'saturation_skewness',
    'value_mean', 'value_std', 'value_entropy','value_kurtosis', 'value_energy', 'value_skewness',
    'glcm_contrast_0', 'glcm_dissimilarity_0', 'glcm_homogeneity_0', 'glcm_energy_0', 'glcm_correlation_0', 'glcm_ASM_0',
    'glcm_contrast_1', 'glcm_dissimilarity_1', 'glcm_homogeneity_1', 'glcm_energy_1', 'glcm_correlation_1', 'glcm_ASM_1',
    'glcm_contrast_2', 'glcm_dissimilarity_2', 'glcm_homogeneity_2', 'glcm_energy_2', 'glcm_correlation_2', 'glcm_ASM_2',
    'glcm_contrast_3', 'glcm_dissimilarity_3', 'glcm_homogeneity_3', 'glcm_energy_3', 'glcm_correlation_3', 'glcm_ASM_3',
    'hog_histogram_0', 'hog_histogram_0.1', 'hog_histogram_0.2', 'hog_histogram_0.3', 'hog_histogram_0.4', 'hog_histogram_0.5', 'hog_histogram_0.6', 'hog_histogram_0.7', 'hog_histogram_0.8',
    'lbp_features_1', 'lbp_features_2', 'lbp_features_3', 'lbp_features_4','lbp_features_5','lbp_features_6','lbp_features_7','lbp_features_8','lbp_features_9','lbp_features_10',
    'lbp_features_11', 'lbp_features_12', 'lbp_features_13', 'lbp_features_14', 'lbp_features_15', 'lbp_features_16', 'lbp_features_17', 'lbp_features_18', 'lbp_features_19', 'lbp_features_20',
    'lbp_features_21', 'lbp_features_22', 'lbp_features_23', 'lbp_features_24', 'lbp_features_25', 'lbp_features_26', 'lbp_features_27', 'lbp_features_28', 'lbp_features_29', 'lbp_features_30',
    'label_binary', 'label_multi', 'n_image'
                          ]
data = pd.DataFrame(columns=columns)

# Configuration parameters
GLCM_ANGLES = [0, np.pi/4, np.pi/2, 3*np.pi/4]
DISTANCES = [1]
PATCH_SIZE = (32, 32)
STRIDE = 4
GAUSSIAN_KERNEL = 5
GAUSSIAN_SIGMA = 1

def process_image_pairs(images, masks):
    """Main processing function for image pairs"""
    feature_rows = []

    for img_idx, (image, mask) in enumerate(zip(images, masks)):
        # Preprocess image
        blurred_img = gaussian_blur(image, kernel_size=GAUSSIAN_KERNEL, sigma=GAUSSIAN_SIGMA)

        # Extract patches
        img_patches = get_patches_with_stride(blurred_img, PATCH_SIZE, STRIDE)
        mask_patches = get_patches_with_stride(mask, PATCH_SIZE, STRIDE)

        for patch_idx, (roi, mask_roi) in enumerate(zip(img_patches, mask_patches)):
            try:
                # Feature extraction
                features = extract_features(roi)
                labels = get_labels(mask_roi)

                # Create feature row
                row = {
                    **features,
                    **labels,
                    'ID': f"{img_idx}_{patch_idx}",
                    'n_image': img_idx
                }
                feature_rows.append(row)

            except Exception as e:
                print(f"Error processing patch {img_idx}_{patch_idx}: {str(e)}")

    return pd.DataFrame(feature_rows)

def extract_features(roi):
    """Unified feature extraction function"""
    features = {}

    # Color features
    rgb_stats = extract_color_statistics_rgb(roi)
    hsv_stats = extract_color_statistics_hsv(roi)

    # Add color features to dict
    color_stats = {
        'red': rgb_stats[0],
        'green': rgb_stats[1],
        'blue': rgb_stats[2],
        'hue': hsv_stats[0],
        'saturation': hsv_stats[1],
        'value': hsv_stats[2]
    }
    for stat_name, stats in color_stats.items():
        features.update({
            f'{stat_name}_mean': stats[0],
            f'{stat_name}_std': stats[1],
            f'{stat_name}_entropy': stats[2],
            f'{stat_name}_kurtosis': stats[3],
            f'{stat_name}_energy': stats[4],
            f'{stat_name}_skewness': stats[5]
        })

    # GLCM features
    for angle_idx, angle in enumerate(GLCM_ANGLES):
        glcm_features = extract_glcm_features(roi, DISTANCES, [angle])
        features.update({
            f'glcm_contrast_{angle_idx}': glcm_features[0],
            f'glcm_dissimilarity_{angle_idx}': glcm_features[1],
            f'glcm_homogeneity_{angle_idx}': glcm_features[2],
            f'glcm_energy_{angle_idx}': glcm_features[3],
            f'glcm_correlation_{angle_idx}': glcm_features[4],
            f'glcm_ASM_{angle_idx}': glcm_features[5]
        })

    # HOG and LBP features
    hog_features = features_hog(roi)
    lbp_features = lbp_features_color(roi)

    features.update({
        **{f'hog_histogram_{i/10:.1f}': val for i, val in enumerate(hog_features)},
        **{f'lbp_features_{i+1}': val for i, val in enumerate(lbp_features)}
    })

    return features

def get_labels(mask_roi):
    """Get classification labels"""
    return {
        'label_binary': classify_corrosion_binary(mask_roi),
        'label_multi': classify_corrosion_multiclass(mask_roi)
    }

# Main execution
start_time = time.time()
df = process_image_pairs(images_tot, masks_tot)
end_time = time.time()

print(f"Processing completed in {end_time - start_time:.2f} seconds")
print(f"Generated {len(df)} samples")
df.to_csv('features_dataset.csv', index=False)