<a href="https://colab.research.google.com/github/2303A52397/EXPLAINABLE-AI-2397/blob/main/EAI_ASS_3_2397.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# air_quality_lime.py
import numpy as np
import pandas as pd
import json
from pathlib import Path
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, roc_auc_score, confusion_matrix
from sklearn.linear_model import Ridge
from sklearn.preprocessing import StandardScaler


# ---------- Step 1: Make synthetic Air Quality dataset ----------
def make_dataset(n=2500, seed=7):
    rng = np.random.default_rng(seed)

    def sample_truncated_normal(mean, sd, low, high, size):
        x = rng.normal(mean, sd, size)
        return np.clip(x, low, high)

    data = pd.DataFrame({
        "PM2_5": sample_truncated_normal(55, 35, 5, 250, n),
        "PM10":  sample_truncated_normal(85, 45, 10, 400, n),
        "NO2":   sample_truncated_normal(45, 25, 5, 200, n),
        "SO2":   sample_truncated_normal(25, 20, 2, 200, n),
        "CO":    sample_truncated_normal(1.2, 0.8, 0.1, 10, n),
        "O3":    sample_truncated_normal(70, 35, 5, 300, n)
    })

    thresholds = {"PM2_5": 60, "PM10": 100, "NO2": 80,
                  "SO2": 80, "CO": 2, "O3": 100}

    def label_row(row):
        for k, t in thresholds.items():
            if row[k] > t:
                return 1
        return 0

    data["unsafe"] = data.apply(label_row, axis=1)
    return data


# ---------- Step 2: LIME-like local explanation ----------
def lime_like_explain(model, x0, X_train, feature_names,
                      num_samples=2000, kernel_width=None,
                      ridge_alpha=1.0, random_state=42):
    rng_local = np.random.default_rng(random_state)
    x0 = np.array(x0).reshape(1, -1)
    d = x0.shape[1]

    scaler = StandardScaler().fit(X_train)
    x0_scaled = scaler.transform(x0)

    if kernel_width is None:
        kernel_width = np.sqrt(d)

    # perturbations in scaled space
    Z_scaled = x0_scaled + rng_local.normal(0, 1, size=(num_samples, d))
    Z = scaler.inverse_transform(Z_scaled)

    # RBF kernel weights
    distances = np.linalg.norm(Z_scaled - x0_scaled, axis=1)
    weights = np.exp(-(distances*2) / (kernel_width*2))

    # Model predictions
    p = model.predict_proba(Z)[:, 1]

    # Weighted Ridge surrogate
    ridge = Ridge(alpha=ridge_alpha, fit_intercept=True)
    ridge.fit(Z, p, sample_weight=weights)

    coefs = ridge.coef_
    baseline = Z.mean(axis=0)
    contributions = coefs * (x0.flatten() - baseline)

    results = pd.DataFrame({
        "feature": feature_names,
        "value": x0.flatten(),
        "baseline": baseline,
        "coef": coefs,
        "contribution": contributions
    }).sort_values("contribution", key=np.abs,
                   ascending=False).reset_index(drop=True)

    pred_prob = float(model.predict_proba(x0)[0, 1])
    pred_label = int(pred_prob >= 0.5)
    return results, pred_prob, pred_label


# ---------- Step 3: Train model & evaluate ----------
def main():
    OUTPUT_DIR = Path("./outputs")
    OUTPUT_DIR.mkdir(parents=True, exist_ok=True)

    data = make_dataset()
    X = data[["PM2_5", "PM10", "NO2", "SO2", "CO", "O3"]].values
    y = data["unsafe"].values

    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.25, random_state=42, stratify=y)

    model = RandomForestClassifier(
        n_estimators=300, min_samples_split=4, min_samples_leaf=2,
        random_state=42, n_jobs=-1)
    model.fit(X_train, y_train)

    y_pred = model.predict(X_test)
    y_prob = model.predict_proba(X_test)[:, 1]
    acc = accuracy_score(y_test, y_pred)
    auc = roc_auc_score(y_test, y_prob)
    cm = confusion_matrix(y_test, y_pred)

    # Save metrics
    with open(OUTPUT_DIR / "metrics.json", "w") as f:
        json.dump({"accuracy": acc, "roc_auc": auc,
                   "confusion_matrix": cm.tolist()}, f, indent=2)

    # ---------- Step 4: Example explanations ----------
    feature_names = ["PM2_5", "PM10", "NO2", "SO2", "CO", "O3"]

    def plot_contributions(df, p, title, outpath):
        top = df.head(6)
        plt.figure(figsize=(8, 5))
        plt.barh(top["feature"], top["contribution"])
        plt.gca().invert_yaxis()
        plt.xlabel("Contribution to Unsafe Probability")
        plt.title(title + f"\nPred Unsafe Probability: {p:.2f}")
        plt.tight_layout()
        plt.savefig(outpath)
        plt.close()

    for idx in [5, 27, 111]:
        if idx >= len(X_test):
            continue
        res, p, lbl = lime_like_explain(
            model, X_test[idx], X_train, feature_names,
            num_samples=2500, ridge_alpha=0.5, random_state=idx+1)
        plot_contributions(res, p, f"LIME-like Explanation #{idx}",
                           OUTPUT_DIR / f"lime_explanation_{idx}.png")

    print("Done. Check ./outputs folder for dataset, metrics, plots, and report.")


