In [None]:
import os
import cv2
import numpy as np
from tqdm import tqdm
from skimage.metrics import structural_similarity
import pandas as pd
from IPython.display import display, HTML
import base64
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
import logging

# Configuration
INPUT_ROOT = ''
OUTPUT_ROOT = '_gaussian'
KERNEL_SIZES = [3, 5, 7, 9, 11, 13, 15]
SSIM_THRESHOLD = 0.85
MAX_PREVIEW = 20
LOG_FILE = 'image_processing.log'

# Setup logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(message)s', handlers=[logging.FileHandler(LOG_FILE), logging.StreamHandler()])

def calculate_ssim(original, denoised):
    """Compute SSIM between grayscale versions of original and denoised images."""
    return structural_similarity(original, denoised, data_range=255)

def find_optimal_kernel_gaussian(image):
    """Find the largest Gaussian filter kernel that maintains SSIM ≥ threshold."""
    gray_orig = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
    best_kernel = 3

    for k in KERNEL_SIZES:
        denoised = cv2.GaussianBlur(image, (k, k), 0)
        gray_denoised = cv2.cvtColor(denoised, cv2.COLOR_BGR2GRAY)
        current_ssim = calculate_ssim(gray_orig, gray_denoised)

        if current_ssim >= SSIM_THRESHOLD:
            best_kernel = k
        else:
            break
    return best_kernel

def encode_image_for_html(img):
    """Convert an image to base64 for inline HTML display."""
    _, buffer = cv2.imencode('.jpg', img)
    b64 = base64.b64encode(buffer).decode()
    return f'<img src="data:image/jpeg;base64,{b64}" width="100"/>'

def process_image(img_path):
    """Process a single image by applying Gaussian filter with optimal kernel size."""
    try:
        image = cv2.imread(img_path)
        if image is None:
            raise ValueError(f"Image could not be read: {img_path}")

        # Determine optimal kernel size
        kernel = find_optimal_kernel_gaussian(image)
        denoised = cv2.GaussianBlur(image, (kernel, kernel), 0)

        # Compute SSIM
        gray_orig = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
        gray_denoised = cv2.cvtColor(denoised, cv2.COLOR_BGR2GRAY)
        final_ssim = calculate_ssim(gray_orig, gray_denoised)

        # Save denoised image
        split, class_name = img_path.split(os.sep)[-3:-1]
        output_path = os.path.join(OUTPUT_ROOT, split, class_name, os.path.basename(img_path))
        os.makedirs(os.path.dirname(output_path), exist_ok=True)
        cv2.imwrite(output_path, denoised)

        return img_path, kernel, final_ssim, denoised
    except Exception as e:
        logging.error(f"Error processing {img_path}: {str(e)}")
        return img_path, None, None, None

def process_dataset_gaussian():
    report = []
    total_ssim = 0
    processed_count = 0

    # Calculate total images for progress bar
    total_images = sum(
        len(files)
        for _, _, files in os.walk(INPUT_ROOT)
        if any(f.lower().endswith(('.png', '.jpg', '.jpeg')) for f in files)
    )

    # Use tqdm to wrap the entire iteration and show progress bar
    progress = tqdm(total=total_images, desc="🔍 Gaussian Filtering", unit="img")

    # Create a pool of workers for parallel processing
    futures = []
    with ThreadPoolExecutor() as executor:
        for split in os.listdir(INPUT_ROOT):
            split_path = os.path.join(INPUT_ROOT, split)
            if not os.path.isdir(split_path):
                continue

            for class_name in os.listdir(split_path):
                class_path = os.path.join(split_path, class_name)
                if not os.path.isdir(class_path):
                    continue

                for filename in os.listdir(class_path):
                    if not filename.lower().endswith(('.png', '.jpg', '.jpeg')):
                        continue

                    img_path = os.path.join(class_path, filename)
                    futures.append(executor.submit(process_image, img_path))

        for future in as_completed(futures):
            img_path, kernel, final_ssim, denoised = future.result()

            if kernel is None:  # Image processing failed
                continue

            # Store preview image for table
            if processed_count < MAX_PREVIEW:
                preview = encode_image_for_html(denoised)
            else:
                preview = ""

            # Update report
            report.append({
                'Image': preview,
                'Path': img_path,
                'Kernel': f"({kernel}, {kernel})",
                'SSIM': f"{final_ssim:.4f}"
            })

            total_ssim += final_ssim
            processed_count += 1

            # Update the progress bar for each completed task
            progress.update(1)

    progress.close()
    avg_ssim = total_ssim / processed_count if processed_count > 0 else 0
    return report, avg_ssim

def show_html_table(report, avg_ssim):
    df = pd.DataFrame(report[:MAX_PREVIEW])
    logging.info(f"\n✅ Average SSIM: {avg_ssim:.2%} across {len(report)} images.")
    display(HTML(df.to_html(escape=False, index=False)))

if __name__ == "__main__":
    start_time = time.time()
    report, avg_ssim = process_dataset_gaussian()
    show_html_table(report, avg_ssim)
    end_time = time.time()

    logging.info(f"Total processing time: {end_time - start_time:.2f} seconds.")


🔍 Gaussian Filtering:   0%|          | 12/2853 [00:10<13:48,  3.43img/s] 

: 