### Prototypical Projection Based Anomaly Detector
###   Code by F. Vides, E. Segura, C. Vargas
###   For Paper, "On Operator Theory-Based Anomaly Detection in Cyber-Physical Systems"
###   by F. Vides, E. Segura, C. Vargas
### @authors: F. Vides, E. Segura, C. Vargas

In [1]:
from matplotlib.pyplot import plot,subplot,show,grid,tight_layout, figure, axvspan
from numpy.linalg import svd, eigh
from numpy import zeros, real, where, nan, append, ones, maximum
from numpy.random import randn
from scipy.linalg import hankel
from pandas import read_excel,read_csv
from statistics import mean, stdev
import numpy as np
import time

In [2]:
from ipywidgets import interact, interactive, fixed, interact_manual
import ipywidgets as widgets

In [3]:
def InversePowerMethod(A,tol, kIterMax,q0):
    
    import numpy as np
    
    n = A.shape[0]

    x = np.random.rand(n).reshape(-1,1)

    B = A.copy()

    error = 1
    
    #First Run
    for od in range(n):
            B[od,od] = A[od,od] - q0
        
    x = np.linalg.solve(B,x)
    x = x/np.linalg.norm(x)
    lamOld = x.T@A@x
    
    kIter = 1
    
    while kIter < kIterMax:
        
        #od = on diagonal
        for od in range(n):
            B[od,od] = A[od,od] - lamOld
        
        x = np.linalg.solve(B,x)
        x = x/np.linalg.norm(x)
        lamNew = x.T@A@x
        kIter = kIter + 1
        
        error = np.abs(lamNew - lamOld)
        if error < tol:
            return (x, lamNew, kIter)
        else:
            lamOld = lamNew
        
        
        
    return (x, lamNew, kIter)

In [4]:
def intermediateSVD(signal, L,N_hankel, figsize):
        
    x1 = signal.copy()
    xm = mean(x1)
    x0 = x1 - xm
    H = hankel(x0[:L],x0[(L-1):N_hankel])


    startTime = time.time()
    
    H1 = H@H.T
    u,s,v = svd(H,full_matrices=0)
    p = u[:,-1]
    
    finishTime = time.time()
    print("Time taken for SVD: {:.4f} seconds".format(finishTime-startTime))

    lp = len(p)
    lx = len(x0)
    N = lx-lp

    
    d0 = zeros(lx)
    d1 = zeros(lx)

    for k in range(N):
        d0[k+lp-1] = abs(p.T@x0[(k):(k+lp)])
        d1[N-k-1]  = abs(p.T@x0[(lx-k-lp):(lx-k)])

    std_d0 = stdev(d0)
    std_d1 = stdev(d1)

    return(d0,d1,std_d0,std_d1,x1,x0,xm,lp,lx,N,figsize)   

In [5]:
def intermediateEigH(signal, L,N_hankel, figsize):
        
    x1 = signal.copy()
    xm = mean(x1)
    x0 = x1 - xm
    H = hankel(x0[:L],x0[(L-1):N_hankel])


    startTime = time.time()
    
    H1 = H@H.T
    eigvec = eigh(H1)[1]
    p = eigvec[:,0]
    
    finishTime = time.time()
    print("Time taken for EigH: {:.4f} seconds".format(finishTime-startTime))

    lp = len(p)
    lx = len(x0)
    N = lx-lp

    
    d0 = zeros(lx)
    d1 = zeros(lx)

    for k in range(N):
        d0[k+lp-1] = abs(p.T@x0[(k):(k+lp)])
        d1[N-k-1]  = abs(p.T@x0[(lx-k-lp):(lx-k)])

    std_d0 = stdev(d0)
    std_d1 = stdev(d1)

    return(d0,d1,std_d0,std_d1,x1,x0,xm,lp,lx,N,figsize) 

In [6]:
def intermediateIPM(signal, L,N_hankel, tolIPM, kIterMax, q0, figsize):
        
    x1 = signal.copy()
    xm = mean(x1)
    x0 = x1 - xm
    H = hankel(x0[:L],x0[(L-1):N_hankel])


    startTime = time.time()
    
    H1 = H@H.T
    p, lam, kIter = InversePowerMethod(A=H1, tol=tolIPM,
                                      kIterMax = kIterMax,
                                      q0 = q0)

    
    finishTime = time.time()
    print("Number of iteration of IPM. kIter = {:d}".format(kIter))
    print("Time taken for IPM: {:.4f} seconds".format(finishTime-startTime))

    lp = len(p)
    lx = len(x0)
    N = lx-lp

    
    d0 = zeros(lx)
    d1 = zeros(lx)

    for k in range(N):
        d0[k+lp-1] = abs(p.T@x0[(k):(k+lp)])
        d1[N-k-1] = abs(p.T@x0[(lx-k-lp):(lx-k)])

    std_d0 = stdev(d0)
    std_d1 = stdev(d1)

    return(d0,d1,std_d0,std_d1,x1,x0,xm,lp,lx,N,figsize)

    

