In [None]:
import os
os.environ["PYTHONHASHSEED"] = "0"
os.environ["TF_DETERMINISTIC_OPS"] = "1"     # deterministic GPU ops
os.environ["TF_CUDNN_DETERMINISTIC"] = "1"   # deterministic cuDNN kernels

import tensorflow as tf
from tensorflow import keras
from keras import layers
from pathlib import Path
import numpy as np
from tensorflow.keras.applications.resnet50 import preprocess_input

# Set the seeds for reproducibility
import random
SEED = 2648509283
random.seed(SEED)
np.random.seed(SEED)
tf.random.set_seed(SEED)
tf.keras.utils.set_random_seed(SEED)
tf.config.experimental.enable_op_determinism()

In [None]:
DATASET_DIR = "../data/chest_xray"
DATASET_PATH = Path(DATASET_DIR)
TRAIN_PATH = DATASET_PATH / "train"
TEST_PATH = DATASET_PATH / "test"
CLASSES = ["NORMAL", "PNEUMONIA"]
ID_TO_CLASS = {0:'NORMAL', 1:'PNEUMONIA'}
IMG_SIZE_SIDE = 224
IMG_SIZE = (IMG_SIZE_SIDE, IMG_SIZE_SIDE)
BATCH = 32
VAL_FRACTION = 0.15
PAD_TO_ASPECT_RATIO=False

In [None]:
train_ds, val_ds = keras.preprocessing.image_dataset_from_directory(
    TRAIN_PATH,
    labels='inferred',
    class_names=CLASSES,
    label_mode='binary',
    color_mode='grayscale',
    image_size=IMG_SIZE,
    crop_to_aspect_ratio=False,
    pad_to_aspect_ratio=PAD_TO_ASPECT_RATIO,
    batch_size=BATCH,
    validation_split=VAL_FRACTION,
    subset='both',
    shuffle=True,
    seed=SEED,
)

In [None]:
test_ds = keras.preprocessing.image_dataset_from_directory(
    TEST_PATH,
    labels='inferred',
    class_names=CLASSES,
    label_mode='binary',
    color_mode='grayscale',
    image_size=IMG_SIZE,
    pad_to_aspect_ratio=PAD_TO_ASPECT_RATIO,
    batch_size=BATCH,
    shuffle=False,
)

In [None]:
def to_rgb_and_pp(x, y):
    if x.shape[-1] == 1:
        x = tf.image.grayscale_to_rgb(x)
    x = preprocess_input(x)
    return x, y

# aug = keras.Sequential([])
# def augment(x, y): return aug(x, training=True), y
def augment(x, y): return x, y

AUTOTUNE = tf.data.AUTOTUNE
train_ds = train_ds.map(to_rgb_and_pp, num_parallel_calls=AUTOTUNE).map(augment, AUTOTUNE).prefetch(AUTOTUNE)
val_ds   = val_ds.map(to_rgb_and_pp, num_parallel_calls=AUTOTUNE).prefetch(AUTOTUNE)

In [None]:
base = keras.applications.ResNet50(
    include_top=False, weights="imagenet", input_shape=IMG_SIZE + (3,)
)
# Freeze backbone
base.trainable = False

inputs = keras.Input(shape=IMG_SIZE + (3,))
x = inputs
x = base(x, training=False)
x = layers.GlobalAveragePooling2D()(x)
x = layers.Dropout(0.3)(x)
# Small head
x = layers.Dense(256, activation="relu", kernel_regularizer=keras.regularizers.l2(1e-4))(x)
x = layers.BatchNormalization()(x)
x = layers.Dropout(0.3)(x)
outputs = layers.Dense(1, activation="sigmoid")(x)

model = keras.Model(inputs, outputs)

In [None]:
TARGET_METRIC = "val_aucpr"
class_weight = None

model.compile(
    optimizer=keras.optimizers.Adam(learning_rate=1e-3),
    loss="binary_crossentropy",
    metrics=[
        keras.metrics.AUC(curve="PR", name="aucpr"),
        keras.metrics.AUC(curve="ROC", name="auc"),
        keras.metrics.Precision(name="precision"),
        keras.metrics.Recall(name="recall"),
    ],
)

