Importing the necessary libraries

In [None]:
import cv2
from skimage.restoration import denoise_nl_means, estimate_sigma
import numpy as np
import matplotlib as plot
from tqdm import tqdm
from tensorflow.keras.applications import ResNet50
from tensorflow.keras.applications.resnet50 import preprocess_input
from tensorflow.keras.models import Model
from sklearn.decomposition import PCA
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, MinMaxScaler, RobustScaler
from sklearn.neighbors import KNeighborsClassifier
from sklearn.metrics import accuracy_score
from scipy.signal import wiener
from skimage import exposure, restoration
from sklearn.neighbors import KNeighborsClassifier
from sklearn.metrics import accuracy_score

Preprocessing data with the filtering techinques
1. Median Filtered
2. TopHat Filter
3. BlackHat Filter
4. ImAdjust Filter
5. Weiner Filter

In [None]:
# Resize images for uniformity
IMG_SIZE = 224

# ------------------- FILTER FUNCTIONS -------------------

def apply_median_filter(image, kernel_size=3):
    return cv2.medianBlur(image, kernel_size)

def apply_tophat_filter(image, kernel_size=15):
    kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (kernel_size, kernel_size))
    return cv2.morphologyEx(image, cv2.MORPH_TOPHAT, kernel)

def apply_blackhat_filter(image, kernel_size=15):
    kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (kernel_size, kernel_size))
    return cv2.morphologyEx(image, cv2.MORPH_BLACKHAT, kernel)

def apply_imadjust_filter(image, low_percent=1, high_percent=99):
    p_low, p_high = np.percentile(image, (low_percent, high_percent))
    return exposure.rescale_intensity(image, in_range=(p_low, p_high))

def apply_wiener_filter(image):
    try:
        # Convert to grayscale
        gray = cv2.cvtColor(image, cv2.COLOR_RGB2GRAY)

        # Normalize to float32 for Wiener
        img_norm = gray.astype(np.float32) / 255.0

        # Apply Wiener filter (3x3 window)
        filtered = restoration.wiener(img_norm, psf=np.ones((3, 3)) / 9, balance=0.2)

        # Handle NaNs, Infs
        filtered = np.nan_to_num(filtered, nan=0.0, posinf=1.0, neginf=0.0)

        # Scale back to uint8
        filtered_uint8 = np.clip(filtered * 255, 0, 255).astype(np.uint8)

        # Convert back to RGB so ResNet can use it
        return cv2.cvtColor(filtered_uint8, cv2.COLOR_GRAY2RGB)

    except Exception as e:
        print(f"[Wiener Filter Error] {str(e)}")
        return image  # Return original if error


def ensure_rgb(img):
    if img.ndim == 2:
        return cv2.cvtColor(img, cv2.COLOR_GRAY2RGB)
    elif img.ndim == 3 and img.shape[2] == 1:
        return np.repeat(img, 3, axis=2)
    return img

# ------------------- PROCESSING FUNCTION -------------------

def show_all_filters_comparison(image_paths, labels, filter_type="median"):
    """
    Applies specified filter to a list of images.

    Args:
        image_paths (list): List of image file paths.
        labels (list): Corresponding image labels.
        filter_type (str): One of ['median', 'tophat', 'blackhat', 'imadjust', 'wiener'].

    Returns:
        filtered_images (np.array): Array of filtered images.
        filtered_labels (np.array): Array of labels.
    """
    filtered_images = []
    filtered_labels = []

    for path, label in tqdm(zip(image_paths, labels), total=len(image_paths), desc=f"Applying {filter_type} filter"):
        original = cv2.imread(path)
        if original is None:
            print(f"[Warning] Could not read: {path}")
            continue

        original = cv2.resize(original, (224, 224))
        original = cv2.cvtColor(original, cv2.COLOR_BGR2RGB)

        match filter_type.lower():
            case "median":
                filtered = apply_median_filter(original)
            case "tophat":
                gray = cv2.cvtColor(original, cv2.COLOR_RGB2GRAY)
                filtered = apply_tophat_filter(gray)
                filtered = ensure_rgb(filtered)
            case "blackhat":
                gray = cv2.cvtColor(original, cv2.COLOR_RGB2GRAY)
                filtered = apply_blackhat_filter(gray)
                filtered = ensure_rgb(filtered)
            case "imadjust":
                gray = cv2.cvtColor(original, cv2.COLOR_RGB2GRAY)
                filtered = apply_imadjust_filter(gray)
                filtered = ensure_rgb(filtered)
            case "wiener":
                filtered = apply_wiener_filter(original)  # already returns RGB
            case _:
                filtered = original

        filtered_images.append(filtered)
        filtered_labels.append(label)

    return np.array(filtered_images), np.array(filtered_labels)




