In [2]:
import torch 
import torch.nn as nn
import numpy as np
import matplotlib.pyplot as plt

from concept_discovery.kmeans import kmeans_explore, kmeans_predict, plot_concepts
from imputer.load import load_imputer

from utils.utils import compute_significance, plot_effects, get_global, get_channel_wise

import subprocess
import json

### Data loading

In [4]:
x_train = np.load('data/x_train.npy')
x_val = np.load('data/x_val.npy')
x_test = np.load('data/x_test.npy')

y_train = np.load('data/y_train.npy')
y_val = np.load('data/y_val.npy')
y_test = np.load('data/y_test.npy')

### Concepts loading or discovery

If you have already defined concepts, you should load here them as 
'train_concepts', 'val_concepts', and 'test_concets' and skip the next concept discovery step.
Each of these should be of the shape (N_concepts, N_samples, Lengh, Channels) and should be binary masks. e.g. 0's where the concept is present, and 1's where don't. (as per diffusion model requierement).

In [None]:
# ks=number of clusters to investigate, it will return a plot to make a final decision

kmeans_explore(x_train,x_val,x_test,ks=10) 

In [None]:
# k=desired number of concepts

train_concepts,val_concepts,test_concepts = kmeans_predict(x_train,x_val,x_test,k=6)

np.save('data/train_concepts.npy', train_concepts)
np.save('data/val_concepts.npy', val_concepts)
np.save('data/test_concepts.npy', test_concepts)

In [None]:
# plot your concepts

max_lenght = x_test.shape[1] # length of the ts for plotting (can be smaller than the total)
leadsn = ["I","II","III","aVR","aVL","aVF","V1","V2","V3","V4","V5","V6"] # name of your channels
index = np.where(y_test==1)[0][0] # index of a class sample
data = x_test # data to use (we used test for the index, so update accordly)
concepts = test_concepts # concepts of the desired split (test again)
save_fig = True 
savename = 'concepts.pdf'

plot_concepts(max_length, leadsn, index, data, concepts, savefig=savefig, savename=savename)

### Classifier model loading or training

In this section, you can either load the model of your choice and assign it in a variable named 'classifier' or train an S4 model as in our main experiments paper

In [None]:
# you data should be in 'data/'

# Construct the command to run the script
command = [
    'python', 'classifier/main.py',
    '--input-size', max_length,
    '--architecture', 's4',
    '--precision', 32,
    '--s4-n', 8,
    '--s4-h', 512,
    '--batch-size', 32,
    '--epochs', 20,
    '--input-channels', len(leadsn)
]

# it will save your model as first version (if there isn't any already)
subprocess.run(command, capture_output=True, text=True)

In [None]:
# load the model configuration
with open('version_0/hparams.yaml') as f:
    hparams = yaml.load(f, Loader=SafeLoader)

namespace = Namespace(**hparams)

In [None]:
# load the model and keep it in evaluation mode
model = Main(hparams=namespace).cuda()
model.load_weights_from_checkpoint('version_0/best_model.ckpt')
classifier = model.eval()

### Diffusion model training

In [None]:
# create configuration files, one for each model

config_paths = []

for setting in ['class','norm']:

    config = {
        "diffusion_config": {
            "T": 200,
            "beta_0": 0.0001,
            "beta_T": 0.02
        },
        "wavenet_config": {
            "in_channels": len(leadsn), 
            "out_channels": len(leadsn),
            "num_res_layers": 36,
            "res_channels": 256, 
            "skip_channels": 256,
            "diffusion_step_embed_dim_in": 128,
            "diffusion_step_embed_dim_mid": 512,
            "diffusion_step_embed_dim_out": 512,
            "s4_lmax": x_train.shape[1],
            "s4_d_state": 64,
            "s4_dropout": 0.0,
            "s4_bidirectional": 1,
            "s4_layernorm": 1
        },
        "train_config": {
            "output_directory": f"/imputer/results/{setting}",
            "ckpt_iter": "max",
            "iters_per_logging": 100,
            "n_iters": 10000,
            "learning_rate": 2e-4,   
            "target_class": setting
        },
        "trainset_config": {
            "segment_length": x_train.shape[1]
        },
        "gen_config": {
            "output_directory": f"/imputer/results/{setting}",
            "ckpt_path": f"/imputer/results/{setting}/"
        }
    }

    # Save the configuration to a JSON file
    config_path = f'imputer/config_{setting}.json'
    with open(config_path, 'w') as f:
        json.dump(config, f, indent=4)
    
    config_paths.append(config_path)

In [None]:
config_paths

In [None]:
# train each of the models

for config_path in config_paths:
    
    command = [
    'python', 'imputer/train.py',
    '--config', config_path]

    result = subprocess.run(command, capture_output=True, text=True)

### Diffusion model generation

In [None]:
for config_path in config_paths:
    
    imputer = load_imputer(path_json=config_path,
                           ckpt_iter=10000)
    
    target_class = 'class'                                    
    target_generations = 40

    for mask_s, mask in zip(['A','B','C','D','E','F'], [test_concetps[0],
                                                             test_concepts[1],
                                                             test_concepts[2],
                                                             test_concepts[3],
                                                             test_concepts[4],
                                                             test_concepts[5]]):
        for g in range(1,target_generations+1):
            generation =  sampling(net,
                                  (len(x_test), x_test.shape[1], x_test.shape[2]),                                    
                                  diffusion_hyperparams, 
                                  cond = torch.from_numpy(samples).permute(0,2,1).cuda().float(),
                                  mask = torch.from_numpy(mask).permute(0,2,1).cuda().float())                                            

            outfile = f'{target_disease}_{mask_s}_{g}.npy'                  
            new_out = os.path.join(ckpt_path, outfile)
            np.save(new_out, generation.detach().cpu().numpy())
            print(outfile)

### Causal Concept Time Series Explainer (CusalConceptTS)

In [None]:
global_sig_dou_s, global_sig_doc_s = get_global(concepts=['A','B','C','D','E','F'], 
                                                target_generations=target_generations, 
                                                l_channels=x_test.shape[2])

channel_sig_dou_s, channel_sig_doc_s = get_channel_wise(concepts=['A','B','C','D','E','F'],
                                                        target_generations=target_generations, 
                                                        l_channels=x_test.shape[2])

In [None]:
plot_effects(global_sig_doc_g, 
         global_sig_doc_s, 
         global_do0_c, 
         channel_sig_doc_g, 
         channel_sig_doc_s, 
         channel_do0_c, 
         channels=leadsn,
        concepts=['A','B','C','D','E','F'],
         'do(causal).pdf')