In [1]:
import os
import random
import json
import numpy as np
import cv2
import matplotlib.pyplot as plt
from sklearn.decomposition import PCA
from skimage.restoration import denoise_tv_chambolle
from skimage import exposure
from skimage.metrics import peak_signal_noise_ratio as psnr
from skimage.metrics import structural_similarity as ssim

In [2]:
class XRayImageProcessor:
                
    def __init__(self, noise_weight=0.05, pca_components=0.98):
    
    #def __init__(self, noise_weight=0.2, pca_components=0.95):
        """
        Initialize X-ray image processing pipeline
        
        Parameters:
        - noise_weight: Weight for total variation noise reduction (default: 0.1)
        - pca_components: Variance ratio to retain in PCA (default: 95%)
        """
        self.noise_weight = noise_weight
        self.pca_components = pca_components
        self.pca = None
    
    def preprocess_image(self, image):
        """
        Preprocess X-ray image: noise reduction and contrast enhancement
        
        Parameters:
        - image: Input X-ray image (numpy array)
        
        Returns:
        - Preprocessed image
        """
        # Convert to float for processing
        image_float = image.astype(np.float64) / 255.0
        
        # Total Variation Noise Reduction
        denoised = denoise_tv_chambolle(image_float, weight=self.noise_weight)
        
        # Contrast Limited Adaptive Histogram Equalization (CLAHE)
        clahe = exposure.equalize_adapthist(denoised, clip_limit=0.03)
        
        return (clahe * 255).astype(np.uint8)
    
    def apply_pca(self, image):
        """
        Apply PCA for dimensionality reduction
        
        Parameters:
        - image: 2D numpy array of the image
        
        Returns:
        - Compressed image preserving key features
        """
        # Reshape image to 2D for PCA
        height, width = image.shape
        image_2d = image.reshape(-1, 1)
        
        # Fit PCA if not already done
        if self.pca is None:
            self.pca = PCA(n_components=self.pca_components)
            self.pca.fit(image_2d)
        
        # Transform and inverse transform to get compressed image
        compressed = self.pca.transform(image_2d)
        reconstructed = self.pca.inverse_transform(compressed)
        
        return reconstructed.reshape(height, width)
    
    def enhance_features(self, image):
        """
        Enhance important structures in X-ray
        
        Parameters:
        - image: Input image
        
        Returns:
        - Enhanced image highlighting key structures
        """
        # Unsharp masking for edge enhancement
        #blurred = cv2.GaussianBlur(image, (0, 0), 3)
        blurred  = cv2.GaussianBlur(image, (5, 5), 1.5)
        sharpened = cv2.addWeighted(image, 1.5, blurred, -0.5, 0)
        
        # Enhance bone-like structures using morphological operations
        kernel = np.ones((3,3), np.uint8)
        enhanced = cv2.morphologyEx(sharpened, cv2.MORPH_CLOSE, kernel)
        
        return enhanced
    
    def process_xray(self, image):
        """
        Complete X-ray image processing pipeline
        
        Parameters:
        - image: Input X-ray image
        
        Returns:
        - Fully processed image
        """
        # Preprocessing: Noise Reduction
        preprocessed = self.preprocess_image(image)
        
        # Dimensionality Reduction
        compressed = self.apply_pca(preprocessed)
        
        # Feature Enhancement
        enhanced = self.enhance_features(compressed.astype(np.uint8))
        
        return enhanced

In [3]:
def visualize_results(original_images, processed_images, output_path):
    """
    Create a visualization of original and processed images
    
    Parameters:
    - original_images: List of original images
    - processed_images: List of processed images
    - output_path: Path to save visualization
    """
    # Limit to first 10 images for visualization
    num_images = min(10, len(original_images))
    
    plt.figure(figsize=(15, 3 * num_images))
    for i in range(num_images):
        plt.subplot(num_images, 2, 2*i + 1)
        plt.title(f'Original Image {i+1}')
        plt.imshow(original_images[i], cmap='gray')
        plt.axis('off')
        
        plt.subplot(num_images, 2, 2*i + 2)
        plt.title(f'Processed Image {i+1}')
        plt.imshow(processed_images[i], cmap='gray')
        plt.axis('off')
    
    plt.tight_layout()
    plt.savefig(os.path.join(output_path, 'comparison.png'))
    plt.close()

