In [1]:
import torch
from torch.utils.data import Dataset, DataLoader
import pandas as pd
import numpy as np
import os
from torch import nn
from torchvision import models
import copy

device = torch.device("cuda")
# %env CUBLAS_WORKSPACE_CONFIG=:4096:8
os.environ["TORCH_HOME"] = "."
import sys

# torch.cuda.is_available()

from torchvision import models

############################################
# parameters to be set for encironment
# NOT NEED to modify if you have the setting

data_root_path = "/opt/local/torch/data/tcr-mat-form"  # path to store train data
util_path = "../../.."  # path to store the util package
############################################

# add local dir to sys path
sys.path.insert(0, util_path)  # the util package is supposed to be clone to this path

In [2]:
from util.model.surf.modified_cnn_model import (
    ModifiedPretrainedNet,
    SurfNet256,
)
from typing import Any, Dict, Union
from util.model.surf.dateset import SurfDatasetFromMat
from util.model.surf.pretrained_model import PretrainedModelDb
import torch.optim as optim


def define_surf_model(
    model_name: str,
    model_type: Union[int, str],
    suffix: str,
    pretrain: bool = True,
    set_type: str = "train",
    kwd: Dict[str, Any] = {
        "dropout": 0.2,
        "cnn_feature_ratio": 0.5,
        "data_root_path": "/hy-tmp/",
        "data_csv_filename": "DataNormilized.csv",
        "lr": 0.001,
        "num_params": 3,
        "num_output": 2,
    },
):
    """
    定义和配置一个用于表面模型的深度学习模型。

    参数:
    - model_name: 模型的名称。
    - model_type: 模型的类型，可以是整数或字符串。
    - suffix: 附加在模型名称和类型之后的后缀，用于区分不同的模型配置。
    - pretrain: 是否使用预训练的模型权重，默认为True。
    - kwd: 包含模型训练和配置所需的各种参数的字典。

    返回:
    一个包含模型配置和训练所需信息的字典，包括模型前缀、模型实例、数据集、优化器和损失函数。
    """

    # 初始化预训练模型数据库
    model_info_db = PretrainedModelDb()

    # 从数据库中获取指定模型和类型的信息
    train_model, model_weights, name_first_conv, name_fc = model_info_db.get_info(
        model_name, model_type
    )

    # 构造模型前缀，用于后续的日志记录或模型保存
    prefix = f"{model_name}{model_type}_{suffix}"

    # 创建一个修改过的预训练网络实例
    pnet = ModifiedPretrainedNet(
        pretrained_net=train_model,
        weights=model_weights if pretrain else None,
        name_first_conv=name_first_conv,
        name_fc=name_fc,
    )

    # 创建并配置最终的表面模型
    surf_model = SurfNet256(
        modified_net=pnet,
        num_params=kwd["num_params"],
        num_output=kwd["num_output"],
        dropout=kwd["dropout"],
        cnn_feature_ratio=kwd["cnn_feature_ratio"],
    )

    # 将模型移动到指定的设备上
    surf_model.to(device)

    # 创建并配置数据集
    dset = SurfDatasetFromMat(
        data_csv_filename=os.path.join(
            kwd["data_root_path"], set_type, kwd["data_csv_filename"]
        ),
        surf_data_dir=os.path.join(kwd["data_root_path"], set_type, "Surf"),
        param_start_idx=3,
        param_end_idx=6,
        num_targets=2,
    )

    # 创建优化器，用于模型参数的更新
    optimizer = optim.Adam(surf_model.parameters(), lr=kwd["lr"])

    # 定义损失函数，用于衡量模型预测值与真实值的差异
    loss_func = nn.MSELoss()

    # 返回包含模型配置和训练所需信息的字典
    return {
        "prefix": prefix,
        "train_model": surf_model,
        "dset": dset,
        "optimizer": optimizer,
        "loss_func": loss_func,
    }

In [3]:
kwd = {
    "dropout": 0.2,
    "cnn_feature_ratio": 0.5,
    "data_root_path": data_root_path,
    "data_csv_filename": "DataNormilized.csv",
    "lr": 0.001,
    "num_params": 3,
    "num_output": 2,
}
suffix = "input254_cv5_train10000"
model_info = define_surf_model(
    model_name="densenet",
    model_type="121",
    pretrain=False,
    suffix=suffix,
    set_type="test",
    kwd=kwd,
)
model_name = f"{model_info['prefix']}"
root_path = "../../../thesis/surfTopo/checkpoints"

