In [6]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import importlib

import cbig.osama2024.dataloader as dataloader
import cbig.osama2024.model as model
importlib.reload(model)

#from cbig.osama2024.model import osama

import cbig.osama2024.misc as misc
from cbig.osama2024.model import MODEL_DICT


# load data
import pickle
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split  

In [15]:
# import integrated gradients from captum
from captum.attr import IntegratedGradients

  from .autonotebook import tqdm as notebook_tqdm


## Loading MinRNN model

In [5]:
model_checkpoint = "examples/output/model.pt"
data_path = "output/val.pkl"
dict_path = "output/model_state_dict.pt"

In [3]:
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
print(f"Using device: {device}")

Using device: cpu


### Data loading

In [7]:
# with open(data_path, 'rb') as f:
#     data = pickle.load(f, encoding='latin1')

with open(data_path, 'rb') as fhandler:
        data = pickle.load(fhandler)


In [8]:
nb_measures = len(data['train'].value_fields())

In [9]:
print(data['train'].value_fields())

['CDRSB' 'ADAS11' 'ADAS13' 'MMSE' 'RAVLT_immediate' 'RAVLT_learning'
 'RAVLT_forgetting' 'RAVLT_perc_forgetting' 'MOCA' 'FAQ' 'Entorhinal'
 'Fusiform' 'Hippocampus' 'ICV' 'MidTemp' 'Ventricles' 'WholeBrain' 'AV45'
 'FDG' 'ABETA_UPENNBIOMK9_04_19_17' 'TAU_UPENNBIOMK9_04_19_17'
 'PTAU_UPENNBIOMK9_04_19_17']


In [10]:
print(data.keys())

dict_keys(['baseline', 'pred_start', 'duration', 'mean', 'stds', 'VentICVstd', 'train', 'test'])


### Model param initialization

In [11]:
model_type = 'MinRNN'
nb_classes = 3
nb_layers = 1
h_size = 512
h_drop = 0.1
i_drop = 0.1
lr = 5e-4
weight_decay = 5e-7
verbose = True

In [12]:
model_class = MODEL_DICT[model_type]
model = model_class(
    nb_classes = nb_classes,
    nb_measures = nb_measures,
    nb_layers = nb_layers,
    h_size = h_size,
    h_drop = h_drop,
    i_drop = i_drop)

setattr(model, 'mean', data['mean'])
setattr(model, 'std', data['stds'])

In [13]:
log = print if verbose else lambda *x, **i: None
device = torch.device(
        'cuda') if torch.cuda.is_available() else torch.device('cpu')
model.to(device)
log(model)

optimizer = torch.optim.Adam(
    model.parameters(), lr=lr, weight_decay=weight_decay)


MinimalRNN(
  (hid2category): Linear(in_features=512, out_features=3, bias=True)
  (hid2measures): Linear(in_features=512, out_features=22, bias=True)
  (cells): ModuleList(
    (0): MinimalRNNCell(
      (W): Linear(in_features=25, out_features=512, bias=True)
    )
  )
)


### Loading trained model states

In [14]:
# Load dictionary into model
model.load_state_dict(torch.load(dict_path))
model.to(device)

MinimalRNN(
  (hid2category): Linear(in_features=512, out_features=3, bias=True)
  (hid2measures): Linear(in_features=512, out_features=22, bias=True)
  (cells): ModuleList(
    (0): MinimalRNNCell(
      (W): Linear(in_features=25, out_features=512, bias=True)
    )
  )
)

In [17]:
ig = IntegratedGradients(model)

# Apply Integrated Gradients to the chosen data point
attributions, approximation_error = ig.attribute(data_point, target=target_class_index, return_convergence_delta=True)

# The attributions tensor contains the sensitivity of the model's output to each input feature
print("Attributions:", attributions)
print("Approximation Error (Delta):", approximation_error.item())

NameError: name 'data_point' is not defined

### Applying Integrated Gradient

In [16]:
def apply_integrated_gradients(model, data_point, target_class_index=0):
    # Create an instance of IntegratedGradients for the RNN model
    ig = IntegratedGradients(model)

    # Apply Integrated Gradients to the chosen data point
    attributions, approximation_error = ig.attribute(data_point, target=target_class_index, return_convergence_delta=True)

    # The attributions tensor contains the sensitivity of the model's output to each input feature
    print("Attributions:", attributions)
    print("Approximation Error (Delta):", approximation_error.item())

