# imports 

In [15]:
from captum.attr import ShapleyValueSampling
from tqdm import trange

from load_data import load_data
from train_models import *
from segmentation import *
from utils import *
import torch
import os
from tqdm import tqdm

# device for torch

In [16]:
from torch.cuda import is_available as is_GPU_available
device = "cuda" if is_GPU_available() else "cpu"

# dictionary mapping predictors to torch vs other, step necessary for Captum 
predictors = {
	'torch' : ['resNet'],
	'scikit' : ['miniRocket','randomForest']
}
segmentation_dict = {"clasp":get_claSP_segmentation, "infogain": get_InformationGain_segmentation, "greedygaussian": get_GreedyGaussian_segmentation, "equal": get_equal_segmentation, "nnsegment": get_NNSegment_segmentation}

# hyper-parameters

In [17]:
# hyperparameters
n_background = 50

# settings
dataset_names = {'gunpointz'}
predictor_names = {'randomForest'} # {"randomForest", 'miniRocket', 'resNet'}
segmentation_names = {"clasp","greedygaussian", "equal", "nnsegment"}  #, "infogain"
background_names = {"average", "sampling", "zero"} #{"average", "zero", "sampling"}
normalization_names = {"default", "normalized"}

demo_mode = False
# demo
if demo_mode:
    dataset_names = {'gunpoint'}
    predictor_names = {"randomForest"}
    segmentation_names = {"clasp"}
    background_names = {"average", "zero"}
    normalization_names = {"default", "normalized"}

In [18]:
# results_demo = {"dataset_name":
#                     {'segmentations' :
#                             {"segmentation_name":
#                                 {"predictors":
#                                         {"predictor_name":{
#                                                 "backgrounds":{
#                                                     "background_name":{
#                                                             "default": None, "normalized": None},
#                                                 },
#                                                 'y_test_pred' : None,        
#                                         }, 
#                                 "segments": None},
# }
#                             },
#                     'y_test_true': None,
#                     'label_mapping': None}
#                         }

In [19]:
results = {'y_test_true':
                {"dataset_name":None},
            'label_mapping': 
                {"dataset_name":None},
            "segments":
                {"dataset_name":
                    {"segmentation_name":None}
                },
            'y_test_pred':
                {"dataset_name":
                    {"segmentation_name":
                        {"predictor_name": None}
                    }
                },
            "attributions":
                {"dataset_name":
                    {"segmentation_name":
                        {"predictor_name":
                            {"background_name":
                                {"normalization_name": None}
                            }
                        }
                    }
                }
            }

# train model

In [26]:
results = dict.fromkeys(('y_test_true', 'label_mapping', "segments", 'y_test_pred', "attributions"))
for key in results.keys():
    results[key] = dict.fromkeys(dataset_names)
normalization_names = normalization_names | {"default"}

for dataset_name in dataset_names:
    # init dataset
    # load data
    X_train, X_test, y_train, y_test, enc = load_data(subset='all', dataset_name=dataset_name)
    # for debugging only
    if demo_mode:
        X_test = X_test[:2]
        y_test = y_test[:2]
    n_samples, n_chs, ts_length = X_test.shape

    results['y_test_true'][dataset_name] = y_test
    results['label_mapping'][dataset_name] = enc
    results["attributions"][dataset_name] = dict.fromkeys(segmentation_names)
    results["segments"][dataset_name] = dict.fromkeys(segmentation_names)
    results["y_test_pred"][dataset_name] = dict.fromkeys(segmentation_names)

    predictor_dict = dict()
    for predictor_name in predictor_names:

        if predictor_name=='resNet':
            clf,preds = train_ResNet(X_train, y_train, X_test, y_test, dataset_name,device=device)
        elif predictor_name=='miniRocket':
            clf,preds = train_miniRocket(X_train, y_train, X_test, y_test, dataset_name)
        elif predictor_name=="randomForest":
            clf, preds = train_randomForest(X_train, y_train, X_test, y_test, dataset_name)
        else:
            raise ValueError("predictor not found")

        predictor_dict[predictor_name] = {"clf": clf, "preds": preds}


training random forest
random forest accuracy is 1.0