best_checkpoint = torch.load(
    os.path.join(root_path, model_name, f"surf_{model_name}_best.ckpt")
)
# latest_checkpoint = torch.load(
#     os.path.join(root_path, model_name, f"surf_{model_name}_latest.ckpt")
# )

In [4]:
surf_model = model_info["train_model"]
surf_model.load_state_dict(best_checkpoint["model_state_dict"])
optimizer = model_info["optimizer"]
optimizer.load_state_dict(best_checkpoint["optimizer_state_dict"])
test_set = model_info["dset"]
loss_func = model_info["loss_func"]
test_loader = DataLoader(test_set, batch_size=1, shuffle=False)

In [5]:
for name, module in surf_model.named_modules():
    if isinstance(module, nn.ReLU):
        # print(name)
        setattr(module, "inplace", False)

In [None]:
from util.visual.torch.cnn.cam import ForwardModelBase


# TODO
# ! 对于我们的模型, 实现有相当的难度,需要仔细考虑. 或许可以找一下LRP for densenet作为参考.
# ! 暂时不完成本部分.
class LRP(ForwardModelBase):
    """
    Layer-wise relevance propagation with gamma+epsilon rule

    This code is largely based on the code shared in: https://git.tu-berlin.de/gmontavon/lrp-tutorial
    Some stuff is removed, some stuff is cleaned, and some stuff is re-organized compared to that repository.
    """

    def __init__(
        self,
        model: nn.Module,
        device: str = "cuda" if torch.cuda.is_available() else "cpu",
        verbose: bool = False,
    ):
        super(LRP, self).__init__(model, device, verbose)

    def LRP_forward(self, layer, input_tensor, gamma=None, epsilon=None):
        # This implementation uses both gamma and epsilon rule for all layers
        # The original paper argues that it might be beneficial to sometimes use
        # or not use gamma/epsilon rule depending on the layer location
        # Have a look a the paper and adjust the code according to your needs

        # LRP-Gamma rule
        if gamma is None:
            gamma = lambda value: value + 0.05 * copy.deepcopy(
                value.data.detach()
            ).clamp(min=0)
        # LRP-Epsilon rule
        if epsilon is None:
            eps = 1e-9
            epsilon = lambda value: value + eps

        # Copy the layer to prevent breaking the graph
        layer = copy.deepcopy(layer)

        # Modify weight and bias with the gamma rule
        try:
            layer.weight = nn.Parameter(gamma(layer.weight))
        except AttributeError:
            pass
            # print('This layer has no weight')
        try:
            layer.bias = nn.Parameter(gamma(layer.bias))
        except AttributeError:
            pass
            # print('This layer has no bias')
        # Forward with gamma + epsilon rule
        return epsilon(layer(input_tensor))

    def LRP_step(self, forward_output, layer, LRP_next_layer):
        # Enable the gradient flow
        forward_output = forward_output.requires_grad_(True)
        # Get LRP forward out based on the LRP rules
        lrp_rule_forward_out = self.LRP_forward(layer, forward_output, None, None)
        # Perform element-wise division
        ele_div = (LRP_next_layer / lrp_rule_forward_out).data
        # Propagate
        (lrp_rule_forward_out * ele_div).sum().backward()
        # Get the visualization
        LRP_this_layer = (forward_output * forward_output.grad).data

        return LRP_this_layer

    def generate(self, batch):
        layers_in_model = list(self.model._modules["features"]) + list(
            self.model._modules["classifier"]
        )
        number_of_layers = len(layers_in_model)
        # Needed to know where flattening happens
        features_to_classifier_loc = len(self.model._modules["features"])

        # Forward outputs start with the input image
        forward_output = [input_image]
        # Then we do forward pass with each layer
        for conv_layer in list(self.model._modules["features"]):
            forward_output.append(conv_layer.forward(forward_output[-1].detach()))

        # To know the change in the dimensions between features and classifier
        feature_to_class_shape = forward_output[-1].shape
        # Flatten so we can continue doing forward passes at classifier layers
        forward_output[-1] = torch.flatten(forward_output[-1], 1)
        for index, classifier_layer in enumerate(
            list(self.model._modules["classifier"])
        ):
            forward_output.append(classifier_layer.forward(forward_output[-1].detach()))

        # Target for backprop
        target_class_one_hot = torch.FloatTensor(1, 1000).zero_()
        target_class_one_hot[0][target_class] = 1

        # This is where we accumulate the LRP results
        LRP_per_layer = [None] * number_of_layers + [
            (forward_output[-1] * target_class_one_hot).data
        ]

        for layer_index in range(1, number_of_layers)[::-1]:
            # This is where features to classifier change happens
            # Have to flatten the lrp of the next layer to match the dimensions
            if layer_index == features_to_classifier_loc - 1:
                LRP_per_layer[layer_index + 1] = LRP_per_layer[layer_index + 1].reshape(
                    feature_to_class_shape
                )

            if isinstance(
                layers_in_model[layer_index],
                (torch.nn.Linear, torch.nn.Conv2d, torch.nn.MaxPool2d),
            ):
                # In the paper implementation, they replace maxpool with avgpool because of certain properties
                # I didn't want to modify the model like the original implementation but
                # feel free to modify this part according to your need(s)
                lrp_this_layer = self.LRP_step(
                    forward_output[layer_index],
                    layers_in_model[layer_index],
                    LRP_per_layer[layer_index + 1],
                )
                LRP_per_layer[layer_index] = lrp_this_layer
            else:
                LRP_per_layer[layer_index] = LRP_per_layer[layer_index + 1]
        return LRP_per_layer

