In [1]:
import torch
import numpy as np

In [3]:
"""
        Compute importance score for a sample x, over time and features
        :param x: Sample instance to evaluate score for. Shape:[batch, features, time]
        :param n_samples: number of Monte-Carlo samples
        :return: Importance score matrix of shape:[batch, features, time]
        """
a = np.array([
    [[1,2,3,4],[5,6,7,8],[9,10,11,12],[13,14,15,16]],
    [[1,2,3,4],[5,6,7,8],[9,10,11,12],[13,14,15,16]],
    [[1,2,3,4],[5,6,7,8],[9,10,11,12],[13,14,15,16]],
    [[1,2,3,4],[5,6,7,8],[9,10,11,12],[13,14,15,16]],])
a

array([[[ 1,  2,  3,  4],
        [ 5,  6,  7,  8],
        [ 9, 10, 11, 12],
        [13, 14, 15, 16]],

       [[ 1,  2,  3,  4],
        [ 5,  6,  7,  8],
        [ 9, 10, 11, 12],
        [13, 14, 15, 16]],

       [[ 1,  2,  3,  4],
        [ 5,  6,  7,  8],
        [ 9, 10, 11, 12],
        [13, 14, 15, 16]],

       [[ 1,  2,  3,  4],
        [ 5,  6,  7,  8],
        [ 9, 10, 11, 12],
        [13, 14, 15, 16]]])

### 测试FO及AFO

In [None]:
c = a.copy()
c[:, 1, 1] = np.random.uniform(-3, +3, size=(len(a),))
c

### 测试TFS

In [None]:
x_o = a[:,:,0:2]
z_o = a[:,:,np.random.randint(0, a.shape[1], dtype='int')]
print(x_o.shape)
print(z_o.shape)
print(x_o)
print(z_o)

In [None]:
x_o[:,3:,-1] = z_o[:,3:]
x_with_j = x_o
print(x_with_j)
x_o[:,2:,-1] = z_o[:,2:]
x_no_j = x_o
print(x_no_j)

In [None]:
t_len = 4
n_features = 4
n_samples = 3
for t in range(1, t_len):

            for i in range(n_features):
                div_all=[]
                x_o = a[:,:,0:t+1].copy()
                for _ in range(n_samples):
                    z_o = a[:,:,np.random.randint(0, a.shape[1], dtype='int')].copy()
                    x_o[:,i+1:,t] = z_o[:,i+1:]
                    x_with_j = x_o
                    x_o[:,i:,t] = z_o[:,i:]
                    x_no_j = x_o

In [None]:
class TFS:
    def __init__(self, model, activation=torch.nn.Softmax(-1)):
        self.device = 'cuda' if torch.cuda.is_available() else 'cpu'
        self.base_model = model.to(self.device)
        self.activation = activation

    def attribute(self, x, y, n_samples=10):
        """
        Compute importance score for a sample x, over time and features
        :param x: Sample instance to evaluate score for. Shape:[batch, features, time]
        :param n_samples: number of Monte-Carlo samples
        :return: Importance score matrix of shape:[batch, features, time]
        """
        x = x.to(self.device)
        _, n_features, t_len = x.shape
        score = np.zeros(list(x.shape))

        for t in range(1, t_len):

            for i in range(n_features):
                div_all=[]
                for _ in range(n_samples):
                    x_o = x[:,:,t].clone()
                    z_o = x[:,:,np.random.randint(0, x.shape[1], dtype='int')].clone()
                    x_o[:,i:,t] = z_o[:,i:]
                    x_with_j = x_o
                    x_o[:,i-1:,t] = z_o[:,i-1:]
                    x_no_j = x_o
                    y_with_j = self.activation(self.base_model(x_with_j))
                    y_no_j = self.activation(self.base_model(x_no_j))
                    div = torch.abs(y_with_j-y_no_j)
                    div_all.append(np.mean(div.detach().cpu().numpy(), -1))
                E_div = np.mean(np.array(div_all),axis=0)
                score[:, i, t] = E_div
        return score

