# Data analysis

This feature able the user to develop real-time data analysis, consist of the complete Python-powered environment, with a set of custom methods for agile development.

<img src='images/analisys-bare_minimum.gif'></img>

## Bare minimum

In [None]:
from bci_framework.extensions.data_analysis import DataAnalysis

class Analysis(DataAnalysis):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

if __name__ == '__main__':
    Analysis()

## Data stream access

The data stream is accessed asynchronously with the `loop_consumer` decorator from `bci_framework.extensions.data_analysis`, this decorator requires the Kafka topics to access.

There is 4 default topics available in BCI-Framework: `eeg`, `markers`, `annotations` and `commands`.

In [None]:
from bci_framework.extensions.data_analysis import DataAnalysis, loop_consumer
import logging

class Analysis(DataAnalysis):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.stream()

    @loop_consumer('eeg')
    def stream(self):
        logging.debug('Incoming data...')

if __name__ == '__main__':
    Analysis()

The decorated method receives 3 optional arguments: `data`, `topic` and `frame`.

### data
The `stream` object from Kafka.

### topic
The topic of the Kafka stream, this object is available too in the object `data.topic`

### frame
Incremental flag with the counter of streamed data. 

In [None]:
@loop_consumer('eeg')
def stream(self, data, topic, frame):
    eeg, aux = data.value['data']
    
    logging.debug(f'Incoming data #{frame}')
    logging.debug(f'Data({data})')
    logging.debug(f'EEG{eeg.shape}')
    logging.debug(f'AUX{aux.shape}')
    logging.debug(f'Topic: {topic}')

The main interest data are the **EEG**, the **auxiliar** and the **timestamp**.

In [None]:
@loop_consumer('eeg')
def stream(self, data, topic, frame):
    eeg, aux = data.value['data']
    timestamp = data.value['timestamp']

## Simulate data stream

Using `fake_loop_consumer` instead of `loop_consumer` is possible to create a fake data stream.

In [None]:
from bci_framework.extensions.data_analysis import DataAnalysis, fake_loop_consumer
import logging

class Analysis(DataAnalysis):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.stream()

    @fake_loop_consumer('eeg')
    def stream(self):
        logging.debug('Incoming data...')

if __name__ == '__main__':
    Analysis()

## Built in methods

### Buffer

We can use `self.create_buffer` to implement an automatic buffer with a fixed time view, for example, a buffer of 30 seconds:

In [None]:
self.create_buffer(seconds=30)

The data can be accesed with `self.buffer_eeg` and `self.buffer_aux`

In [None]:
class Analysis(DataAnalysis):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.create_buffer(seconds=30)
        self.stream()

    @loop_consumer('eeg')
    def stream(self):
         eeg = self.buffer_eeg
         aux = self.buffer_aux

The `self.create_buffer` method receives other arguments like `aux_shape`, `fill` and `samples`.

In [None]:
self.create_buffer(seconds=30, aux_shape=3, fill=0, resampling=1000)

**aux_shape:** The dimension of the auxiliary data, 3 by default.  
**fill:** Initialize buffet with this value, 0 by default.  
**resampling:** This value is used to resampling the data.  

### Resampling

The resampling is defined when the buffer is created, with the argument `resampling` this value is not strictly used, instead a near and optimal value is calculated based on the sampling rate and the buffer size.

In [None]:
class Analysis(DataAnalysis):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.create_buffer(seconds=30, resampling=1000)
        self.stream()

    @loop_consumer('eeg')
    def stream(self):
         eeg = self.buffer_eeg_resampled
         aux = self.buffer_aux_resampled
         
         logging.debug(f'EEG{eeg.shape}')
         logging.debug(f'AUX{aux.shape}')

The resampling will not affect the buffer, the both data are accessible all the time.

## Markers

The markers and annotations can be acceded specifying the topics in the `loop_consumer`.

In [None]:
@loop_consumer('eeg', 'marker')

## Commands and Annotations

The commands are used to communicate outputs into the real world, or other systems, they can also be read in the **Stimuli delivery** to create neurofeedback applications. To activate this feature just add the `enable_produser` argument as `True` into the `DataAnalysis` subclass.

In [None]:
if __name__ == '__main__':
    Analysis(enable_produser=True)

Once activate the producer, the methods `self.send_command` and `self.end_annotation`are available.

In [None]:
@loop_consumer('eeg')
def stream(self):
     eeg = self.buffer_eeg_resampled
     aux = self.buffer_aux_resampled

    [...]  # amazing data analysis
    
    self.send_command('MyCommand', value=45)        

The `self.send_annotation` also receive the optional argument `duration`.

In [None]:
self.send_annotation('Data record start')
self.send_annotation('The subject yawn', duration=5)

A generic producer also is available:

In [None]:
self.generic_produser(topic, data)

## Data exchange

Let's build a script that will acts like **Kafka transformer**, this script reads the raw EEG data, calculate their EEG spectrum using Fourier and inject back again into the stream. This can be other advanced processing tasks, like classifications using neural networks. 

In [None]:
from bci_framework.extensions.data_analysis import DataAnalysis, loop_consumer
from bci_framework.extensions import properties as prop
from gcpds.utils.processing import fourier

class Analysis(DataAnalysis):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.create_buffer(seconds=30, resampling=1000)
        self.stream()

    @loop_consumer('eeg')
    def stream(self):
         W, EEG = fourier(self.buffer_eeg, fs=prop.SAMPLE_RATE, axis=1)
         data = {'amplitude': EEG,
                 'frequency': W}
         self.generic_produser('spectrum', data)

if __name__ == '__main__':
    Analysis(enable_produser=True)

Now, in another script, we will write a **Kafka consumer** this script will consume from the previously created stream.

In [None]:
from bci_framework.extensions.data_analysis import DataAnalysis, loop_consumer
from bci_framework.extensions import properties as prop
from gcpds.utils.processing import fourier

class Analysis(DataAnalysis):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.stream()

    @loop_consumer('spectrum')
    def stream(self, data):
        data = data.value['data']
        
        EEG = data['amplitude']
        W = data['frequency']

if __name__ == '__main__':
    Analysis()

## Framework integration

In this interface is possible to execute any number of scripts as an independent process, the system will handle the interruption and show information about the CPU and memory usage.

<img src='images/analysis_process.png'></img>