Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions neo/io/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@

.. autoclass:: neo.io.RawBinarySignalIO

.. autoclass:: neo.io.RawMCSIO

.. autoclass:: neo.io.StimfitIO

.. autoclass:: neo.io.TdtIO
Expand Down Expand Up @@ -133,6 +135,7 @@
from neo.io.pickleio import PickleIO
from neo.io.plexonio import PlexonIO
from neo.io.rawbinarysignalio import RawBinarySignalIO
from neo.io.rawmcsio import RawMCSIO
from neo.io.spike2io import Spike2IO
from neo.io.stimfitio import StimfitIO
from neo.io.tdtio import TdtIO
Expand Down Expand Up @@ -170,6 +173,7 @@
PickleIO,
PlexonIO,
RawBinarySignalIO,
RawMCSIO,
Spike2IO,
StimfitIO,
TdtIO,
Expand Down
12 changes: 12 additions & 0 deletions neo/io/rawmcsio.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# -*- coding: utf-8 -*-

from neo.io.basefromrawio import BaseFromRaw
from neo.rawio.rawmcsrawio import RawMCSRawIO


class RawMCSIO(RawMCSRawIO, BaseFromRaw):
_prefered_signal_group_mode = 'group-by-same-units'

def __init__(self, filename):
RawMCSRawIO.__init__(self, filename=filename)
BaseFromRaw.__init__(self, filename)
2 changes: 2 additions & 0 deletions neo/rawio/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from neo.rawio.nixrawio import NIXRawIO
from neo.rawio.plexonrawio import PlexonRawIO
from neo.rawio.rawbinarysignalrawio import RawBinarySignalRawIO
from neo.rawio.rawmcsrawio import RawMCSRawIO
from neo.rawio.spike2rawio import Spike2RawIO
from neo.rawio.tdtrawio import TdtRawIO
from neo.rawio.winedrrawio import WinEdrRawIO
Expand All @@ -39,6 +40,7 @@
NIXRawIO,
PlexonRawIO,
RawBinarySignalRawIO,
RawMCSRawIO,
Spike2RawIO,
TdtRawIO,
WinEdrRawIO,
Expand Down
155 changes: 155 additions & 0 deletions neo/rawio/rawmcsrawio.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
# -*- coding: utf-8 -*-
"""
Class for reading data from "Raw" Multi Channel System (MCS) format.
This format is NOT the native MCS format (*.mcd).
This format is a raw format with an internal binary header exported by the
"MC_DataTool binary conversion" with the option header slected.

The internal header contain sampling rate, channel names, gain and units.
Not so bad : everything that neo need, so this IO is without parameters.

If some MCS custumers read this you should lobby to get the real specification
of the real MCS format (.mcd) and so the MCSRawIO could be done instead of this
ersatz.


Author: Samuel Garcia
"""
from __future__ import unicode_literals, print_function, division, absolute_import

from .baserawio import (BaseRawIO, _signal_channel_dtype, _unit_channel_dtype,
_event_channel_dtype)

import numpy as np

import os
import sys


class RawMCSRawIO(BaseRawIO):
extensions = ['raw']
rawmode = 'one-file'

def __init__(self, filename=''):
BaseRawIO.__init__(self)
self.filename = filename

def _source_name(self):
return self.filename

def _parse_header(self):
self._info = info = parse_mcs_raw_header(self.filename)

self.dtype = 'uint16'
self.sampling_rate = info['sampling_rate']
self.nb_channel = len(info['channel_names'])

self._raw_signals = np.memmap(self.filename, dtype=self.dtype, mode='r',
offset=info['header_size']).reshape(-1, self.nb_channel)

sig_channels = []
for c in range(self.nb_channel):
chan_id = c
group_id = 0
sig_channels.append((info['channel_names'][c], chan_id, self.sampling_rate,
self.dtype, info['signal_units'], info['signal_gain'],
info['signal_offset'], group_id))
sig_channels = np.array(sig_channels, dtype=_signal_channel_dtype)

# No events
event_channels = []
event_channels = np.array(event_channels, dtype=_event_channel_dtype)

