# Prepare Data

## Physionet Challenge Dataset

In [1]:
import pandas as pd
import numpy as np
from tcav import *
import torch

In [2]:
physionet_df = pd.read_csv("./physionet.csv")

In [None]:
physionet_df.source.value_counts()

In [4]:
label_list = [
    "atrial fibrillation",
    "atrial flutter",
    "bundle branch block",
    "bradycardia",
    "complete left bundle branch block, left bundle...", #
    "complete right bundle branch block, right bund...",
    "1st degree av block",
    "incomplete right bundle branch block",
    "left axis deviation", 
    "left anterior fascicular block",
    "prolonged pr interval",
    "low qrs voltages",
    "prolonged qt interval",
    "nonspecific intraventricular conduction disorder",
    "sinus rhythm", #
    "premature atrial contraction, supraventricular...",
    "pacing rhythm",
    "poor R wave Progression",
    "premature ventricular contractions, ventricula...",
    "qwave abnormal", #
    "right axis deviation",
    "sinus arrhythmia",
    "sinus bradycardia",
    "sinus tachycardia",
    "t wave abnormal", #
    "t wave inversion"
]



## add sublabel

In [5]:
physionet_df['26'] = physionet_df['0'] | physionet_df['1']
label_list.append("atrial fibrillation+atrial flutter")

physionet_df['27'] = physionet_df['24'] | physionet_df['25']
label_list.append("t wave abnormal + t wave inversion ")


In [None]:
label_list = np.array(label_list)
for idx,label_name in enumerate(label_list):
    print(idx, label_name)

In [None]:
import matplotlib.pyplot as plt

hist_label_list = np.array(label_list[:])
label_dist = np.array(physionet_df[[str(i) for i in range(len(hist_label_list))]].sum().tolist())
order_idx_list = np.argsort(label_dist)

# Assume you have these two lists
labels = hist_label_list[order_idx_list]
counts = label_dist[order_idx_list]

plt.figure(figsize=(10, 6))  # Optional: You can adjust the size of the figure

plt.bar(range(len(labels)), counts, color='skyblue', edgecolor='black')

plt.xticks(range(len(labels)), labels, rotation=270)

plt.title('Physionet label distribution')  # Title of the plot
plt.xlabel('Labels')  # X-axis label
plt.ylabel('Counts')  # Y-axis label

plt.show()


## Make Target and Random Concept Dataset
- Prioritize extracting data with a single clear label from the multilabel data
- Ensure the distribution of source_id is as uniform as possible within each label
- For random control, randomly extract data from the remaining data to match the source_id distribution


In [8]:
from random import shuffle
import random

In [None]:
selected_idx_list = sorted([4,26,5,8,20,12,13,19,27,21,22,23])#

random_concept_n:int = 10 # how many random concept to make
sample_n = 200 # how many sample to make
random_seed = 777 # random seed

concept_file_dict = dict()

total_file_set= set(physionet_df.filename.tolist()) #
control_file_set = set(physionet_df.filename.tolist()) #random control file pool: 

for idx in np.argsort(label_dist[selected_idx_list]): # least label first
    select_idx = selected_idx_list[idx]
    name = label_list[select_idx]
    print(name)
    
    target_df = physionet_df[physionet_df[str(select_idx)]==1].copy()
    target_df['count'] = target_df[[str(i) for i in range(len(label_list))]].sum(axis=1).tolist()
        
    exist_file_df = pd.DataFrame(total_file_set,columns=['filename'])
    target_df = pd.merge(target_df,exist_file_df,on='filename',how='inner')

    random.seed(random_seed)
    random_number = list(range(len(target_df)))
    shuffle(random_number)
    target_df['random_seed'] = random_number
    
    source_list = target_df.source.value_counts(ascending=True).index.tolist()
    source_sample_list = list()
    
    remain_n = sample_n
    each_n = int(sample_n/len(source_list))
    
    for i,source in enumerate(source_list):
        
        source_sample_df = target_df[target_df.source==source]
        if i==len(source_list)-1:
            target_sample_df = source_sample_df.sort_values(['count','random_seed']).head(remain_n)
        else:
            target_sample_df = source_sample_df.sort_values(['count','random_seed']).head(each_n)
            remain_n -=len(target_sample_df)
        print(name,source,target_sample_df.shape)
        source_sample_list.append(target_sample_df)
    
    target_sample_df = pd.concat(source_sample_list)
    file_list= target_sample_df.filename.tolist()
    concept_file_dict[name] = target_sample_df
    
    total_file_set = total_file_set-set(file_list)
    control_file_set = control_file_set-set(target_df.filename.tolist())
    
    
    if len(file_list)<sample_n:
        print(f"[Caution]{name} label is insufficient, file_n: {len(file_list)}")
    else:
        print(f"[Success]{name} label is prepared, sample_n: {len(file_list)}")

