In [1]:
import cv2
import numpy as np
import logging
import os
import time
from pathlib import Path
from typing import Dict, List, Tuple, Optional, Union
from dataclasses import dataclass

In [2]:
# Setup logging
try:
    # Ensure log directory exists
    log_dir = os.path.dirname(os.path.abspath("logs/preprocessing.log"))
    os.makedirs(log_dir, exist_ok=True)
    
    # Configure logging
    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
        handlers=[
            logging.FileHandler("preprocessing.log", mode='w'),
            logging.StreamHandler()
        ]
    )
    # Force output of initial log to confirm logging system is working
    logging.info("Logging system initialized")
except Exception as e:
    print(f"Error setting up logging system: {str(e)}")
    
logger = logging.getLogger("PreprocessingModule")
# Confirm logger is working properly
logger.info("Preprocessing module initialized")

2025-05-19 12:56:58,839 - root - INFO - Logging system initialized
2025-05-19 12:56:58,841 - PreprocessingModule - INFO - Preprocessing module initialized


In [3]:
@dataclass
class PreprocessingConfig:
    """Preprocessing module configuration"""
    target_size: Tuple[int, int] = (1024, 1024)  # Target size (height, width)
    normalize: bool = True                       # Whether to normalize pixel values
    clahe_clip_limit: float = 2.0                # CLAHE contrast limit
    clahe_grid_size: Tuple[int, int] = (8, 8)    # CLAHE grid size
    denoise_h: int = 10                          # Denoising strength
    gamma_correction: bool = True                # Whether to perform gamma correction
    gamma_value: float = 1.2                     # Gamma value
    sharpen: bool = True                         # Whether to sharpen
    adaptive_brightness: bool = True             # Whether to adjust brightness adaptively
    brightness_percentile: float = 0.95          # Brightness adjustment percentile
    crop_borders: bool = False                   # Whether to crop borders
    border_fraction: float = 0.05                # Border crop ratio