# No spikes
unit_channels = []
unit_channels = np.array(unit_channels, dtype=_unit_channel_dtype)

# fille into header dict
self.header = {}
self.header['nb_block'] = 1
self.header['nb_segment'] = [1]
self.header['signal_channels'] = sig_channels
self.header['unit_channels'] = unit_channels
self.header['event_channels'] = event_channels

# insert some annotation at some place
self._generate_minimal_annotations()

def _segment_t_start(self, block_index, seg_index):
return 0.

def _segment_t_stop(self, block_index, seg_index):
t_stop = self._raw_signals.shape[0] / self.sampling_rate
return t_stop

def _get_signal_size(self, block_index, seg_index, channel_indexes):
return self._raw_signals.shape[0]

def _get_signal_t_start(self, block_index, seg_index, channel_indexes):
return 0.

def _get_analogsignal_chunk(self, block_index, seg_index, i_start, i_stop, channel_indexes):
if channel_indexes is None:
channel_indexes = slice(None)
raw_signals = self._raw_signals[slice(i_start, i_stop), channel_indexes]

return raw_signals


def parse_mcs_raw_header(filename):
"""
This is a mix with stuff on github.
https://github.com/spyking-circus/spyking-circus/blob/master/circus/files/mcs_raw_binary.py
https://github.com/jeffalstott/Multi-Channel-Systems-Import/blob/master/MCS.py
"""
MAX_HEADER_SIZE = 5000

with open(filename, mode='rb') as f:
raw_header = f.read(MAX_HEADER_SIZE)

header_size = raw_header.find(b'EOH')
assert header_size != -1, 'Error in reading raw mcs header'
header_size = header_size + 5
raw_header = raw_header[:header_size]
raw_header = raw_header.replace(b'\r', b'')

info = {}
info['header_size'] = header_size

def parse_line(line, key):
if key + b' = ' in line:
v = line.replace(key, b'').replace(b' ', b'').replace(b'=', b'')
return v

keys = (b'Sample rate', b'ADC zero', b'ADC zero', b'El', b'Streams')

for line in raw_header.split(b'\n'):
for key in keys:
v = parse_line(line, key)
if v is None:
continue

if key == b'Sample rate':
info['sampling_rate'] = float(v)

elif key == b'ADC zero':
info['adc_zero'] = int(v)

elif key == b'El':
v = v.decode('Windows-1252')
v = v.replace('/AD', '')
split_pos = 0
while v[split_pos] in '1234567890.':
split_pos += 1
if split_pos == len(v):
split_pos = None
break
assert split_pos is not None, 'Impossible to find units and scaling'
info['signal_gain'] = float(v[:split_pos])
info['signal_units'] = v[split_pos:].replace(u'µ', u'u')
info['signal_offset'] = -info['signal_gain'] * info['adc_zero']

elif key == b'Streams':
info['channel_names'] = v.decode('Windows-1252').split(';')

return info
19 changes: 19 additions & 0 deletions neo/rawio/tests/test_rawmcsrawio.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# -*- coding: utf-8 -*-

# needed for python 3 compatibility
from __future__ import unicode_literals, print_function, division, absolute_import

import unittest

from neo.rawio.rawmcsrawio import RawMCSRawIO
from neo.rawio.tests.common_rawio_test import BaseTestRawIO


class TestRawMCSRawIO(BaseTestRawIO, unittest.TestCase, ):
rawioclass = RawMCSRawIO
entities_to_test = ['raw_mcs_with_header_1.raw']
files_to_download = entities_to_test


if __name__ == "__main__":
unittest.main()
21 changes: 21 additions & 0 deletions neo/test/iotest/test_rawmcsio.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# -*- coding: utf-8 -*-

# needed for python 3 compatibility
from __future__ import absolute_import, division

import sys

import unittest

from neo.io import RawMCSIO
from neo.test.iotest.common_io_test import BaseTestIO


class TestRawMcsIO(BaseTestIO, unittest.TestCase, ):
ioclass = RawMCSIO
files_to_test = ['raw_mcs_with_header_1.raw']
files_to_download = files_to_test


if __name__ == "__main__":
unittest.main()