remain_n = sample_n
for random_idx in range(random_concept_n):
    random_sample_df = pd.DataFrame(list(control_file_set),columns=['filename'])
    random_sample_df = pd.merge(random_sample_df,physionet_df,on='filename',how='inner')
    
    each_n = int(sample_n/len(random_sample_df.source.unique()))
    
    random_sample_df = random_sample_df.groupby('source').sample(each_n,random_state=random_seed)
    
    concept_file_dict[f"random_concept_{random_idx}"] = random_sample_df
        
    print(f"[Success] random{random_idx} label is prepared, sample_n: {len(file_list)}")
    

In [None]:
#check random label list
random_sample_df = pd.DataFrame(list(control_file_set),columns=['filename'])
random_sample_df = pd.merge(random_sample_df,physionet_df,on='filename',how='inner')
print(random_sample_df.shape)
for col in selected_idx_list:
    print(random_sample_df[str(col)].sum())

In [None]:
#check random label list
random_label_list = label_list[random_sample_df[[str(i) for i in range(0,28)]].sum(axis=0)!=0]
random_label_list

## check concept dist

In [None]:
count_df_list = list()
for name, target_df in concept_file_dict.items():
    count_df = pd.DataFrame(target_df.source.value_counts()).T.rename({'source':name})
    count_df_list.append(count_df)
    target_df.source.hist(label=name)
    plt.legend()
    plt.show()

# Setting for TCAV analysis
- target classifier and target_label dataframe are required
- USER must define following code for their research setting
- USER CUSTUM CODE: get_ecg_tensor, TCAV_dataset,model_inference

In [14]:
"""
[Example]
ecg_tensor = get_ecg_tensor(filename)
classifier = get_model(model_path)
classifier(ecg_tensor)
"""


def get_ecg_tensor(filename,)->torch.Tensor:
    """USER CUSTUM FUNCTION
    USER have to define this function
    input: filename, etc...
    output: ecg_tensor for model input
    """
    
    return ecg_tensor

class TCAV_dataset(torch.utils.data.Dataset):
    """USER CUSTUM CLASS
    USER have to define this class
    input: file_df, device, etc...
    """
    def __init__(self,file_df, device):
        self.file_df = file_df.reset_index(drop=True)
        self.device = device
    
    def __getitem__(self,index)->torch.Tensor:
        filename = self.file_df.loc[index].filename
        output = get_ecg_tensor(filename)
        return output.to(self.device)
    
    def __len__(self):
        return len(self.file_df)
    
    
def get_model(model_path)->torch.nn.Module:
    """
    USER CUSTUM FUNCTION
    USER have to define this function
    input: model_path, etc...
    output: model for TCAV analaysis
    """
    return model


In [None]:
device = "cuda:3"
label_df_path = "LVSD_label.csv" # this is label file for target concept
label_df = pd.read_csv(label_df_path)

label_df_target = label_df[label_df['1']>0.5].sample(1000,random_state=random_seed)
target_tensor_list = list()

for oid in label_df_target.filename:
    try:
        out = get_ecg_tensor(oid)
        target_tensor_list.append(out)
    except:
        pass
    
target_tensor = torch.stack(target_tensor_list).squeeze()

In [None]:
model_path = "checkpoint.pth" #this is model path for TCAV analysis
classifier = get_model(model_path)

classifier.eval()
classifier.to(device)

# TCAV with captum

In [None]:

from captum.attr import LayerIntegratedGradients

from captum.concept import TCAV
from captum.concept import Concept

from captum.concept._utils.data_iterator import dataset_to_dataloader
from captum.concept._utils.common import concepts_to_str

In [None]:
tcav_concept_dict = dict()