if __name__ == "__main__":
    main()

Done. Check ./outputs folder for dataset, metrics, plots, and report.


In [2]:
%pip install lime

Collecting lime
  Downloading lime-0.2.0.1.tar.gz (275 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/275.7 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m[90m━━━━━━━━━━[0m [32m204.8/275.7 kB[0m [31m6.2 MB/s[0m eta [36m0:00:01[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m275.7/275.7 kB[0m [31m5.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: lime
  Building wheel for lime (setup.py) ... [?25l[?25hdone
  Created wheel for lime: filename=lime-0.2.0.1-py3-none-any.whl size=283834 sha256=cd99d00d7929777509b4339cde276c8c761cc21b31551f8b5c965053a6ac6117
  Stored in directory: /root/.cache/pip/wheels/e7/5d/0e/4b4fff9a47468fed5633211fb3b76d1db43fe806a17fb7486a
Successfully built lime
Installing collected packages: lime
Successfully installed lime-0.2.0.1


In [3]:
# mnist_lime.py
import numpy as np
import matplotlib.pyplot as plt
import json
from pathlib import Path
from sklearn.metrics import accuracy_score
from lime import lime_image
from skimage.segmentation import mark_boundaries

import tensorflow as tf
from tensorflow.keras import layers, models, datasets

# ---------------- Setup ----------------
OUTPUT_DIR = Path("./mnist_outputs")
OUTPUT_DIR.mkdir(exist_ok=True)

# ---------------- 1. Load MNIST ----------------
(x_train, y_train), (x_test, y_test) = datasets.mnist.load_data()
x_train = x_train.astype("float32") / 255.0
x_test = x_test.astype("float32") / 255.0

# add channel dimension (CNN expects 28x28x1)
x_train = np.expand_dims(x_train, -1)
x_test = np.expand_dims(x_test, -1)

# ---------------- 2. Build CNN ----------------
model = models.Sequential([
    layers.Conv2D(32, (3,3), activation="relu", input_shape=(28,28,1)),
    layers.MaxPooling2D((2,2)),
    layers.Conv2D(64, (3,3), activation="relu"),
    layers.MaxPooling2D((2,2)),
    layers.Flatten(),
    layers.Dense(64, activation="relu"),
    layers.Dense(10, activation="softmax")
])

model.compile(optimizer="adam",
              loss="sparse_categorical_crossentropy",
              metrics=["accuracy"])

# train briefly (keep epochs low for speed)
model.fit(x_train, y_train, epochs=2, batch_size=128, validation_split=0.1, verbose=2)

# ---------------- 3. Evaluate ----------------
y_pred = model.predict(x_test, verbose=0).argmax(axis=1)
acc = accuracy_score(y_test, y_pred)

metrics = {"accuracy": acc}
with open(OUTPUT_DIR / "metrics.json", "w") as f:
    json.dump(metrics, f, indent=2)
print("Test accuracy:", acc)

# ---------------- 4. LIME Explanations ----------------
explainer = lime_image.LimeImageExplainer()

def predict_fn(images):
    # images shape: (n, 28, 28, 3) expected, but model expects (n,28,28,1)
    gray = np.mean(images, axis=-1, keepdims=True)  # convert back to 1-channel
    return model.predict(gray, verbose=0)

# pick some test samples
sample_indices = [0, 1, 2, 100, 555]
for idx in sample_indices:
    image = x_test[idx]
    label = y_test[idx]

    explanation = explainer.explain_instance(
        image.squeeze(),
        predict_fn,
        top_labels=1,
        hide_color=0,
        num_samples=1000
    )

    temp, mask = explanation.get_image_and_mask(
        explanation.top_labels[0],
        positive_only=True,
        num_features=10,
        hide_rest=False
    )

    # Save visualization
    fig, ax = plt.subplots(1, 2, figsize=(6,3))
    ax[0].imshow(image.squeeze(), cmap="gray")
    ax[0].set_title(f"True: {label}, Pred: {y_pred[idx]}")
    ax[0].axis("off")

    ax[1].imshow(mark_boundaries(temp, mask))
    ax[1].set_title("LIME Highlighted Pixels")
    ax[1].axis("off")

    plt.tight_layout()
    plt.savefig(OUTPUT_DIR / f"lime_explanation_{idx}.png")
    plt.close()

print("Explanations saved in:", OUTPUT_DIR)

Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz
[1m11490434/11490434[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 0us/step


  super().__init__(activity_regularizer=activity_regularizer, **kwargs)


Epoch 1/2
422/422 - 40s - 95ms/step - accuracy: 0.9225 - loss: 0.2578 - val_accuracy: 0.9785 - val_loss: 0.0741
Epoch 2/2
422/422 - 41s - 98ms/step - accuracy: 0.9792 - loss: 0.0691 - val_accuracy: 0.9855 - val_loss: 0.0525
Test accuracy: 0.9828


  0%|          | 0/1000 [00:00<?, ?it/s]

  0%|          | 0/1000 [00:00<?, ?it/s]

  0%|          | 0/1000 [00:00<?, ?it/s]

  0%|          | 0/1000 [00:00<?, ?it/s]

  0%|          | 0/1000 [00:00<?, ?it/s]

Explanations saved in: mnist_outputs