Preparing 6 datasets for each of the filters including no filter mentioned above

In [None]:
images_original, labels_original = show_all_filters_comparison(image_paths, labels, filter_type="original")
images_median, labels_median = show_all_filters_comparison(image_paths, labels, filter_type="median")
images_top, labels_top = show_all_filters_comparison(image_paths, labels, filter_type="tophat")
images_blackhat, labels_blackhat = show_all_filters_comparison(image_paths, labels, filter_type="blackhat")
images_imadjust, labels_imadjust = show_all_filters_comparison(image_paths, labels, filter_type="imadjust")
images_wiener, labels_wiener = show_all_filters_comparison(image_paths, labels, filter_type="wiener")

Loading the pre-trained ResNet-50 with imageNet weights to extract cutting edge feaatures from the x-ray

In [None]:
base_model = ResNet50(weights='imagenet', include_top=False, input_shape=(224, 224, 3))
model = Model(inputs=base_model.input, outputs=base_model.output)

print("ResNet50 loaded.")

Defining the extract feature method
1. Extracting the features in batches since the dataset is large, stacking each feature at the end

In [None]:
from tensorflow.keras.applications.resnet50 import preprocess_input
import numpy as np

def extract_features_in_batches(images, batch_size=32):
    all_features = []

    num_samples = len(images)
    for i in range(0, num_samples, batch_size):
        batch = images[i:i+batch_size]

        # If grayscale, convert to 3-channel
        if batch[0].ndim == 2 or batch[0].shape[-1] == 1:
            batch = np.stack([cv2.cvtColor(img, cv2.COLOR_GRAY2RGB) for img in batch])

        # Preprocess for ResNet
        batch_prep = preprocess_input(batch.astype(np.float32))

        # Predict
        features = model.predict(batch_prep, verbose=0)

        # Flatten features
        features_flat = features.reshape(features.shape[0], -1)
        all_features.append(features_flat)

    return np.vstack(all_features)


Extracting features for all the 6 datasets

In [None]:
features_original = extract_features_in_batches(images_original)
features_median = extract_features_in_batches(images_median)
features_top = extract_features_in_batches(images_top)
features_black = extract_features_in_batches(images_blackhat)
features_imadjust = extract_features_in_batches(images_imadjust)
features_wiener = extract_features_in_batches(images_wiener)

Since the vector or dimensions creating issue with KNN , applying PCA on features vectors

In [None]:
def apply_pca_to_reduce_feature(X_scaled):
    pca = PCA(n_components=100, svd_solver='randomized', random_state=42)
    X_pca = pca.fit_transform(X_scaled)
    return X_pca

Creating test train split for the data

In [None]:
from sklearn.model_selection import train_test_split

def stratified_split(features, labels):
    return train_test_split(features, labels, test_size=0.3, random_state=42, stratify=labels)

X_train_orig, X_test_orig, y_train_orig, y_test_orig = stratified_split(apply_pca_to_reduce_feature(features_original), labels_enc)
X_train_median, X_test_median, y_train_median, y_test_median = stratified_split(apply_pca_to_reduce_feature(features_median), labels_enc)
X_train_top, X_test_top, y_train_top, y_test_top = stratified_split(apply_pca_to_reduce_feature(features_top), labels_enc)
X_train_black, X_test_black, y_train_black, y_test_black = stratified_split(apply_pca_to_reduce_feature(features_black), labels_enc)
X_train_wiener, X_test_wiener, y_train_wiener, y_test_wiener = stratified_split(apply_pca_to_reduce_feature(features_wiener), labels_enc)
X_train_imadjust, X_test_imadjust, y_train_imadjust, y_test_imadjust = stratified_split(apply_pca_to_reduce_feature(features_imadjust), labels_enc)


Creating a dictionary to maintain information about the different Scaling techinques


In [None]:

scalers = {
    "Standard": StandardScaler(),
    "MinMax": MinMaxScaler(),
    "Robust": RobustScaler()
}


Defining KNN classifier for different distance metrics distances=["euclidean","manhattan","chebyshev","minkowski","cosine","correlation","canberra","braycurtis"] and [1,3,5,7] neighbors

