## Ablation Study about loss function with penalty term for a high number of false positives

This notebook recreates the results shown in the short ablation study about the epsilon parameter of the loss function.

We try 6 differen values for epsilon: 0, 0.1, 0.2, 0.3, 0.4, 0.5

To run this notebook in the codespace, choose the base kernel (Python 3.10.0).

In [3]:
%reload_ext autoreload
%autoreload 2

import sys
sys.path.append('../artitect/')

In [4]:
import pickle
from itertools import repeat
from pathlib import Path

import matplotlib.pyplot as plt
import numpy as np
import torch
from artifact import Saw
from sliding_window_detector import SlidingWindowTransformerDetector, ConvolutionalSlidingWindowDetector, SlidingWindowLinearDetector
from mask_detector import WindowLinearDetector, WindowTransformerDetector, ConvolutionDetector

from data import RealisticArtifactDataset, CachedArtifactDataset, TestArtifactDataset, CenteredArtifactDataset

  from .autonotebook import tqdm as notebook_tqdm


In [5]:
torch.set_grad_enabled(False)
seed = 42
torch.manual_seed(seed)
np.random.seed(seed)

In [4]:
test_width = 512

test_path = Path("../data/test_files/test_label_CinCECGTorso512.pkl")
test = CachedArtifactDataset(file=test_path)

val_path = Path("../data/val_files/val_SW_noCiECGT512.pkl")
val = CachedArtifactDataset(file=val_path)

In [6]:
paths_SW = "../models/ablationFP/SW_FCNTrans_fpbosst_0.ckpt" # SW FCN
adaFCNTrans_0 = SlidingWindowTransformerDetector.load_from_checkpoint(paths_SW).cpu()

paths_SW = "../models/ablationFP/SW_FCNTrans_fpboost_01.ckpt" # SW FCN
adaFCNTrans_0_1 = SlidingWindowTransformerDetector.load_from_checkpoint(paths_SW).cpu()

paths_SW = "../models/ablationFP/SW_FCNTrans_fpboost_02.ckpt" # SW FCN
adaFCNTrans_0_2 = SlidingWindowTransformerDetector.load_from_checkpoint(paths_SW).cpu()

paths_SW = "../models/ablationFP/SW_FCNTrans_fpboost_03.ckpt" # SW FCN
adaFCNTrans_0_3 = SlidingWindowTransformerDetector.load_from_checkpoint(paths_SW).cpu()

paths_SW = "../models/ablationFP/SW_FCNTrans_fpboost_04.ckpt" # SW FCN
adaFCNTrans_0_4 = SlidingWindowTransformerDetector.load_from_checkpoint(paths_SW).cpu()

paths_SW = "../models/ablationFP/SW_FCNTrans_fpboost_05.ckpt" # SW FCN
adaFCNTrans_0_5 = SlidingWindowTransformerDetector.load_from_checkpoint(paths_SW).cpu()