In [4]:
class XRayImageEvaluator:
    def __init__(self, processor):
        """
        Initialize evaluator with the image processor
        
        Parameters:
        - processor: XRayImageProcessor instance for consistent processing
        """
        self.processor = processor
    
    def calculate_quality_metrics(self, original_images, processed_images):
        """
        Compute quantitative image quality metrics
        
        Parameters:
        - original_images: List of original X-ray images
        - processed_images: List of processed X-ray images
        
        Returns:
        - Dictionary of performance metrics
        """
        metrics = {
            'psnr_values': [],
            'ssim_values': [],
            'compression_ratios': []
        }
        
        for original, processed in zip(original_images, processed_images):
            # Ensure images are the same size
            min_height = min(original.shape[0], processed.shape[0])
            min_width = min(original.shape[1], processed.shape[1])
            
            original = original[:min_height, :min_width]
            processed = processed[:min_height, :min_width]
            
            # Peak Signal-to-Noise Ratio (PSNR)
            try:
                psnr_value = psnr(original, processed)
                metrics['psnr_values'].append(psnr_value)
                
                # Structural Similarity Index (SSIM)
                ssim_value = ssim(original, processed)
                metrics['ssim_values'].append(ssim_value)
                
                # Compression Ratio via PCA
                original_size = original.nbytes
                processed_size = processed.nbytes
                compression_ratio = 1 - (processed_size / original_size)
                metrics['compression_ratios'].append(compression_ratio)
            except Exception as e:
                print(f"Metrics calculation error: {e}")
        
        # Aggregate metrics with error handling
        metrics['avg_psnr'] = np.mean(metrics['psnr_values']) if metrics['psnr_values'] else None
        metrics['avg_ssim'] = np.mean(metrics['ssim_values']) if metrics['ssim_values'] else None
        metrics['avg_compression_ratio'] = np.mean(metrics['compression_ratios']) if metrics['compression_ratios'] else None
        
        return metrics
    
    def feature_consistency_analysis(self, processed_images):
        """
        Analyze feature extraction consistency
        
        Parameters:
        - processed_images: List of processed X-ray images
        
        Returns:
        - Consistency metrics and potential anomalies
        """
        # Compute feature variability using standard deviation of pixel intensities
        feature_variability = [
            np.std(image[image > np.mean(image)]) 
            for image in processed_images
        ]
        
        return {
            'feature_variability': feature_variability,
            'avg_feature_stability': np.mean(feature_variability) if feature_variability else None,
            'max_feature_variation': np.max(feature_variability) if feature_variability else None
        }
    
    def comprehensive_evaluation(self, original_images, processed_images):
        """
        Perform a comprehensive evaluation of the image processing pipeline
        
        Parameters:
        - original_images: List of original X-ray images
        - processed_images: List of processed X-ray images
        
        Returns:
        - Detailed evaluation report
        """
        # Quality Metrics
        quality_metrics = self.calculate_quality_metrics(original_images, processed_images)
        
        # Feature Consistency
        consistency_metrics = self.feature_consistency_analysis(processed_images)
        
        # Combine metrics into a comprehensive report
        evaluation_report = {
            'Quality Metrics': quality_metrics,
            'Feature Consistency': consistency_metrics,
            'Recommendations': self._generate_recommendations(quality_metrics, consistency_metrics)
        }
        
        return evaluation_report
    
    def _generate_recommendations(self, quality_metrics, consistency_metrics):
        """
        Generate processing recommendations based on evaluation results
        
        Parameters:
        - quality_metrics: Dictionary of quality evaluation results
        - consistency_metrics: Dictionary of feature consistency results
        
        Returns:
        - List of processing recommendations
        """
        recommendations = []
        
        # PSNR Recommendations
        if quality_metrics['avg_psnr'] is not None and quality_metrics['avg_psnr'] < 30:
            recommendations.append(
                "Consider adjusting noise reduction parameters to improve image quality."
            )
        
        # Compression Ratio Recommendations
        if (quality_metrics['avg_compression_ratio'] is not None and 
            quality_metrics['avg_compression_ratio'] < 0.5):
            recommendations.append(
                "PCA compression might be losing too much information. Increase retained variance."
            )
        
        # Feature Consistency Recommendations
        if (consistency_metrics['max_feature_variation'] is not None and 
            consistency_metrics['avg_feature_stability'] is not None and
            consistency_metrics['max_feature_variation'] > 2 * consistency_metrics['avg_feature_stability']):
            recommendations.append(
                "Detected significant variability in feature extraction. Review enhancement techniques."
            )
        
        return recommendations

