In [2]:
"""pip install gradio scikit-image scikit-learn cupy matplotlib"""

'pip install gradio scikit-image scikit-learn cupy matplotlib'

# Imports, Model Loading, and Preprocessing Pipeline Functions

In [3]:
# Cell 1: Imports, model loading, and pipeline functions

import gradio as gr
import numpy as np
import cupy as cp
import cv2
from skimage.util import img_as_ubyte, img_as_float
from skimage import morphology, filters, segmentation, exposure, feature, transform
from scipy.ndimage import convolve, gaussian_gradient_magnitude, distance_transform_edt
from scipy.ndimage import label as nd_label
from skimage.color import rgb2gray
from sklearn.preprocessing import MinMaxScaler
from joblib import load

# ---------------------------
# Load trained models
# ---------------------------
models = {
    "Random Forest": r"pkl_files\best_random_forest_model.pkl",
    "KNN": r"pkl_files\best_KNN_model.pkl",
    "Decision Tree": r"pkl_files\Decision_tree_model.pkl",
    "Gaussian NB": r"pkl_files\GaussianNB_model.pkl",
    "Logistic Regression": r"pkl_files\logistic_regression_model.pkl",
    "SVM": r"pkl_files\svm_model.pkl",
}
loaded_models = {model_name: load(model_path) for model_name, model_path in models.items()}

# ---------------------------
# Preprocessing Pipeline Functions
# ---------------------------
def anisotropic_diffusion_filter(image, alpha=0.1, beta=0.1, iterations=3):
    image = img_as_float(image.copy())
    for _ in range(iterations):
        gradient_magnitude = gaussian_gradient_magnitude(image, sigma=1)
        c = 1.0 / (1.0 + (gradient_magnitude / beta) ** 2)
        diff_north = convolve(image, np.array([[0, 1, 0],
                                               [0, -1, 0],
                                               [0, 0, 0]]))
        diff_south = convolve(image, np.array([[0, 0, 0],
                                               [0, -1, 0],
                                               [0, 1, 0]]))
        diff_east = convolve(image, np.array([[0, 0, 0],
                                              [0, -1, 1],
                                              [0, 0, 0]]))
        diff_west = convolve(image, np.array([[0, 0, 0],
                                              [1, -1, 0],
                                              [0, 0, 0]]))
        image += alpha * (c * diff_north + c * diff_south + c * diff_east + c * diff_west)
    return img_as_ubyte(image)

def skull_stripping(image, se_closing=None, se_erosion=None, skull_remove_area=2000):
    if se_closing is None:
        se_closing = morphology.disk(15)
    if se_erosion is None:
        se_erosion = morphology.disk(30)
    threshold_value = filters.threshold_otsu(image)
    binary_image = image > threshold_value
    filled_image = morphology.closing(binary_image, se_closing)
    filled_image = morphology.remove_small_holes(filled_image, area_threshold=skull_remove_area)
    eroded_image = morphology.erosion(filled_image, se_erosion)
    return np.where(eroded_image, image, 0)

def top_hat_filtering(image, disk_size=30):
    footprint = morphology.disk(disk_size)
    return morphology.white_tophat(image, footprint)

def contrast_enhancement(image):
    contrasted_image = exposure.equalize_hist(image)
    contrasted_image = (contrasted_image - np.min(contrasted_image)) / (np.max(contrasted_image) - np.min(contrasted_image))
    return img_as_ubyte(contrasted_image)

def get_binary(image, thresh=None):
    if thresh is None:
        thresh = filters.threshold_otsu(image)
    binary_image = image > thresh
    return binary_image, thresh

def watershed_segmentation(image, se_peak_local=np.ones((3, 3))):
    distance = distance_transform_edt(image)
    local_maxi = feature.peak_local_max(distance, labels=image, footprint=se_peak_local, exclude_border=False)
    local_maxi_mask = np.zeros_like(image, dtype=bool)
    local_maxi_mask[tuple(local_maxi.T)] = True
    markers = nd_label(local_maxi_mask)[0]
    segmented_image = segmentation.watershed(-distance, markers, mask=image)
    nbr_labels = np.max(segmented_image)
    return segmented_image, nbr_labels

def apply_morphological_operations(segmented_image):
    if not isinstance(segmented_image, np.ndarray) or segmented_image.ndim != 2:
        return None, None
    closed_image = morphology.closing(segmented_image, morphology.disk(6))
    opened_image = morphology.opening(closed_image, morphology.disk(5))
    return closed_image, opened_image