In [7]:
def calibration(tolerance, and_or, rest):
    
    from numpy import logical_or
    from numpy import logical_and
    
    d0,d1,std_d0,std_d1,x1,x0,xm,lp,lx,N,figsize = rest
    
    
    threshold0 = tolerance*std_d0
    threshold1 = tolerance*std_d1    
    
    d0 = (d0 >= threshold0)
    d1 = (d1 >= threshold1)

    
    if and_or == "AND": 
        d = logical_and(d0,d1)
    if and_or == "OR":
        d = logical_or(d0,d1)

    di = where(d==1)[0]
    y = nan*ones(lx)
    #y[di] = x0[di]
    y[di] = x1[di]
    
    figure(figsize =figsize)
    
    ax1 = subplot(3,1,1)
    ax1.set_xlim(0,len(x0)+1)
    
    plot(x1,'blue')
    grid(color='k', linestyle='--', linewidth=0.5)
    ax1.set_title('Signal', fontsize=12)
    
    ax2 = subplot(3,1,2)
    
    #Put this or it doesn't plot the whole region
    ax2.set_xlim(0,len(x0)+1)
    ax2.set_ylim(0,1)
        
    
    for anom in di:
        ax2.axvspan(anom-0.5, anom+0.5, facecolor = "seagreen", alpha = 1)
    
    plot(di,'darkorange',)
    grid(color='k', linestyle='--', linewidth=0.5)
    ax2.set_title('Identified scanning region', fontsize=12)
    tight_layout()
    
    ax3 = subplot(3,1,3)
    
    ax3.set_xlim(0,len(x0)+1)
    for anom in di:
        ax3.axvspan(anom-0.5, anom+0.5, facecolor = "seagreen", alpha = 1)

    plot(x1,'blue')
    #plot(y,"darkorange")
    #plot(d*x1,'darkorange')
    plot(d*x0+xm,'darkorange')
    grid(color='k', linestyle='--', linewidth=0.5)
    ax3.set_title('Identified anomalies', fontsize=12)
    tight_layout()
    show()

### Let's define the figure size here

In [8]:
figsize = (16,7)

# Let's calibrate the synthetic signal

In [9]:
signal = read_csv('../Data/synthetic_signal.csv', header = None)
signal = signal.values.reshape(-1)

### Set some value for L and N_hankel here

In [10]:
L = 150
N_hankel = 1200

## Using SVD

### The intermediate step is for the widget not recalculating the SVD decomposition, and only changing the sensitivity.
### It's easier on the computer to only adjust with the widget the sensitivity and change manually the L and S in the cell before for the calibration

In [13]:
rest = intermediateSVD(signal = signal, L = L, N_hankel = N_hankel,
                       figsize = figsize)
interact(calibration, tolerance=(0,2.4,0.01),
         and_or = ["AND","OR"],
        rest = fixed(rest) );

Time taken for SVD: 0.0357 seconds


