## **CLIP Example**

In [None]:
import os
import cv2
import numpy as np
import torch
from torch import nn
from pytorch_grad_cam import GradCAM, GradCAMPlusPlus
from pytorch_grad_cam.utils.image import show_cam_on_image, preprocess_image
from pytorch_grad_cam.utils.model_targets import ClassifierOutputTarget
from transformers import CLIPProcessor, CLIPModel


# 参数类，直接用于设置运行所需的参数
class Args:
    image_path = "../examples/both.png"  # 替换为你的图像路径
    device = "cuda" if torch.cuda.is_available() else "cpu"
    method = "gradcam"
    output_path = "../output/CLIP_output.jpg"


class ImageClassifier(nn.Module):
    def __init__(self, labels):
        super().__init__()
        self.clip = CLIPModel.from_pretrained("openai/clip-vit-large-patch14")
        self.processor = CLIPProcessor.from_pretrained("openai/clip-vit-large-patch14")
        self.labels = labels

    def forward(self, x):
        text_inputs = self.processor(text=self.labels, return_tensors="pt", padding=True)
        outputs = self.clip(pixel_values=x, input_ids=text_inputs['input_ids'].to(self.clip.device),
                            attention_mask=text_inputs['attention_mask'].to(self.clip.device))
        logits_per_image = outputs.logits_per_image
        probs = logits_per_image.softmax(dim=1)
        for label, prob in zip(self.labels, probs[0]):
            print(f"{label}: {prob:.4f}")
        return probs


def reshape_transform(tensor, height=16, width=16):
    """
    Reshape the Vision Transformer output to match the necessary shape for Grad-CAM.
    """
    tensor = tensor[:, 1:, :].reshape(tensor.size(0), height, width, tensor.size(2))
    return tensor.permute(0, 3, 1, 2)  # Rearrange to (batch, channels, height, width)


def run_grad_cam(args):
    # Default labels
    labels = ["a cat", "a dog", "a car", "a person", "a shoe"]

    # Load model and set to evaluation mode
    model = ImageClassifier(labels).to(args.device).eval()

    # Select target layers (ViT's LayerNorm)
    target_layers = [model.clip.vision_model.encoder.layers[-1].layer_norm1]

    # Read and preprocess the image
    rgb_img = cv2.imread(args.image_path, 1)[:, :, ::-1]
    rgb_img = cv2.resize(rgb_img, (224, 224))
    rgb_img = np.float32(rgb_img) / 255
    input_tensor = preprocess_image(rgb_img, mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5]).to(args.device)

    # Initialize Grad-CAM with reshape_transform
    cam_method = GradCAM if args.method == "gradcam" else GradCAMPlusPlus
    cam = cam_method(model=model, target_layers=target_layers, reshape_transform=reshape_transform)

    # Use the highest scoring category as default target
    targets = None
    targets = [ClassifierOutputTarget(1)]

    # Generate CAM
    grayscale_cam = cam(input_tensor=input_tensor, targets=targets)
    grayscale_cam = grayscale_cam[0, :]  # Take the first (and only) image in the batch

    # Overlay CAM on the image
    cam_image = show_cam_on_image(rgb_img, grayscale_cam, use_rgb=True)

    # Save and display the result
    os.makedirs(os.path.dirname(args.output_path), exist_ok=True)  # 确保输出目录存在
    cv2.imwrite(args.output_path, cam_image)
    print(f"Grad-CAM result saved to {args.output_path}")


if __name__ == '__main__':
    # 直接使用手动设置的参数
    args = Args()
    run_grad_cam(args)

## **MERL**

In [8]:
import os
import numpy as np
import torch
import yaml
from torch import nn
from pytorch_grad_cam import GradCAM
from pytorch_grad_cam.utils.model_targets import ClassifierOutputTarget
from utils_builder import ECGCLIP
import matplotlib.pyplot as plt
from matplotlib.colors import Normalize
from PIL import Image
from matplotlib.cm import get_cmap
from scipy.io import loadmat