In [4]:
class ImagePreprocessor:
    """Part image preprocessing class for enhancing image quality and standardization"""
    
    def __init__(self, config: PreprocessingConfig = None):
        """
        Initialize preprocessor
        
        Args:
            config: Preprocessing configuration, uses default if None
        """
        self.config = config or PreprocessingConfig()
        self.clahe = cv2.createCLAHE(
            clipLimit=self.config.clahe_clip_limit,
            tileGridSize=self.config.clahe_grid_size
        )
        logger.info(f"Initialized preprocessor with config: {self.config}")
        
    def preprocess(self, image: np.ndarray) -> Dict[str, np.ndarray]:
        """
        Perform a series of preprocessing operations on the input image
        
        Args:
            image: Input BGR format image
            
        Returns:
            Dictionary containing original and preprocessed images
        """
        if image is None or image.size == 0:
            logger.error("Received empty image")
            raise ValueError("Input image is empty")
            
        start_time = time.time()
        
        logger.info(f"Starting image processing, shape: {image.shape}")
        result = {"original": image.copy()}
        
        # Record processing steps for debugging and analysis
        intermediate_results = {}
        
        # 1. Crop borders (if enabled)
        if self.config.crop_borders:
            image = self._crop_borders(image)
            intermediate_results["after_crop"] = image.copy()
        
        # 2. Resize to standard dimensions
        image = cv2.resize(image, 
                         (self.config.target_size[1], self.config.target_size[0]),
                         interpolation=cv2.INTER_AREA)
        intermediate_results["resized"] = image.copy()
        
        # 3. Convert to different color spaces for experimentation
        gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
        hsv = cv2.cvtColor(image, cv2.COLOR_BGR2HSV)
        intermediate_results["gray"] = gray.copy()
        intermediate_results["hsv"] = hsv.copy()
        
        # 4. Adaptive brightness adjustment
        if self.config.adaptive_brightness:
            image = self._adjust_brightness(image)
            intermediate_results["brightness_adjusted"] = image.copy()
        
        # 5. Contrast enhancement (CLAHE)
        enhanced_channels = []
        for i in range(3):  # Enhance each BGR channel separately
            enhanced_channel = self.clahe.apply(image[:, :, i])
            enhanced_channels.append(enhanced_channel)
        image = cv2.merge(enhanced_channels)
        intermediate_results["contrast_enhanced"] = image.copy()
        
        # Apply CLAHE to grayscale image as well
        gray_enhanced = self.clahe.apply(gray)
        intermediate_results["gray_enhanced"] = gray_enhanced.copy()
        
        # 6. Denoising
        image = cv2.fastNlMeansDenoisingColored(
            image, None, h=self.config.denoise_h, 
            hColor=self.config.denoise_h, templateWindowSize=7, searchWindowSize=21
        )
        intermediate_results["denoised"] = image.copy()
        
        # 7. Gamma correction (if enabled)
        if self.config.gamma_correction:
            lookUpTable = self._create_gamma_lut(self.config.gamma_value)
            image = cv2.LUT(image, lookUpTable)
            intermediate_results["gamma_corrected"] = image.copy()
        
        # 8. Sharpening (if enabled)
        if self.config.sharpen:
            kernel = np.array([[-1, -1, -1], 
                               [-1,  9, -1], 
                               [-1, -1, -1]])
            image = cv2.filter2D(image, -1, kernel)
            intermediate_results["sharpened"] = image.copy()
        
        # 9. Normalization (if enabled)
        if self.config.normalize:
            image = image.astype(np.float32) / 255.0
            intermediate_results["normalized"] = image.copy()
        
        # Add all intermediate results to output
        result["preprocessed"] = image
        result["gray"] = gray_enhanced if "gray_enhanced" in intermediate_results else gray
        result["intermediate"] = intermediate_results
        
        process_time = time.time() - start_time
        logger.info(f"Preprocessing completed, time: {process_time:.4f} seconds")
        
        return result
    
    def _adjust_brightness(self, image: np.ndarray) -> np.ndarray:
        """
        Adaptively adjust image brightness
        
        Args:
            image: Input image
            
        Returns:
            Brightness-adjusted image
        """
        # Calculate image brightness
        hsv = cv2.cvtColor(image, cv2.COLOR_BGR2HSV)
        v_channel = hsv[:, :, 2]
        
        # Calculate current brightness percentile
        current_brightness = np.percentile(v_channel, self.config.brightness_percentile * 100)
        target_brightness = 220  # Target brightness value (0-255)
        
        # If brightness is too low, increase it
        if current_brightness < target_brightness:
            alpha = target_brightness / max(current_brightness, 1)  # Avoid division by zero
            alpha = min(alpha, 3.0)  # Limit maximum brightness adjustment factor
            
            # Adjust brightness
            hsv[:, :, 2] = np.clip(v_channel * alpha, 0, 255).astype(np.uint8)
            return cv2.cvtColor(hsv, cv2.COLOR_HSV2BGR)
        
        return image
    
    def _create_gamma_lut(self, gamma: float) -> np.ndarray:
        """
        Create gamma correction lookup table
        
        Args:
            gamma: Gamma value
            
        Returns:
            Lookup table array of length 255
        """
        lookUpTable = np.empty((1, 256), np.uint8)
        for i in range(256):
            lookUpTable[0, i] = np.clip(pow(i / 255.0, gamma) * 255.0, 0, 255)
        return lookUpTable
    
    def _crop_borders(self, image: np.ndarray) -> np.ndarray:
        """
        Crop image borders
        
        Args:
            image: Input image
            
        Returns:
            Cropped image
        """
        h, w = image.shape[:2]
        crop_h = int(h * self.config.border_fraction)
        crop_w = int(w * self.config.border_fraction)
        
        return image[crop_h:h-crop_h, crop_w:w-crop_w]
    
    def process_batch(self, 
                     images: List[np.ndarray], 
                     batch_id: str = None) -> List[Dict[str, np.ndarray]]:
        """
        Process multiple images in batch
        
        Args:
            images: List of images
            batch_id: Batch ID for logging
            
        Returns:
            List of processing result dictionaries
        """
        batch_id = batch_id or f"batch_{int(time.time())}"
        logger.info(f"Starting batch processing, batch: {batch_id}, image count: {len(images)}")
        
        results = []
        for i, img in enumerate(images):
            try:
                logger.info(f"Processing image {i+1}/{len(images)} batch: {batch_id}")
                result = self.preprocess(img)
                results.append(result)
            except Exception as e:
                logger.error(f"Failed to process image {i+1}: {str(e)}")
                # Add an empty result to maintain index consistency
                results.append({"original": img, "error": str(e)})
                
        logger.info(f"Batch {batch_id} processing complete, successful: {sum(1 for r in results if 'error' not in r)}/{len(images)}")
        return results