In [30]:
with torch.no_grad():
    for dataset_name in dataset_names:
    
        for segmentation_name in segmentation_names:

            init_segments = np.empty( (X_test.shape[0] , X_test.shape[1]), dtype=object) if X_test.shape[1] > 1  else (np.empty( X_test.shape[0] , dtype=object))
            results["segments"][dataset_name][segmentation_name] = init_segments.copy()
            results["attributions"][dataset_name][segmentation_name] = dict.fromkeys(predictor_names)
            results["y_test_pred"][dataset_name][segmentation_name] = dict.fromkeys(predictor_names)
            for predictor_name in predictor_names:
                results["attributions"][dataset_name][segmentation_name][predictor_name] = dict.fromkeys(background_names)
            segmentation_method = segmentation_dict[segmentation_name]
    
            ts_list = []
            mask_list = []
            y_list = []
    
            for i in range(n_samples) : #
                
                # get current sample and label
                ts, y = X_test[i] , torch.tensor( y_test[i:i+1] )
    
                # get segment and its tensor representation
                current_segments = segmentation_method(ts)[:X_test.shape[1]]
                results['segments'][dataset_name][segmentation_name][i] = current_segments
                mask = get_feature_mask(current_segments,ts.shape[-1])
                mask_list.append(mask)
                ts = torch.tensor(ts).repeat(1,1,1)	#TODO use something similar to np.expand_dim?
                ts_list.append(ts)
                y_list.append(y)
    
            for background_name in background_names:
    
                results["attributions"][dataset_name][segmentation_name][predictor_name][background_name] = dict.fromkeys(normalization_names)
                # background data
                if background_name=="zero":
                    background_dataset = torch.zeros((1,) + X_train.shape[1:])
                elif background_name=="sampling":
                    background_dataset = sample_background(X_train, n_background)
                elif background_name=="average":
                    background_dataset = sample_background(X_train, n_background).mean(axis=0, keepdim=True)
    
                for predictor_name in predictor_names:
    
                    clf = predictor_dict[predictor_name]["clf"]
                    results['y_test_pred'][dataset_name][segmentation_name][predictor_name] = predictor_dict[predictor_name]["preds"]

                    init_attributions = np.zeros(X_test.shape, dtype=np.float32)
                    for normalization_name in normalization_names:
                        results['attributions'][dataset_name][segmentation_name][predictor_name][background_name][normalization_name] = init_attributions.copy()
                    
                    SHAP = ShapleyValueSampling(clf) if predictor_name in predictors['torch'] \
                        else ShapleyValueSampling(forward_classification)

                    #print (background_name, predictor_name , segmentation_name)

                    with tqdm(total=len(ts_list)) as pbar:
                        for i, (ts, mask, y) in enumerate(zip(ts_list, mask_list, y_list)):
    
                            # for sampling strategy repeat the ts many times as the background dataset size
                            ts = ts.repeat(background_dataset.shape[0],1,1) if background_name=="sampling" else ts
                            
                            # different call depending on predictor type
                            if predictor_name in predictors['scikit']:
                                # if using random forest flat everything
                                if predictor_name=="randomForest":
                                    ts = ts.reshape( -1, n_chs*ts_length)
                                    mask = mask.reshape( -1, n_chs*ts_length)
                                    background_dataset = background_dataset.reshape( -1, n_chs*ts_length)
                                
                                tmp = SHAP.attribute( ts, target=y , feature_mask=mask, baselines=background_dataset, additional_forward_args=clf)
                            
                            elif predictor_name in predictors['torch']:
                                # if use torch make sure everything is on selected device
                                ts = ts.to(device); y = y.to(device)
                                mask = mask.to(device) ; background_dataset =  background_dataset.to(device)
                                tmp = SHAP.attribute( ts, target=y , feature_mask=mask, baselines=background_dataset)
                            
                            # 'un-flatten' for randomForest
                            if predictor_name=="randomForest":
                                tmp = tmp.reshape(-1,X_test.shape[1],X_test.shape[2])
    
                            # store current explanation in the data structure; if sampling store the mean
                            results['attributions'][dataset_name][segmentation_name][predictor_name][background_name]["default"][i] = torch.mean(tmp, dim=0).cpu().numpy() if \
                                background_name=="sampling" else tmp[0].cpu().numpy()
                            
                            # update tqdm
                            pbar.update(1)
                    
                    pbar.close()

                    if "normalized" in normalization_names:
                        weights = np.array(list(map(lambda x: list(map(lambda y: lengths_to_weights(change_points_to_lengths(y, X_train.shape[-1])), x)), results["segments"][dataset_name][segmentation_name])))

                        results['attributions'][dataset_name][segmentation_name][predictor_name][background_name]["normalized"] = results['attributions'][dataset_name][segmentation_name][predictor_name][background_name]["default"] * weights
                    if "default" not in normalization_names:
                        del results['attributions'][dataset_name][segmentation_name][predictor_name][background_name]["default"]
                        


            
    


100%|██████████| 2/2 [00:09<00:00,  4.95s/it]
100%|██████████| 2/2 [00:10<00:00,  5.05s/it]


In [31]:
results

{'y_test_true': {'gunpoint': array([0, 1])},
 'label_mapping': {'gunpoint': array(['1', '2'], dtype='<U1')},
 'segments': {'gunpoint': {'clasp': array([array([array([  0,   7,  16,  47,  96, 109, 118], dtype=int64)],
                dtype=object)                                             ,
          array([array([  0,  35,  45,  68,  78, 101, 132], dtype=int64)],
                dtype=object)                                             ],
         dtype=object)}},
 'y_test_pred': {'gunpoint': {'clasp': {'randomForest': array([0, 1], dtype=int64)}}},
 'attributions': {'gunpoint': {'clasp': {'randomForest': {'zero': {'normalized': array([[[ 3.02857081e-03,  3.02857081e-03,  3.02857081e-03,
                3.02857081e-03,  3.02857081e-03,  3.02857081e-03,
                3.02857081e-03,  6.04444328e-03,  6.04444328e-03,
                6.04444328e-03,  6.04444328e-03,  6.04444328e-03,
                6.04444328e-03,  6.04444328e-03,  6.04444328e-03,
                6.04444328e-03,  5.50

In [None]:
# dump result to disk
file_name = "all_results_resNet" # "_".join ( ( predictor_name, dataset_name ) )+".npy"
file_path = os.path.join("attributions", file_name)
np.save( file_path, results )

# OLD keep temporarily -  to be deleted

In [None]:
print ("gpu available",torch.cuda.is_available())

gpu avaliable True