class Args:
    ecg_path = "../zeroshot/10_LV/QMH1001803PECG20231200010936_20231205092900_0.mat"
    ecg_image_path = "../zeroshot/10_LV/10.jpeg"
    labels = ["LV Impaired"]
    config_path = "self_test.yaml"
    device = "cuda" if torch.cuda.is_available() else "cpu"
    target_label = 0
    output_path = "../output/ecg_grad_cam_overlay.png"
    ecg_encoder_path = "../ckpt/Vit_69_best_encoder.pth"
    text_encoder_path = "../ckpt/Vit_69_best_ckpt.pth"


class MultiModalClassifier(nn.Module):
    def __init__(self, base_model, labels):
        super().__init__()
        self.model = base_model
        self.labels = labels
        self.use_cam_mode = False  # 添加标志，默认关闭 CAM 模式

    def forward(self, ecg, text=None):
        if self.use_cam_mode:  # 如果是 CAM 模式，调用 forward_for_cam
            return self.forward_for_cam(ecg)
        
        print(f"Text input: {text}")
        
        tokenizer_output = self.model._tokenize(text)
        input_ids = tokenizer_output['input_ids'].to(ecg.device)
        attention_mask = tokenizer_output['attention_mask'].to(ecg.device)
        outputs = self.model(ecg=ecg, input_ids=input_ids, attention_mask=attention_mask)
        proj_ecg_emb = outputs['proj_ecg_emb'][0]
        proj_text_emb = outputs['proj_text_emb'][0]
        logits_per_ecg = torch.matmul(proj_ecg_emb, proj_text_emb.T)
        probs = logits_per_ecg.softmax(dim=-1)
        return probs

    def forward_for_cam(self, ecg):
        if ecg.dim() == 4:  # 如果输入形状是 [batch_size, channels, 1, length]
            print(f"Input shape before squeeze: {ecg.shape}")
            ecg = ecg.squeeze(2)
            print(f"Input shape after squeeze: {ecg.shape}")

        default_text = [self.labels[0]]  # 假设 labels[0] 对应目标标签
        print(f"Default text input: {default_text}")
        tokenizer_output = self.model._tokenize(default_text)
        input_ids = tokenizer_output['input_ids'].to(ecg.device)
        attention_mask = tokenizer_output['attention_mask'].to(ecg.device)
        outputs = self.model(ecg=ecg, input_ids=input_ids, attention_mask=attention_mask)

        proj_ecg_emb = outputs['ecg_emb']
        if isinstance(proj_ecg_emb, list):
            proj_ecg_emb = proj_ecg_emb[0]

        if proj_ecg_emb.dim() == 2:  # 如果输出是 [B, L]
            proj_ecg_emb = proj_ecg_emb.unsqueeze(1)
        print(f"Output shape: {proj_ecg_emb.shape}")
        return proj_ecg_emb


def preprocess_ecg(ecg_path):
    ecg = loadmat(ecg_path)['ecg_signals']
    ecg = ecg.astype(np.float32)

    ecg = ecg[:, :5000]
    
    # normalzie to 0-1
    ecg = (ecg - np.min(ecg))/(np.max(ecg) - np.min(ecg) + 1e-8)
    
    ecg = torch.from_numpy(ecg).float()

    ecg = ecg.unsqueeze(0)
    print(f"ECG signal shape: {ecg.shape}")
    return ecg


def reshape_transform_ecg(tensor):
    if tensor.dim() == 3:
        tensor = tensor.unsqueeze(2)
    print(f"Reshaped tensor shape: {tensor.shape}")
    return tensor


def load_yaml_config(config_path):
    try:
        with open(config_path, "r") as f:
            config = yaml.safe_load(f)
        return config
    except FileNotFoundError:
        raise FileNotFoundError(f"YAML config file not found: {config_path}")
    except yaml.YAMLError as e:
        raise ValueError(f"Error parsing YAML config file: {e}")