callbacks = [
    keras.callbacks.ModelCheckpoint("../model/resnet50_head.keras", monitor=TARGET_METRIC,
                                    save_best_only=True, mode="max"),
    keras.callbacks.EarlyStopping(monitor=TARGET_METRIC, mode="max", patience=6, restore_best_weights=True),
]

model.fit(train_ds, validation_data=val_ds, epochs=10, class_weight=class_weight, callbacks=callbacks)

In [None]:
# Unfreeze top layers
for layer in base.layers:
    layer.trainable = False
for layer in base.layers:
    # Unfreeze the last block
    if "conv5_block" in layer.name:
        layer.trainable = True

model.compile(
    optimizer=keras.optimizers.Adam(learning_rate=2e-5),
    loss="binary_crossentropy",
    metrics=[
        keras.metrics.AUC(curve="PR", name="aucpr"),
        keras.metrics.AUC(curve="ROC", name="auc"),
        keras.metrics.Precision(name="precision"),
        keras.metrics.Recall(name="recall"),
    ],
)

callbacks_ft = [
    keras.callbacks.ModelCheckpoint("../model/resnet50_finetune.keras", monitor=TARGET_METRIC,
                                    save_best_only=True, mode="max"),
    keras.callbacks.EarlyStopping(monitor=TARGET_METRIC, mode="max", patience=6, restore_best_weights=True),
    keras.callbacks.ReduceLROnPlateau(monitor=TARGET_METRIC, mode="max", factor=0.5, patience=2, min_lr=1e-6),
]

model.fit(train_ds, validation_data=val_ds, epochs=15, class_weight=class_weight, callbacks=callbacks_ft)

In [None]:
y_val = np.concatenate([y.numpy() for _, y in val_ds])
y_val_pred = model.predict(val_ds).ravel()

best_thr, best_f1 = 0.5, -1
for thr in np.linspace(0.05, 0.95, 19):
    y_hat = (y_val_pred >= thr).astype(int)
    tp = np.sum((y_hat==1)&(y_val==1))
    fp = np.sum((y_hat==1)&(y_val==0))
    fn = np.sum((y_hat==0)&(y_val==1))
    prec = tp/(tp+fp+1e-9)
    rec  = tp/(tp+fn+1e-9)
    f1 = 2*prec*rec/(prec+rec+1e-9)
    if f1 > best_f1:
        best_f1, best_thr = f1, thr

print("Best F1:", best_f1, "at threshold:", best_thr)

In [None]:
from sklearn.metrics import precision_recall_curve

target_recall = 0.94

prec, rec, thr = precision_recall_curve(y_val, y_val_pred)
prec_ = prec[1:]
rec_ = rec[1:]
thr_ = thr

mask = rec_ >= target_recall
if np.any(mask):
    sel = np.max(np.where(mask)[0])
    # sel = np.where(mask)[0][np.argmax(prec_[mask])]
else:
    # Fallback: pick the closest recall to the target (if target is unattainable)
    sel = int(np.argmin(np.abs(rec_ - target_recall)))

best_thr = float(thr_[sel])

# Compute and print metrics at this threshold
y_hat = (y_val_pred >= best_thr).astype(int)
tp = np.sum((y_hat == 1) & (y_val == 1))
fp = np.sum((y_hat == 1) & (y_val == 0))
fn = np.sum((y_hat == 0) & (y_val == 1))
precision_at = tp / (tp + fp + 1e-9)
recall_at = tp / (tp + fn + 1e-9)
f1_at = 2 * precision_at * recall_at / (precision_at + recall_at + 1e-9)

print(f"Target recall: {target_recall:.3f}")
print(f"Chosen threshold: {best_thr:.4f}")
print(f"Val Precision: {precision_at:.4f}  Recall: {recall_at:.4f}  F1: {f1_at:.4f}")