In [None]:
def evaluate_knn(X_train, X_test, y_train, y_test, scalers, metrics,distances):
    results = []

    for scaler_name, scaler in scalers.items():
        # Fit and transform data
        X_train_scaled = scaler.fit_transform(X_train)
        X_test_scaled = scaler.transform(X_test)

        for ne in [1,3,5,7]:
            for metric in distances:
                knn = KNeighborsClassifier(n_neighbors=ne,n_jobs=1,metric=metric)
                knn.fit(X_train_scaled, y_train)
                y_pred = knn.predict(X_test_scaled)
                acc = accuracy_score(y_test, y_pred)
    
                results.append({
                    "Scaler": scaler_name,
                    "Distance": metric,
                    "Accuracy": acc,
                    "Neighbors":ne
                })

    return results


Computing 6 results vectors each corresponds to respective filters

In [None]:
results_orig = evaluate_knn(X_train_orig, X_test_orig, y_train_orig, y_test_orig, scalers, metrics,distances)
results_median = evaluate_knn(X_train_median, X_test_median, y_train_median, y_test_median, scalers, metrics,distances)
results_black = evaluate_knn(X_train_black, X_test_black, y_train_black, y_test_black, scalers, metrics,distances)
results_top = evaluate_knn(X_train_top, X_test_top, y_train_top, y_test_top, scalers, metrics,distances)
results_wiener = evaluate_knn(X_train_wiener, X_test_wiener, y_train_wiener, y_test_wiener, scalers, metrics,distances)
results_imadjust = evaluate_knn(X_train_imadjust, X_test_imadjust, y_train_imadjust, y_test_imadjust, scalers, metrics,distances)

Writing a function to extract measure metrics for the above results

In [None]:
def evaluate_knn_multiclass_roc(X_train, X_test, y_train, y_test, scalers, metrics, distances):
    classes = np.unique(y_train)
    y_test_bin = label_binarize(y_test, classes=classes)

    results = []
    roc_data = {}
    all_preds = {}

    for scaler_name, scaler in scalers.items():
        X_train_scaled = scaler.fit_transform(X_train)
        X_test_scaled = scaler.transform(X_test)

        for ne in [1, 3, 5, 7]:
            for metric in distances:
                knn = KNeighborsClassifier(n_neighbors=ne, metric=metric, n_jobs=1)
                knn.fit(X_train_scaled, y_train)

                y_prob = knn.predict_proba(X_test_scaled)
                y_pred = knn.predict(X_test_scaled)

                acc = accuracy_score(y_test, y_pred)

                fpr = dict()
                tpr = dict()
                roc_auc = dict()
                for i, cls in enumerate(classes):
                    fpr[i], tpr[i], _ = roc_curve(y_test_bin[:, i], y_prob[:, i])
                    roc_auc[i] = auc(fpr[i], tpr[i])

                fpr["micro"], tpr["micro"], _ = roc_curve(y_test_bin.ravel(), y_prob.ravel())
                roc_auc["micro"] = auc(fpr["micro"], tpr["micro"])

                results.append({
                    "Scaler": scaler_name,
                    "Distance": metric,
                    "Neighbors": ne,
                    "Accuracy": acc,
                    "AUC_micro": roc_auc["micro"],
                    **{f"AUC_class_{cls}": roc_auc[i] for i, cls in enumerate(classes)}
                })

                key = (scaler_name, metric, ne)
                roc_data[key] = {
                    "fpr": fpr,
                    "tpr": tpr,
                    "roc_auc": roc_auc,
                    "classes": classes
                }

                all_preds[key] = {
                    "y_test": y_test,
                    "y_pred": y_pred
                }

    return results, roc_data, all_preds


Generating all the resukts, predictions and ROC-AUC data

In [None]:
results_orig,roc_data_orig,all_preds_orig = evaluate_knn_multiclass_roc(X_train_orig, X_test_orig, y_train_orig, y_test_orig, scalers, metrics,distances)
results_median,roc_data_median,all_preds_median = evaluate_knn_multiclass_roc(X_train_median, X_test_median, y_train_median, y_test_median, scalers, metrics,distances)
results_top,roc_data_top,all_preds_top = evaluate_knn_multiclass_roc(X_train_top, X_test_top, y_train_top, y_test_top, scalers, metrics,distances)
results_black,roc_data_black,all_preds_black = evaluate_knn_multiclass_roc(X_train_black, X_test_black, y_train_black, y_test_black, scalers, metrics,distances)
results_imadjust,roc_data_imadjust,all_preds_imadjust = evaluate_knn_multiclass_roc(X_train_imadjust, X_test_imadjust, y_train_imadjust, y_test_imadjust, scalers, metrics,distances)
results_wiener,roc_data_wiener,all_preds_wiener = evaluate_knn_multiclass_roc(X_train_wiener, X_test_wiener, y_train_wiener, y_test_wiener, scalers, metrics,distances)