In [5]:
def process_and_evaluate_images(input_path, output_path, num_images=50):
    """
    Comprehensive image processing and evaluation function
    
    Parameters:
    - input_path: Directory containing input images
    - output_path: Directory to save processed images
    - num_images: Number of images to process
    
    Returns:
    - Processed images
    - Original images
    - Evaluation report
    """
    # Create output directory if it doesn't exist
    os.makedirs(output_path, exist_ok=True)
    
    # Create processor and evaluator
    processor = XRayImageProcessor()
    evaluator = XRayImageEvaluator(processor)
    
    # Get image files
    image_files = [f for f in os.listdir(input_path) 
                   if f.lower().endswith(('.png', '.jpg', '.jpeg', '.bmp', '.tif', '.tiff'))]
    
    # Randomly select images
    selected_files = random.sample(image_files, min(num_images, len(image_files)))
    
    original_images = []
    processed_images = []
    
    for filename in selected_files:
        input_image_path = os.path.join(input_path, filename)
        
        try:
            # Read image in grayscale
            image = cv2.imread(input_image_path, cv2.IMREAD_GRAYSCALE)
            
            if image is None:
                print(f"Could not read image: {filename}")
                continue
            
            # Process the image ONCE
            processed_image = processor.process_xray(image)
            
            # Store images
            original_images.append(image)
            processed_images.append(processed_image)
            
            # Save processed image
            output_image_path = os.path.join(output_path, f'processed_{filename}')
            cv2.imwrite(output_image_path, processed_image)
        
        except Exception as e:
            print(f"Error processing {filename}: {e}")
    
    # Visualization
    visualize_results(original_images, processed_images, output_path)
    
    # Evaluation
    evaluation_report = evaluator.comprehensive_evaluation(original_images, processed_images)
    
    return processed_images, original_images, evaluation_report

In [6]:
def main():
    # Input and output paths
    input_path = r'C:\Users\nadon\computational perception\Finala\data\Normal'
    output_path = os.path.join(input_path, 'processed_random_images')
    
    # Set random seed for reproducibility
    random.seed(42)
    
    # Process and evaluate images
    processed_images, original_images, evaluation_report = process_and_evaluate_images(
        input_path, output_path, num_images=50
    )
    
    # Print evaluation results
    try:
        print(json.dumps(evaluation_report, indent=2))
    except Exception as e:
        print(f"Error printing evaluation report: {e}")

if __name__ == '__main__':
    main()

{
  "Quality Metrics": {
    "psnr_values": [
      16.137772560456657,
      15.297499078894184,
      16.152064399750937,
      15.390450596280989,
      17.458250822166775,
      16.831736607524853,
      15.438157506619156,
      15.351175021009434,
      15.72596631521119,
      16.02902580413906,
      15.924040665734559,
      16.72645648067184,
      15.539732327012786,
      15.510766217826104,
      16.524201468715876,
      13.749175565982188,
      15.631108334439707,
      14.650459423975654,
      15.493529340986143,
      15.148608362757486,
      16.11542892807163,
      15.902021057090614,
      15.449138824484617,
      16.720973259824795,
      15.603375182485292,
      17.36906385497047,
      17.82361550248105,
      15.512323371781903,
      15.9948807995601,
      16.88480679695588,
      16.167869401790355,
      16.732652206673023,
      16.75256275832727,
      16.013944627492354,
      16.251769470180115,
      15.99826776681143,
      16.600112349355616,
   

In [7]:
# Hypothetical PSNR calculation method
def calculate_psnr(original_image, processed_image):
    """
    Compute Peak Signal-to-Noise Ratio between original and processed images
    
    Mathematical representation:
    PSNR = 10 * log10((MAX_PIXEL_VALUE)^2 / MSE)
    Where MSE is Mean Squared Error between images
    """
    mse = np.mean((original_image - processed_image) ** 2)
    max_pixel_value = 255.0  # For 8-bit images
    psnr = 10 * np.log10((max_pixel_value ** 2) / mse)
    return psnr

In [8]:
calculate_psnr

<function __main__.calculate_psnr(original_image, processed_image)>