In [23]:
class PreprocessingPipeline:
    """
    Preprocessing pipeline including image reading, batch processing and result saving
    """
    
    def __init__(self, 
                config: PreprocessingConfig = None,
                input_dir: str = None,
                output_dir: str = None):
        """
        Initialize preprocessing pipeline
        
        Args:
            config: Preprocessing configuration
            input_dir: Input image directory
            output_dir: Output directory for processed results
        """
        self.preprocessor = ImagePreprocessor(config)
        self.input_dir = input_dir
        self.output_dir = output_dir
        
        if self.output_dir:
            os.makedirs(self.output_dir, exist_ok=True)
            logger.info(f"Created output directory: {self.output_dir}")
    
    def process_file(self, file_path: str, save_result: bool = True) -> Dict[str, np.ndarray]:
        """
        Process a single image file
        
        Args:
            file_path: Image file path
            save_result: Whether to save processing results
            
        Returns:
            Processing result dictionary
        """
        logger.info(f"Reading file: {file_path}")
        image = cv2.imread(file_path)
        
        if image is None:
            logger.error(f"Cannot read image: {file_path}")
            raise IOError(f"Cannot read image file: {file_path}")
        
        result = self.preprocessor.preprocess(image)
        
        if save_result and self.output_dir:
            self._save_result(result, file_path)
            
        return result
    
    def process_directory(self, 
                         directory: str = None, 
                         batch_size: int = 10,
                         file_pattern: str = "*.png") -> Dict[str, List[Dict[str, np.ndarray]]]:
        """
        Process all images in a directory
        
        Args:
            directory: Input directory, uses initialized directory if None
            batch_size: Batch processing size
            file_pattern: File matching pattern
            
        Returns:
            Mapping of batch IDs to result lists
        """
        directory = directory or self.input_dir
        if not directory:
            logger.error("Input directory not specified")
            raise ValueError("Input directory not specified")
            
        logger.info(f"Processing directory: {directory}, pattern: {file_pattern}")
        print(f"Scanning directory: {directory}, looking for files: {file_pattern}")
        
        # Check if directory exists
        if not os.path.exists(directory):
            error_msg = f"Directory does not exist: {directory}"
            logger.error(error_msg)
            print(error_msg)
            raise ValueError(error_msg)
            
        # Ensure directory is absolute path
        abs_directory = os.path.abspath(directory)
        logger.info(f"Absolute path: {abs_directory}")
        
        # List all files in directory
        all_files = os.listdir(abs_directory)
        logger.info(f"All files in directory: {all_files}")
        print(f"Total files in directory: {len(all_files)}")
        
        # Use glob to find matching files
        files = list(Path(abs_directory).glob(file_pattern))
        
        if not files:
            logger.warning(f"No files matching {file_pattern} found in directory {directory}")
            # Try other matching patterns to find file types
            image_extensions = ['.png', '.jpg', '.jpeg', '.bmp', '.tif', '.tiff']
            for ext in image_extensions:
                test_files = list(Path(abs_directory).glob(f"*{ext}"))
                if test_files:
                    logger.info(f"Found {len(test_files)} {ext} files")
                    print(f"Found {len(test_files)} {ext} files, consider using --pattern '*{ext}'")
            return {}
            
        logger.info(f"Found {len(files)} matching files: {[f.name for f in files]}")
        
        # Process in batches
        results = {}
        for i in range(0, len(files), batch_size):
            batch_files = files[i:i+batch_size]
            batch_id = f"batch_{int(time.time())}_{i//batch_size}"
            
            logger.info(f"Processing batch {batch_id}, file count: {len(batch_files)}")
            
            batch_images = []
            file_paths = []
            
            # Read all images in batch
            for file_path in batch_files:
                try:
                    image = cv2.imread(str(file_path))
                    if image is not None:
                        batch_images.append(image)
                        file_paths.append(file_path)
                    else:
                        logger.warning(f"Cannot read image: {file_path}")
                except Exception as e:
                    logger.error(f"Failed to read image {file_path}: {str(e)}")
            
            # Batch processing
            batch_results = self.preprocessor.process_batch(batch_images, batch_id)
            
            # Save results
            if self.output_dir:
                for j, (result, file_path) in enumerate(zip(batch_results, file_paths)):
                    if "error" not in result:
                        self._save_result(result, file_path)
            
            results[batch_id] = batch_results
            
        return results

    def _save_result(self, result: Dict[str, np.ndarray], original_path: Union[str, Path]) -> None:
        """
        Save processing results
        
        Args:
            result: Processing result dictionary
            original_path: Original file path
        """
        filename = Path(original_path).stem
        output_subdir = os.path.join(self.output_dir, filename)
        os.makedirs(output_subdir, exist_ok=True)
        
        # if "preprocessed" in result:
        #     preprocessed = result["preprocessed"]
            
        #     if preprocessed.dtype == np.float32 or preprocessed.dtype == np.float64:
        #         preprocessed = (preprocessed * 255).astype(np.uint8)

        #     output_path = os.path.join(os.path.dirname(result), f"{filename}_preprocessed.png")
            
        #     cv2.imwrite(str(output_path), preprocessed)
        
        # Save preprocessed main image
        if "preprocessed" in result:
            preprocessed = result["preprocessed"]
            # Convert float image back to uint8
            if preprocessed.dtype == np.float32 or preprocessed.dtype == np.float64:
                preprocessed = (preprocessed * 255).astype(np.uint8)
            
            output_path = os.path.join(output_subdir, f"{filename}_preprocessed.png")
            cv2.imwrite(output_path, preprocessed)
        
        # Save grayscale image
        if "gray" in result:
            output_path = os.path.join(output_subdir, f"{filename}_gray.png")
            cv2.imwrite(output_path, result["gray"])
        
        # Save intermediate results (optional, for debugging)
        if "intermediate" in result:
            for step_name, img in result["intermediate"].items():
                # Ensure image is in uint8 format
                if img.dtype == np.float32 or img.dtype == np.float64:
                    img = (img * 255).astype(np.uint8)
                
                output_path = os.path.join(output_subdir, f"{filename}_{step_name}.png")
                cv2.imwrite(output_path, img)