In [95]:
def apply_integrated_gradients(model, cat_seq, value_seq, time_seq, target_class_index=0):
    """
    Apply Integrated Gradients to the given model and data point.
    
    Args:
        model: Trained PyTorch model.
        cat_seq: Sequence of categorical features.
        value_seq: Sequence of numerical features.
        time_seq: Sequence of time points.
        target_class_index: Index of the target class for which to compute attributions.
        
    Returns:
        attributions: Integrated Gradients attributions for the input features.
        approximation_error: Approximation error computed during the attribution process.
    """
    # Create an instance of IntegratedGradients for the RNN model
    ig = IntegratedGradients(model)

    # Concatenate categorical and numerical features along the time dimension
    input_seq = torch.cat([cat_seq, value_seq], dim=-1)

    # Apply Integrated Gradients to the input sequence
    attributions, approximation_error = ig.attribute(input_seq, target=target_class_index, return_convergence_delta=True)

    # Return the computed attributions and approximation error
    return attributions, approximation_error


test_data = data['test']
    
attributions_list = []


# Iterate over individual data points in the test data
for i in range(len(test_data)):
    data_point = test_data.next()  # Get the next data point using the 'next' method of the Sorted object
    
    # Convert data point to appropriate format for applying Integrated Gradients
    cat_seq = data_point['cat']
    value_seq = data_point['val']
    time_seq = data_point['tp']

    # Convert data to tensors and move to device
    cat_seq = torch.tensor(cat_seq, dtype=torch.float32).to(device)
    value_seq = torch.tensor(value_seq, dtype=torch.float32).to(device)
    time_seq = torch.tensor(time_seq, dtype=torch.float32).to(device)

    # Apply Integrated Gradients to the current data point
    attributions = apply_integrated_gradients(model, cat_seq, value_seq, time_seq)

    # Append attributions to the list
    attributions_list.append(attributions)

TypeError: RnnModelInterp.forward() missing 1 required positional argument: '_val_seq'

In [17]:
# device = torch.device(
#         'cuda') if torch.cuda.is_available() else torch.device('cpu')
# model = torch.load(args.checkpoint)
# model.to(device)

# with open(args.data, 'rb') as fhandler:
#     data = pickle.load(fhandler)

# Assuming 'test' contains your test data

#import dataloader
import torch
from torch.utils.data import DataLoader, TensorDataset


test_loader = DataLoader(TensorDataset(*data['test']), batch_size=64, shuffle=False)

# Choose a data point from the test set for applying Integrated Gradients
data_point, _ = next(iter(test_loader))
data_point = data_point.to(device)

# Apply Integrated Gradients
apply_integrated_gradients(model, data_point)

AttributeError: 'list' object has no attribute 'size'

### Original Predict function of RNN model

In [None]:
def predict_subject(model, cat_seq, value_seq, time_seq):
    """
    Predict Alzheimer’s disease progression for a subject
    Args:
        model: trained pytorch model
        cat_seq: sequence of diagnosis [nb_input_timpoints, nb_classes]
        value_seq: sequence of other features [nb_input_timpoints, nb_features]
        time_seq: months from baseline [nb_output_timpoints, nb_features]
    nb_input_timpoints <= nb_output_timpoints
    Returns:
        out_cat: predicted diagnosis
        out_val: predicted features
    """
    in_val = np.full((len(time_seq), ) + value_seq.shape[1:], np.nan)
    in_val[:len(value_seq)] = value_seq

    in_cat = np.full((len(time_seq), ) + cat_seq.shape[1:], np.nan)
    in_cat[:len(cat_seq)] = cat_seq

    with torch.no_grad():
        out_cat, out_val = model(in_cat, in_val)
    out_cat = out_cat.cpu().numpy()
    out_val = out_val.cpu().numpy()

    assert out_cat.shape[1] == out_val.shape[1] == 1

    return out_cat, out_val


