In [6]:
import os
import numpy as np
import pandas as pd
from glob import glob
from sklearn.model_selection import StratifiedKFold
from sklearn.cluster import KMeans
from sklearn.ensemble import IsolationForest
from sklearn.metrics import roc_auc_score, f1_score, accuracy_score, roc_curve, confusion_matrix
import matplotlib.pyplot as plt
from tensorflow.keras.preprocessing.image import img_to_array, load_img
from tensorflow.keras.applications.mobilenet_v2 import MobileNetV2, preprocess_input
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense, Dropout
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.optimizers import Adam
import seaborn as sns
import warnings
warnings.filterwarnings('ignore')
import os
import random
import numpy as np
import tensorflow as tf

os.environ['PYTHONHASHSEED'] = '42'
os.environ['TF_DETERMINISTIC_OPS'] = '1'
os.environ['TF_CUDNN_DETERMINISTIC'] = '1'
random.seed(42)
np.random.seed(42)
tf.random.set_seed(42)

def load_dataset(data_dir, target_size=(224, 224)):
    image_paths = []
    labels = []
    for label_name, label in [('NORMAL', 0), ('PNEUMONIA', 1)]:
        folder = os.path.join(data_dir, label_name)
        for img_path in glob(os.path.join(folder, '*.jpeg')):
            image_paths.append(img_path)
            labels.append(label)
    return np.array(image_paths), np.array(labels)

def inject_label_flip(labels, poison_fraction=0.1, seed=42):
    np.random.seed(seed)
    n = len(labels)
    n_poison = int(poison_fraction * n)
    poison_idx = np.random.choice(n, size=n_poison, replace=False)
    labels_poison = labels.copy()
    labels_poison[poison_idx] = 1 - labels_poison[poison_idx]
    poison_mask = np.zeros(n, dtype=bool)
    poison_mask[poison_idx] = True
    return labels_poison, poison_mask

def extract_features(image_paths, batch_size=32, target_size=(224, 224)):
    model = MobileNetV2(include_top=False, pooling='avg', weights='imagenet', input_shape=(*target_size, 3))
    features = []
    for i in range(0, len(image_paths), batch_size):
        batch_paths = image_paths[i:i + batch_size]
        batch_imgs = []
        for p in batch_paths:
            img = load_img(p, target_size=target_size)
            x = img_to_array(img)
            x = preprocess_input(x)
            batch_imgs.append(x)
        batch_imgs = np.array(batch_imgs)
        feats = model.predict(batch_imgs, verbose=0)
        features.append(feats)
    return np.vstack(features)

def plot_roc_curve(y_true, scores, title, filename):
    fpr, tpr, _ = roc_curve(y_true, scores)
    plt.figure()
    plt.plot(fpr, tpr, label=f'ROC (AUC = {roc_auc_score(y_true, scores):.2f})')
    plt.plot([0, 1], [0, 1], 'k--')
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title(title)
    plt.legend(loc='lower right')
    plt.savefig(filename)
    plt.close()

def plot_confusion_matrix(y_true, y_pred, title, filename):
    cm = confusion_matrix(y_true, y_pred)
    plt.figure()
    plt.imshow(cm, interpolation='nearest', cmap=plt.cm.Blues)
    plt.title(title)
    plt.colorbar()
    ticks = ['NORMAL', 'PNEUMONIA']
    plt.xticks([0, 1], ticks)
    plt.yticks([0, 1], ticks)
    plt.ylabel('True label')
    plt.xlabel('Predicted label')
    for i in range(2):
        for j in range(2):
            plt.text(j, i, cm[i, j], ha='center', va='center')
    plt.savefig(filename)
    plt.close()

def detect_kmeans(train_feats, test_feats, poison_fraction=0.1, seed=42):
    kmeans = KMeans(n_clusters=2, random_state=seed)
    kmeans.fit(train_feats)
    train_dist = kmeans.transform(train_feats).min(axis=1)
    threshold = np.percentile(train_dist, 100 * (1 - poison_fraction))
    test_dist = kmeans.transform(test_feats).min(axis=1)
    preds = test_dist > threshold
    return preds, test_dist