In [6]:
def create_default_pipeline(input_dir: str = None, 
                          output_dir: str = None) -> PreprocessingPipeline:
    """
    Create preprocessing pipeline with default configuration
    
    Args:
        input_dir: Input directory
        output_dir: Output directory
        
    Returns:
        Preprocessing pipeline instance
    """
    config = PreprocessingConfig(
        target_size=(1024, 1024),
        normalize=True,
        clahe_clip_limit=2.5,
        clahe_grid_size=(8, 8),
        denoise_h=8,
        gamma_correction=True,
        gamma_value=1.2,
        sharpen=True,
        adaptive_brightness=True
    )
    
    return PreprocessingPipeline(config, input_dir, output_dir)

In [7]:
def create_enhanced_pipeline(input_dir: str = None,
                           output_dir: str = None) -> PreprocessingPipeline:
    """
    Create enhanced preprocessing pipeline with stronger contrast and sharpening
    
    Args:
        input_dir: Input directory
        output_dir: Output directory
        
    Returns:
        Preprocessing pipeline instance
    """
    config = PreprocessingConfig(
        target_size=(1280, 1280),  # Higher resolution
        normalize=True,
        clahe_clip_limit=3.5,      # Stronger contrast
        clahe_grid_size=(12, 12),  # Finer grid
        denoise_h=7,               # Slightly reduce denoising strength to preserve details
        gamma_correction=True,
        gamma_value=1.3,           # Slightly increase gamma value
        sharpen=True,
        adaptive_brightness=True,
        brightness_percentile=0.97 # Higher brightness adjustment percentile
    )
    
    return PreprocessingPipeline(config, input_dir, output_dir)