In [6]:
from util.visual.torch.cnn.cam import GradCam

grad_cam = GradCam(model=surf_model, verbose=True)

Following target layer is available:
['conv0', 'norm0', 'relu0', 'pool0', 'denseblock1', 'transition1', 'denseblock2', 'transition2', 'denseblock3', 'transition3', 'denseblock4', 'norm5']


In [10]:
import matplotlib.pyplot as plt

input_kwds = {"cmap": "gist_rainbow"}

fdict = grad_cam.get_feature_layer_dict()
fig, axes = plt.subplots(1, len(fdict), figsize=(5 * len(fdict), 5))
for batch in test_loader:
    input_surf = batch[0][0].detach().numpy()
    for i, target_layer in enumerate(fdict):
        print(target_layer)

        cam, _ = grad_cam.generate_cam(batch, target_layer)
        axes[i].imshow(cam, **input_kwds, alpha=0.4)
    for ax in axes.ravel():
        ax.axis("off")

    plt.savefig("./fig/grad_cam_layers.png")
    plt.close()
    break

conv0
Target layer [conv0] found
norm0
Target layer [norm0] found
relu0
Target layer [relu0] found
pool0
Target layer [pool0] found
denseblock1
Target layer [denseblock1] found
transition1
Target layer [transition1] found
denseblock2
Target layer [denseblock2] found
transition2
Target layer [transition2] found
denseblock3
Target layer [denseblock3] found
transition3
Target layer [transition3] found
denseblock4
Target layer [denseblock4] found
norm5
Target layer [norm5] found


In [10]:
from util.visual.torch.cnn.cam import LayerCam

layer_cam = LayerCam(model=surf_model)
for batch in test_loader:
    cam, conv_out = layer_cam.generate_cam(batch, target_layer="conv0")
    break
input_kwds = {"cmap": "gist_rainbow"}
fig, axes = plt.subplots(2, 2, figsize=(20, 20))
axes[0, 0].imshow(input_surf[0], **input_kwds)
axes[0, 1].imshow(input_surf[1], **input_kwds)
axes[1, 0].imshow(input_surf[0] + input_surf[1], **input_kwds)
axes[1, 1].imshow(cam, **input_kwds, alpha=0.4)

for ax in axes.ravel():
    ax.axis("off")

plt.savefig("./fig/layer_cam.png")
plt.close()

In [11]:
from util.visual.torch.cnn.cam import ScoreCam

score_cam = ScoreCam(model=surf_model)
for batch in test_loader:
    cam, conv_out = score_cam.generate_cam(batch, target_layer="conv0")
    break
input_kwds = {"cmap": "gist_rainbow"}
fig, axes = plt.subplots(2, 2, figsize=(20, 20))
axes[0, 0].imshow(input_surf[0], **input_kwds)
axes[0, 1].imshow(input_surf[1], **input_kwds)
axes[1, 0].imshow(input_surf[0] + input_surf[1], **input_kwds)
axes[1, 1].imshow(cam, **input_kwds, alpha=0.4)

for ax in axes.ravel():
    ax.axis("off")

plt.savefig("./fig/score_cam.png")
plt.close()