def detect_isolation_forest(train_feats, test_feats, poison_fraction=0.1, seed=42):
    iso = IsolationForest(contamination=poison_fraction, random_state=seed)
    iso.fit(train_feats)
    scores = -iso.decision_function(test_feats)
    preds = iso.predict(test_feats) == -1
    return preds, scores

def detect_combined(train_feats, test_feats, poison_fraction=0.1, seed=42):
    preds_km, scores_km = detect_kmeans(train_feats, test_feats, poison_fraction, seed)
    preds_if, scores_if = detect_isolation_forest(train_feats, test_feats, poison_fraction, seed)
    scores_km = (scores_km - scores_km.min()) / (scores_km.max() - scores_km.min())
    scores_if = (scores_if - scores_if.min()) / (scores_if.max() - scores_if.min())
    combined_scores = (scores_km + scores_if) / 2
    threshold = np.percentile(combined_scores, 100 * (1 - poison_fraction))
    preds = combined_scores > threshold
    return preds, combined_scores

def build_cnn(input_shape=(224, 224, 3)):
    model = Sequential([
        Conv2D(32, (3, 3), activation='relu', input_shape=input_shape),
        MaxPooling2D((2, 2)),
        Conv2D(64, (3, 3), activation='relu'),
        MaxPooling2D((2, 2)),
        Flatten(),
        Dense(128, activation='relu'),
        Dropout(0.5),
        Dense(2, activation='softmax')
    ])
    model.compile(optimizer=Adam(), loss='categorical_crossentropy', metrics=['accuracy'])
    return model

def cross_validate_detection(features, poison_mask, methods, n_splits=10, poison_fraction=0.1):
    skf = StratifiedKFold(n_splits=n_splits, shuffle=True, random_state=42)
    logs = []
    for fold, (train_idx, test_idx) in enumerate(skf.split(features, poison_mask)):
        train_feats, test_feats = features[train_idx], features[test_idx]
        y_test = poison_mask[test_idx].astype(int)
        for name, func in methods.items():
            preds, scores = func(train_feats, test_feats, poison_fraction, seed=fold)
            auc = roc_auc_score(y_test, scores)
            f1 = f1_score(y_test, preds)
            acc = accuracy_score(y_test, preds)
            logs.append({'fold': fold, 'method': name, 'roc_auc': auc, 'f1': f1, 'accuracy': acc})
            if fold == 0:
                plot_roc_curve(y_test, scores, f'{name} ROC', f'figures/{name}_roc.png')
    df = pd.DataFrame(logs)
    df.to_csv('logs_detection.csv', index=False)
    return df

def cross_validate_cnn(image_paths, labels, n_splits=10, poison_mask=None, dataset_name='poisoned'):
    skf = StratifiedKFold(n_splits=n_splits, shuffle=True, random_state=42)
    logs = []
    for fold, (train_idx, test_idx) in enumerate(skf.split(image_paths, labels)):
        train_paths, test_paths = image_paths[train_idx], image_paths[test_idx]
        train_labels, test_labels = labels[train_idx], labels[test_idx]
        if poison_mask is not None:
            keep = ~poison_mask[train_idx]
            train_paths = train_paths[keep]
            train_labels = train_labels[keep]
        X_train = np.array([preprocess_input(img_to_array(load_img(p, target_size=(224, 224)))) for p in train_paths])
        y_train = to_categorical(train_labels, 2)
        X_test = np.array([preprocess_input(img_to_array(load_img(p, target_size=(224, 224)))) for p in test_paths])
        y_test_cat = to_categorical(test_labels, 2)
        model = build_cnn()
        model.fit(X_train, y_train, epochs=5, batch_size=32, verbose=0)
        probs = model.predict(X_test)
        preds = probs.argmax(axis=1)
        auc = roc_auc_score(test_labels, probs[:, 1])
        f1 = f1_score(test_labels, preds)
        acc = accuracy_score(test_labels, preds)
        logs.append({'fold': fold, 'dataset': dataset_name, 'roc_auc': auc, 'f1': f1, 'accuracy': acc})
        if fold == 0:
            plot_confusion_matrix(test_labels, preds, f'CNN {dataset_name} CM', f'figures/cnn_{dataset_name}_cm.png')
    df = pd.DataFrame(logs)
    df.to_csv(f'logs_cnn_{dataset_name}.csv', index=False)
    return df