def predict(model, dataset, pred_start, duration, baseline):
    """
    Predict Alzheimer’s disease progression using a trained model
    Args:
        model: trained pytorch model
        dataset: test data
        pred_start (dictionary): the date at which prediction begins
        duration (dictionary): how many months into the future to predict
        baseline (dictionary): the baseline date
    Returns:
        dictionary which contains the following key/value pairs:
            subjects: list of subject IDs
            DX: list of diagnosis prediction for each subject
            ADAS13: list of ADAS13 prediction for each subject
            Ventricles: list of ventricular volume prediction for each subject
    """
    model.eval()
    ret = {'subjects': dataset.subjects}
    ret['DX'] = []  # 1. likelihood of NL, MCI, and Dementia
    ret['ADAS13'] = []  # 2. (best guess, upper and lower bounds on 50% CI)
    ret['Ventricles'] = []  # 3. (best guess, upper and lower bounds on 50% CI)
    ret['dates'] = misc.make_date_col(
        [pred_start[s] for s in dataset.subjects], duration)

    col = ['ADAS13', 'Ventricles', 'ICV']
    indices = misc.get_index(list(dataset.value_fields()), col)
    mean = model.mean[col].values.reshape(1, -1)
    stds = model.stds[col].values.reshape(1, -1)

    for data in dataset:
        rid = data['rid']
        all_tp = data['tp'].squeeze(axis=1)
        start = misc.month_between(pred_start[rid], baseline[rid])
        assert np.all(all_tp == np.arange(len(all_tp)))
        mask = all_tp < start
        itime = np.arange(start + duration)
        icat = np.asarray(
            [misc.to_categorical(c, 3) for c in data['cat'][mask]])
        ival = data['val'][:, None, :][mask]

        ocat, oval = predict_subject(model, icat, ival, itime)
        oval = oval[-duration:, 0, indices] * stds + mean

        ret['DX'].append(ocat[-duration:, 0, :])
        ret['ADAS13'].append(misc.add_ci_col(oval[:, 0], 1, 0, 85))
        ret['Ventricles'].append(
            misc.add_ci_col(oval[:, 1] / oval[:, 2], 5e-4, 0, 1))

    return ret

In [None]:
def predict_subject_with_gradients(model, cat_seq, value_seq, time_seq, target_class_index=0):
    """
    Predict Alzheimer’s disease progression for a subject and calculate Integrated Gradients
    Args:
        model: trained PyTorch model
        cat_seq: sequence of diagnosis [nb_input_timpoints, nb_classes]
        value_seq: sequence of other features [nb_input_timpoints, nb_features]
        time_seq: months from baseline [nb_output_timpoints, nb_features]
        target_class_index: index of the target class for Integrated Gradients
    nb_input_timpoints <= nb_output_timpoints
    Returns:
        out_cat: predicted diagnosis
        out_val: predicted features
        attributions: Integrated Gradients attributions
        approximation_error: Delta value for approximation error
    """
    in_val = np.full((len(time_seq), ) + value_seq.shape[1:], np.nan)
    in_val[:len(value_seq)] = value_seq

    in_cat = np.full((len(time_seq), ) + cat_seq.shape[1:], np.nan)
    in_cat[:len(cat_seq)] = cat_seq

    in_val_tensor = torch.tensor(in_val, dtype=torch.float32, device=device)
    in_cat_tensor = torch.tensor(in_cat, dtype=torch.float32, device=device)

    in_val_tensor.requires_grad_()
    in_cat_tensor.requires_grad_()

    # Forward pass to get predictions
    with torch.no_grad():
        out_cat, out_val = model(in_cat_tensor, in_val_tensor)

    # Backward pass to calculate Integrated Gradients
    ig = IntegratedGradients(model)
    attributions, approximation_error = ig.attribute((in_cat_tensor, in_val_tensor), target=target_class_index, return_convergence_delta=True)

    out_cat = out_cat.cpu().numpy()
    out_val = out_val.cpu().numpy()
    attributions = attributions.cpu().numpy()

    assert out_cat.shape[1] == out_val.shape[1] == 1

    return out_cat, out_val, attributions, approximation_error.item()



In [None]:
def get_input_sequences_for_subject(dataset, subject_id):
    # Assuming your dataset is a dictionary with keys 'cat', 'val', and 'time'
    # Each key corresponds to the diagnosis, values, and time sequences for a subject
    # Replace this with your actual data retrieval logic

    icat = dataset['cat'][subject_id]
    ival = dataset['val'][subject_id]
    itime = dataset['time'][subject_id]

    return icat, ival, itime

In [None]:
# Example usage:
rid = "6"  # Replace with an actual subject ID
icat, ival, itime = get_input_sequences_for_subject(data, rid)  # Replace with your actual function to get input sequences

out_cat, out_val, attributions, approximation_error = predict_subject_with_gradients(model, icat, ival, itime)