def preprocess_pipeline(image):
    try:
        # Convert to grayscale
        grayscale_image = rgb2gray(image)
        print(f"Grayscale Image Min: {grayscale_image.min()}, Max: {grayscale_image.max()}")

        adf_filtered_image = anisotropic_diffusion_filter(grayscale_image)
        print(f"ADF Filtered Min: {adf_filtered_image.min()}, Max: {adf_filtered_image.max()}")

        skull_stripped_image = skull_stripping(adf_filtered_image)
        print(f"Skull Stripped Min: {skull_stripped_image.min()}, Max: {skull_stripped_image.max()}")

        tophat_image = top_hat_filtering(skull_stripped_image)
        print(f"Tophat Filtered Min: {tophat_image.min()}, Max: {tophat_image.max()}")

        contrasted_image = contrast_enhancement(tophat_image)
        print(f"Contrast Enhanced Min: {contrasted_image.min()}, Max: {contrasted_image.max()}")

        # Here thresh is set to 230 (assumes an 8-bit image after contrast enhancement)
        binary_image, _ = get_binary(contrasted_image, thresh=230)
        print(f"Binary Image Min: {binary_image.min()}, Max: {binary_image.max()}")

        segmented_image, _ = watershed_segmentation(binary_image, morphology.disk(10))
        print(f"Segmented Image Min: {segmented_image.min()}, Max: {segmented_image.max()}")

        closed_image_post, opened_image_post = apply_morphological_operations(segmented_image)
        print(f"Opened Image Post Min: {opened_image_post.min()}, Max: {opened_image_post.max()}")

        # Resize to 256x256 for further processing/visualization,
        # preserving the original intensity range.
        resized_opened_image = transform.resize(opened_image_post, (256, 256),
                                                anti_aliasing=True, preserve_range=True)
        # Optionally, rescale intensities to use full 0-255 if needed:
        if resized_opened_image.max() > 0:
            resized_opened_image = (resized_opened_image / resized_opened_image.max()) * 255
        resized_opened_image = resized_opened_image.astype(np.uint8)
        print(f"Resized Opened Image Min: {resized_opened_image.min()}, Max: {resized_opened_image.max()}")

        # Flatten and scale features
        flattened_image = resized_opened_image.flatten().reshape(1, -1)
        scaler = MinMaxScaler()
        scaled_features = scaler.fit_transform(flattened_image)

        return scaled_features, resized_opened_image
    except Exception as e:
        print(f"Error in preprocessing pipeline: {e}")
        raise

# Additional helper functions for GUI usage later
def preprocess_image(image):
    """Helper function for the GUI Preprocess button."""
    _, processed_image = preprocess_pipeline(image)
    return processed_image

def classify_image(image, model_name):
    try:
        scaled_features, opened_image = preprocess_pipeline(image)
        model = loaded_models[model_name]
        # Adjust feature size if necessary
        expected_size = model.n_features_in_
        if scaled_features.shape[1] != expected_size:
            print(f"Resizing from {scaled_features.shape[1]} to {expected_size}")
            scaled_features = np.resize(scaled_features, (1, expected_size))
        prediction = model.predict(scaled_features)
        return opened_image, f"Predicted Label: {prediction[0]}"
    except Exception as e:
        print(f"Error in classification: {e}")
        return None, "Error"

print("Cell 1 executed: All imports, models, and preprocessing functions are loaded.")


Cell 1 executed: All imports, models, and preprocessing functions are loaded.


# Define the ImagePreprocessor Class and Load the Saved Preprocessing Pipeline

In [4]:
# Cell 2: Define ImagePreprocessor class and load the preprocessing pipeline

from sklearn.base import BaseEstimator, TransformerMixin
import joblib  # Ensure joblib is imported

class ImagePreprocessor(BaseEstimator, TransformerMixin):
    def __init__(self):
        pass  # No initialization parameters needed

    def fit(self, X, y=None):
        return self  # No fitting needed

    def transform(self, X):
        processed_images = []
        for image in X:
            try:
                # Apply the same pipeline as defined in Cell 1
                adf_filtered_image = anisotropic_diffusion_filter(image)
                skull_stripped_image = skull_stripping(adf_filtered_image)
                tophat_image = top_hat_filtering(skull_stripped_image)
                contrasted_image = contrast_enhancement(tophat_image)
                binary_image, _ = get_binary(contrasted_image, thresh=230)
                segmented_image, _ = watershed_segmentation(binary_image, morphology.disk(10))
                _, opened_image = apply_morphological_operations(segmented_image)
                # Resize to 256x256 for consistency, preserving range
                resized_opened_image = transform.resize(opened_image, (256, 256),
                                                        anti_aliasing=True, preserve_range=True)
                if resized_opened_image.max() > 0:
                    resized_opened_image = (resized_opened_image / resized_opened_image.max()) * 255
                resized_opened_image = resized_opened_image.astype(np.uint8)
                flattened_image = resized_opened_image.flatten().reshape(1, -1)
                processed_images.append(flattened_image)
            except Exception as e:
                print(f"Error processing image: {e}")
                processed_images.append(None)
        return np.vstack(processed_images)