def load_pretrained_model(config_path, ecg_encoder_path, text_encoder_path, device):
    config = load_yaml_config(config_path)
    model = ECGCLIP(config['network'])
    print(config['network'])
    ecg_weights = torch.load(ecg_encoder_path, map_location=device)
    model.ecg_encoder.load_state_dict(ecg_weights, strict=False)
    text_weights = torch.load(text_encoder_path, map_location=device)
    model.lm_model.load_state_dict(text_weights, strict=False)
    return model.to(device).eval()


def overlay_grad_cam_on_image(ecg_image_path, grad_cams, output_path, alpha=0.5):
    """
    将 Grad-CAM 热图叠加到 ECG 图像上，每条导联对应特定的图像区域。
    """
    import numpy as np
    from PIL import Image
    from matplotlib.cm import get_cmap

    # 1. 加载原始 ECG 图像
    ecg_image = Image.open(ecg_image_path).convert("RGB")
    ecg_image_np = np.array(ecg_image)

    # 2. 定义每条导联的坐标区域（基于更正后的坐标）
    lead_coords = {
        "I": (680, 900, 270, 1660),
        "II": (930, 1080, 270, 1660),
        "III": (1125, 1316, 270, 1660),
        "aVR": (1317, 1585, 270, 1660),
        "aVL": (1600, 1824, 270, 1660),
        "aVF": (1840, 2082, 270, 1660),
        "V1": (680, 900, 1695, 3115),
        "V2": (930, 1080, 1695, 3115),
        "V3": (1125, 1316, 1695, 3115),
        "V4": (1317, 1585, 1695, 3115),
        "V5": (1600, 1824, 1695, 3115),
        "V6": (1840, 2082, 1695, 3115),
    }

    # 3. 确保坐标适配图片分辨率
    reference_height = 2480  # 假设参考高度
    reference_width = 3508   # 假设参考宽度
    image_height, image_width, _ = ecg_image_np.shape

    # 根据实际图片大小调整坐标比例
    height_ratio = image_height / reference_height
    width_ratio = image_width / reference_width

    lead_coords_scaled = {
        lead: (
            int(y_start * height_ratio),
            int(y_end * height_ratio),
            int(x_start * width_ratio),
            int(x_end * width_ratio)
        )
        for lead, (y_start, y_end, x_start, x_end) in lead_coords.items()
    }

    print(f"ECG image resolution: {image_height} x {image_width}")
    print(f"Scaled lead coordinates: {lead_coords_scaled}")

    # 4. 结果图像
    blended_image = ecg_image_np.copy()

    # 5. 遍历每条导联的 Grad-CAM
    lead_names = list(lead_coords.keys())  # 确保顺序与 grad_cams 一致
    for lead_idx, grad_cam in enumerate(grad_cams):
        lead_name = lead_names[lead_idx]

        print(f"Processing Lead: {lead_name}")
        print(f"Grad-CAM shape for {lead_name}: {grad_cam.shape}")

        y_start, y_end, x_start, x_end = lead_coords_scaled[lead_name]
        print(f"Lead {lead_name}: y=({y_start}, {y_end}), x=({x_start}, {x_end})")
        print(f"Grad-CAM shape: {grad_cam.shape}")

        # Grad-CAM 归一化到 [0, 1]
        grad_cam_normalized = (grad_cam - grad_cam.min()) / (grad_cam.max() - grad_cam.min())
        grad_cam_resized = Image.fromarray((grad_cam_normalized * 255).astype(np.uint8))

        grad_cam_resized.save(f"grad_cam_{lead_name}.png")
        print(f"Saved Grad-CAM for {lead_name}")

        grad_cam_resized = grad_cam_resized.resize((x_end - x_start, y_end - y_start), resample=Image.BICUBIC)
        grad_cam_resized_np = np.array(grad_cam_resized)

        # 转换为彩色热图
        cmap = get_cmap("jet")
        grad_cam_colored = cmap(grad_cam_resized_np / 255.0)[:, :, :3]  # 去掉 alpha 通道
        grad_cam_colored = (grad_cam_colored * 255).astype(np.uint8)

        # 叠加到对应导联区域
        blended_image[y_start:y_end, x_start:x_end, :] = (
            (1 - alpha) * blended_image[y_start:y_end, x_start:x_end, :]
            + alpha * grad_cam_colored
        )

    # 6. 保存并显示结果
    blended_image_pil = Image.fromarray(blended_image.astype(np.uint8))
    blended_image_pil.save(output_path)
    blended_image_pil.show()

    print(f"Grad-CAM overlay image saved to {output_path}")


