> Copyright 2022 University of Luxembourg
> 
> Licensed under the Apache License, Version 2.0 (the "License");  
> you may not use this file except in compliance with the License.  
> You may obtain a copy of the License at  
>
>    https://www.apache.org/licenses/LICENSE-2.0
>
> Unless required by applicable law or agreed to in writing, software  
> distributed under the License is distributed on an "AS IS" BASIS,  
> WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  
> See the License for the specific language governing permissions and  
> limitations under the License.  
>
***

Author: Andrzej Mizera (andrzej.mizera@uni.lu)

***

# Called by the AtMonSat anomaly detection algorithm to compute prediction errors for individual abnormal datasets.

In [None]:
islc_abnorm = getIterationsSinceLastChangeMicro(abnormal_df,abnormal_features)
df_abnorm_values = pd.DataFrame(islc_abnorm)
feature_abnorm_values = abnormal_df[abnormal_features].astype('float32').values

---

In [None]:
def getInterpolationInfo(df,features):
    
    fs = list(features) 

    num_iterations_since_last_change_vec = np.zeros(len(fs),np.float32)
    
    list_of_num_iterations_since_last_change = [num_iterations_since_last_change_vec.tolist()]
    
    i = 0
    
    result = []
    
    while (i+1 < len(df.index)):
        
        next_num_iterations_since_last_change_vec = np.zeros(len(fs),np.float32)
        
        features_to_interpolate = []    
        for f_ind,f in enumerate(fs):
            
            if (df[f].iat[i] == df[f].iat[i+1]):
                next_num_iterations_since_last_change_vec[f_ind] = num_iterations_since_last_change_vec[f_ind] + 1
            else:
                features_to_interpolate.append(f_ind)
         
        list_of_num_iterations_since_last_change.append(next_num_iterations_since_last_change_vec.tolist())
        
        # If any of the features changed
        if len(features_to_interpolate) > 0:

            # Number of iterations back to previous change 
            nilc = int(min(num_iterations_since_last_change_vec)) + 1
            
            result.append((i+1,nilc))

        num_iterations_since_last_change_vec = next_num_iterations_since_last_change_vec
        
        i = i + 1
        
    return result

In [None]:
# Index of temperature change datapoint and the number of datapoints since previous temperature change.
# Returns a dictionary with t_anomalies as keys and pairs of anomaly index and the detection margin before
# anomaly.
def getInterpolationInfoForAnomalies(df,features,t_anomalies):
    
    interpInfo = getInterpolationInfo(df,features)
    
    InterpInfoForAnomalies = {}
    
    for t_anomaly in t_anomalies:
        
        ind_anomaly = list(df.index >= t_anomaly).index(True)

        for entry in interpInfo:
            if (entry[0] >= ind_anomaly):
                
                # c x x x x x a x x x c
                # c- change, x - no change, a - anomaly
                ind_first_change_after_anomaly = entry[0]
                detection_margin_before_anomaly = entry[1] - (entry[0] - ind_anomaly) - 1
                
                assert detection_margin_before_anomaly >= 0
                
                InterpInfoForAnomalies[t_anomaly] = (ind_anomaly,detection_margin_before_anomaly)
                
                break
    
    return InterpInfoForAnomalies

---

### Preparation of the data for anomaly detection

In [None]:
if (model_name == 'AutoEncoder'):

    _, _, X_abnorm, Y_abnorm = create_subseq_AE(np.append(df_abnorm_values,feature_abnorm_values,axis=1), window_length)
    
elif (model_name == 'CNN') or (model_name == 'LSTM') or (model_name == 'LSTM_2'):

    _, _, X_abnorm, Y_abnorm = create_subseq(np.append(df_abnorm_values,feature_abnorm_values,axis=1), np.append(df_abnorm_values,feature_abnorm_values,axis=1), window_length, 1)

else:
    raise ValueError('Wrong model name!')

### Computation of the prediction errors on the test data subset, i.e., test errors.

In [None]:
cet_abnorm = abnormal_df.index

In [None]:
pred = model(np.array(X_abnorm))
    
pred_np = pred.numpy()
if (len(pred_np.shape) == 2):
    pred_np = np.expand_dims(pred_np,axis=1)
    
te = pred_np - np.array(Y_abnorm)

test_errors = np.reshape(te,(te.shape[0],te.shape[1]*te.shape[2]))

