---

# EchoFlow-Net: Automated Ejection Fraction Estimation from Echocardiographic Segmentation using Deep Learning and Calibration
**Author:** Elrefaey, K. M. E. (MBBS)  
**Journal Target:** Frontiers in Cardiovascular Medicine  
**Date:** November 2025

---

---

# Task 1 : Set up dependencies

---



In [None]:
!pip install -r requirements.txt

---

# Task 2 : Imports And Seed everything

---

In [114]:
import sys
import random
import os
import torch
import cv2
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from tqdm import tqdm
import helper
from pathlib import Path
import albumentations as A
import nibabel as nib
from torch.utils.data import Dataset,DataLoader
import segmentation_models_pytorch as smp
from torch import nn
import torch.optim as optim
import matplotlib.pyplot as plt
import torch.nn.functional as F
import time
import re
from math import pi
from scipy.stats import pearsonr
from sklearn.model_selection import KFold
from scipy.ndimage import label
print("Torch:", torch.__version__)
print("Albumentations:", A.__version__)
print("SMP:", smp.__version__)

Torch: 2.8.0+cu126
Albumentations: 2.0.8
SMP: 0.5.0


In [None]:
SEED=42
def seed_everything(seed: int = 42):
    random.seed(seed)
    np.random.seed(seed)
    os.environ["PYTHONHASHSEED"] = str(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False
    print(f"Random seed fixed at {seed}")
seed_everything(SEED)

---

# Task 3 : Prepare Camus Dataset In Desired Directory


---


In [115]:
CAMUS_DIRECTORY=Path('<Your Directory>')

---

# Task 4 : Setup Configurations

---

In [None]:
# ====== DATA SETTINGS ======
DATA_ROOT = Path(CAMUS_DIRECTORY)
USE_VIEWS  = ('2CH', '4CH')        # choose ('2CH',) or ('4CH',) if you prefer one
USE_PHASES = ('ED', 'ES')          # for single frames
USE_SEQUENCES = False              # True = use *_half_sequence.nii.gz volumes

# ====== TRAINING ======
DEVICE = 'cuda'
EPOCHS = 50
LR = 7e-3
IMAGE_SIZE = 256
BATCH_SIZE = 8

# ====== MODEL ======
ENCODER = 'timm-efficientnet-b4'
ENCODER_WEIGHTS = 'imagenet'
IN_CHANNELS = 1            # grayscale ultrasound
NUM_CLASSES = 3            # background, endo, epi

In [None]:
# split the dataset into training and validation

def list_patients(root: Path):
    return sorted([p for p in root.glob('patient*') if p.is_dir()])

def build_index(root: Path, use_views=('2CH', '4CH'), use_phases=('ED','ES'), use_sequences=False):
    rows = []
    for pdir in list_patients(root):
        pid = pdir.name
        for view in use_views:
            if use_sequences:
                img = pdir / f'{pid}_{view}_half_sequence.nii.gz'
                msk = pdir / f'{pid}_{view}_half_sequence_gt.nii.gz'
                if img.exists() and msk.exists():
                    rows.append(dict(patient=pid, view=view, phase='SEQ',
                                     image=str(img), mask=str(msk), is_sequence=True))
            else:
                for ph in use_phases:
                    img = pdir / f'{pid}_{view}_{ph}.nii.gz'
                    msk = pdir / f'{pid}_{view}_{ph}_gt.nii.gz'
                    if img.exists() and msk.exists():
                        rows.append(dict(patient=pid, view=view, phase=ph,
                                         image=str(img), mask=str(msk), is_sequence=False))
    return pd.DataFrame(rows)

df = build_index(DATA_ROOT, USE_VIEWS, USE_PHASES, USE_SEQUENCES)
print('Found samples:', len(df))
df = df.rename(columns={'image': 'image_path', 'mask': 'mask_path'})
df.head()

In [None]:
patients = sorted(df['patient'].unique())
train_p, val_p = train_test_split(patients, test_size=0.2, random_state=42)

train_df = df[df['patient'].isin(train_p)].reset_index(drop=True)
valid_df = df[df['patient'].isin(val_p)].reset_index(drop=True)

len(train_df), len(valid_df), len(patients)

---

# Task 5 : Augmentation Functions

---

In [None]:
def get_train_augs(image_size=256):
    return A.Compose([
        # --- Mild geometric jitter (probe placement) ---
        A.OneOf([
            A.Affine(
                scale=(0.90, 1.10),
                rotate=(-12, 12),
                shear=(-8, 8),
                translate_percent=(-0.03, 0.03),
                fit_output=False
            ),
            A.Affine(
                scale=(0.95, 1.05),
                rotate=(-6, 6),
                shear=(-4, 4),
                translate_percent=(-0.02, 0.02),
                fit_output=False
            ),
        ], p=0.70),

        # --- Local deformations (breathing / coupling) ---
        A.OneOf([
            A.GridDistortion(num_steps=5, distort_limit=0.20,
                             border_mode=cv2.BORDER_REFLECT_101),
            A.ElasticTransform(alpha=20, sigma=7,
                               border_mode=cv2.BORDER_REFLECT_101),
        ], p=0.30),

        # --- Cropping/zoom (depth/focus changes) ---
        A.RandomResizedCrop(
            size=(image_size, image_size),
            scale=(0.90, 1.0),
            ratio=(0.95, 1.05),
            p=0.35
        ),

        # --- Photometric changes (gain, dynamic range) ---
        A.OneOf([
            A.RandomBrightnessContrast(brightness_limit=0.25, contrast_limit=0.20),
            A.RandomGamma(gamma_limit=(80, 120)),
            A.CLAHE(clip_limit=2.0, tile_grid_size=(8, 8)),
        ], p=0.60),

        # --- Ultrasound noise (speckle / sensor) ---
        A.OneOf([
            A.MultiplicativeNoise(multiplier=(0.85, 1.15)),
            A.GaussNoise(std_range=(0.01, 0.05)),
        ], p=0.50),

        # --- Blur (defocus, motion) ---
        A.OneOf([
            A.MotionBlur(blur_limit=3),
            A.MedianBlur(blur_limit=3),
            A.GaussianBlur(blur_limit=3),
        ], p=0.30),

        # --- Resolution / bandwidth changes ---
        A.Downscale(
            scale_range=(0.70, 0.95),
            p=0.15
        ),

        # --- Flip ---
        A.HorizontalFlip(p=0.5),
    ])

def get_valid_augs():
    return A.Compose([])

---

# Task 6 : Create Custom Dataset

---

In [None]:
class CamusDataset(Dataset):
    def __init__(self, df, augmentations=None, image_size=256, pick_sequence_frame='maxmask'):
        """
        pick_sequence_frame:
          - 'random': random frame if a sequence
          - 'maxmask': choose frame with largest label area (good default)
          - int: fixed index
        """
        self.df = df.reset_index(drop=True)
        self.augs = augmentations
        self.size = (image_size, image_size)
        self.pick = pick_sequence_frame

    def __len__(self):
        return len(self.df)

    def _load_nii(self, path):
        # returns np.ndarray (H, W) or (H, W, T)
        arr = nib.load(path).get_fdata()
        arr = np.asarray(arr)
        return arr

    def _choose_frame(self, img, msk):
        # img, msk: (H, W, T)
        T = img.shape[-1]
        if isinstance(self.pick, int):
            t = np.clip(self.pick, 0, T-1)
        elif self.pick == 'random':
            t = np.random.randint(0, T)
        else:
            # pick frame with max number of labeled pixels
            sums = msk.reshape(-1, T).sum(0)
            t = int(np.argmax(sums))
        return img[..., t], msk[..., t]

    def __getitem__(self, index):
        row = self.df.iloc[index]
        img = self._load_nii(str(row.image_path))      # (H, W) or (H, W, T)
        msk = self._load_nii(str(row.mask_path)).astype(np.int64)

        # if sequence, pick a single frame
        if row.is_sequence:
            if img.ndim == 2:   # safety
                img = img[..., None]
                msk = msk[..., None]
            img, msk = self._choose_frame(img, msk)

        # ensure 2D
        if img.ndim == 3 and img.shape[-1] == 1:
            img = img[..., 0]
        if msk.ndim == 3 and msk.shape[-1] == 1:
            msk = msk[..., 0]

        # resize (keep masks nearest-neighbor)
        img = cv2.resize(img, self.size, interpolation=cv2.INTER_AREA)
        msk = cv2.resize(msk, self.size, interpolation=cv2.INTER_NEAREST)

        # Map mask values >= NUM_CLASSES to 0 (background)
        msk[msk >= NUM_CLASSES] = 0

        # normalize to [0,1]
        img = img.astype(np.float32)
        if np.isfinite(img).all():
            mn, mx = np.percentile(img, 1), np.percentile(img, 99)
            if mx > mn:
                img = np.clip((img - mn) / (mx - mn), 0, 1)
            else:
                img = (img - img.min()) / (img.max() - img.min() + 1e-8)
        else:
            img = np.nan_to_num(img)
            img = (img - img.min()) / (img.max() - img.min() + 1e-8)

        if self.augs:
            out = self.augs(image=img, mask=msk)
            img, msk = out['image'], out['mask']

        # Assert mask values are within the expected range
        assert np.all((msk >= 0) & (msk < NUM_CLASSES)), f"Mask values out of range: {np.unique(msk)}"


        # to tensors
        img = torch.from_numpy(img[None, ...])            # (1, H, W)
        msk = torch.from_numpy(msk.astype(np.int64))      # (H, W) with class indices {0,1,2}

        return img, msk

In [None]:
trainset = CamusDataset(train_df, augmentations=get_train_augs(IMAGE_SIZE), image_size=IMAGE_SIZE)
validset = CamusDataset(valid_df, augmentations=get_valid_augs(),  image_size=IMAGE_SIZE)

In [None]:
print(f"Size of Trainset : {len(trainset)}")
print(f"Size of Validset : {len(validset)}")

---

# Task 7 : Load dataset into batches

---

In [None]:
trainloader = DataLoader(trainset, batch_size=BATCH_SIZE, shuffle=True,  num_workers=2, pin_memory=True)
validloader = DataLoader(validset, batch_size=BATCH_SIZE, shuffle=False, num_workers=2, pin_memory=True)

In [None]:
print(f"Size of Trainloader : {len(trainloader)}")
print(f"Size of Validloader : {len(validloader)}")

---

# Task 8 : Create Segmentation Model

---

segmentation_models_pytorch documentation : https://smp.readthedocs.io/en/latest/

In [None]:
model = smp.Unet(
    encoder_name=ENCODER,
    encoder_weights=ENCODER_WEIGHTS,
    in_channels=IN_CHANNELS,
    classes=NUM_CLASSES,
    activation=None
).to(DEVICE)

---

# Task 9 : Train Model

---

In [None]:
# CrossEntropy expects targets as (H, W) long with class indices
criterion = nn.CrossEntropyLoss()

# optional: add Dice loss for multiclass (from_logits=True)
try:
    dice_loss = smp.losses.DiceLoss(mode='multiclass', from_logits=True)
    def loss_fn(pred, target):
        return criterion(pred, target) + 0.5 * dice_loss(pred, target)
except:
    def loss_fn(pred, target):
        return criterion(pred, target)

optimizer = optim.AdamW(model.parameters(), lr=LR, weight_decay=1e-4)

In [None]:
def train_one_epoch(model, loader, optimizer):
    model.train()
    total = 0.0
    for imgs, masks in tqdm(loader):
        imgs  = imgs.to(DEVICE)            # (B, 1, H, W)
        masks = masks.to(DEVICE)           # (B, H, W) with values in {0,1,2}
        assert torch.all((masks >= 0) & (masks < NUM_CLASSES)), f"Mask values out of range before loss calculation: {torch.unique(masks)}"
        optimizer.zero_grad()
        logits = model(imgs)               # (B, 3, H, W)
        loss = loss_fn(logits, masks)
        loss.backward()
        optimizer.step()

        total += loss.item() * imgs.size(0)
    return total / len(loader.dataset)

In [None]:
@torch.no_grad()
def validate(model, loader):
    model.eval()
    total = 0.0
    for imgs, masks in tqdm(loader):
        imgs  = imgs.to(DEVICE)
        masks = masks.to(DEVICE)
        logits = model(imgs)
        loss = loss_fn(logits, masks)
        total += loss.item() * imgs.size(0)
    return total / len(loader.dataset)

In [None]:
best_val = 1e9
for epoch in range(1, EPOCHS+1):
    tr = train_one_epoch(model, trainloader, optimizer)
    vl = validate(model, validloader)
    print(f'Epoch {epoch:03d} | train {tr:.4f} | valid {vl:.4f}')
    if vl < best_val:
        best_val = vl
        torch.save(model.state_dict(), 'best_camus.pth')
        print("SAVED-MODEL")

---

# Task 10 : Evaluation (with and withoud post-processing) Before K-Fold Calibration

---

In [None]:
CAVITY_CLASS = 1  # LV cavity class id

def clean_cavity_mask(pred_labels, cavity_class=1, kernel_size=3, median_ks=3, connectivity=8):
    """Keep largest LV cavity component and smooth edges."""
    cav = (pred_labels == cavity_class).astype(np.uint8)

    # Connected components
    num_labels, labels_im = cv2.connectedComponents(cav, connectivity=connectivity)
    if num_labels > 1:
        sizes = [(labels_im == i).sum() for i in range(1, num_labels)]
        largest = 1 + int(np.argmax(sizes))
        cav = (labels_im == largest).astype(np.uint8)

    # Morphological closing (optional)
    if kernel_size and kernel_size > 1:
        kernel = np.ones((kernel_size, kernel_size), np.uint8)
        cav = cv2.morphologyEx(cav, cv2.MORPH_CLOSE, kernel)

    # Median smoothing (optional)
    if median_ks and median_ks > 1:
        cav = cv2.medianBlur(cav, median_ks)

    # Return labels again (0 bg, cavity_class for LV)
    out = np.zeros_like(pred_labels, dtype=np.uint8)
    out[cav.astype(bool)] = cavity_class
    return out

def _principal_axis_length(mask_bin: np.ndarray) -> float:
    ys, xs = np.nonzero(mask_bin.astype(bool))
    if xs.size < 10: return float('nan')
    X = np.stack([xs, ys], axis=1).astype(np.float64)
    Xc = X - X.mean(axis=0, keepdims=True)
    _, _, Vt = np.linalg.svd(Xc, full_matrices=False)
    axis1 = Vt[0]; proj = Xc @ axis1
    return float(proj.max() - proj.min())

def _area_pixels(mask_bin: np.ndarray) -> float:
    return float(mask_bin.astype(np.uint8).sum())

def _vol_biplane(A2, L2, A4, L4):
    vals = []
    if np.isfinite(A2) and np.isfinite(L2) and L2 > 0: vals.append(8.0/(3.0*pi) * (A2*A2)/L2)
    if np.isfinite(A4) and np.isfinite(L4) and L4 > 0: vals.append(8.0/(3.0*pi) * (A4*A4)/L4)
    if all(np.isfinite([A2,L2,A4,L4])) and L2>0 and L4>0:
        Lavg = 0.5*(L2 + L4); vals.append(8.0/(3.0*pi) * (A2*A4)/Lavg)
    return float(vals[-1] if len(vals)==3 else (np.mean(vals) if vals else float('nan')))

@torch.no_grad()
def _predict_one(model, ds, row, device='cuda', use_postproc=False,
                 kernel_size=3, median_ks=3, connectivity=8, return_img=False):
    img, _ = ds[row.name]                      # (1,H,W)
    img_np = img.squeeze(0).cpu().numpy()      # (H,W) for overlay
    logits = model(img[None].to(device))
    pred = torch.argmax(logits, dim=1)[0].cpu().numpy()  # labels

    if use_postproc:
        pred_use = clean_cavity_mask(pred, cavity_class=CAVITY_CLASS,
                                     kernel_size=kernel_size, median_ks=median_ks, connectivity=connectivity)
    else:
        pred_use = pred

    cav = (pred_use == CAVITY_CLASS).astype(np.uint8)
    A = _area_pixels(cav)
    L = _principal_axis_length(cav)
    return (A, L, pred_use, img_np) if return_img else (A, L, pred_use)

def parse_cfg_numeric_lines(path: Path):
    out = {}
    if not path.exists(): return out
    for line in path.read_text(errors='ignore').splitlines():
        line = line.strip()
        if not line or line.startswith('#'): continue
        if ':' in line: k, v = line.split(':', 1)
        elif '=' in line: k, v = line.split('=', 1)
        else: continue
        k = k.strip().lower(); v = v.strip()
        num = None
        for tok in v.replace(',', ' ').split():
            try: num = float(tok); break
            except: pass
        if num is not None: out[k] = num
    return out

def read_gt_ef_from_patient_dir(pdir: Path, prefer='4ch'):
    order = ['4ch','2ch'] if prefer.lower()=='4ch' else ['2ch','4ch']
    files = {lab: pdir / f'Info_{lab.upper()}.cfg' for lab in ['2ch','4ch']}
    infos = {lab: parse_cfg_numeric_lines(files[lab]) for lab in files}
    for lab in order:
        ef = infos.get(lab, {}).get('ef', None)
        if ef is not None: return float(ef)
    for lab in order:
        edv = infos.get(lab, {}).get('edv', None)
        esv = infos.get(lab, {}).get('esv', None)
        if edv and esv and edv > 0: return (edv - esv)/edv*100.0
    return float('nan')

@torch.no_grad()
def ef_from_predictions_for_patient(model, ds, patient_id: str, device='cuda',
                                    use_postproc=False, kernel_size=3, median_ks=3, connectivity=8,
                                    return_thumbs=False):
    sub = ds.df[ds.df['patient'] == patient_id]
    if sub.empty:
        return {"patient": patient_id, "EF_pred_%": float('nan'), "EF_gt_%": float('nan')}

    pdir = Path(sub.iloc[0]['image_path']).parent
    EF_gt = read_gt_ef_from_patient_dir(pdir)

    A2_ED=L2_ED=A4_ED=L4_ED = np.nan
    A2_ES=L2_ES=A4_ES=L4_ES = np.nan
    thumbs = []

    for view, ph, lab in [('2CH','ED','2CH ED'), ('2CH','ES','2CH ES'),
                          ('4CH','ED','4CH ED'), ('4CH','ES','4CH ES')]:
        r = sub[(sub['view']==view) & (sub['phase']==ph)]
        if len(r):
            vals = _predict_one(model, ds, r.iloc[0], device,
                                use_postproc=use_postproc,
                                kernel_size=kernel_size, median_ks=median_ks, connectivity=connectivity,
                                return_img=return_thumbs)
            if return_thumbs:
                A, L, pred_use, img_np = vals
                thumbs.append((lab, img_np, pred_use))
            else:
                A, L, pred_use = vals

            if ph=='ED':
                if view=='2CH': A2_ED, L2_ED = A, L
                else:           A4_ED, L4_ED = A, L
            else:
                if view=='2CH': A2_ES, L2_ES = A, L
                else:           A4_ES, L4_ES = A, L

    EDV = _vol_biplane(A2_ED, L2_ED, A4_ED, L4_ED)
    ESV = _vol_biplane(A2_ES, L2_ES, A4_ES, L4_ES)
    EF_pred = (EDV - ESV)/EDV*100.0 if (np.isfinite(EDV) and np.isfinite(ESV) and EDV>0) else float('nan')

    out = {
        "patient": patient_id,
        "EF_pred_%": EF_pred,
        "EF_gt_%": EF_gt,
        "EF_error_%": (EF_pred - EF_gt) if (np.isfinite(EF_pred) and np.isfinite(EF_gt)) else float('nan'),
        "EDV_px3": EDV, "ESV_px3": ESV,
    }
    if return_thumbs: out['thumbs'] = thumbs
    return out
def eval_val_split(use_postproc=False, kernel_size=3, median_ks=3, connectivity=8):
    val_patients = sorted(valid_df['patient'].unique())
    ef_rows = []
    for pid in tqdm(val_patients):
        out = ef_from_predictions_for_patient(
            model, validset, pid, device=DEVICE,
            use_postproc=use_postproc,
            kernel_size=kernel_size, median_ks=median_ks, connectivity=connectivity,
            return_thumbs=False  # set True only if you want overlays back
        )
        ef_rows.append(out)

    ef_df = pd.DataFrame(ef_rows)
    pred = ef_df['EF_pred_%'].to_numpy()
    gt   = ef_df['EF_gt_%'].to_numpy()
    err  = pred - gt

    # Summary
    mae  = np.mean(np.abs(err))
    bias = np.mean(err)
    sd   = np.std(err, ddof=1)
    loa1, loa2 = bias - 1.96*sd, bias + 1.96*sd
    print(f"[postproc={use_postproc}]  MAE={mae:.2f} | Bias={bias:.2f} | SD={sd:.2f} | LoA=[{loa1:.2f}, {loa2:.2f}]")

    # Correlation
    r, p = pearsonr(gt, pred); R2 = r**2
    print(f"Pearson r = {r:.3f} (p={p:.2e}), R² = {R2:.3f}")

    # Bootstrap 95% CI for MAE & Bias
    rng = np.random.default_rng(42)
    B = 2000
    idx = rng.integers(0, len(err), (B, len(err)))
    mae_ci  = np.percentile(np.mean(np.abs(err[idx]), axis=1), [2.5, 97.5])
    bias_ci = np.percentile(np.mean(err[idx], axis=1), [2.5, 97.5])
    print(f"MAE 95% CI: [{mae_ci[0]:.2f}%, {mae_ci[1]:.2f}%]")
    print(f"Bias 95% CI: [{bias_ci[0]:.2f}%, {bias_ci[1]:.2f}%]")

    # Scatter
    plt.figure(figsize=(5,5))
    lims = [min(gt.min(), pred.min())-2, max(gt.max(), pred.max())+2]
    plt.scatter(gt, pred, s=16)
    plt.plot(lims, lims, '--')
    plt.xlabel('GT EF (%)'); plt.ylabel('Pred EF (%)')
    plt.title(f'Pred vs GT (R²={R2:.2f})')
    plt.xlim(lims); plt.ylim(lims); plt.show()

    # Bland–Altman
    plt.figure(figsize=(6,4))
    mean_ef = (pred + gt) / 2
    plt.scatter(mean_ef, err, s=16)
    plt.axhline(bias, linestyle='--', label=f"bias={bias:.2f}%")
    plt.axhline(loa1, color='r', linestyle=':', label=f"LoA={loa1:.2f}%")
    plt.axhline(loa2, color='r', linestyle=':')
    plt.xlabel('Mean of Pred & GT EF (%)'); plt.ylabel('Pred − GT EF (%)')
    plt.title('Bland–Altman: EF')
    plt.legend(); plt.show()

    return ef_df

In [None]:
# Run raw (no post-processing)
ef_df_raw = eval_val_split(use_postproc=False)

# Run with post-processing (largest component + morphology)
ef_df_pp  = eval_val_split(use_postproc=True, kernel_size=3, median_ks=3)

---

# Task 11 : Calibration And Reevaluation

---

In [None]:
def clip_ef(arr, lo=5.0, hi=85.0):
    return np.clip(arr, lo, hi)

def kfold_calibration_df(df_in, K=5, seed=42):
    df = df_in.copy()
    df['EF_pred_cal_%'] = np.nan
    kf = KFold(n_splits=K, shuffle=True, random_state=seed)
    for tr, te in kf.split(df):
        x = df.iloc[tr]['EF_pred_%'].to_numpy()
        y = df.iloc[tr]['EF_gt_%'].to_numpy()
        a, b = np.polyfit(x, y, 1)
        df.loc[df.index[te], 'EF_pred_cal_%'] = clip_ef(a*df.iloc[te]['EF_pred_%'].to_numpy() + b)
    return df

def summarize(pred, gt):
    e = pred - gt
    mae = np.mean(np.abs(e))
    bias = np.mean(e)
    sd = np.std(e, ddof=1)
    loa1, loa2 = bias - 1.96*sd, bias + 1.96*sd
    r, p = pearsonr(gt, pred)
    R2 = r**2
    # bootstrap 95% CI for MAE & bias
    B = 2000
    rng = np.random.default_rng(42)
    idx = rng.integers(0, len(e), (B, len(e)))
    mae_ci  = np.percentile(np.mean(np.abs(e[idx]), axis=1), [2.5, 97.5])
    bias_ci = np.percentile(np.mean(e[idx], axis=1), [2.5, 97.5])
    return dict(MAE=mae, Bias=bias, SD=sd, LoA=(loa1, loa2), r=r, R2=R2,
                MAE_CI=mae_ci, Bias_CI=bias_ci)

# K-fold calibration
df_cal = kfold_calibration_df(ef_df_pp, K=5, seed=SEED)

# Metrics BEFORE and AFTER
pre = summarize(ef_df_pp['EF_pred_%'].to_numpy(),      ef_df_pp['EF_gt_%'].to_numpy())
post= summarize(df_cal['EF_pred_cal_%'].to_numpy(), df_cal['EF_gt_%'].to_numpy())

print("Before calib : "
      f"MAE={pre['MAE']:.2f} [{pre['MAE_CI'][0]:.2f},{pre['MAE_CI'][1]:.2f}]  | "
      f"Bias={pre['Bias']:.2f} [{pre['Bias_CI'][0]:.2f},{pre['Bias_CI'][1]:.2f}]  | "
      f"SD={pre['SD']:.2f}  | LoA=[{pre['LoA'][0]:.2f},{pre['LoA'][1]:.2f}]  | "
      f"r={pre['r']:.3f}, R²={pre['R2']:.3f}")

print("After  calib : "
      f"MAE={post['MAE']:.2f} [{post['MAE_CI'][0]:.2f},{post['MAE_CI'][1]:.2f}] | "
      f"Bias={post['Bias']:.2f} [{post['Bias_CI'][0]:.2f},{post['Bias_CI'][1]:.2f}] | "
      f"SD={post['SD']:.2f} | LoA=[{post['LoA'][0]:.2f},{post['LoA'][1]:.2f}] | "
      f"r={post['r']:.3f}, R²={post['R2']:.3f}")

# Scatter (overlay before vs after)
gt    = ef_df_pp['EF_gt_%'].to_numpy()
pred0 = ef_df_pp['EF_pred_%'].to_numpy()
pred1 = df_cal['EF_pred_cal_%'].to_numpy()

plt.figure(figsize=(5.5,5.5))
lims = [min(gt.min(), pred0.min(), pred1.min())-2, max(gt.max(), pred0.max(), pred1.max())+2]
plt.scatter(gt, pred0, s=14, label='Before')
plt.scatter(gt, pred1, s=14, alpha=0.8, label='After (calibrated)')
plt.plot(lims, lims, '--', linewidth=1)
plt.xlabel('GT EF (%)'); plt.ylabel('Pred EF (%)')
plt.title(f'Pred vs GT (After: R²={post["R2"]:.2f}, MAE={post["MAE"]:.1f}%)')
plt.xlim(lims); plt.ylim(lims); plt.legend(); plt.show()

# Bland–Altman (after calibration)
diff = pred1 - gt
bias = diff.mean(); sd = diff.std(ddof=1)
loa1, loa2 = bias - 1.96*sd, bias + 1.96*sd
mean_ef = (pred1 + gt)/2

plt.figure(figsize=(6,4.2))
plt.scatter(mean_ef, diff, s=16)
plt.axhline(bias, linestyle='--', label=f"bias={bias:.2f}%")
plt.axhline(loa1, color='r', linestyle=':', label=f"LoA={loa1:.2f}%")
plt.axhline(loa2, color='r', linestyle=':')
plt.xlabel('Mean of Pred_cal & GT EF (%)'); plt.ylabel('Pred_cal − GT EF (%)')
plt.title('Bland–Altman: EF (calibrated)')
plt.legend(); plt.show()

---

# Task 12 : Visualization (Segmentation)

---

In [None]:
def _overlay_rgb(gray_hw, mask_hw, cavity_id=1, color=(1,0,0), alpha=0.35):
    g = gray_hw.astype(np.float32)
    g = (g - g.min()) / (g.max() - g.min() + 1e-8)
    rgb = np.stack([g, g, g], axis=-1)

    overlay = rgb.copy()
    overlay[mask_hw == cavity_id] = color
    out = (1 - alpha) * rgb + alpha * overlay
    return out

def _get_row_for(sub_df, view, phase):
    r = sub_df[(sub_df['view'] == view) & (sub_df['phase'] == phase)]
    return None if r.empty else r.iloc[0]

@torch.no_grad()
def plot_patient_ed_es_panel(model, ds, df, patient_id,
                             device='cuda',
                             use_postproc=True, kernel_size=3, median_ks=3, connectivity=8,
                             alpha_gt=0.35, alpha_pred=0.35,
                             cavity_class=CAVITY_CLASS,
                             figsize=(14, 7)):
    """
    Panel layout (2 rows x 4 cols):
      Row 1 (2CH):  ED GT | ED Pred | ES GT | ES Pred
      Row 2 (4CH):  ED GT | ED Pred | ES GT | ES Pred

    Uses your _predict_one() (no changes) and accesses GT masks via ds[idx].
    """
    sub = df[df['patient'] == patient_id]
    if sub.empty:
        raise ValueError(f"No samples for patient '{patient_id}' in dataframe.")

    # Prepare figure
    fig, axes = plt.subplots(2, 4, figsize=figsize)

    def _one(view, phase, row_idx, col_base):
        row = _get_row_for(sub, view, phase)
        if row is None:
            # blank panels if missing
            for k in range(4):
                axes[row_idx, col_base+k//2].axis('off')
            return

        idx = row.name
        img_t, gt_mask = ds[idx]          # (1,H,W), (H,W)
        img_hw = img_t.squeeze(0).cpu().numpy()

        # predict using your helper (returns (A, L, pred_labels, img_np) when return_img=True)
        A, L, pred_labels, img_np = _predict_one(
            model, ds, row, device=device,
            use_postproc=use_postproc,
            kernel_size=kernel_size, median_ks=median_ks, connectivity=connectivity,
            return_img=True
        )

        # overlays
        gt_overlay   = _overlay_rgb(img_hw, gt_mask, cavity_id=cavity_class,
                                    color=(1,0,0), alpha=alpha_gt)
        pred_overlay = _overlay_rgb(img_hw, pred_labels, cavity_id=cavity_class,
                                    color=(0,1,0), alpha=alpha_pred)

        # ED -> columns [0,1], ES -> columns [2,3]
        col_off = 0 if phase == 'ED' else 2

        axes[row_idx, col_off+0].imshow(gt_overlay);   axes[row_idx, col_off+0].set_title(f'{view} {phase} – GT')
        axes[row_idx, col_off+1].imshow(pred_overlay); axes[row_idx, col_off+1].set_title(f'{view} {phase} – Pred')

        axes[row_idx, col_off+0].axis('off'); axes[row_idx, col_off+1].axis('off')

    # Top row: 2CH, Bottom row: 4CH
    _one('2CH', 'ED', row_idx=0, col_base=0)
    _one('2CH', 'ES', row_idx=0, col_base=0)
    _one('4CH', 'ED', row_idx=1, col_base=0)
    _one('4CH', 'ES', row_idx=1, col_base=0)

    # Optional: include EF info using your ef_from_predictions_for_patient()
    try:
        out = ef_from_predictions_for_patient(model, ds, patient_id, device=device,
                                              use_postproc=use_postproc,
                                              kernel_size=kernel_size, median_ks=median_ks,
                                              connectivity=connectivity, return_thumbs=False)
        ef_pred = out.get("EF_pred_%", np.nan)
        ef_gt   = out.get("EF_gt_%",   np.nan)
        fig.suptitle(f"Patient {patient_id}  |  EF (GT)={ef_gt:.1f}%   EF (Pred)={ef_pred:.1f}%", fontsize=12)
    except Exception:
        fig.suptitle(f"Patient {patient_id}", fontsize=12)

    plt.tight_layout()
    return fig

@torch.no_grad()
def plot_random_patients_ed_es(model, ds, df, n=4, seed=42, **panel_kwargs):
    rng = np.random.default_rng(seed)
    patients = sorted(df['patient'].unique())
    if len(patients) == 0:
        raise ValueError("No patients found in df.")
    pick = rng.choice(patients, size=min(n, len(patients)), replace=False)

    figs = []
    for pid in pick:
        fig = plot_patient_ed_es_panel(model, ds, df, str(pid), **panel_kwargs)
        figs.append(fig)
        plt.show()   # show each panel
    return figs
_ = plot_random_patients_ed_es(
    model, validset, valid_df, n=6, seed=SEED,
    device=DEVICE, use_postproc=True, alpha_gt=0.40, alpha_pred=0.40
)



Linkedin : https://www.linkedin.com/in/khaled-elrefaey/