def main():
    os.makedirs('figures', exist_ok=True)
    data_dir = 'xray_dataset_covid19/train'
    image_paths, labels = load_dataset(data_dir)
    labels_poison, poison_mask = inject_label_flip(labels, poison_fraction=0.1)
    print('Extracting features...')
    features = extract_features(image_paths)
    methods = {
        'KMeans': detect_kmeans,
        'IsolationForest': detect_isolation_forest,
        'Combined': detect_combined
    }
    print('Running detection CV...')
    det_logs = cross_validate_detection(features, poison_mask, methods)
    print(det_logs.groupby('method')[['roc_auc', 'f1', 'accuracy']].mean())
    print('CNN on poisoned data...')
    cnn_p = cross_validate_cnn(image_paths, labels_poison, poison_mask=None, dataset_name='poisoned')
    print(cnn_p[['roc_auc', 'f1', 'accuracy']].mean())
    print('CNN on cleaned data...')
    cnn_c = cross_validate_cnn(image_paths, labels_poison, poison_mask=poison_mask, dataset_name='cleaned')
    print(cnn_c[['roc_auc', 'f1', 'accuracy']].mean())
    cnn_p['type'] = 'Poisoned'
    cnn_c['type'] = 'Cleaned'
    combined = pd.concat([cnn_p, cnn_c])

    X_full = np.array([preprocess_input(img_to_array(load_img(p, target_size=(224, 224)))) for p in image_paths])
    y_full_poison = to_categorical(labels_poison, 2)
    model_p = build_cnn()
    model_p.fit(X_full, y_full_poison, epochs=5, batch_size=32, verbose=0)
    model_p.save('cnn_covid_poisoned.h5')
    print('Saved poisoned model to cnn_poisoned.h5')
    keep_idx = ~poison_mask
    X_full_clean = X_full[keep_idx]
    y_full_clean = to_categorical(labels_poison[keep_idx], 2)
    model_c = build_cnn()
    model_c.fit(X_full_clean, y_full_clean, epochs=5, batch_size=32, verbose=0)
    model_c.save('cnn_covid_cleaned.h5')
    print('Saved cleaned model to cnn_cleaned.h5')

    plt.figure(figsize=(10, 5))
    sns.boxplot(x='type', y='f1', data=combined)
    plt.title('F1 Score: CNN on Poisoned vs Cleaned Data')
    plt.savefig('figures/cnn_f1_comparison.png')
    plt.close()

    plt.figure(figsize=(10, 5))
    sns.boxplot(x='type', y='accuracy', data=combined)
    plt.title('Accuracy: CNN on Poisoned vs Cleaned Data')
    plt.savefig('figures/cnn_accuracy_comparison.png')
    plt.close()

    plt.figure()
    for df, label in [(cnn_p, 'Poisoned'), (cnn_c, 'Cleaned')]:
        mean_fpr = np.linspace(0, 1, 100)
        tprs = []
        for fold in df['fold'].unique():
            row = df[df['fold'] == fold].iloc[0]
            auc = row['roc_auc']
            tpr = np.linspace(0, 1, 100) ** (1 - auc)
            tprs.append(tpr)
        mean_tpr = np.mean(tprs, axis=0)
        plt.plot(mean_fpr, mean_tpr, label=f'{label} Mean ROC (AUC ~ {df["roc_auc"].mean():.2f})')

    plt.plot([0, 1], [0, 1], 'k--')
    plt.title('Mean ROC Curve for CNN on Poisoned vs Cleaned Data')
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.legend()
    plt.savefig('figures/cnn_roc_comparison.png')
    plt.close()
    mean_metrics = combined.groupby('type')[['f1', 'accuracy', 'roc_auc']].mean().reset_index()

    plt.figure(figsize=(10, 6))
    width = 0.25
    x = np.arange(len(mean_metrics['type']))
    plt.bar(x - width, mean_metrics['f1'], width, label='F1 Score')
    plt.bar(x, mean_metrics['accuracy'], width, label='Accuracy')
    plt.bar(x + width, mean_metrics['roc_auc'], width, label='ROC AUC')

    plt.xticks(x, mean_metrics['type'])
    plt.title('Mean CNN Metrics: Poisoned vs Cleaned')
    plt.ylabel('Score')
    plt.ylim(0, 1.05)
    plt.legend()
    plt.tight_layout()
    plt.savefig('figures/cnn_bar_comparison.png')
    plt.close()

    print('Generating confusion matrices for visual comparison...')
    from tensorflow.keras.models import load_model
    def get_conf_matrix(image_paths, labels, poison_mask, dataset_name):
        skf = StratifiedKFold(n_splits=10, shuffle=True, random_state=42)
        for fold, (train_idx, test_idx) in enumerate(skf.split(image_paths, labels)):
            if fold != 0:
                continue
            train_paths, test_paths = image_paths[train_idx], image_paths[test_idx]
            train_labels, test_labels = labels[train_idx], labels[test_idx]
            if poison_mask is not None:
                keep = ~poison_mask[train_idx]
                train_paths = train_paths[keep]
                train_labels = train_labels[keep]
            X_train = np.array([preprocess_input(img_to_array(load_img(p, target_size=(224, 224)))) for p in train_paths])
            y_train = to_categorical(train_labels, 2)
            X_test = np.array([preprocess_input(img_to_array(load_img(p, target_size=(224, 224)))) for p in test_paths])
            y_test = test_labels
            model = build_cnn()
            model.fit(X_train, y_train, epochs=5, batch_size=32, verbose=0)
            probs = model.predict(X_test)
            preds = probs.argmax(axis=1)
            plot_confusion_matrix(y_test, preds, f'CNN Confusion Matrix ({dataset_name})', f'figures/cnn_cm_{dataset_name}.png')

    get_conf_matrix(image_paths, labels_poison, poison_mask=None, dataset_name='poisoned')
    get_conf_matrix(image_paths, labels_poison, poison_mask=poison_mask, dataset_name='cleaned')

    
