In [None]:
import numpy as np
import matplotlib.pyplot as plt
import brian2
from IPython.core.display import SVG

# A guide to the OSCFAR SNN

Let's first generate some data with a peak at the position 7.

In [None]:
gen = np.random.default_rng()
data_1D = gen.random(15)
data_1D[7] += 1

plt.plot(data_1D)
plt.grid(True)
plt.show()

## Traditional OSCFAR algorithm

<img src="../data/images/CFAR.svg" />

The figure shows how the OSCFAR algorithm processes the data. The boxes on top correspond to a numpy array. We want to figure out if the yellow box contains valuable information or not. In other words, does it contain a signal or is it background noise. The output will be 1 or 0, respectively. 

In the picture above, the blue cells are called neighbour cells, the red cells are guarding cells and the yellow cell is the test cell. The OS-CFAR algorithm computes the $k$-th largest value $x_k$ among the neighbouring cells ($f(x)$ in the figure). Afterwards, it compares it to a scaled version of the test cell, e.g. $S\cdot x_{test} - x_k$, where $S$ is the scale factor. The final output is determined by a Heaviside function, e.g. it returns 1 if $S\cdot x_{test} - x_k > 0$ and zero else.

We implemented this algorithm in ```oscfar_core_1d```. The algorithm can also be applied to larger arrays with a sliding window approach. This is implemented in ```oscfar_1d```.


In [None]:
def oscfar_core_1d(np_array, neighbour_cells, guarding_cells, k, scale_factor):
    '''
    OSCFAR implementation. 
    @param np_array : array to analyze
    @param neighbour_cells : cells to determine background. 
        Number refers to cells left of the test cell.
    @param guarding_cells : cells shielding test cell.
        Number refers to cells left of the test cell.
    @param k : k-th largest value for statistical measure
    @param scale_factor : scale factor S for test cell.
    '''
    
    # test array size
    if np_array.size != 2*neighbour_cells+2*guarding_cells+1:
        raise ValueError('oscfar_core_1d: array size and cell sizes do not match.')
    
    # save value of test cell
    center_value = np_array[neighbour_cells+guarding_cells]
    
    # dense array with only neighbour cells
    neighbours = np.zeros(2*neighbour_cells)
    neighbours[:neighbour_cells] = np_array[:neighbour_cells]
    neighbours[neighbour_cells:] = np_array[neighbour_cells+2*guarding_cells+1:]
    
    # compute k-th largest neighbour value
    threshold = np.partition(neighbours,-k)[-k]
    
    # compare scaled center value against background threshold
    comparison = scale_factor*center_value - threshold
    
    # return Heaviside of comparision
    if comparison > 0:
        return 1
    else:
        return 0

    
def oscfar_1d(np_array, neighbour_cells, guarding_cells, k, scale_factor):
    '''
    OSCFAR implementation. Handles longer arrays with sliding window approach.
    Does not handle boundaries, only uses full window size for OSCFAR.
    @param np_array : array to analyze
    @param neighbour_cells : cells to determine background. 
        Number refers to cells left of the test cell.
    @param guarding_cells : cells shielding test cell.
        Number refers to cells left of the test cell.
    @param k : k-th largest value for statistical measure
    @param scale_factor : scale factor S for test cell.
    '''
    
    # prepare array for results (smaller than original array)
    # window of certain size n only fits int an array of length N 
    # N-n+1 times. 
    window_size = 2*neighbour_cells+2*guarding_cells+1
    result = np.zeros(np_array.size-2*(neighbour_cells+guarding_cells))
    
    # slide window and apply osfar_core_1d to each window
    for i in range(result.size):
        result[i] = oscfar_core_1d(np_array[i:i+window_size], neighbour_cells, 
                                guarding_cells, k, scale_factor)
    
    return result


Let's test out implementation on the sample data generated above.

In [None]:
# define parameters
neighbour_cells, guarding_cells, k, scale_factor = 4, 1, 3, .5

# run OSCFAR on sample data with parameters as above
oscfar_1d_result = oscfar_1d(data_1D, neighbour_cells, guarding_cells, k, scale_factor)

# visualize the data together with the results of OSCFAR
plt.plot(data_1D, label='data')
plt.scatter(range(neighbour_cells+guarding_cells,data_1D.size - neighbour_cells - guarding_cells), 
         oscfar_1d_result, label='oscfar')
plt.legend()
plt.grid(True)
plt.show()

## A SNN to replace the OSCFAR algorithm

<img src="../data/images/OSCFAR_SNN.svg" />

To replace the OSCFAR algorithm, we only need a single spiking neuron. This neuron is an IF neuron. An IF neuron obeys the differential equation $dv/dt = 0$, which means the membrane potential does not change over time. If an IF neuron receives a spike from a presynaptic neuron via a connection associated with a weight $w_i$ at $t_i^f$, we observe a sudden (non-continous) jump in the membrane potential by the value value $w_i$:

