In [None]:
import random
import numpy as np
import pandas as pd
import os
import cv2
from skimage import exposure, filters
from google.colab import drive
from sklearn.cluster import KMeans
from sklearn.decomposition import PCA
from sklearn.model_selection import train_test_split
import tensorflow as tf

# Mount Google Drive
drive.mount('/content/drive')

# Paths
image_folder = '/content/drive/My Drive/IMAGE CLASSIFICATION PNEUMONIA/'
csv_file_path = '/content/drive/My Drive/IMAGE CLASSIFICATION PNEUMONIA/Images Label.csv'

# Load and preprocess images
def preprocess_image(image_path, target_size=(224, 224)):
    img = cv2.imread(image_path, cv2.IMREAD_GRAYSCALE)  # Convert to grayscale
    if img is None:
        return None

    img = cv2.resize(img, target_size)

    # Apply CLAHE
    clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8))
    img = clahe.apply(img)

    # Apply Contrast Enhancement (Gamma Correction)
    gamma = 1.2  # Tunable hyperparameter
    img = exposure.adjust_gamma(img, gamma)

    # Apply Otsu's Thresholding
    threshold = filters.threshold_otsu(img)
    img = (img > threshold).astype(np.uint8) * 255

    return img

# Load dataset
def load_images_from_csv(csv_df, image_folder):
    images = []
    labels = []
    image_paths = [os.path.join(root, file) for root, _, files in os.walk(image_folder) for file in files if file.endswith(('.png', '.jpg', '.jpeg'))]

    for _, row in csv_df.iterrows():
        image_name = row['Image Index']
        label = row['Label']
        matching_path = next((path for path in image_paths if os.path.basename(path) == image_name), None)

        if matching_path:
            preprocessed_img = preprocess_image(matching_path)
            if preprocessed_img is not None:
                images.append(preprocessed_img)
                labels.append(label)

    return np.array(images), np.array(labels)

# Load images and labels
labels_df = pd.read_csv(csv_file_path)
labels_df = labels_df[labels_df['Finding Labels'].str.contains('Pneumonia', case=False, na=False)]
images, labels = load_images_from_csv(labels_df, image_folder)

# Feature Extraction using PCA
def extract_features(images, n_components=100):
    images_flattened = images.reshape(images.shape[0], -1)  # Flatten images
    pca = PCA(n_components=n_components)  # Reduce to 100 principal components
    images_pca = pca.fit_transform(images_flattened)
    return images_pca

features = extract_features(images, n_components=100)
labels = labels.ravel()  # Ensure labels are 1D

print("New feature shape:", features.shape)
print("Labels shape:", labels.shape)

# Step 1: Clustering to create Local Regions
n_clusters = 3  # Choose based on data complexity
kmeans = KMeans(n_clusters=n_clusters, random_state=42)
cluster_labels = kmeans.fit_predict(features)

# Step 2: Train a model per cluster
def create_model(input_shape=(100,), dropout_rate=0.3, learning_rate=1e-3):
    inputs = tf.keras.Input(shape=input_shape)

    x = tf.keras.layers.Dense(512)(inputs)
    x = tf.keras.layers.LeakyReLU(alpha=0.01)(x)
    x = tf.keras.layers.BatchNormalization()(x)
    x = tf.keras.layers.Dropout(dropout_rate)(x)

    x = tf.keras.layers.Dense(256)(x)
    x = tf.keras.layers.LeakyReLU(alpha=0.01)(x)
    x = tf.keras.layers.BatchNormalization()(x)
    x = tf.keras.layers.Dropout(dropout_rate)(x)

    x = tf.keras.layers.Dense(128)(x)
    x = tf.keras.layers.LeakyReLU(alpha=0.01)(x)
    x = tf.keras.layers.BatchNormalization()(x)
    x = tf.keras.layers.Dropout(dropout_rate)(x)

    output = tf.keras.layers.Dense(1, activation='sigmoid')(x)

    model = tf.keras.Model(inputs=inputs, outputs=output)
    model.compile(loss="binary_crossentropy",
                  optimizer=tf.keras.optimizers.Adam(learning_rate=learning_rate),
                  metrics=["accuracy", tf.keras.metrics.Precision(), tf.keras.metrics.Recall()])
    return model

# Train separate models for each cluster
local_models = {}

for cluster_id in range(n_clusters):
    print(f"Training model for cluster {cluster_id}...")

    # Extract subset
    cluster_indices = np.where(cluster_labels == cluster_id)[0]
    X_cluster = features[cluster_indices]
    y_cluster = labels[cluster_indices]

    # Train-test split
    X_train, X_test, y_train, y_test = train_test_split(X_cluster, y_cluster, test_size=0.2, random_state=42)

    # Train model
    model = create_model(input_shape=(100,))
    model.fit(X_train, y_train, epochs=30, batch_size=32, verbose=0)

    # Store trained model
    local_models[cluster_id] = model

print("Training for all clusters complete.")

# Step 3: Inference - Dynamic Model Selection
def predict_with_ensemble(test_features):
    cluster_assignments = kmeans.predict(test_features)
    predictions = []

    for i, cluster_id in enumerate(cluster_assignments):
        model = local_models[cluster_id]
        y_pred_prob = model.predict(test_features[i].reshape(1, -1))
        y_pred = (y_pred_prob > 0.5).astype(int)
        predictions.append(y_pred[0])

    return np.array(predictions)

# Evaluate final ensemble model
y_pred = predict_with_ensemble(features)
precision = precision_score(labels, y_pred, zero_division=1)
recall = recall_score(labels, y_pred, zero_division=1)
f1 = f1_score(labels, y_pred, zero_division=1)

print(f"Final Model Precision: {precision:.4f}")
print(f"Final Model Recall: {recall:.4f}")
print(f"Final Model F1-Score: {f1:.4f}")