In [9]:
config = PreprocessingConfig(
    target_size=(1024, 1024),
    normalize=True,
    clahe_clip_limit=2.5,
    clahe_grid_size=(8, 8),
    denoise_h=8,
    gamma_correction=True,
    gamma_value=1.2,
    sharpen=True,
    adaptive_brightness=True
)

input_dir = "data/train"
output_dir = "data/train_preprocessed"

In [10]:
pipeline = PreprocessingPipeline(config, input_dir, output_dir)
batch = 10
pattern="*.png"

2025-05-19 16:09:51,178 - PreprocessingModule - INFO - Initialized preprocessor with config: PreprocessingConfig(target_size=(1024, 1024), normalize=True, clahe_clip_limit=2.5, clahe_grid_size=(8, 8), denoise_h=8, gamma_correction=True, gamma_value=1.2, sharpen=True, adaptive_brightness=True, brightness_percentile=0.95, crop_borders=False, border_fraction=0.05)
2025-05-19 16:09:51,180 - PreprocessingModule - INFO - Created output directory: data/train_preprocessed


In [24]:
logger.info(f"Processing directory: {input_dir}, batch size: {batch}")
results = pipeline.process_directory(
    str(input_dir), batch_size=batch, file_pattern=pattern
)
logger.info(f"Processing completed, total {sum(len(batch) for batch in results.values())} files")

2025-05-19 16:49:15,119 - PreprocessingModule - INFO - Processing directory: data/train, batch size: 10
2025-05-19 16:49:15,120 - PreprocessingModule - INFO - Processing directory: data/train, pattern: *.png
2025-05-19 16:49:15,121 - PreprocessingModule - INFO - Absolute path: d:\aiml\defect_detector\data\train
2025-05-19 16:49:15,121 - PreprocessingModule - INFO - All files in directory: ['Picture1.png', 'Picture2.png', 'Picture3.png', 'Picture4.png', 'Picture5.png', 'Picture6.png', 'Picture7.png', 'Picture8.png']
2025-05-19 16:49:15,121 - PreprocessingModule - INFO - Found 8 matching files: ['Picture1.png', 'Picture2.png', 'Picture3.png', 'Picture4.png', 'Picture5.png', 'Picture6.png', 'Picture7.png', 'Picture8.png']
2025-05-19 16:49:15,121 - PreprocessingModule - INFO - Processing batch batch_1747644555_0, file count: 8
2025-05-19 16:49:15,149 - PreprocessingModule - INFO - Starting batch processing, batch: batch_1747644555_0, image count: 8
2025-05-19 16:49:15,149 - PreprocessingMo

Scanning directory: data/train, looking for files: *.png
Total files in directory: 8


2025-05-19 16:49:17,914 - PreprocessingModule - INFO - Preprocessing completed, time: 2.7647 seconds
2025-05-19 16:49:17,929 - PreprocessingModule - INFO - Processing image 2/8 batch: batch_1747644555_0
2025-05-19 16:49:17,931 - PreprocessingModule - INFO - Starting image processing, shape: (248, 273, 3)
2025-05-19 16:49:20,884 - PreprocessingModule - INFO - Preprocessing completed, time: 2.9534 seconds
2025-05-19 16:49:20,884 - PreprocessingModule - INFO - Processing image 3/8 batch: batch_1747644555_0
2025-05-19 16:49:20,884 - PreprocessingModule - INFO - Starting image processing, shape: (280, 304, 3)
2025-05-19 16:49:23,871 - PreprocessingModule - INFO - Preprocessing completed, time: 2.9869 seconds
2025-05-19 16:49:23,875 - PreprocessingModule - INFO - Processing image 4/8 batch: batch_1747644555_0
2025-05-19 16:49:23,881 - PreprocessingModule - INFO - Starting image processing, shape: (328, 341, 3)
2025-05-19 16:49:26,800 - PreprocessingModule - INFO - Preprocessing completed, ti

TypeError: 'module' object is not callable