Generating the pandas data csv files for each of the scaler methods containing information about fiters, values of K , distance metrics and there AUC scores

In [None]:
# Example filter names and results from evaluate_knn_multiclass_roc
filters = ['Original', 'Median', 'Tophat', 'Blackhat', 'Imadjust', 'Wiener']
results_all = [results_orig, results_median, results_top, results_black, results_imadjust, results_wiener]

# Dictionary to collect results per scaler
scaler_tables = {}

for filter_name, results in zip(filters, results_all):
    for row in results:
        scaler_name = row['Scaler']

        # Build base row
        row_data = {
            "Filter": filter_name,
            "Distance": row["Distance"],
            "K": row["Neighbors"],
            "Accuracy": round(row["Accuracy"], 4),
            "AUC_micro": round(row["AUC_micro"], 4),
        }

        # Add class-wise AUCs
        for key in row:
            if key.startswith("AUC_class_"):
                class_label = key.replace("AUC_class_", "")
                row_data[f"AUC_{class_label}"] = round(row[key], 4)

        # Store by scaler
        if scaler_name not in scaler_tables:
            scaler_tables[scaler_name] = []
        scaler_tables[scaler_name].append(row_data)

# Convert and save each scaler's table
for scaler_name, rows in scaler_tables.items():
    df = pd.DataFrame(rows)
    df_sorted = df.sort_values(by=["Filter", "K"])
    
    filename = f"knn_results_{scaler_name.lower()}.csv"
    df_sorted.to_csv(filename, index=False)
    print(f"[Saved] {filename}")


Generating method for creating confusion metrics for each filter type for each type of scaling

In [None]:
filters = ['Original', 'Median', 'Black hat', 'top hat', 'imadjust', 'wiener']
class_labels = ["COVID-19", "Normal", "Pneumonia"]

# Ensure root directory exists
#os.makedirs('/kaggle/working/confusion_matrix', exist_ok=True)

for fil in filters:
    # Create directory for the filter
    #filter_dir = os.path.join('/kaggle/working/confusion_matrix', fil.replace(" ", "_"))
    #os.makedirs(filter_dir, exist_ok=True)

    for scaler_name in scalers:
        for dist in distances:
            key = (scaler_name, dist.lower(), 5)

            # Get predictions and labels for the given filter
            if fil == "Original":
                y_test = all_preds_orig[key]['y_test']
                y_pred = all_preds_orig[key]['y_pred']
            elif fil == "Median":
                y_test = all_preds_median[key]['y_test']
                y_pred = all_preds_median[key]['y_pred']
            elif fil == "Black hat":
                y_test = all_preds_black[key]['y_test']
                y_pred = all_preds_black[key]['y_pred']
            elif fil == "top hat":
                y_test = all_preds_top[key]['y_test']
                y_pred = all_preds_top[key]['y_pred']
            elif fil == "imadjust":
                y_test = all_preds_imadjust[key]['y_test']
                y_pred = all_preds_imadjust[key]['y_pred']
            elif fil == "wiener":
                y_test = all_preds_wiener[key]['y_test']
                y_pred = all_preds_wiener[key]['y_pred']
            else:
                continue  # Unknown filter, skip

            # Compute and display confusion matrix
            cm = confusion_matrix(y_test, y_pred, labels=class_labels)
            disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=class_labels)

            fig, ax = plt.subplots(figsize=(6, 6))
            disp.plot(ax=ax, cmap='Blues', values_format='d')
            plt.title(f'Confusion Matrix: {fil} - {scaler_name} - {dist} - k=5')
            plt.grid(False)

            # Save the figure
            # filename = f'{scaler_name}_{dist}_k5.png'
            # save_path = os.path.join(filter_dir, filename)
            # plt.savefig(save_path, dpi=300, bbox_inches='tight')
            # plt.close()  # Prevent display in notebook

            #print(f"[Saved] {save_path}")


Plot function to display the ROC-AUC curve for the results

In [None]:

def plot_comparative_roc(roc_data_list, labels, scaler='StandardScaler', distance='euclidean', neighbors=7, class_to_plot=0):
    """
    roc_data_list: list of roc_data dictionaries from evaluate_knn_multiclass_roc
    labels: list of labels for the datasets (e.g. ['Original', 'Median Filtered', 'NLM Filtered'])
    scaler: scaler name to filter plots by
    distance: distance metric to filter plots by
    neighbors: number of neighbors to filter plots by
    class_to_plot: index of class to plot ROC for (default: first class)
    """
    plt.figure(figsize=(8, 6))
    
    for roc_data, label in zip(roc_data_list, labels):
        key = (scaler, distance,7)
        if key not in roc_data:
            print(f"Warning: Key {key} not found in roc_data")
            continue

        data = roc_data[key]
        fpr = data['fpr']
        tpr = data['tpr']
        roc_auc = data['roc_auc']
        print("LABLEL",label)

        # Plot ROC curve for the selected class
        plt.plot(fpr[class_to_plot], tpr[class_to_plot], lw=2, label=f'{label} (AUC = {roc_auc[class_to_plot]:.3f})')

    plt.plot([0, 1], [0, 1], color='gray', lw=1, linestyle='--')
    plt.xlim([0.0, 1.0])
    plt.ylim([0.0, 1.05])
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title(f'ROC Curve Comparison (Class {class_to_plot}, {scaler}, {distance})')
    plt.legend(loc='lower right')
    
    filename = f'class{class_to_plot}_{scaler}_{distance}_k{neighbors}.png'
    save_path = os.path.join("/kaggle/working/output_plots", filename)
    plt.savefig(save_path, dpi=300, bbox_inches='tight')
    print(f"[Saved] {save_path}")
    plt.show()

    plt.close()  # Close the plot to free memory



Creating ROC-AUC curves for each types of scaling and each type of distance metrics used

In [None]:
scalers = ['Standard','Robust','MinMax']
roc_data_list = [roc_data_orig, roc_data_median, roc_data_top,roc_data_black,roc_data_imadjust,roc_data_wiener]
labels = ['Original', 'Median Filtered', 'Top Filtered','Black','Imadjust','Wiener']


for i in range(3):
    for j in range(len(distances)):
        plot_comparative_roc(roc_data_list,labels,scaler=scalers[i],distance=distances[j],neighbors=5,class_to_plot=0)

GRad IO (UI design)

In [None]:
import os
import numpy as np
import torch
import cv2
from PIL import Image
from tqdm import tqdm
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler
import torchvision.models as models
import torchvision.transforms as transforms

# ===== 1. Load Pretrained ResNet and Remove FC Layer =====
resnet = models.resnet50(pretrained=True)
resnet = torch.nn.Sequential(*list(resnet.children())[:-1])  # Remove final classification layer
resnet.eval()

# ===== 2. Define Image Transform Pipeline for ResNet =====
transform = transforms.Compose([
    transforms.ToPILImage(),
    transforms.Resize((224, 224)),
    transforms.ToTensor()
])

# ===== 3. Define imadjust-style Contrast Enhancement =====
def imadjust(img):
    img = cv2.normalize(img, None, alpha=0, beta=255, norm_type=cv2.NORM_MINMAX)
    return img.astype(np.uint8)

# ===== 4. Feature Extraction Function =====
def extract_features(image_path):
    img = Image.open(image_path).convert('L')  # grayscale
    img = np.array(img)
    img = imadjust(img)

    # Convert grayscale to 3 channels by stacking
    img = np.stack([img]*3, axis=-1)  # shape: (H, W, 3)

    img = transform(img).unsqueeze(0)  # shape: (1, 3, 224, 224)

    with torch.no_grad():
        features = resnet(img).squeeze().numpy()  # shape: (2048,)
    return features

# ===== 5. Build Dataset from Folders =====
data_dir = "/kaggle/input/chest-xray-covid19-pneumonia/Data/train"  # update if your dataset path is different
labels = []
features = []

for label in os.listdir(data_dir):
    class_dir = os.path.join(data_dir, label)
    if not os.path.isdir(class_dir):
        continue
    for file in tqdm(os.listdir(class_dir), desc=f"Processing {label}"):
        path = os.path.join(class_dir, file)
        try:
            feat = extract_features(path)
            features.append(feat)
            labels.append(label)
        except Exception as e:
            print(f"❌ Error processing {path}: {e}")

features = np.array(features)
labels = np.array(labels)

# ===== 6. PCA and Scaling =====
pca = PCA(n_components=100, svd_solver='randomized', random_state=42)
features_pca = pca.fit_transform(features)

scaler = StandardScaler()
features_scaled = scaler.fit_transform(features_pca)

# ===== 7. Save Dataset and Transformers =====
np.savez("xray_knn_dataset.npz", X=features_scaled, y=labels)
np.save("pca_components.npy", pca.components_)
np.save("scaler_mean_std.npy", np.vstack([scaler.mean_, scaler.scale_]))

print("✅ Saved features to 'xray_knn_dataset.npz'")


Link for our kaggle notebook

https://www.kaggle.com/code/imdevml/effect-of-noise-filtering-feature-scaling