# ---------------------------
# Load the preprocessing pipeline from disk
# ---------------------------
preprocessing_pipeline_path = r"pkl_files\preprocessing_pipeline.pkl"
preprocessor = joblib.load(preprocessing_pipeline_path)
print("Cell 2 executed: Preprocessing pipeline loaded successfully!")


Cell 2 executed: Preprocessing pipeline loaded successfully!


# Gradio GUI for Preprocessing and Classification

In [5]:
# Cell 3: Gradio GUI Launch

import gradio as gr
import numpy as np
import joblib
from skimage.color import rgb2gray

# --- Helper function for visualization ---
def resize_for_visualization(flattened_array, target_size=(256, 256)):
    """Resizes a flattened array back to a 2D image for visualization."""
    reshaped_array = flattened_array[: np.prod(target_size)].reshape(target_size)
    # Normalize to [0, 255]
    reshaped_array = (reshaped_array - reshaped_array.min()) / (reshaped_array.max() - reshaped_array.min() + 1e-8)
    reshaped_array = (reshaped_array * 255).astype(np.uint8)
    return reshaped_array

# --- Functions for the GUI using the saved preprocessor ---
def preprocess_image_gui(image):
    """
    Uses the saved preprocessor to transform the image and then 
    resizes the output for visualization.
    """
    try:
        grayscale_image = rgb2gray(image)
        processed_features = preprocessor.transform([grayscale_image])
        visualization_image = resize_for_visualization(processed_features.flatten())
        return image, visualization_image
    except Exception as e:
        print(f"Error in preprocessing (GUI): {e}")
        return image, None

def classify_image_gui(image, model_name):
    """
    Applies the saved preprocessor and then uses the selected model for classification.
    Also resizes the processed features for preview.
    """
    try:
        grayscale_image = rgb2gray(image)
        processed_features = preprocessor.transform([grayscale_image])
        if processed_features is None:
            return image, None, "Error in preprocessing"
        model = loaded_models[model_name]
        # Resize features if needed
        expected_size = model.n_features_in_
        flattened_features = processed_features.flatten().reshape(1, -1)
        if flattened_features.shape[1] != expected_size:
            flattened_features = np.resize(flattened_features, (1, expected_size))
        prediction = model.predict(flattened_features)[0]
        # Map prediction to a label
        label_mapping = {0: "Tumor Detected", 1: "Healthy"}
        label = label_mapping.get(prediction, "Unknown")
        visualization_image = resize_for_visualization(processed_features.flatten())
        return image, visualization_image, f"Result: {label}"
    except Exception as e:
        print(f"Error in classification (GUI): {e}")
        return image, None, "Error"

# --- Gradio Interface ---
def gui():
    with gr.Blocks() as demo:
        gr.Markdown("## Image Classification Demo")
        with gr.Row():
            with gr.Column():
                image_input = gr.Image(label="Upload a Photo", type="numpy", interactive=True)
                preprocess_button = gr.Button("Preprocess")
                preprocess_output = gr.Image(label="Preprocessed Image Preview")
            with gr.Column():
                model_dropdown = gr.Dropdown(choices=list(models.keys()), label="Select Model", value="Random Forest")
                classify_button = gr.Button("Classify")
                prediction_output = gr.Textbox(label="Predicted Label", interactive=False)
        
        # When the Preprocess button is clicked:
        preprocess_button.click(
            preprocess_image_gui, 
            inputs=image_input, 
            outputs=[image_input, preprocess_output]
        )
        # When the Classify button is clicked:
        classify_button.click(
            classify_image_gui, 
            inputs=[image_input, model_dropdown], 
            outputs=[image_input, preprocess_output, prediction_output]
        )
    demo.launch()

gui()


* Running on local URL:  http://127.0.0.1:7860

To create a public link, set `share=True` in `launch()`.


ERROR:    Exception in ASGI application
Traceback (most recent call last):
  File "c:\Users\Marwan Shamsan\anaconda3\Lib\site-packages\uvicorn\protocols\http\h11_impl.py", line 403, in run_asgi
    result = await app(  # type: ignore[func-returns-value]
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "c:\Users\Marwan Shamsan\anaconda3\Lib\site-packages\uvicorn\middleware\proxy_headers.py", line 60, in __call__
    return await self.app(scope, receive, send)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "c:\Users\Marwan Shamsan\anaconda3\Lib\site-packages\fastapi\applications.py", line 1054, in __call__
    await super().__call__(scope, receive, send)
  File "c:\Users\Marwan Shamsan\anaconda3\Lib\site-packages\starlette\applications.py", line 112, in __call__
    await self.middleware_stack(scope, receive, send)
  File "c:\Users\Marwan Shamsan\anaconda3\Lib\site-packages\starlette\middleware\errors.py", line 187, in __call__
    raise exc
  File "c:\Users\Mar