# 1D-Segmentation

# Test GPU
Before Starting, kindly check the available GPU from the Google Server, GPU model and other related information. It might help!

In [None]:
import torch
print("Is CUDA enabled GPU Available?", torch.cuda.is_available())
print("GPU Number:", torch.cuda.device_count())
print("Current GPU Index:", torch.cuda.current_device())
print("GPU Type:", torch.cuda.get_device_name(device=None))
print("GPU Capability:", torch.cuda.get_device_capability(device=None))
print("Is GPU Initialized yet?", torch.cuda.is_initialized())
print(torch.__version__)

In [None]:
import tensorflow as tf
print(tf.__version__)
print(tf.config.experimental.list_physical_devices())
print(tf.config.list_physical_devices('GPU'))

# Import Libraries

In [3]:
import os
import h5py
import scipy
import random
import pickle
import json
import torch
import numpy as np
import pandas as pd
import seaborn as sns
import scipy.io as sio
import tensorflow as tf
import matplotlib.pyplot as plt
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score, classification_report, confusion_matrix, root_mean_squared_error
from sklearn.model_selection import train_test_split
from pathlib import Path
from tqdm import tqdm
from scipy.stats import pearsonr

In [4]:
# from segmentation_models import *
from segmentation_models.unet_variants import *
# from segmentation_models.MLMRSNet import *
# from segmentation_models.saunet_variants import *

In [5]:
%matplotlib inline
sns.set_style('white')

# Set Number of Channels

In [6]:
num_channel = 3
fold = 1
device = 'Kettle'
phase = 1

# Import and Prepare Data

Import dataset

Prepare Dataset for Training

Overlap Train Set

Cleanse Data

Plot

Save Train, Test and Validation Sets

In [15]:
import os
Checkpoint = {}
Checkpoint['X'] = X_Train
Checkpoint['Y'] = Y_Train
RawDataPath = 'Train_Set.pt'
torch.save(Checkpoint, RawDataPath)

In [16]:
import os
Checkpoint = {}
Checkpoint['X'] = X_Val
Checkpoint['Y'] = Y_Val
RawDataPath = 'Val_Set.pt'
torch.save(Checkpoint, RawDataPath)

In [17]:
import os
Checkpoint = {}
Checkpoint['X'] = X_Test
Checkpoint['Y'] = Y_Test
RawDataPath = 'Test_Set.pt'
torch.save(Checkpoint, RawDataPath)

### Garbage Collector

In [None]:
import gc #Garbage Collector
fl_Data = None
X_Test = None
X_Train = None
X_Val = None
Y_Test = None
Y_Train = None
Y_Val = None
gc.collect()

0

# MAIN (Sample Application)

## Configurations

In [9]:
# Configurations
## General Configurations
signal_length = 21600  # Length of each Segment
model_name = 'LDNet'  # UNet, UNetPP, etc.
model_depth = 4  # Number of Level in the CNN Model
model_width = 64  # Width of the Initial Layer, subsequent layers start from here
kernel_size = 3  # Size of the Kernels/Filter
num_channel = 3 # Number of Channels in the Model
D_S = 1  # Turn on Deep Supervision
A_E = 0  # Turn on AutoEncoder Mode for Feature Extraction
A_G = 0  # Turn on for Guided Attention
LSTM = 0  # Turn on BiConvLSTM Block
problem_type = 'Regression'
output_nums = 1  # Number of Class for Classification Problems, always '1' for Regression Problems
is_transconv = True # True: Transposed Convolution, False: UpSampling
feature_number = 1024  # Number of Features to be Extracted, only required if the AutoEncoder Mode is turned on

## Model Specific Configurations
alpha = 1  # Model Width Expansion Parameter, for MultiResUNet only
pooling_type = 'mix' # pooling_type, only for EMARS
cardinality = 5  # Cardinality, only for EMARS
q = 3
t = 1

## Data Configurations
DS_Model_Type = 'UNetPP' # UNet or UNetPP