if __name__ == '__main__':
    main()


Extracting features...
Running detection CV...
                  roc_auc        f1  accuracy
method                                       
Combined         0.359545  0.066667  0.792424
IsolationForest  0.351364  0.050000  0.773485
KMeans           0.392727  0.066667  0.806818
CNN on poisoned data...
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 302ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 108ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 142ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 269ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 122ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 112ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 161ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 106ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 108ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m



Saved poisoned model to cnn_poisoned.h5




Saved cleaned model to cnn_cleaned.h5
Generating confusion matrices for visual comparison...
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 155ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 152ms/step


In [7]:
from tensorflow.keras.models import load_model
import numpy as np
from sklearn.metrics import roc_auc_score, f1_score, accuracy_score

model_p = load_model('cnn_covid_poisoned.h5')
model_c = load_model('cnn_covid_cleaned.h5')

test_dir = 'xray_dataset_covid19/test'
test_paths, y_test = load_dataset(test_dir)   
X_test = np.array([
    preprocess_input(img_to_array(load_img(p, target_size=(224, 224))))
    for p in test_paths
])

for name, model in [('Poisoned', model_p), ('Cleaned', model_c)]:
    probs = model.predict(X_test, verbose=0)
    preds = probs.argmax(axis=1)
    auc = roc_auc_score(y_test, probs[:,1])
    f1  = f1_score(y_test, preds)
    acc = accuracy_score(y_test, preds)
    print(f"[{name}] Test AUC: {auc:.3f}, F1: {f1:.3f}, Acc: {acc:.3f}")




[Poisoned] Test AUC: 1.000, F1: 0.966, Acc: 0.971
[Cleaned] Test AUC: 1.000, F1: 1.000, Acc: 1.000