\begin{equation}
\text{lim}_{\epsilon > 0, \ \epsilon \rightarrow 0} \ \ 
v(t_i^f + \epsilon) = v(t_i^f - \epsilon) + w_i .
\end{equation}

The behaviour is shown by with a dotted brown line in the voltage time diagram above. To evaluate the OSCFAR algorithm, we make use of the time dimenstion of the IF neuron. The values $x_i$ of the neighbour, guarding and test cell(s) are encoded in the precise spike timing according to

\begin{equation}
t_i = (t_{max}-t_{min}) \cdot \left(1- \frac{x_i - x_{min}}{x_{max}-x_{min}} \right) + t_{min} \ .
\end{equation}

The parameters $t_{min}, t_{max}, x_{min}, x_{max}$ need to be chosen appropriately. For instance, $x_{min}, x_{max}$ need bound all $x_i$'s : $x_{min} \leq x_i \leq x_{max} \ \forall i$. The larger the encoded value $x_i$, the earlier the associated neuron spikes ($x_j < x_i \Rightarrow t_i < t_j$).


In [None]:
def time_encoding(np_array, t_max, t_min, x_max, x_min):
    return (t_max-t_min)*(1-(np_array - x_min) / (x_max-x_min)) + t_min


We now want to describe a possible choice of parameters to express the OSCFAR algorithm. Let's fix our initial conditions by $v(t_0) = 0, t_0 < t_{min}$. Further, our IF neuron spikes at $v_{thres} \geq 1$. As larger input values correspond to earlier spike times, figuring out if the test cell is larger than the $k$-th largest value is equivalent to asking if less than $k-1$ spikes arrived before the test spike. Hence, we choose the weights of all neighbour cells to be $w_n = -1$ (blue lines above) and the test weight as $w_t = k$. If less then $k$ spikes arrive before the test spike, the membrane potential is $v \geq -(k-1)$. If this is followed by a test spike, the membrane potential becomes $v\geq -k+1 +k = 1$ and the IF neuron spikes. 

### An educational graphical example

Let's quickly demonstrate how this works. Feel free to try different values for ```k``` and ```test_value```. If the ouput neuron spikes, you see a vertical orange line in the final graph and the test value is large enough to be a signal.

In [None]:
# define parameteres k and test_value
k = 2
test_value = .48

# generate the test array and the weights as described above
inputs = [.3,.55,.6,test_value,.7,.41,.2]
no_inputs = len(inputs)
weights = [-1,-1,0,k,0,-1,-1]

# start Brian2 SNN simulation
brian2.start_scope()

# encode the values in the time domain
simulation_time = 50*brian2.ms
input_times = time_encoding(inputs, 50, 0, 
                            np.amax(inputs)+.5, 
                            np.amin(inputs)-.5)
input_times = [x*brian2.ms for x in input_times]

# define input neurons that spike at specified times
input_neurons = brian2.NeuronGroup(no_inputs, 'tspike:second',
                                   threshold='t>tspike', 
                                   refractory= simulation_time)
input_neurons.tspike = input_times

# define IF neuron to compare k-th largest value
tau = 10*brian2.ms
eqs = '''
dv/dt = 0./tau  : 1 
'''
compute_neuron = brian2.NeuronGroup(1, eqs, threshold='v>=1', 
                                    reset='v = 0', method ='euler')

# enforce connectivity with weights as above
S = brian2.Synapses(input_neurons,compute_neuron,'w:1',
                    on_pre='v_post += w')
S.connect()
S.w = weights

# monitor membrane potential and spikes of IF neuron
statemon = brian2.StateMonitor(compute_neuron,'v',record=True)
spikemon = brian2.SpikeMonitor(compute_neuron)

# run brian2 simulation
brian2.run(simulation_time)

# visualize results
brian2.plot(statemon.t/brian2.ms, statemon.v[0])
for spike in spikemon.t:
    brian2.axvline(spike/brian2.ms, ls='--', c='C1', lw=3)
brian2.xlabel('Time (ms)')
brian2.ylabel('v')
brian2.grid(True);

### The OSCFAR SNN

We have explained all the details and now we need to put everything together. As before, we implement the core functionality of the OSCFAR separately from the sliding window version. ```oscfar_core_1d``` has the same functionality as ```oscfar_snn_core_1d``` but the latter is based on the SNN logic. The same is true for the relation between ```oscfar_1d``` and ```oscfar_snn_1d```. 

