In [9]:
args['prediction_window_size']= 10
args['beta'] = 1.0
#args['save_fig'] = False
#args['compensate'] = True
args['data'] = "XYZ"

In [11]:
def normalization(seqData,max,min):
    return (seqData -min)/(max-min)

def standardization(seqData,mean,std):
    return (seqData-mean)/std

def reconstruct(seqData,mean,std):
    return seqData*std+mean


def get_accuracy_precision_recall_fscore(ground_truth, prediction):
    accuracy = accuracy_score(ground_truth, prediction)
    precision, recall, f_score, support = prf(ground_truth, prediction, average='binary')
    return accuracy, precision, recall, f_score

def get_precision_recall(score, label, num_samples, beta=1.0, sampling='log', predicted_score=None):
    '''
    :param args:
    :param score: anomaly scores
    :param label: anomaly labels
    :param num_samples: the number of threshold samples
    :param beta:
    :param scale:
    :return:
    '''
    if predicted_score is not None:
        score = score - torch.FloatTensor(predicted_score).squeeze().to(device)

    maximum = score.max()
    if sampling=='log':
        # Sample thresholds logarithmically
        # The sampled thresholds are logarithmically spaced between: math:`10 ^ {start}` and: math:`10 ^ {end}`.
        th = torch.logspace(0, torch.log10(torch.tensor(maximum)), num_samples)
    else:
        # Sample thresholds equally
        # The sampled thresholds are equally spaced points between: attr:`start` and: attr:`end`
        th = torch.linspace(0, maximum, num_samples)

    precision = []
    recall = []

    for i in range(len(th)):
        anomaly = (score > th[i]).float()
        idx = anomaly * 2 + label
        tn = (idx == 0.0).sum().item()  # tn
        fn = (idx == 1.0).sum().item()  # fn
        fp = (idx == 2.0).sum().item()  # fp
        tp = (idx == 3.0).sum().item()  # tp

        p = tp / (tp + fp + 1e-7)
        r = tp / (tp + fn + 1e-7)

        if p != 0 and r != 0:
            precision.append(p)
            recall.append(r)

    precision = torch.FloatTensor(precision)
    recall = torch.FloatTensor(recall)


    f1 = (1 + beta ** 2) * (precision * recall).div(beta ** 2 * precision + recall + 1e-7)

    return precision, recall, f1