def run_ecg_grad_cam(args):

    # load model
    model = load_pretrained_model(args.config_path, args.ecg_encoder_path, args.text_encoder_path, args.device)

    classifier = MultiModalClassifier(model, args.labels).to(args.device).eval()
    last_block = getattr(model.ecg_encoder, f'block{model.ecg_encoder.depth - 1}')
    target_layers = [last_block.attn]
    ecg_signal = preprocess_ecg(args.ecg_path).to(args.device)

    ecg_signal = ecg_signal.unsqueeze(2)
    classifier.use_cam_mode = True

    print(f"ECG signal shape before Grad-CAM: {ecg_signal.shape}")
    grad_cams = []  # 存储每个导联的 Grad-CAM 热图
    for lead_idx in range(12):  # 针对每个导联单独计算
        print(f"Processing Lead {lead_idx + 1}")

        # 提取单导联信号
        single_lead_signal = ecg_signal[:, lead_idx:lead_idx+1, :]  # [1, 1, 5000]

        # 创建一个形状为 [1, 12, 5000] 的输入信号，其中其他导联填充为 0
        input_signal = torch.zeros_like(ecg_signal)  # [1, 12, 5000]
        input_signal[:, lead_idx, :] = single_lead_signal[:, 0, :]  # 替换目标导联的信号

        print(f"Input signal shape for Lead {lead_idx + 1}: {input_signal.shape}")

        # Initialize Grad-CAM for this lead
        cam = GradCAM(model=classifier, target_layers=target_layers, reshape_transform=reshape_transform_ecg)
        targets = [ClassifierOutputTarget(args.target_label)]

        # Compute Grad-CAM for this lead
        grayscale_cam = cam(input_tensor=input_signal, targets=targets)[0]  # 输出形状为 [5000]
        grad_cams.append(grayscale_cam)

    print(f"Generated Grad-CAM for {len(grad_cams)} leads.")

    # 将每个 Grad-CAM 热图叠加到 ECG 图像
    overlay_grad_cam_on_image(args.ecg_image_path, grad_cams, args.output_path, alpha=0.5)


if __name__ == '__main__':
    args = Args()
    run_ecg_grad_cam(args)

{'ecg_model': 'vit_tiny', 'num_leads': 12, 'text_model': 'ncbi/MedCPT-Query-Encoder', 'free_layers': 6, 'feature_dim': 768, 'projection_head': {'mlp_hidden_size': 256, 'projection_size': 256}}
ECG signal shape: torch.Size([1, 12, 5000])
ECG signal shape before Grad-CAM: torch.Size([1, 12, 1, 5000])
Processing Lead 1
Input signal shape for Lead 1: torch.Size([1, 12, 1, 5000])
Input shape before squeeze: torch.Size([1, 12, 1, 5000])
Input shape after squeeze: torch.Size([1, 12, 5000])
Default text input: ['LV Impaired']
Reshaped tensor shape: torch.Size([1, 100, 1, 192])
Output shape: torch.Size([1, 1, 256])
Reshaped tensor shape: torch.Size([1, 100, 1, 192])
Processing Lead 2
Input signal shape for Lead 2: torch.Size([1, 12, 1, 5000])
Input shape before squeeze: torch.Size([1, 12, 1, 5000])
Input shape after squeeze: torch.Size([1, 12, 5000])
Default text input: ['LV Impaired']
Reshaped tensor shape: torch.Size([1, 100, 1, 192])
Output shape: torch.Size([1, 1, 256])
Reshaped tensor shap

  cmap = get_cmap("jet")


Grad-CAM overlay image saved to ../output/ecg_grad_cam_overlay.png


huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)
huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)
