# Metrics

Evaluates a trained model accordingly to the metrics specified on the paper

In [None]:
import numpy as np
import sys

sys.path.append("../src")

from ModelHandler import ModelHandler
import pickle
from sklearn.metrics import f1_score

configs = {
    "N_GRIDS": 5, 
    "SIGNAL_BASE_LENGTH": 12800, 
    "N_CLASS": 26, 
    "USE_NO_LOAD": False, 
    "AUGMENTATION_RATIO": 5, 
    "MARGIN_RATIO": 0.15, 
    "DATASET_PATH": "../Synthetic_Full_iHall.hdf5",
    "TRAIN_SIZE": 0.8,
    "FOLDER_PATH": "drive/MyDrive/YOLO_NILM/final/001/",
    "FOLDER_DATA_PATH": "drive/MyDrive/YOLO_NILM/final/001/", 
    "N_EPOCHS_TRAINING": 250,
    "INITIAL_EPOCH": 0,
    "TOTAL_MAX_EPOCHS": 250,
    "SNRdb": None # Noise level on db
}

folderPath = configs["FOLDER_PATH"]
folderDataPath = configs["FOLDER_DATA_PATH"]
signalBaseLength = configs["SIGNAL_BASE_LENGTH"]
ngrids = configs["N_GRIDS"]
trainSize = configs["TRAIN_SIZE"]

dict_data = pickle.load(open(folderDataPath + "data.p", "rb")) # Load data



## Choose best performing model

At this point, the model with best performance under the validation set is chosen.

In order to make this choice, the average between f1 macro is verified.

$$
F_1 = \frac{F1_{ON} + F1_{OFF} + F1_{NO EVENT}}{3}
$$

In [None]:
def choose_model(dict_data, folderPath):
    from tqdm import tqdm
    from sklearn.preprocessing import MaxAbsScaler
    from sklearn.metrics import f1_score

    threshold = 0.5
    f1_macro = []
    for fold in tqdm(range(1, 11)):
        foldFolderPath = folderPath + str(fold) + "/"
        
        train_index = np.load(foldFolderPath + "train_index.npy")
        validation_index = np.load(foldFolderPath + "validation_index.npy")

        bestModel = ModelHandler.loadModel(foldFolderPath + "model_class_opt.h5", type_weights=None) # Load model

        scaler = MaxAbsScaler()
        scaler.fit(np.squeeze(dict_data["x_train"][train_index], axis=2))
        x_validation = np.expand_dims(scaler.transform(np.squeeze(dict_data["x_train"][validation_index], axis=2)), axis=2)

        final_prediction = []
        final_groundTruth = []
        for xi, yclass, ytype in zip(x_validation, dict_data["y_train"]["classification"][validation_index], dict_data["y_train"]["type"][validation_index]):
            pred = bestModel.predict(np.expand_dims(xi, axis=0))
            prediction = np.max(pred[2][0],axis=0)
            groundTruth = np.max(yclass,axis=0)

            final_prediction.append(prediction)
            final_groundTruth.append(groundTruth) 

            del xi, yclass, ytype

        event_type = np.min(np.argmax(dict_data["y_train"]["type"][validation_index], axis=2), axis=1)

        final_groundTruth = np.array(final_groundTruth)
        final_prediction = np.array(final_prediction)
    
        f1_macro.append([f1_score(final_groundTruth[event_type == 0] > threshold, final_prediction[event_type == 0] > threshold, average='macro', zero_division=0), 
                         f1_score(final_groundTruth[event_type == 1] > threshold, final_prediction[event_type == 1] > threshold, average='macro', zero_division=0),
                         f1_score(final_groundTruth[event_type == 2] > threshold, final_prediction[event_type == 2] > threshold, average='macro', zero_division=0)])
        print(f"Fold {fold}: F1 Macro avg: {np.average(f1_macro[-1]) * 100:.1f}")

    return np.argmax(np.average(f1_macro, axis=1)) + 1

fold = choose_model(dict_data, folderPath)
fold

 10%|█         | 1/10 [01:07<10:10, 67.81s/it]

Fold 1: F1 Macro avg: 72.6


 20%|██        | 2/10 [02:12<08:46, 65.82s/it]