In [None]:
import abc
import logging
class BaseExplainer(abc.ABC):
    """
    A base class for explainer.
    """

    def __init__(self, device=None):
        """
        Constructor.

        Args:
            device:
               The torch device.
        """
        self.base_model: TorchModel | None = None
        self.device = resolve_device(device)

    @abc.abstractmethod
    def attribute(self, x):
        """
        The attribution method that the explainer will give.
        Args:
            x:
                The input tensor.

        Returns:
            The attribution with respect to x. The shape should be the same as x, or it could
            be one dimension greater than x if there is aggregation needed.

        """

    def train_generators(
        self, train_loader, valid_loader, num_epochs=300
    ) -> GeneratorTrainingResults | None:
        """
        If the explainer or attribution method needs a generator, this will train the generator.

        Args:
            train_loader:
                The dataloader for training
            valid_loader:
                The dataloader for validation.
            num_epochs:
                The number of epochs.

        Returns:
            The training results for the generator, if applicable. This includes the
            training curves.

        """
        return None

    def test_generators(self, test_loader) -> float | None:
        """
        If the explainer or attribution method needs a generator, this will return the performance
        of the generator on the test set.

        Args:
            test_loader:
                The dataloader for testing.

        Returns:
            The test result (MSE) for the generator, if applicable.

        """
        return None

    def load_generators(self) -> None:
        """
        If the explainer or attribution method needs a generator, this will load the generator from
        the disk.
        """

    def set_model(self, model, set_eval=True) -> None:
        """
        Set the base model the explainer wish to explain.

        Args:
            model:
                The base model.
            set_eval:
                Indicating whether we set to eval mode for the explainer. Note that in some cases
                like Dynamask or FIT, they do not set the model to eval mode.
        """
        self.base_model = model
        if set_eval:
            self.base_model.eval()
        self.base_model.to(self.device)

    @abc.abstractmethod
    def get_name(self):
        """
        Return the name of the explainer.
        """

### 测试MIMIC数据上的TFS

In [None]:
class TFSExplainer(BaseExplainer):

    def __init__(self, device, n_samples=10, **kwargs):
        super().__init__(device)
        self.n_samples = n_samples
        if len(kwargs) > 0:
            log = logging.getLogger(TFSExplainer.__name__)
            log.warning(f"kwargs is not empty. Unused kwargs={kwargs}")

    def attribute(self, x):
        """
        Compute importance score for a sample x, over time and features
        :param x: Sample instance to evaluate score for. Shape:[batch, features, time]
        :param n_samples: number of Monte-Carlo samples
        :return: Importance score matrix of shape:[batch, features, time]
        """
        self.base_model.eval()
        self.base_model.zero_grad()

        x = x.to(self.device)
        _, n_features, t_len = x.shape
        score = np.zeros(list(x.shape))

        for t in range(1, t_len):

            for i in range(n_features):
                div_all=[]
                x_o = x[:,:,0:t+1].clone()
                for _ in range(self.n_samples):
                    z_o = x[:,:,np.random.randint(0, x.shape[1], dtype='int')].clone()
                    x_o[:,i+1:,t] = z_o[:,i+1:]
                    x_with_j = x_o
                    x_o[:,i:,t] = z_o[:,i:]
                    x_no_j = x_o
                    y_with_j = self.base_model.predict(self.base_model(x_with_j))
                    y_no_j = self.base_model.predict(self.base_model(x_no_j))
                    div = torch.abs(y_with_j-y_no_j)
                    div_all.append(np.mean(div.detach().cpu().numpy(), -1))
                E_div = np.mean(np.array(div_all),axis=0)
                score[:, i, t] = E_div
        return score

    def get_name(self):
        if self.n_samples != 10:
            return f"tfs_sample_{self.n_samples}"
        return "tfs"