## Experimental Configurations8
load_weights = False
max_epoch_stop = 20
max_epoch_lr_change = 10
lr = 3e-4
lr_factor = 0.1
monitor_param = 'loss'  # 'loss' or 'acc' for monitoring

# Loss Weight Dictionary for Deep Supervision
if D_S == 1:
    loss_weights = np.zeros(model_depth)
    for k in range(0, model_depth):
        loss_weights[k] = 1-(k*0.1)

## Helper Functions

In [10]:
def prepareTrainDict(y, model_depth, signal_length, model_name, num_channel=1):
    def approximate(inp, w_len, signal_length, num_channel):
        ops = np.zeros((len(inp), signal_length//w_len, num_channel))
        for j in range(0,num_channel):
            op = np.zeros((len(inp), signal_length//w_len))
            for i in range(0, signal_length, w_len):
                try:
                    op[:, i//w_len] = np.mean(inp[:, i:i+w_len, j], axis=1)
                except Exception as e:
                    print(e)
                    print(i)
            ops[:,:,j] = op
        return ops

    out = {}
    Y_Train_dict = {}
    out['out'] = np.array(y)
    Y_Train_dict['out'] = out['out']
    for i in range(1, (model_depth+1)):
        name = f'level{i}'
        if model_name == 'UNet':
            out[name] = approximate(y, 2**i, signal_length, num_channel)
        elif model_name == 'UNetPP':
            out[name] = y
        Y_Train_dict[f'level{i}'] = out[f'level{i}']

    return Y_Train_dict

## Train and Test

Build Model for 1D Segmentation

In [None]:
# Build model for PPG2ABP Segmentation - Deep UNet Architecture
SMDisAgg_Network = MLMRSNet(signal_length, model_depth, num_channel, model_width, kernel_size, problem_type='Regression', output_nums=output_nums,
                            ds=D_S, ae=A_E, cardinality=cardinality, pooling_type=pooling_type).LDNet()
# print(SMDisAgg_Network.summary())

In [None]:
# Load Data
Train_Data = torch.load('Train_Set.pt')
Test_Data = torch.load('Test_Set.pt')
Val_Data = torch.load('Train_Set.pt')
X_Train = Train_Data['X']
Y_Train = Train_Data['Y']
X_Test = Test_Data['X']
Y_Test = Test_Data['Y']
X_Val = Val_Data['X']
Y_Val = Val_Data['Y']
if D_S == 0:  # Deep Supervision OFF
    # Compile Built Model
    SMDisAgg_Network.compile(loss=tf.keras.losses.MeanAbsoluteError(), optimizer=tf.keras.optimizers.Adam(learning_rate=lr), metrics=tf.keras.metrics.MeanSquaredError())
    # Directory for Saving Trained Models
    save_directory = f'trained_models/{model_name}/'+model_name+'_'+str(signal_length)+'_'+str(model_width)+'_'+str(num_channel)+'_'+str(D_S)+'.h5'
    # Load Pretrained Weights (if available)
    if (os.path.exists(save_directory) and load_weights == True):
        print('\nLoading Pretrained Weights...')
        # Load Previously Trained Weights for Transfer Learning
        SMDisAgg_Network.load_weights(save_directory)
    # Declare Callbacks
    if monitor_param == 'loss':
        callbacks = [tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=max_epoch_stop, mode='min'),
                    tf.keras.callbacks.ReduceLROnPlateau(monitor='val_loss', factor=lr_factor, patience=max_epoch_lr_change, verbose=1, mode='min'),
                    tf.keras.callbacks.ModelCheckpoint(save_directory, verbose=1, monitor='val_loss', save_best_only=True, mode='min')]
    elif monitor_param == 'acc':
        callbacks = [tf.keras.callbacks.EarlyStopping(monitor='val_acc', patience=max_epoch_stop, mode='max'),
                    tf.keras.callbacks.ReduceLROnPlateau(monitor='val_acc', factor=lr_factor, patience=max_epoch_lr_change, verbose=1, mode='max'),
                    tf.keras.callbacks.ModelCheckpoint(save_directory, verbose=1, monitor='val_acc', save_best_only=True, mode='max')]
    # Train Model
    print('Starting Training...')
    model_history = SMDisAgg_Network.fit(X_Train, Y_Train, epochs=500, batch_size=1, verbose=1, validation_data=(X_Val, Y_Val), shuffle=True, callbacks=callbacks)
    # Test and Evaluation
    Y_Pred = SMDisAgg_Network.predict(X_Test, verbose=0)
    # Save History
    print('\n')
    print('Save History')
    # Get the dictionary containing each metric and the loss for each epoch
    history_dict = model_history.history
    history_path = f'Outcomes/{model_name}_{device}_History.h5'
    json.dump(history_dict.item(), open(history_path, 'w'))
    print('\n')
    # Save Outcomes
    print('Save Results')
    File = h5py.File(f'Outcomes/{model_name}_Ch{num_channel}_F{fold}_P{phase}_D{device}.h5', 'w')
    File.create_dataset('SM', data=X_Test)
    File.create_dataset('App', data=Y_Test)
    File.create_dataset('App_Pred', data=Y_Pred)
    File.close()
    print('\n')
elif D_S == 1:  # Deep Supervision ON
    # Prepare Train and Test Sets for Deep Supervision
    Y_Train_dict = prepareTrainDict(Y_Train, model_depth, signal_length, DS_Model_Type)
    Y_Val_dict = prepareTrainDict(Y_Val, model_depth, signal_length, DS_Model_Type)
    # Generate Custom Loss Weights for Deep Supervision
    loss_weights = np.zeros(model_depth)
    for k in range(0, model_depth):
        loss_weights[k] = 1-(k*0.1)
    # Compile Built Model
    SMDisAgg_Network.compile(loss=tf.keras.losses.MeanAbsoluteError(), optimizer=tf.keras.optimizers.Adam(learning_rate=lr), metrics=tf.keras.metrics.MeanSquaredError(), loss_weights=loss_weights)
    # Directory for Saving Trained Models
    save_directory = f'trained_models/{model_name}/'+model_name+'_'+str(signal_length)+'_'+str(model_width)+'_'+str(num_channel)+'_'+str(D_S)+'.h5'
    # Load Pretrained Weights (if available)
    if (os.path.exists(save_directory) and load_weights == True):
        print('Loading Pretrained Weights...')
        # Load Previously Trained Weights for Transfer Learning
        SMDisAgg_Network.load_weights(save_directory)
    # Declare Callbacks
    if monitor_param == 'loss':
        callbacks = [tf.keras.callbacks.EarlyStopping(monitor='val_out_loss', patience=max_epoch_stop, mode='min'),
                    tf.keras.callbacks.ReduceLROnPlateau(monitor='val_out_loss', factor=lr_factor, patience=max_epoch_lr_change, verbose=1, mode='min'),
                    tf.keras.callbacks.ModelCheckpoint(save_directory, verbose=1, monitor='val_out_loss', save_best_only=True, mode='min')]
    elif monitor_param == 'acc':
        callbacks = [tf.keras.callbacks.EarlyStopping(monitor='val_out_acc', patience=max_epoch_stop, mode='max'),
                    tf.keras.callbacks.ReduceLROnPlateau(monitor='val_out_acc', factor=lr_factor, patience=max_epoch_lr_change, verbose=1, mode='max'),
                    tf.keras.callbacks.ModelCheckpoint(save_directory, verbose=1, monitor='val_out_acc', save_best_only=True, mode='max')]
    # Train Model
    print('Starting Training...')
    model_history = SMDisAgg_Network.fit(X_Train, Y_Train_dict, epochs=500, batch_size=2, verbose=1, validation_data=(X_Val, Y_Val_dict), shuffle=True, callbacks=callbacks)
    # Test and Evaluation
    Y_Pred = SMDisAgg_Network.predict(X_Test, verbose=0)
    Y_Pred = Y_Pred[0]
    '''Test and Evaluation'''
    # Save History
    print('\n')
    print('Save History')
    # Get the dictionary containing each metric and the loss for each epoch
    history_dict = model_history.history
    history_path = f'History/{model_name}_{device}_History.h5'
    with open(history_path, 'wb') as file:
        history_dict = model_history.history
        pickle.dump(history_dict, file, pickle.HIGHEST_PROTOCOL)
    print('\n')
    # Save Outcomes
    print('Save Results')
    File = h5py.File(f'Outcomes/{model_name}_Ch{num_channel}_F{fold}_P{phase}_{device}.h5', 'w')
    File.create_dataset('SM', data=X_Test)
    File.create_dataset('App', data=Y_Test)
    File.create_dataset('App_Pred', data=Y_Pred)
    File.close()
    print('\n')

In [None]:
# Save History
print('\n')
print('Save History')
# Get the dictionary containing each metric and the loss for each epoch
history_dict = model_history.history
history_path = f'History/{model_name}_{device}_History.h5'
with open(history_path, 'wb') as file:
    history_dict = model_history.history
    pickle.dump(history_dict, file, pickle.HIGHEST_PROTOCOL)
print('\n')

Load History

In [None]:
with open(history_path, 'rb') as file:
    history=pickle.load(file)
print(history.keys())

External Evaluation

In [None]:
SMDisAgg_Network = MLMRSNet(signal_length, model_depth, num_channel, model_width, kernel_size, problem_type='Regression', output_nums=output_nums,
                            ds=D_S, ae=A_E, cardinality=cardinality, pooling_type=pooling_type).LDNet()
if D_S == 0:
    SMDisAgg_Network.compile(loss=tf.keras.losses.MeanAbsoluteError(), optimizer=tf.keras.optimizers.Adam(learning_rate=1e-4), metrics=tf.keras.metrics.MeanSquaredError())
elif D_S == 1:
    SMDisAgg_Network.compile(loss=tf.keras.losses.MeanAbsoluteError(), optimizer=tf.keras.optimizers.Adam(learning_rate=1e-4), metrics=tf.keras.metrics.MeanSquaredError(), loss_weights= loss_weights)

In [11]:
# Load Model and Predict
save_directory = f'trained_models/{model_name}/'+model_name+'_'+str(signal_length)+'_'+str(model_width)+'_'+str(num_channel)+'_'+str(D_S)+'.h5'
SMDisAgg_Network.load_weights(save_directory)
# Load Test Set
Test_Data = torch.load('Test_Set.pt')
X_Test = Test_Data['X']
Y_Test = Test_Data['Y']
house_labels = Test_Data['Z']
print(X_Test.shape)
print(Y_Test.shape)
print(house_labels.shape)
# Predict
Y_Pred = SMDisAgg_Network.predict(X_Test, verbose=0)
Y_Pred = Y_Pred[0]
print(Y_Pred.shape)

(76, 21600, 3)
(76, 21600, 1)
(76,)


2024-05-30 18:05:13.704342: I external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:454] Loaded cuDNN version 8902
2024-05-30 18:05:14.038925: I external/local_tsl/tsl/platform/default/subprocess.cc:304] Start cannot spawn child process: No such file or directory
2024-05-30 18:05:14.282044: I external/local_tsl/tsl/platform/default/subprocess.cc:304] Start cannot spawn child process: No such file or directory


(76, 21600, 1)


In [12]:
# Save Outcomes
print('Save Results')
File = h5py.File(f'Outcomes/{model_name}_Ch{num_channel}_F{fold}_P{phase}_{device}.h5', 'w')
File.create_dataset('SM', data=X_Test)
File.create_dataset('App', data=Y_Test)
File.create_dataset('App_Pred', data=Y_Pred)
File.close()
print('\n')

Save Results




In [13]:
# Load Resuts with Ground Truth and Inputs
fl_Data = h5py.File(os.path.join('Outcomes/LDNet_Ch3_F1_P1_Kettle.h5'),'r')
print(fl_Data.keys())
X_Test = fl_Data['SM']
Y_Test = fl_Data['App']
Y_Pred = fl_Data['App_Pred']
# Y_Test_Denorm = fl_Data['App_Denorm']
# Y_Pred_Denorm = fl_Data['App_Pred_Denorm']

<KeysViewHDF5 ['App', 'App_Pred', 'SM']>


In [14]:
# Load Test Set and Print Shape
Test_Data = torch.load('Test_Set.pt')
X_Test = Test_Data['X']
Y_Test = Test_Data['Y']
house_labels = np.int16(Test_Data['Z'])
normalization_factors = Test_Data['F']
print(X_Test.shape)
print(Y_Test.shape)
print(house_labels.shape)
print(normalization_factors.shape)
house_labels_unique = np.unique(house_labels)
num_houses = np.size(house_labels_unique)
# Denormalize Waveforms for Evaluation
Y_Test_shape = Y_Test.shape
Y_Test_Denorm = np.zeros((Y_Test_shape[0],Y_Test_shape[1],Y_Test_shape[2]))
for i in range(Y_Test_shape[0]):
    Y_Test_Norm_Temp = Y_Test[i,:,:]
    house_label_Temp = house_labels[0]
    Y_Test_Denorm_Temp = Y_Test_Norm_Temp*normalization_factors[house_label_Temp*2-2]
    Y_Test_Denorm[i,:,:] = Y_Test_Denorm_Temp
# Load Resuts
fl_Data = h5py.File(os.path.join('Outcomes/LDNet_Ch3_F1_P1_Kettle.h5'), 'r')
print(fl_Data.keys())
Y_Pred = fl_Data['App_Pred']
Y_Pred_shape = Y_Pred.shape
Y_Pred_Denorm = np.zeros((Y_Pred_shape[0],Y_Pred_shape[1],Y_Pred_shape[2]))
for i in range(Y_Pred_shape[0]):
    Y_Pred_Norm_Temp = Y_Pred[i,:,:]
    house_label_Temp = house_labels[0]
    Y_Pred_Denorm_Temp = Y_Pred_Norm_Temp*(normalization_factors[house_label_Temp*2-2])
    Y_Pred_Denorm[i,:,:] = Y_Pred_Denorm_Temp

(68, 21600, 3)
(68, 21600, 1)
(68,)
(12,)
<KeysViewHDF5 ['App', 'App_Pred', 'SM']>


In [None]:
# Plot
data_shape = X_Test.shape
i = random.randint(0, data_shape[0])
plt.figure(figsize=(12,8))
plt.subplot(3,1,1)
plt.plot(X_Test[i,:,0], label='GT')
plt.ylim(0,1)
plt.title(f"Smart Meter (Power All Phase) -- Sample Number {i}", fontdict={'fontsize': 16})
plt.legend()
plt.subplot(3,1,2)
plt.plot(Y_Test_Denorm[i,:,0], label='Target')
# plt.ylim(0,1)
plt.title(f"Appliance GT", fontdict={'fontsize': 16})
plt.legend()
plt.subplot(3,1,3)
plt.plot(Y_Pred_Denorm[i,:,0], label='Pred')
# plt.ylim(0,1)
plt.title(f"Appliance Estimated", fontdict={'fontsize': 16})
plt.legend()
plt.tight_layout()

In [16]:
# Save Outcomes
print('Saving Results')
File = h5py.File(f'Outcomes/LDNet_5_32_DS_Kettle_SM3_21600_Overlapped.h5', 'w')
File.create_dataset('SM', data=X_Test)
File.create_dataset('App', data=Y_Test)
File.create_dataset('App_Pred', data=Y_Pred)
File.create_dataset('App_Denorm', data=Y_Test_Denorm)
File.create_dataset('App_Pred_Denorm', data=Y_Pred_Denorm)
File.close()

Saving Results


# Evaluate

Load Data

In [63]:
# Load Resuts
fl_Data = h5py.File(os.path.join('Outcomes/Phase_1/LDNet/LDNet_5_32_DS_Dishwasher_SM3_21600_Overlapped.h5'), 'r')
print(fl_Data.keys())
X_Test = fl_Data['SM']
Y_Test = fl_Data['App']
Y_Pred = fl_Data['App_Pred']
Y_Test_Denorm = fl_Data['App_Denorm']
Y_Pred_Denorm = fl_Data['App_Pred_Denorm']

<KeysViewHDF5 ['App', 'App_Denorm', 'App_Pred', 'App_Pred_Denorm', 'SM']>


Construction Errors

In [17]:
def Construction_Error(GRND, Pred):
    mae_construction_err = []
    mse_construction_err = []
    rmse_construction_err = []
    cc = []
    for i in range(len(GRND)):
        MAE = np.mean(np.abs(Pred[i].ravel() - GRND[i].ravel()))
        MSE = mean_squared_error(Pred[i].ravel(), GRND[i].ravel())
        RMSE = root_mean_squared_error(Pred[i].ravel(), GRND[i].ravel())
        if ~(np.std(Pred[i].ravel()) == 0 or np.std(GRND[i].ravel()) == 0):
            corr, _ = pearsonr(Pred[i].ravel(), GRND[i].ravel())
        else:
            continue
        mae_construction_err.append(MAE)
        mse_construction_err.append(MSE)
        rmse_construction_err.append(RMSE)
        cc.append(corr)
    print(f'MAE Construction Error: {round(np.mean(mae_construction_err), 3)} +/- {round(np.std(mae_construction_err), 3)}')
    print(f'MSE Construction Error: {round(np.mean(mse_construction_err), 3)} +/- {round(np.std(mse_construction_err), 3)}')
    print(f'RMSE Construction Error: {round(np.mean(rmse_construction_err), 3)} +/- {round(np.std(rmse_construction_err), 3)}')
    print(f'Pearson Correlation: {round(np.mean(cc)*100, 3)}% +/- {round(np.std(cc)*100, 3)}')
    MAE = round(np.mean(mae_construction_err), 3)
    MSE = round(np.mean(mse_construction_err), 3)
    RMSE = round(np.mean(rmse_construction_err), 3)
    PCC = round(np.mean(cc)*100, 3)
    return MAE, MSE, RMSE, PCC

In [None]:
Construction_Error(Y_Pred, Y_Test)

SAE

In [19]:
def Calculate_SAE(GRND, Pred):
    EP = 0
    EG = 0

    for i in range(len(GRND)):
        EG = EG + np.sum(GRND[i].ravel())
        EP = EP + np.sum(Pred[i].ravel())
        
    SAE = np.abs(EP - EG)/EG
    print(f'SAE Construction Error: {round(SAE, 3)}')
    return SAE

In [None]:
Calculate_SAE(Y_Test_Denorm, Y_Pred_Denorm)

EA

In [21]:
def Calculate_EA(GRND, Pred):
    EA = []
    for i in range(len(GRND)):
        GRND_Temp = GRND[i].ravel()
        Pred_Temp = Pred[i].ravel()
        NOM_Temp_TOT = 0
        DENOM_Temp_TOT = 2*(np.sum(GRND_Temp))
        for ii in range(len(GRND_Temp)):
            NOM_Temp = np.abs(GRND_Temp[ii] - Pred_Temp[ii])
            NOM_Temp_TOT = NOM_Temp_TOT + NOM_Temp
        NOM_Temp = 1 - (NOM_Temp_TOT/DENOM_Temp_TOT)
        EA.append(NOM_Temp)
    EA_OVR = round(np.mean(EA), 3)
    print(f'Estimation Accuracy (EA): {EA_OVR}')

In [None]:
Calculate_EA(Y_Test_Denorm, Y_Pred_Denorm)

JEOI

In [23]:
def Calculate_JEOI(GRND, Pred):
    JEOI = []
    for i in range(len(GRND)):
        GRND_Temp = GRND[i].ravel()
        Pred_Temp = Pred[i].ravel()
        EO_Temp_TOT = 0
        EE_Temp_TOT = 0
        EM_Temp_TOT = 0
        EP_Temp_TOT = np.sum(Pred_Temp)
        EG_Temp_TOT = np.sum(GRND_Temp)
        for ii in range(len(GRND_Temp)):
            if Pred_Temp[ii] < 0:
                Pred_Temp[ii] = 0
            if GRND_Temp[ii] > Pred_Temp[ii]:
                EO_Temp = Pred_Temp[ii]
                EE_Temp = 0
                EM_Temp = np.abs(GRND_Temp[ii] - Pred_Temp[ii])
            elif GRND_Temp[ii] < Pred_Temp[ii]:
                EO_Temp = GRND_Temp[ii]
                EE_Temp = np.abs(Pred_Temp[ii] - GRND_Temp[ii])
                EM_Temp = 0
            elif GRND_Temp[ii] == Pred_Temp[ii]:
                EO_Temp = GRND_Temp[ii]
                EE_Temp = 0
                EM_Temp = 0
            # EO_Temp = min(GRND_Temp[ii], Pred_Temp[ii])
            # EE_Temp = max(GRND_Temp[ii], Pred_Temp[ii])
            EO_Temp_TOT = EO_Temp_TOT + EO_Temp
            EE_Temp_TOT = EE_Temp_TOT + EE_Temp
            EM_Temp_TOT = EM_Temp_TOT + EM_Temp
            
        EO_Temp_Norm = EO_Temp_TOT/EG_Temp_TOT
        EE_Temp_Norm = EE_Temp_TOT/EG_Temp_TOT
        EM_Temp_Norm = EM_Temp_TOT/EG_Temp_TOT
        JEOI_Temp = EO_Temp_TOT/(EO_Temp_TOT+EE_Temp_TOT+EM_Temp_TOT)
        JEOI.append(JEOI_Temp)
    JEOI_OVR = round(np.mean(JEOI), 4)
    print(f'JEOI: {JEOI_OVR}')
    return JEOI_OVR

In [None]:
Calculate_JEOI(Y_Test, Y_Pred)

DEOI

In [25]:
def Calculate_DEOI(GRND, Pred):
    DEOI = []
    for i in range(len(GRND)):
        GRND_Temp = GRND[i].ravel()
        Pred_Temp = Pred[i].ravel()
        EO_Temp_TOT = 0
        EE_Temp_TOT = 0
        EM_Temp_TOT = 0
        EP_Temp_TOT = np.sum(Pred_Temp)
        EG_Temp_TOT = np.sum(GRND_Temp)
        for ii in range(len(GRND_Temp)):
            if Pred_Temp[ii] < 0:
                Pred_Temp[ii] = 0
            if GRND_Temp[ii] > Pred_Temp[ii]:
                EO_Temp = Pred_Temp[ii]
                EE_Temp = 0
                EM_Temp = np.abs(GRND_Temp[ii] - Pred_Temp[ii])
            elif GRND_Temp[ii] < Pred_Temp[ii]:
                EO_Temp = GRND_Temp[ii]
                EE_Temp = np.abs(Pred_Temp[ii] - GRND_Temp[ii])
                EM_Temp = 0
            elif GRND_Temp[ii] == Pred_Temp[ii]:
                EO_Temp = GRND_Temp[ii]
                EE_Temp = 0
                EM_Temp = 0
            # EO_Temp = min(GRND_Temp[ii], Pred_Temp[ii])
            # EE_Temp = max(GRND_Temp[ii], Pred_Temp[ii])
            EO_Temp_TOT = EO_Temp_TOT + EO_Temp
            EE_Temp_TOT = EE_Temp_TOT + EE_Temp
            EM_Temp_TOT = EM_Temp_TOT + EM_Temp
        EO_Temp_Norm = EO_Temp_TOT/EG_Temp_TOT
        EE_Temp_Norm = EE_Temp_TOT/EG_Temp_TOT
        EM_Temp_Norm = EM_Temp_TOT/EG_Temp_TOT
        DEOI_Temp = (2*EO_Temp_TOT)/((2*EO_Temp_TOT)+EE_Temp_TOT+EM_Temp_TOT)
        DEOI.append(DEOI_Temp)
    DEOI_OVR = round(np.mean(DEOI), 4)
    print(f'DEOI: {DEOI_OVR}')
    return DEOI_OVR

In [None]:
Calculate_DEOI(Y_Test, Y_Pred)

# Infinite Loop to Keep the Tab Alive

In [None]:
while True:
    pass