interactive(children=(FloatSlider(value=1.2, description='tolerance', max=2.4, step=0.01), Dropdown(descriptio…

## Using Eigen Hermitian

### The intermediate step is for the widget not recalculating the Eigen Hermitian decomposition, and only changing the sensitivity.
### It's easier on the computer to only adjust with the widget the sensitivity and change manually the L and S in the cell before for the calibration

In [16]:
L = 150
N_hankel = 1200

In [17]:
rest = intermediateEigH(signal = signal, L = L, N_hankel = N_hankel,
                       figsize = figsize)
interact(calibration, tolerance=(0,2.4,0.01),
         and_or = ["AND","OR"],
        rest = fixed(rest) );

Time taken for EigH: 0.0045 seconds


interactive(children=(FloatSlider(value=1.2, description='tolerance', max=2.4, step=0.01), Dropdown(descriptio…

## Using Inverse Power Method

### The intermediate step is for the widget not recalculating the IPM decomposition, and only changing the sensitivity.
### It's easier on the computer to only adjust with the widget the sensitivity and change manually the L and S in the cell before for the calibration

In [18]:
L = 150
N_hankel = 1200

tolIPM = 1e-7
kIterMax = 400
#This q0 is an approximation to the lowest singular value
#This can be estimated by other methods, but here we use it as known
q0 = 0.01975  + 1e-5

In [19]:
rest = intermediateIPM(signal = signal, L = L, N_hankel = N_hankel,
                       tolIPM = tolIPM,
                       kIterMax = kIterMax,
                       q0 = q0,
                       figsize = figsize)
interact(calibration, tolerance=(0,2.4,0.01),
         and_or = ["AND","OR"],
        rest = fixed(rest) );

Number of iteration of IPM. kIter = 3
Time taken for IPM: 0.0064 seconds


interactive(children=(FloatSlider(value=1.2, description='tolerance', max=2.4, step=0.01), Dropdown(descriptio…

# Let's calibrate the first real signal
### First read the file that has the signal

In [21]:
signal = read_csv('../Data/real_signal_1.csv', header = None)
signal = signal.values.reshape(-1)

### The intermediate step is for the widget not recalculating the SVD decomposition, and only changing the sensitivity.
### It's easier on the computer to only adjust with the widget the sensitivity and change manually the L and S in the cell before for the calibration

## Let's do it with the SVD Method

### Set some value for L and N_hankel here

In [22]:
L = 75
N_hankel = 1300

In [23]:
rest = intermediateSVD(signal = signal, L = L, N_hankel = N_hankel,
                       figsize = figsize)
interact(calibration, tolerance=(0,2.4,0.01),
         and_or = ["AND","OR"],
        rest = fixed(rest) );

Time taken for SVD: 0.0196 seconds


interactive(children=(FloatSlider(value=1.2, description='tolerance', max=2.4, step=0.01), Dropdown(descriptio…

## Now with the Eigen Hermitian method

In [24]:
L = 75
N_hankel = 1300

In [25]:
rest = intermediateEigH(signal = signal, L = L, N_hankel = N_hankel,
                       figsize = figsize)
interact(calibration, tolerance=(0,2.4,0.01),
         and_or = ["AND","OR"],
        rest = fixed(rest) );

Time taken for EigH: 0.0127 seconds


interactive(children=(FloatSlider(value=1.2, description='tolerance', max=2.4, step=0.01), Dropdown(descriptio…

### Let's do it with the Inverse Power Method

In [28]:
L = 75
N_hankel = 1300

tolIPM = 1e-7
kIterMax = 400
#This q0 is an approximation to the lowest singular value
#This can be estimated by other methods, but here we use it as known
q0 = 2313.020907 + 1e-1
#q0 = 0

In [29]:
rest = intermediateIPM(signal = signal, L = L, N_hankel = N_hankel,
                       tolIPM = tolIPM,
                       kIterMax = kIterMax,
                       q0 = q0,
                       figsize = figsize)
interact(calibration, tolerance=(0,2.4,0.01),
         and_or = ["AND","OR"],
        rest = fixed(rest) );

Number of iteration of IPM. kIter = 3
Time taken for IPM: 0.0114 seconds


interactive(children=(FloatSlider(value=1.2, description='tolerance', max=2.4, step=0.01), Dropdown(descriptio…

# Let's do it with the second real signal

In [30]:
signal = read_csv('../Data/real_signal_2.csv', header = None)
signal = signal.values.reshape(-1)

## Let's do it with the SVD Method

### Set some value for L and N_hankel here

In [31]:
L = 90
N_hankel = 900

In [32]:
rest = intermediateSVD(signal = signal, L = L, N_hankel = N_hankel,
                       figsize = figsize)
interact(calibration, tolerance=(0,1.8,0.01),
         and_or = ["AND","OR"],
        rest = fixed(rest) );

Time taken for SVD: 0.0110 seconds


interactive(children=(FloatSlider(value=0.9, description='tolerance', max=1.8, step=0.01), Dropdown(descriptio…

## Let's do it with the Eigen Hermitian Method

### Set some value for L and N_hankel here

In [34]:
rest = intermediateEigH(signal = signal, L = L, N_hankel = N_hankel,
                       figsize = figsize)
interact(calibration, tolerance=(0,1.8,0.01),
         and_or = ["AND","OR"],
        rest = fixed(rest) );

Time taken for EigH: 0.0056 seconds


interactive(children=(FloatSlider(value=0.9, description='tolerance', max=1.8, step=0.01), Dropdown(descriptio…

### Let's do it now with the Inverse Power Method

In [35]:
L = 90
N_hankel = 900

tolIPM = 1e-8
kIterMax = 100
#This q0 is an approximation to the lowest singular value
#This can be estimated by other methods, but here we use it as known
q0 = 2.5697 + 1e-3

In [36]:
rest = intermediateIPM(signal = signal, L = L, N_hankel = N_hankel,
                       tolIPM = tolIPM,
                       kIterMax = kIterMax,
                       q0 = q0,
                       figsize = figsize)
interact(calibration, tolerance=(0,2.4,0.01),
         and_or = ["AND","OR"],
        rest = fixed(rest) );

Number of iteration of IPM. kIter = 3
Time taken for IPM: 0.0436 seconds


interactive(children=(FloatSlider(value=1.2, description='tolerance', max=2.4, step=0.01), Dropdown(descriptio…