In [None]:
def oscfar_snn_core_1d(np_array, neighbour_cells, guarding_cells, k, scale_factor, 
                       t_max = 50, t_min = 0, x_max = 2, x_min = -1):
    '''
    OSCFAR SNN implementation. 
    @param np_array : array to analyze
    @param neighbour_cells : cells to determine background. 
        Number refers to cells left of the test cell.
    @param guarding_cells : cells shielding test cell.
        Number refers to cells left of the test cell.
    @param k : k-th largest value for statistical measure
    @param scale_factor : scale factor S for test cell.
    @param t_max : latest possible spike time
    @param t_min : earliest possible spike time
    @param x_max : upper bound for encoded values
    @param x_max : lower bound for encoded values
    '''
    
    
    # time-encode the input values
    inputs = np_array.copy()
    inputs[neighbour_cells+guarding_cells] *= scale_factor
    inputs = time_encoding(inputs,t_max, t_min, x_max, x_min)
    #print(inputs) # DEBUG statement
    
    # define weights
    weights = np.zeros_like(np_array)
    weights[:neighbour_cells] = -1
    weights[neighbour_cells+2*guarding_cells+1:] = -1
    weights[neighbour_cells+guarding_cells] = k+0.1
    #print(weights) # DEBUG statement

    # initialize brian simulation
    brian2.start_scope()
    
    # define simulation time
    simulation_time = (np.max(inputs)+5)*brian2.ms
    
    # adjust input spike times for Brian2
    input_times = [x*brian2.ms for x in inputs]
    no_inputs = len(input_times)
    
    # define neuron group that spikes at specific input times
    input_neurons = brian2.NeuronGroup(no_inputs, 'tspike:second',
                                       threshold='t>tspike', 
                                       refractory= simulation_time)
    input_neurons.tspike = input_times

    # define IF neuron according to previous explanations 
    tau = 10*brian2.ms
    eqs = '''
    dv/dt = 0./tau  : 1
    '''
    compute_neuron = brian2.NeuronGroup(1, eqs, threshold='v>=1', 
                                        refractory= simulation_time, #refractory not working
                                        reset='v = 0', method ='euler')

    # establish connectivity of the network, weights from above
    S = brian2.Synapses(input_neurons,compute_neuron,'w:1',
                        on_pre='v_post += w')
    S.connect()
    S.w = weights

    # monitor spiking behaviour 
    spikemon = brian2.SpikeMonitor(compute_neuron)
    
    # run Brian2 simulation
    brian2.run(simulation_time)
    
    # return 1 if spike occurs, 0 if not.
    return len(spikemon.t)


def oscfar_snn_1d(np_array, neighbour_cells, guarding_cells, k, scale_factor, 
                  t_max=50,t_min=0, x_max = 1, x_min=-1):
    '''
    OSCFAR SNN implementation. 
    @param np_array : array to analyze
    @param neighbour_cells : cells to determine background. 
        Number refers to cells left of the test cell.
    @param guarding_cells : cells shielding test cell.
        Number refers to cells left of the test cell.
    @param k : k-th largest value for statistical measure
    @param scale_factor : scale factor S for test cell.
    @param t_max : latest possible spike time
    @param t_min : earliest possible spike time
    @param x_max : upper bound for encoded values
    @param x_max : lower bound for encoded values
    '''
    # prepare array for results (smaller than original array)
    # window of certain size n only fits int an array of length N 
    # N-n+1 times. 
    window_size = 2*neighbour_cells+2*guarding_cells+1
    result = np.zeros(np_array.size-2*(neighbour_cells+guarding_cells))
    
    # slide window and apply osfar_core_1d to each window
    for i in range(result.size):
        result[i] = oscfar_snn_core_1d(np_array[i:i+window_size], 
                                       neighbour_cells, guarding_cells, 
                                       k, scale_factor, t_max, t_min, x_max, x_min)
    return result

### The results

Let's apply the SNN CFAR to the same data as the traditional CFAR algorithm. If the work correctly, you should see that the blue dots and the orange x's coincide. 

In [None]:
# define parameters (same as for traditional OSCFAR)
neighbour_cells, guarding_cells, k, scale_factor = 4, 1, 3, .5
t_max, t_min, x_max, x_min = 50, 0, 2, 0

# run OSCFAR SNN simulation
oscfar_snn_1d_result = oscfar_snn_1d(data_1D, neighbour_cells, 
                                     guarding_cells, k, scale_factor, 
                                     t_max, t_min, x_max, x_min)

# visualize data, OSCFAR, OSCFAR-SNN results
plt.plot(data_1D, label='data')
plt.scatter(range(neighbour_cells+guarding_cells,data_1D.size - neighbour_cells - guarding_cells), 
         oscfar_1d_result, label='oscfar')
plt.scatter(range(neighbour_cells+guarding_cells,data_1D.size - neighbour_cells - guarding_cells), 
         oscfar_1d_result, label='oscfar snn', marker ='x')
plt.legend()
plt.grid(True)
plt.show()