In [None]:
for idx, (name,concept_df) in enumerate(concept_file_dict.items()):
    tcav_dataset = TCAV_dataset(concept_df,device)
    concept_iter = dataset_to_dataloader(tcav_dataset)
    tcav_concept = Concept(idx,name,concept_iter)
    tcav_concept_dict[name] = tcav_concept

In [None]:
layers = ["blk1d.0.2.conv2","blk1d.1.2.conv2","blk1d.2.2.conv2","blk1d.3.2.conv2"] # this is layer name in model for TCAV analysis
tcav_concept_dict.keys()
mytcav = TCAV(model=classifier,layers=layers,
              layer_attr_method =LayerIntegratedGradients(classifier, None, multiply_by_inputs=False) ) #
print(tcav_concept_dict.keys())
list(tcav_concept_dict.values())
experimental_set_rand = [list(tcav_concept_dict.values())]
tcav_scores_w_random = mytcav.interpret(inputs=target_tensor,
                                        experimental_sets=experimental_set_rand,
                                        target=1,
                                        n_steps=5)

In [None]:
from tcav import plot_tcav_scores
plot_tcav_scores(experimental_set_rand,tcav_scores_w_random,layers)

## statistical signification test

In [None]:
tcav_concept_dict.keys()

In [None]:
exp_sets_for_each = list()

for concept_name in tcav_concept_dict.keys():
    
    if "random_concept" in concept_name:
        continue
    
    experimental_sets = list()
    target_concept = tcav_concept_dict[concept_name]
    random_concepts = [tcav_concept_dict[f"random_concept_{i}"] for i in range(0, random_concept_n)]


    experimental_sets.extend([[target_concept, random_concept] for random_concept in random_concepts])
    
    exp_sets_for_each.append(experimental_sets)

In [None]:
block_tcav_result_list = list()
block_tcav_random_score_list = list()
score_type = "sign_count" #'magnitude'
for block_n in [0,1,2,3]:
    target_layer = f'blk1d.{block_n}.2.conv2' #your layer name in model
    
    p_val_out_list = list()
    random_score_each_block=list()
    for target_exp_set in exp_sets_for_each:
        out = get_confidnece_plot(mytcav,target_exp_set,target_layer,score_type,target_tensor,device,label_name=target_exp_set[0][0].name)
        p_val_out_list.append(out)
        random_score_each_block.append(out[-1])


    name_list = [target_exp_set[0][0].name for target_exp_set in exp_sets_for_each]
    mean_list = [out[1][0] for out in p_val_out_list]
    h_list = [out[1][1] for out in p_val_out_list]
    block_tcav_result_list.append([mean_list,h_list])
    block_tcav_random_score_list.append(random_score_each_block)




In [None]:
from itertools import chain
block_tcav_radom_result_list = [mean_confidence_interval(list(chain(*block_tcav_random_score_list[i]))) for i in  [0,1,2,3]]
print(block_tcav_radom_result_list)

In [None]:
reindex_list = [6,0,1,5,7,4,8,3,2,-1]

block_tav_score_list =np.array([
    block_tcav_result_list[0][0]+[block_tcav_radom_result_list[0][0]],
    block_tcav_result_list[1][0]+[block_tcav_radom_result_list[1][0]],
    block_tcav_result_list[2][0]+[block_tcav_radom_result_list[2][0]],
    block_tcav_result_list[3][0]+[block_tcav_radom_result_list[3][0]]])

block_tav_score_list=block_tav_score_list[:,reindex_list]


block_tav_ci_list =np.array([
    block_tcav_result_list[0][1]+[block_tcav_radom_result_list[0][1]],
    block_tcav_result_list[1][1]+[block_tcav_radom_result_list[1][1]],
    block_tcav_result_list[2][1]+[block_tcav_radom_result_list[2][1]],
    block_tcav_result_list[3][1]+[block_tcav_radom_result_list[3][1]]])
block_tav_ci_list = block_tav_ci_list[:,reindex_list]

In [None]:
from tcav import draw_heatmap
matrix = block_tav_score_list
rows = ["Block1", "Block2", "Block3","Block4"]
cols = list(np.array(list(tcav_concept_dict)[:])[reindex_list])
fig = draw_heatmap(matrix,block_tav_ci_list, row_names=rows, col_names=cols, cell_width=2, cell_height=1, vmin=0.1, vmax=1,cmap='Reds')


In [None]:
fig.savefig('TCAV_block_figure.png',dpi=250)