Fold 2: F1 Macro avg: 74.7


 30%|███       | 3/10 [03:17<07:39, 65.67s/it]

Fold 3: F1 Macro avg: 80.8


 40%|████      | 4/10 [04:24<06:35, 65.96s/it]

Fold 4: F1 Macro avg: 76.3


 50%|█████     | 5/10 [05:28<05:27, 65.48s/it]

Fold 5: F1 Macro avg: 77.7


 60%|██████    | 6/10 [06:35<04:23, 65.85s/it]

Fold 6: F1 Macro avg: 79.5


 70%|███████   | 7/10 [07:39<03:15, 65.26s/it]

Fold 7: F1 Macro avg: 76.7


 80%|████████  | 8/10 [08:43<02:09, 64.98s/it]

Fold 8: F1 Macro avg: 77.2


 90%|█████████ | 9/10 [09:47<01:04, 64.73s/it]

Fold 9: F1 Macro avg: 75.3


100%|██████████| 10/10 [10:53<00:00, 65.32s/it]

Fold 10: F1 Macro avg: 75.6





3

## Extract Scattering Features

In [None]:
from sklearn.preprocessing import MaxAbsScaler

foldFolderPath = folderPath + str(fold) + "/"

train_index = np.load(foldFolderPath + "train_index.npy")
validation_index = np.load(foldFolderPath + "validation_index.npy")

bestModel = ModelHandler.loadModel(foldFolderPath + "model_class_opt.h5", type_weights=None) # Load model
scattering_extract = ModelHandler.loadModel(folderPath + "scattering_model.h5")

scaler = MaxAbsScaler()
scaler.fit(np.squeeze(dict_data["x_train"][train_index], axis=2))
x_train = np.expand_dims(scaler.transform(np.squeeze(dict_data["x_train"][train_index], axis=2)), axis=2)
x_validation = np.expand_dims(scaler.transform(np.squeeze(dict_data["x_train"][validation_index], axis=2)), axis=2)
x_test = np.expand_dims(scaler.transform(np.squeeze(dict_data["x_test"], axis=2)), axis=2)

x_test = scattering_extract.predict(x_test)

## Evaluates the identification

This step generates a dict with the ground truth and the prediction for each test example

In [None]:
final_prediction = []
final_groundTruth = []
for xi, yclass, ytype in zip(x_test, dict_data["y_test"]["classification"], dict_data["y_test"]["type"]):
    pred = bestModel.predict(np.expand_dims(xi, axis=0))
    prediction = np.max(pred[2][0],axis=0)
    groundTruth = np.max(yclass,axis=0)

    final_prediction.append(prediction)
    final_groundTruth.append(groundTruth) 

    del xi, yclass, ytype

y = {}
y["true"] = final_groundTruth.copy()
y["pred"] = final_prediction.copy()

### F1 Score

#### F1 Macro:
$$
\begin{gather*}
F1_{Macro} = \frac{1}{Y} \sum_{i=1}^{Y} \frac{2 \cdot tp_i}{2 \cdot tp_i + fp_i + fn_i}
\end{gather*}
$$

#### F1 Micro:
$$
\begin{gather*}
F1_{Micro} = \frac{2 \cdot \sum_{i=1}^{Y} tp_i}{\sum_{i=1}^{Y} 2 \cdot tp_i + fp_i + fn_i}
\end{gather*}
$$

- $tp_i$: True positives classifications for appliance $i$
- $fp_i$: False positives classifications for appliance $i$
- $fn_i$: False negatives classifications for appliance $i$

In [None]:
from sklearn.metrics import f1_score

threshold = 0.5
f1_macro = f1_score(np.array(y["true"]) > threshold, np.array(y["pred"]) > threshold, average='macro')
f1_micro = f1_score(np.array(y["true"]) > threshold, np.array(y["pred"]) > threshold, average='micro')

print(f"Fold {fold} - F1 Macro: {f1_macro * 100:.1f}, F1 Micro: {f1_micro * 100:.1f}")

Fold 3 - F1 Macro: 83.2, F1 Micro: 82.9


### Accuracy (ACC)

$$
\begin{gather*}
ACC_i = \frac{CCE_i}{TNE_i} \\ \\
ACC = \frac{1}{Y} \sum_{i = 1}^{Y} ACC_i
\end{gather*}
$$