In [None]:
padding = np.zeros(window_length)

def score(x):
    if not kPCA:
        pca_variances = pca.explained_variance_[first_higher_order_PCA:]
    else:
        pca_variances = pca.lambdas_[first_higher_order_PCA:]
    #return sum(np.divide(np.square(x),np.tile(pca_variances,window_length)))
    return sum(np.divide(np.square(x),pca_variances))

def vec_length(x):
    return np.sqrt(sum(np.square(x)))

if PCA_higher_order_analysis:
    error_fun = score
else:
    error_fun = vec_length

In [None]:
fs = plt.rcParams.get('font.size')
plt.rcParams.update({'font.size': 20})
figure(figsize=(20, 7))
if DeltaLastChangeTimes_analysis:
    plt.plot(cet_abnorm,
             np.concatenate((padding,np.apply_along_axis(error_fun, 1, test_errors))),
             color='r', label='Euclidean error'
            )
else:
    plt.plot(np.concatenate((padding,np.apply_along_axis(error_fun, 1, test_errors))),
             color='r', label='Euclidean error'
            )
plt.xlabel('Timestamp [Month-Day Hour]')
plt.ylabel('Euclidean error')

if len(anomaly_times) > 0:
    if DeltaLastChangeTimes_analysis:
        plt.vlines(t_anomalies,0,max(np.apply_along_axis(error_fun, 1, test_errors)),color=ANOMALY_COLOR)
    else:
        ind_anomalies = [list(abnormal_df_full.index >= t_anomaly).index(True) for t_anomaly in t_anomalies]
        plt.vlines(ind_anomalies,0,max(np.apply_along_axis(error_fun, 1, test_errors)),color=ANOMALY_COLOR)

if DeltaLastChangeTimes_analysis:
    plt.hlines(error_threshold,cet_abnorm[0],cet_abnorm[-1],'g','dashed')
else:
    plt.hlines(error_threshold,ind_anomalies[0],ind_anomalies[-1],'g','dashed')

plot_file_name = os.path.join(output_folder,'error_' + anomaly_ds + '.pdf')
plt.savefig(plot_file_name, format='pdf', bbox_inches='tight', pad_inches=0.1)

plt.show()
plt.rcParams.update({'font.size': fs})

### Compute the Mahalanobis distances for the prediction errors on the test data subset.

In [None]:
# calculate Mahalanobis distance
def Mahala_distance(x,mean,cov):
    d = np.dot(x-mean,np.linalg.inv(cov))
    d = np.dot(d, (x-mean).T)
    return d

m_dist = [0]*window_length 
for e in test_errors:
    m_dist.append(Mahala_distance(e,mean,cov))

### Plot the Mahalanobis distance for the test dataset predictions.

In [None]:
from matplotlib.dates import DateFormatter

fs = plt.rcParams.get('font.size')
plt.rcParams.update({'font.size': 20})
figure(figsize=(20, 7))
if DeltaLastChangeTimes_analysis:
    plt.plot(cet_abnorm, m_dist, color='r', label='Mahalanobis Distance')
else:
    plt.plot(m_dist, color='r', label='Mahalanobis Distance')

plt.ylabel('Mahalanobis error')

if len(anomaly_times) > 0:
    if DeltaLastChangeTimes_analysis:
        plt.vlines(t_anomalies,0,max(m_dist),colors=ANOMALY_COLOR)
    else:
        ind_anomalies = [list(abnormal_df_full.index >= t_anomaly).index(True) for t_anomaly in t_anomalies]
        plt.vlines(ind_anomalies,0,max(m_dist),colors=ANOMALY_COLOR)

if DeltaLastChangeTimes_analysis:
    plt.hlines(mahalanobis_error_threshold,cet_abnorm[0],cet_abnorm[-1],'g','dashed',
               label='Mahalanobis error threshold')
else:
    plt.hlines(mahalanobis_error_threshold,ind_anomalies[0],ind_anomalies[-1],'g','dashed',
               label='Mahalanobis error threshold')

date_form = DateFormatter("%H:%M")
ax = plt.gca()
ax.xaxis.set_major_formatter(date_form)

plot_file_name = os.path.join(output_folder,'mahalanobis_error_' + anomaly_ds + '.pdf')
plt.savefig(plot_file_name, format='pdf', bbox_inches='tight', pad_inches=0.1)

plt.show()
plt.rcParams.update({'font.size': fs})