SW_detectors = [adaFCNTrans_0.eval(), adaFCNTrans_0_1.eval(), adaFCNTrans_0_2.eval(), adaFCNTrans_0_3.eval(), adaFCNTrans_0_4.eval(), adaFCNTrans_0_5.eval()]

  rank_zero_warn(


In [7]:
def baseline_detector(input: torch.Tensor) -> int:   
    input.squeeze(0)
    prediction = 0

    center = int(input.shape[1]/2)
    # flag points with very high increment as artifact
    # Calculate increments by subtracting the tensor shifted by one from the original tensor
    increments = (input[0][1:] - input[0][:-1]).abs()
    mean_increment = torch.mean(increments)
    std_increment = torch.std(increments)

    if increments[center-1] > (mean_increment + 3*std_increment):
        prediction = 1
    
    return prediction

## Predictions on Validation set for threshold calculation with fbeta score

In case you do not want to perform the threshold search, skip to the cell that contains the hard coded thresholds and run it.

In [9]:
from sklearn.metrics import accuracy_score, precision_score, recall_score, mean_squared_error, confusion_matrix
import pandas as pd

all_predictions_valSet = pd.DataFrame(columns=['Detector_id', 'predictions'])
index = 0
gt = list()

for detector in SW_detectors:

    preds = list()

    for sample in val:
        example = sample["data"]
        window  = detector.window
        length  = len(example)

        # add artifact to data
        example_data = torch.tensor(example + sample["artifact"])

        # set detector to evaluation mode
        detector.eval()
        # make prediction and insert into prediction
        prediction = detector(example_data.unsqueeze(0))

        # update count
        preds = preds + [prediction.numpy()]

        if index == 0 :
            gt = gt + [sample["label"]]
    

    new_row = pd.DataFrame([{
        'Detector_id': index +1,
        'predictions': preds
    }])

    all_predictions_valSet = pd.concat([all_predictions_valSet,new_row])
    index = index +1
    print(len(all_predictions_valSet))

all_predictions_valSet = pd.concat([
    all_predictions_valSet,
    pd.DataFrame([{
        'Detector_id': 0,
        'predictions': gt
    }])
    ])


1
2


  return F.conv1d(input, weight, bias, self.stride,


3
4
5
6


In [10]:
from sklearn.metrics import fbeta_score

gt = all_predictions_valSet.iloc[-1]["predictions"]
metrics = pd.DataFrame()
for index, row in all_predictions_valSet.iterrows():

    max_fbeta = 0

    for threshold in np.linspace(0,  1,  100):
        predictions = np.where(np.array(row['predictions']) > threshold, 1, 0)

        fbeta = fbeta_score(gt, predictions, average='macro', beta=0.5)

        if (fbeta > max_fbeta):
            max_fbeta = fbeta
            best_threshold_fbeta = threshold

    predictions = np.where(np.array(row['predictions']) > best_threshold_fbeta, 1, 0)

    tn, fp, fn, tp = confusion_matrix(gt, predictions, labels=[0, 1]).ravel()

    metric = pd.DataFrame([{
        'index': index,
        'detector': f"detector{row['Detector_id']}",
        'threshold': best_threshold_fbeta,
        'fbeta_score': fbeta_score(gt, predictions, beta=0.5),
        'accuracy': accuracy_score(gt, predictions),
        'precision': precision_score(gt, predictions),
        'recall': recall_score(gt, predictions),
        'mse': mean_squared_error(gt, predictions), 
        'tn': tn,
        'fp': fp, 
        'fn': fn, 
        'tp': tp
    }])

    metrics = pd.concat([metrics, metric])


In [11]:
thresholds = list(metrics["threshold"])

In [14]:
thresholds = [0] + thresholds

Continue with this cell for skipping the threshold calculation

In [2]:
thresholds = [0, 0.464, 0.363, 0.252, 0.262, 0.484, 0.212, 0]

In [10]:
from sklearn.metrics import accuracy_score, precision_score, recall_score, mean_squared_error, confusion_matrix
import pandas as pd

all_predictions = pd.DataFrame(columns=['Detector_id', 'predictions'])
index = 1
gt = list()
preds_baseline = list()

for sample in test:
    example = sample["data"]
    stride  = 64
    window  = test_width
    length  = len(example)

    example_data = torch.tensor(example + sample["artifact"])
    prediction_baseline = baseline_detector(example_data.unsqueeze(0))
    preds_baseline = preds_baseline + [prediction_baseline]

    gt = gt + [sample["label"]]

all_predictions = pd.concat([
    all_predictions,
    pd.DataFrame([{
        'Detector_id': index,
        'predictions': preds_baseline
    }])
    ], ignore_index=True)

index = 2

for detector in SW_detectors:

    preds = list()

    for sample in test:
        example = sample["data"]
        window  = detector.window
        length  = len(example)

        # add artifact to data
        example_data = torch.tensor(example + sample["artifact"])

        # set detector to evaluation mode
        detector.eval()
        # make prediction and insert into prediction
        prediction = detector(example_data.unsqueeze(0))

        # update count
        preds = preds + [prediction.numpy()]
    

    new_row = pd.DataFrame([{
        'Detector_id': index,
        'predictions': preds
    }])

    all_predictions = pd.concat([all_predictions,new_row], ignore_index=True)
    index = index +1
    print(len(all_predictions))


all_predictions = pd.concat([
    all_predictions,
    pd.DataFrame([{
        'Detector_id': 0,
        'predictions': gt
    }])
    ], ignore_index=True)



2
3


  return F.conv1d(input, weight, bias, self.stride,


4
5
6
7


In [18]:
from sklearn.metrics import fbeta_score

gt = all_predictions.iloc[-1]["predictions"]
metrics = pd.DataFrame()
for index, row in all_predictions.iterrows():

    predictions = np.where(np.array(row['predictions']) > thresholds[index], 1, 0)

    tn, fp, fn, tp = confusion_matrix(gt, predictions, labels=[0, 1]).ravel()

    metric = pd.DataFrame([{
        'index': index,
        'detector': f"detector{row['Detector_id']}",
        'threshold': thresholds[index],
        'fbeta_score': fbeta_score(gt, predictions, beta=0.5),
        'accuracy': accuracy_score(gt, predictions),
        'precision': precision_score(gt, predictions),
        'recall': recall_score(gt, predictions),
        'mse': mean_squared_error(gt, predictions), 
        'tn': tn,
        'fp': fp, 
        'fn': fn, 
        'tp': tp
    }])

    metrics = pd.concat([metrics, metric])


In [19]:
metrics

Unnamed: 0,index,detector,threshold,fbeta_score,accuracy,precision,recall,mse,tn,fp,fn,tp
0,0,detector1,0.0,0.974131,0.966797,0.979675,0.952569,0.033203,1016,20,48,964
0,1,detector2,0.464,0.985772,0.976074,0.992835,0.958498,0.023926,1029,7,42,970
0,2,detector3,0.363,0.980431,0.977051,0.982983,0.970356,0.022949,1019,17,30,982
0,3,detector4,0.252,0.993512,0.984375,1.0,0.968379,0.015625,1036,0,32,980
0,4,detector5,0.262,0.983157,0.974121,0.989785,0.95751,0.025879,1026,10,43,969
0,5,detector6,0.484,0.983157,0.974121,0.989785,0.95751,0.025879,1026,10,43,969
0,6,detector7,0.212,0.980509,0.978516,0.982072,0.974308,0.021484,1018,18,26,986
0,7,detector0,0.0,1.0,1.0,1.0,1.0,0.0,1036,0,0,1012


## Evaluation on the manually labeled test set

In [12]:
import pandas as pd
test_width =512

real_data_df = pd.read_pickle('../data/real/normalized_deviation_updated_TEST.pickle') 
ground_truth = pd.read_csv('../data/real/gt_changes_only_relabeled_200K.csv')

In [13]:
ground_truth.drop(columns=["Unnamed: 0"], inplace=True)

In [14]:
from sklearn.metrics import accuracy_score, precision_score, recall_score, mean_squared_error, confusion_matrix
import pandas as pd

all_predictions_real = pd.DataFrame(columns=['Detector_id', 'predictions'])
dist = test_width // 2

index = 1
gt = list()
preds_baseline = list()

for index, row in ground_truth[:494].iterrows():
    example_data = torch.tensor(real_data_df[0][int(row["position"]-dist) : int(row["position"]+dist)])
    
    prediction_baseline = baseline_detector(example_data.unsqueeze(0))
    preds_baseline = preds_baseline + [prediction_baseline]

    gt = gt + [row["gt"]]

all_predictions_real = pd.concat([
    all_predictions_real,
    pd.DataFrame([{
        'Detector_id': index,
        'predictions': preds_baseline
    }])
    ], ignore_index=True)

index = 2

for detector in SW_detectors:

    preds = list()

    for index, row in ground_truth[:494].iterrows():
        example_data = torch.tensor(real_data_df[0][int(row["position"]-dist) : int(row["position"]+dist)])
        # make prediction and insert into prediction
        prediction = detector(example_data.unsqueeze(0))

        # update count
        preds = preds + [prediction.numpy()] 

    new_row = pd.DataFrame([{
        'Detector_id': index ,
        'predictions': preds
    }])

    all_predictions_real = pd.concat([all_predictions_real,new_row], ignore_index=True)
    index = index +1
    print(len(all_predictions_real))


all_predictions_real = pd.concat([
    all_predictions_real,
    pd.DataFrame([{
        'Detector_id': 0,
        'predictions': gt
    }])
    ], ignore_index=True)

2
3


  return F.conv1d(input, weight, bias, self.stride,


4
5
6
7


In [15]:
all_predictions_real.drop(all_predictions_real.tail(1).index,inplace=True)

In [16]:
from sklearn.metrics import fbeta_score

gt_real = ground_truth[:494]["gt"]
metrics_real = pd.DataFrame()

for index, row in all_predictions_real.iterrows():

    predictions = np.where(np.array(row['predictions']) > thresholds[index], 1, 0)

    tn, fp, fn, tp = confusion_matrix(gt_real, predictions, labels=[0, 1]).ravel()

    metric = pd.DataFrame([{
        'index': index,
        'detector': f"Detector{row['Detector_id']}",
        'threshold': thresholds[index],
        'fbeta_score': fbeta_score(gt_real, predictions, beta=0.5),
        'accuracy': accuracy_score(gt_real, predictions),
        'precision': precision_score(gt_real, predictions),
        'recall': recall_score(gt_real, predictions),
        'mse': mean_squared_error(gt_real, predictions), 
        'tn': tn,
        'fp': fp, 
        'fn': fn, 
        'tp': tp, 
    }])

    metrics_real = pd.concat([metrics_real, metric])


  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))


In [17]:
metrics_real

Unnamed: 0,index,detector,threshold,fbeta_score,accuracy,precision,recall,mse,tn,fp,fn,tp
0,0,Detector493,0.0,0.0,0.591093,0.0,0.0,0.408907,292,0,202,0
0,1,Detector493,0.464,0.679443,0.718623,0.83871,0.386139,0.281377,277,15,124,78
0,2,Detector493,0.363,0.708419,0.759109,0.715026,0.683168,0.240891,237,55,64,138
0,3,Detector493,0.252,0.464481,0.645749,0.829268,0.168317,0.354251,285,7,168,34
0,4,Detector493,0.262,0.748441,0.789474,0.757895,0.712871,0.210526,246,46,58,144
0,5,Detector493,0.484,0.72,0.751012,0.788321,0.534653,0.248988,263,29,94,108
0,6,Detector493,0.212,0.717017,0.771255,0.7109,0.742574,0.228745,231,61,52,150