- $ACC_i$: Accuracy for appliance $i$
- $CCE_i$: Load connected successfully identified
- $TNE_i$: Total of connected events

In [None]:
threshold = 0.5

correct_on = np.zeros((26,1))
total_on = np.zeros((26,1))
correct_off = np.zeros((26,1))
total_off = np.zeros((26,1))
correct_no_event = np.zeros((26,1))
total_no_event = np.zeros((26,1))

for ytype, ytrue, ypred in zip(dict_data["y_test"]["type"], y["true"], y["pred"]):
    event_type = np.min(np.argmax(ytype, axis=1))
    if event_type == 0:
        correct_on[np.bitwise_and(ytrue > threshold, ypred > threshold)] += 1
        total_on[ytrue > threshold] += 1
    elif event_type == 1:
        correct_off[np.bitwise_and(ytrue > threshold, ypred > threshold)] += 1
        total_off[ytrue > threshold] += 1
    else:
        correct_no_event[np.bitwise_and(ytrue > threshold, ypred > threshold)] += 1
        total_no_event[ytrue > threshold] += 1

acc_on = 100 * np.average(np.nan_to_num(correct_on/total_on))
acc_off = 100 * np.average(np.nan_to_num(correct_off/total_off))
acc_no_event = 100 * np.average(np.nan_to_num(correct_no_event/total_no_event))
acc_total = 100 * np.average(np.nan_to_num((correct_on + correct_off + correct_no_event)/(total_on + total_off + total_no_event)))

print(f"Fold {fold} - Acc on: {acc_on:.1f}, Acc off: {acc_off:.1f}, Acc no event: {acc_no_event:.1f} Acc total: {acc_total:.1f}")

Fold 3 - Acc on: 94.7, Acc off: 98.7, Acc no event: 99.4 Acc total: 98.5


## Detection Metrics

### D
$$
\begin{gather*}
D = \frac{ \sum_{i=1}^{A} |d(i) - ev(i)|}{A}
\end{gather*}
$$

- `A`: Total of events correctly detected ($\pm$ 10 semi cycles tolerance)
- `d(i)`: Detection for appliance $i$
- `ev(i)`: Ground truth detection for appliance $i$

## PC

$$
\begin{gather*}
PC = \frac{A}{N}
\end{gather*}
$$

- `A`: Total of events correctly detected ($\pm$ 10 semi cycles tolerance)
- `N`: Total of events

In [None]:
from PostProcessing import PostProcessing
from DataHandler import DataHandler

postProcessing = PostProcessing(configs=configs)
dataHandler = DataHandler(configs=configs)

general_qtd_test = dict_data["y_test"]["group"]

print(f"-------------- FOLD {fold} ---------------")
pcMetric, dMetric = postProcessing.checkModel(bestModel, x_test, dict_data["y_test"], general_qtd=general_qtd_test, print_error=False)

-------------- FOLD 3 ---------------
Total time: 69.59573629798615, Average Time: 0.033315335709902416
LIT-SYN-1 PCmetric: (0.9428571428571428, 0.9523809523809523, 0.948051948051948)
LIT-SYN-1 Dmetric: (1.1818181818181819, 1.075, 1.1232876712328768)
LIT-SYN-2 PCmetric: (0.8968253968253969, 0.8832116788321168, 0.8897338403041825)
LIT-SYN-2 Dmetric: (0.8849557522123894, 0.8429752066115702, 0.8632478632478633)
LIT-SYN-3 PCmetric: (0.7987421383647799, 0.7554347826086957, 0.7755102040816326)
LIT-SYN-3 Dmetric: (1.2440944881889764, 1.381294964028777, 1.3157894736842106)
LIT-SYN-8 PCmetric: (0.5747126436781609, 0.5416666666666666, 0.559748427672956)
LIT-SYN-8 Dmetric: (1.38, 1.5384615384615385, 1.449438202247191)
LIT-SYN-All PCmetric: (0.7936117936117936, 0.7793103448275862, 0.7862232779097387)
LIT-SYN-All Dmetric: (1.13312693498452, 1.1710914454277286, 1.1525679758308156)
