In [1]:
import pickle
import logging
import rawutil
import struct
import numpy as np
from datetime import datetime
# from typing import Dict, Any
import time
from kafka import KafkaProducer

from multiprocessing import Process, Queue
from typing import TypeVar, List, Dict, Tuple, Any
from multiprocessing import Pool

In [2]:
RAW = None

In [3]:
id_ = 0
data = {}
data['context'] = {'daisy': False,
                   'boardmode': 'default',
                   'montage': {i: ch for i, ch in enumerate('Fp1,Fp2,T3,C3,C4,T4,O1,O2'.split(','))},
                   'connection': 'wifi',
                   'gain': [24, 24, 24, 24, 24, 24, 24, 24]
                   }


data['context']['created'] = datetime.now().timestamp()

def aux_(v): return list(struct.pack('>hhh', *(np.array([v / 3] * 3) * (16 / 0.002)).astype(int).tolist()))

def eeg_(v): return list(rawutil.pack('>u', -v // 24)) * 8
def t0(): return ((time.time() * 10) // 1)

In [45]:
if (time.time() // 1) % 2:
    aux = aux_(1)
    eeg = eeg_(1)
else:
    aux = aux_(-1)
    eeg = eeg_(-1)

data['context']['created'] = datetime.now().timestamp()
data['data'] = [0xa0,  # header
                id_ % 256,  # ID 0-255
                *eeg,
                *aux,
                0xc0,  # footer
                ] * 10000

data['data'] = bytes(data['data'])
len(data['data'])

330000

In [42]:

########################################################################
class Deserialize:
    """"""

    # ----------------------------------------------------------------------
    def __init__(self):
        """Constructor"""

        # self.deserialize(data, context)

        self._last_marker = 0
        self.counter = 0

        self.remnant = b''
        self.offset = None, None
        self._last_aux_shape = 0

    # ----------------------------------------------------------------------
    def deserialize(self, data: np.ndarray, context: Dict[str, Any]) -> None:
        """From signed 24-bits integer to signed 32-bits integer.

        Parameters
        ----------
        data
            Numpy array of shape (`33, LENGTH`)
        context
            Information from the acquisition side useful for deserializing and
            that will be packaged back in the stream.
        """
        
        # EGG
        eeg_data = data[:, 2:26]
        eeg_data = getattr(self, f'deserialize_eeg_{context["connection"]}')(
            eeg_data, data[:, 1], context)
        

        # Auxiliar
        # stop_byte = data[0][-1]
        stop_byte = int((np.median(data[:, -1])))
        
        aux = self.deserialize_aux(stop_byte, data[:, 26:32], context)
        self._last_aux_shape = aux.shape
        

        # Stream
        channels = list(context['montage'].keys())
        return eeg_data.round(3).T[channels], aux.round(3).T

    # ----------------------------------------------------------------------

    def deserialize_eeg_wifi(self, eeg: np.ndarray, ids: np.ndarray, context: Dict[str, Any]) -> np.ndarray:
        """From signed 24-bits integer to signed 32-bits integer by channels.

        The `Cyton data format <https://docs.openbci.com/docs/02Cyton/CytonDataFormat>`_
        says that only can send packages of 33 bits, when a Daisy board is
        attached these same packages will be sent at double speed in favor to
        keep the desired sample rate for 16 channels.

        Parameters
        ----------
        eeg
            Numpy array in signed 24-bits integer (`8, LENGTH`)
        ids
            List of IDs for eeg data.
        context
            Information from the acquisition side useful for deserializing and
            that will be packaged back in the stream.

        Returns
        -------
        eeg_data
            EEG data in microvolts, signed 32-bits integer, (`CHANNELS, LENGTH`),
            if there is a Daisy board `CHANNELS` is 16, otherwise is 8.
        """
        global RAW
        
        RAW = eeg
        eeg_data = np.array([[rawutil.unpack('>u', bytes(ch))[0] for ch in row.reshape(-1, 3).tolist()] for row in eeg])
        
        eeg_data = eeg_data * self.scale_factor_eeg

        if context['daisy']:

            # # If offset, the pair index condition must change
            if np.array(self.offset[0]).any():
                eeg_data = np.concatenate([[self.offset[0]], eeg_data], axis=0)
                ids = np.concatenate([[self.offset[1]], ids], axis=0)
                # pair = not pair

            if ids[0] != ids[1]:
                eeg_data = np.delete(eeg_data, 0, axis=0)
                ids = np.delete(ids, 0, axis=0)

            # if not pair dataset, create an offeset
            if eeg_data.shape[0] % 2:
                self.offset = eeg_data[-1], ids[-1]
                eeg_data = np.delete(eeg_data, -1, axis=0)
                ids = np.delete(ids, -1, axis=0)
            else:
                self.offset = None, None

            return eeg_data.reshape(-1, 16)
        

        return eeg_data

    # ----------------------------------------------------------------------
    def deserialize_eeg_serial(self, eeg: np.ndarray, ids: np.ndarray, context: Dict[str, Any]) -> np.ndarray:
        """From signed 24-bits integer to signed 32-bits integer by channels.

        The `Cyton data format <https://docs.openbci.com/docs/02Cyton/CytonDataFormat>`_
        says that only can send packages of 33 bits, over serial (RFduino) this
        limit is absolute, when a Daisy board is attached these same amount of
        packages will be sent, in this case, the data must be distributed and
        interpolated in order to complete the sample rate.

        Parameters
        ----------
        eeg
            Numpy array in signed 24-bits integer (`8, LENGTH`)
        ids
            List of IDs for eeg data.
        context
            Information from the acquisition side useful for deserializing and
            that will be packaged back in the stream.

        Returns
        -------
        eeg_data
            EEG data in microvolts, signed 32-bits integer, (`CHANNELS, LENGTH`),
            if there is a Daisy board `CHANNELS` is 16, otherwise is 8.
        """

        eeg_data = np.array([[rawutil.unpack('>u', bytes(ch))[0]
                              for ch in row.reshape(-1, 3).tolist()] for row in eeg])
        eeg_data = eeg_data * self.scale_factor_eeg
        
        eeg_data = eeg_data.round(5)

        if context['daisy']:

            even = not ids[0] % 2

            # If offset, the even index condition must change
            if np.array(self.offset[0]).any():
                eeg_data = np.concatenate([[self.offset[0]], eeg_data], axis=0)
                ids = np.concatenate([[self.offset[1]], ids], axis=0)
                even = not even

            # if not even dataset, create an offset
            if eeg_data.shape[0] % 2:
                self.offset = eeg_data[-1], ids[-1]
                eeg_data = np.delete(eeg_data, -1, axis=0)
                ids = np.delete(ids, -1, axis=0)

            # Data can start with a even or odd id
            if even:
                board = eeg_data[::2]
                daisy = eeg_data[1::2]
            else:
                daisy = eeg_data[::2]
                board = eeg_data[1::2]

            board = np.array([np.interp(np.arange(0, p.shape[0], 0.5), np.arange(
                p.shape[0]), p) for p in board.T]).T
            daisy = np.array([np.interp(np.arange(0, p.shape[0], 0.5), np.arange(
                p.shape[0]), p) for p in daisy.T]).T

            eeg = np.concatenate([np.stack(board), np.stack(daisy)], axis=1)

        else:
            eeg = eeg_data

        return eeg

    # ----------------------------------------------------------------------

    @property
    def scale_factor_eeg(self) -> float:
        """Vector with the correct factors for scale eeg data samples."""
        gain = 24
        # vref = 4.5  # for V
        vref = 4500000  # for uV

        return vref / (gain * ((2 ** 23) - 1))

    # ----------------------------------------------------------------------

    def deserialize_aux(self, stop_byte: int, aux: int, context: Dict[str, Any]) -> np.ndarray:
        """Determine the content of `AUX` bytes and format it.

        Auxialiar data could contain different kind of information: accelometer,
        user defined, time stamped and digital or analog inputs.
        The context of `AUX` bytes are determined by the stop byte.

        If `stop_byte` is `0xc0` the `AUX` bytes contain `Standard with accel`,
        this data are packaged at different frequency, they will be show up each
        10 or 11 packages, the final list will contain accelometer value in `G`
        units for axis `X`, `Y` and `Z` respectively and `None` when are not
        availables.

        If `stop_byte` is `0xc1` the `AUX` bytes contain `Standard with raw aux`,
        there are 3 types of raw data: `digital` in wich case the final list
        will contain the values for `D11`, `D12`, `D13`, `D17`, `D18`; `analog`
        with the values for `A7` (`D13`), `A6` (`D12`), `A5` (`D11`); `markers`
        data contain the the marker sended with `send_marker()` method.

        Parameters
        ----------
        stop_byte
             0xCX where X is 0-F in hex.
        aux
            6 bytes of data defined and parsed based on the `Footer` bytes.
        context
            Information from the acquisition side useful for deserializing and
            that will be packaged back in the stream.

        Returns
        -------
        list
            Correct data formated.

        """

        # Standard with accel
        if stop_byte == 0xc0:
            return 0.002 * \
                np.array([struct.unpack('>hhh', a.astype('i1').tobytes())
                          for a in aux]) / 16

        # Standard with raw aux
        elif stop_byte == 0xc1:

            if context['boardmode'] == 'analog':
                # A7, A6, A5
                # D13, D12, D11
                return aux[:, 1::2]

            elif context['boardmode'] == 'digital':
                # D11, D12, D13, D17, D18
                return np.delete(aux, 4, axis=1)

            elif context['boardmode'] == 'marker':
                # Some time for some reason, marker not always send back from
                # OpenBCI, so this module implement a strategy to send a burst of
                # markers but read back only one.
                a = aux[:, 1]
                a[a > ord('Z')] = 0
                a[a < ord('A')] = 0
                return a

        # User defined
        elif stop_byte == 0xc2:
            pass

        # Time stamped set with accel
        elif stop_byte == 0xc3:
            pass

        # Time stamped with accel
        elif stop_byte == 0xc4:
            pass

        # Time stamped set with raw auxcalculate_sample_rate
        elif stop_byte == 0xc5:
            pass

        # Time stamped with raw aux
        elif stop_byte == 0xc6:
            pass

        return np.zeros(self._last_aux_shape)

    # ----------------------------------------------------------------------
    def stream(self, eeg_queue, data, samples, context):
        """Kafka produser.

        Stream data to network.

        Parameters
        ----------
        data : list
            The EEG data format.
        samples : int
            The number of samples in this package.

        """
        context.update({'samples': samples})
        # context['created'] = datetime.now().timestamp()

        data_ = {'context': context,
                 'data': data,
                 # 'binary_created': self.created,
                 # 'created': datetime.now().timestamp(),
                 # 'samples': samples,
                 }

        # self.producer_eeg.send('eeg', data_)
        eeg_queue.put(data_)

        if DEBUG:
            logging.info(f"streamed {samples} samples")


In [6]:
BIN_HEADER = 0xa0
DEBUG = True

# ----------------------------------------------------------------------
def align_data(binary: bytes) -> Tuple[np.ndarray, bytes]:
    """Align data following the headers and footers.

    Parameters
    ----------
    binary
        Data raw from OpenBCI board.

    Returns
    -------
    data_aligned
        Numpy array of shape (`33, LENGTH`) with headers and footer aligned.
    remnant
        This bytes could be used for complete next binary input.
    """

    data = np.array(list(binary))

    # Search for the the first index with a `BIN_HEADER`
    start = [np.median(np.roll(data, -i, axis=0)[::33])
             == BIN_HEADER for i in range(33)].index(True)

    if (start == 0) and (data.shape[0] % 33 == 0):
        data_aligned = data
        remnant = b''
    else:
        # Fix the offset to complete 33 bytes divisible array
        end = (data.shape[0] - start) % 33
        data_aligned = data[start:-end]
        remnant = binary[-end:]

    data_aligned = data_aligned.reshape(-1, 33)

    return data_aligned, remnant

In [36]:
des = Deserialize()
# def dess(d):
#     return des.deserialize(d, data['context'])

dess = lambda d:des.deserialize(d, data['context'])

In [41]:
t0 = time.time()
data_, _ = align_data(data['data'])
dess(data_)

t1 = time.time()
print('Total:', (t1-t0)*1000)

Total: 667.4270629882812


In [47]:
t0 = time.time()
data_, _ = align_data(data['data'])
N = 1
with Pool(N) as pool:
    q = pool.map(dess, np.array_split(data_, N))
    e, a = zip(*q)
#     print(e[0].shape)
    
np.concatenate(e, axis=1).shape
t1 = time.time()
print((t1-t0)*1000)

822.4489688873291


In [114]:
RAW.shape

(10000, 24)

In [142]:


t0 = time.time()
out = np.array([[rawutil.unpack('>u', bytes(ch))[0] for ch in row.reshape(-1, 3).tolist()] for row in RAW])

print(out.shape)
print((time.time()-t0)*1000)


print("#"*70)
t0 = time.time()
with Pool(3) as pool:
    out = np.array(pool.map(_24to8, RAW))

print(out.shape)
print((time.time()-t0)*1000)



print("#"*70)
t0 = time.time()

out = np.array([rawutil.unpack('>u', bytes(ch))[0] for ch in RAW.reshape(-1, 3)]).reshape(-1, 8)
print(out.shape)

print((time.time()-t0)*1000)

(10000, 8)
619.3513870239258
######################################################################
(10000, 8)
744.1587448120117
######################################################################
(10000, 8)
661.1278057098389


In [120]:
rawutil.unpack('>u', bytes(a3))

[0]

In [98]:
bytes(a3[0])

b'\x00\x00\x00'

In [82]:
signs = (a3[..., 0] >> 0x80) * 0xff

In [87]:
a4 = np.concatenate((signs.reshape(-1, 1), a3), axis=1)

In [198]:
t0 = time.time()
print(np.array(list(rawutil.iter_unpack('>24u', RAW.reshape(-1, 3)))).shape)
print((time.time()-t0)*1000)

OperationError: In format '24u' : No data remaining to read element 'u'

In [74]:
RAW.shape

(10000, 24)

In [156]:
[rawutil.unpack('8>u', bytes(d)) for d in RAW.reshape(-1, 8, 3)]

FormatError: In format '8>u', in subformat '8>u', at position 0 : Unrecognised character '>'

In [130]:
from pathos.multiprocessing import Pool

In [2]:
import ntplib
client = ntplib.NTPClient()
ntp_offset = client.request('192.168.1.1').offset * 1000
print(f" NTP offset: {ntp_offset :.2f} ms")

 NTP offset: -9586893258.83 ms