In [5]:
def fancy_evaluation(X_full, X_test, predicted_scores, prediction_errors):
    # For each channel in the dataset
    for channel_idx in range(nfeatures):
        ''' 4. Evaluate the result '''
        # The obtained anomaly scores are evaluated by measuring precision, recall, and f_beta scores
        # The precision, recall, f_beta scores are are calculated repeatedly,
        # sampling the threshold from 1 to the maximum anomaly score value, either equidistantly or logarithmically.
        print('=> calculating precision, recall, and f_beta')
        #precision, recall, f_beta = get_precision_recall(args, score, num_samples=1000, beta=args.beta,
        #                                                 label=TimeseriesData.testLabel.to(args.device))
        accuracy, precision, recall, f_score = get_accuracy_precision_recall_fscore(X_test['y'], predicted_scores)

        #print('data: ',args.data,' filename: ',args.filename,
        #      ' f-beta (no compensation): ', f_beta.max().item(),' beta: ',args.beta)
        #if args['compensate']:
        #precision, recall, f_beta = get_precision_recall(args, score, num_samples=1000, beta=args.beta,
        #                                                 label=TimeseriesData.testLabel.to(args.device),
        #                                                 predicted_score=predicted_score)
        #print('data: ',args.data,' filename: ',args.filename,
        #      ' f-beta    (compensation): ', f_beta.max().item(),' beta: ',args.beta)

        target = reconstruct(test_dataset[channel_idx], X_full[channel_idx].mean(), X_full[channel_idx].std())
        mean_prediction = reconstruct(predicted_scores.mean(), X_full[channel_idx].mean(), X_full[channel_idx].std())
        oneStep_prediction = reconstruct(predicted_scores[:, -1], X_full[channel_idx].mean(), X_full[channel_idx].std())
        Nstep_prediction = reconstruct(predicted_scores[:, 0], X_full[channel_idx].mean(), X_full[channel_idx].std())
        
        sorted_errors_mean = prediction_errors.abs().mean()
        sorted_errors_mean *= TimeseriesData.std[channel_idx]
        sorted_errors_mean = sorted_errors_mean
        score = score
        scores.append(score), targets.append(targets), predicted_scores.append(predicted_score)
        mean_predictions.append(mean_prediction), oneStep_predictions.append(oneStep_prediction)
        Nstep_predictions.append(Nstep_prediction)
        precisions.append(precision), recalls.append(recall), f_betas.append(f_beta)

        #if args.save_fig:
        #save_dir = Path('result',args.data,args.filename).with_suffix('').joinpath('fig_detection')
        #save_dir.mkdir(parents=True,exist_ok=True)
        plt.plot(precision, label='precision')
        plt.plot(recall, label='recall')
        plt.plot(f_beta, label='f1')
        plt.legend()
        plt.xlabel('Threshold (log scale)')
        plt.ylabel('Value')
        plt.title('Anomaly Detection on ' + args.data + ' Dataset', fontsize=18, fontweight='bold')
        #plt.savefig(str(save_dir.joinpath('fig_f_beta_channel'+str(channel_idx)).with_suffix('.png')))
        plt.close()


        fig, ax1 = plt.subplots(figsize=(15,5))
        ax1.plot(target,label='Target',
                 color='black',  marker='.', linestyle='--', markersize=1, linewidth=0.5)
        ax1.plot(mean_prediction, label='Mean predictions',
                 color='purple', marker='.', linestyle='--', markersize=1, linewidth=0.5)
        ax1.plot(oneStep_prediction, label='1-step predictions',
                 color='green', marker='.', linestyle='--', markersize=1, linewidth=0.5)
        ax1.plot(Nstep_prediction, label=str(args.prediction_window_size) + '-step predictions',
                 color='blue', marker='.', linestyle='--', markersize=1, linewidth=0.5)
        ax1.plot(sorted_errors_mean,label='Absolute mean prediction errors',
                 color='orange', marker='.', linestyle='--', markersize=1, linewidth=1.0)
        ax1.legend(loc='upper left')
        ax1.set_ylabel('Value',fontsize=15)
        ax1.set_xlabel('Index',fontsize=15)
        ax2 = ax1.twinx()
        ax2.plot(score.numpy().reshape(-1, 1), label='Anomaly scores from \nmultivariate normal distribution',
                 color='red', marker='.', linestyle='--', markersize=1, linewidth=1)
        #if args.compensate:
        ax2.plot(predicted_score, label='Predicted anomaly scores from SVR',
                 color='cyan', marker='.', linestyle='--', markersize=1, linewidth=1)
        #ax2.plot(score.reshape(-1,1)/(predicted_score+1),label='Anomaly scores from \nmultivariate normal distribution',
        #        color='hotpink', marker='.', linestyle='--', markersize=1, linewidth=1)
        ax2.legend(loc='upper right')
        ax2.set_ylabel('anomaly score',fontsize=15)
        #plt.axvspan(2830,2900 , color='yellow', alpha=0.3)
        plt.title('Anomaly Detection on ' + args.data + ' Dataset', fontsize=18, fontweight='bold')
        plt.tight_layout()
        plt.xlim([0,len(test_dataset)])
        #plt.savefig(str(save_dir.joinpath('fig_scores_channel'+str(channel_idx)).with_suffix('.png')))
        plt.show()
        plt.close()

=> calculating precision, recall, and f_beta


NameError: name 'score' is not defined