Permalink
Fetching contributors…
Cannot retrieve contributors at this time
356 lines (305 sloc) 12.5 KB
#!/usr/bin/env python
from __future__ import print_function
import os
import sys
import neuroshare as ns
import numpy as np
import time
import h5py
import argparse
try:
import nix
except ImportError:
import nixio as nix
class ProgressIndicator(object):
def __init__(self, offset=0):
self._cur_value = offset
self._max_value = 0
def setup(self, max_value):
self._max_value = max_value
self.progress(self._max_value, 0)
def __add__(self, other):
self._cur_value += other
self.progress(self._max_value, self._cur_value)
return self
def progress(self, max_value, cur_value):
pass
#CLASS DEFINITION FOR NIX-CONVERTER
class Converter(object):
def __init__(self, filepath, output, progress=None):
nf = ns.File(filepath)
nixF = nix.File.open(output, nix.FileMode.Overwrite)
nixB = nixF.create_block(filepath, "nix.session")
nixS = nixF.create_section(filepath, "odml.recording")
self._nf = nf
self._nixF = nixF
self._nixB = nixB
self._nixS = nixS
self._groups = {}
self._sections = {}
self.convert_map = {1: self.convert_event,
2: self.convert_analog,
3: self.convert_segment,
4: self.convert_neural}
if not progress:
progress = ProgressIndicator()
self._progress = progress
def get_group_for_type(self, entity_type):
name_map = {1: 'Event',
2: 'Analog',
3: 'Segment',
4: 'Neural'}
if entity_type not in self._groups:
name = name_map[entity_type]
group = self._nixB.create_group(name,"group")
self._groups[entity_type] = group
return self._groups[entity_type]
def get_section_for_type(self, entity_type):
name_map = {1: 'events',
2: 'analogs',
3: 'segments',
4: 'neurals'}
if entity_type not in self._sections:
name = name_map[entity_type]
section = self._nixS.create_section(name,"odml.recording")
self._sections[entity_type] = section
return self._sections[entity_type]
def convert(self):
progress = self._progress
progress.setup(len(self._nf.entities))
for (key, value) in self._nf.metadata_raw.items():
self._nixS.create_property(key, nix.Value(value))
for entity in self._nf.entities:
conv = self.convert_map[entity.entity_type]
conv(entity)
progress + 1
self._nixF.close()
def convert_event(self, event):
nitems = event.item_count
values = np.empty([nitems], np.double)
times = np.empty([nitems], np.double)
for n in range(0, event.item_count):
times[n] = event.get_data(n)[0]
values[n] = event.get_data(n)[1]
#handle the cases with empty events
if len(times)<1 or len(times)<len(values):
return
group = self.get_group_for_type(event.entity_type)
dset = self._nixB.create_data_array(event.label, "nix.event", dtype=nix.DataType.Double, data=values)
dset.label = "timepoints"
dim = dset.append_range_dimension(times)
dim.unit = 's'
dim.label = 'time'
group.data_arrays.append(dset)
section = self.get_section_for_type(event.entity_type)
self.copy_metadata(section,dset, event.metadata_raw)
def convert_analog(self, analog):
(data, times, ic) = analog.get_data()
group = self.get_group_for_type(analog.entity_type)
dset = self._nixB.create_data_array(analog.label, "nix.analog", dtype=nix.DataType.Double, data=data.T)
try:
dset.unit = analog.units
except:
if analog.units == "raw":
dset.unit = "V"
pass
dset.label = "waveform"
dim = dset.append_range_dimension(times)
dim.unit = "s"
dim.label = "time"
group.data_arrays.append(dset)
section = self.get_section_for_type(analog.entity_type)
self.copy_metadata(section,dset, analog.metadata_raw)
def convert_segment(self, segment):
if not segment.item_count:
return
seg_group = self.get_group_for_type(segment.entity_type)
section = self.get_section_for_type(segment.entity_type)
subSection = section.create_section(segment.label+"_metadata",'odml.recording')
for (key, value) in segment.metadata_raw.items():
subSection.create_property(key, nix.Value(value))
for index in range(0, segment.source_count):
source = segment.sources[index]
name = 'SourceInfo.%d' % index
sourceSection = subSection.create_section(name,"odml.recording")
for (key, value) in source.metadata_raw.items():
sourceSection.create_property(key, nix.Value(value))
for index in range(0, segment.item_count):
(data, timestamp, samples, unit) = segment.get_data(index)
name = '%d - %f' % (index, timestamp)
dset = self._nixB.create_data_array(str(segment.label)+"_"+name, "nix.segment", dtype=nix.DataType.Double, data=np.squeeze(data.T))
dset.label = "cutout waveform"
try:
dset.unit = segment.metadata_raw["Units"]
except:
if segment.metadata_raw["Units"] == "raw":
dset.unit = "V"
pass
stepSize = 1/segment.metadata_raw['SampleRate']
dim = dset.append_sampled_dimension(stepSize)
dim.unit = 's'
dim.offset = timestamp
dim.label = "time"
sec = subSection.create_section(str(segment.id)+"_"+name+"_metadata","odml.unit")
sec.create_property('unitID', nix.Value(unit))
dset.metadata = sec
seg_group.data_arrays.append(dset)
def convert_neural(self, neural):
data = neural.get_data()
group = self.get_group_for_type(neural.entity_type)
name = "%d - %s" % (neural.id, neural.label)
dset = self._nixB.create_data_array(name, "nix.neural", dtype=nix.DataType.Double, data=data)
dim = dset.append_alias_range_dimension()
dim.label = "timestamps"
group.data_arrays.append(dset)
section = self.get_section_for_type(event.entity_type)
self.copy_metadata(section, dset, neural.metadata_raw)
@classmethod
def copy_metadata(cls,parentSection, target, metadata, prefix=None):
try:
name = target.name
except:
name = "File"
if prefix is not None:
name = prefix+name
sec = parentSection.create_section(name+' metadata','odml.recording')
for (key, value) in metadata.items():
if prefix is not None:
key = prefix + key
sec.create_property(key, nix.Value(value))
target.metadata = sec
# CLASS DEFINITION FOR HDF5-CONVERTER
class H5Converter(object):
def __init__(self, filepath, output, progress=None):
nf = ns.File(filepath)
h5 = h5py.File(output, 'w')
self._nf = nf
self._h5 = h5
self._groups = {}
self.convert_map = {1: self.convert_event,
2: self.convert_analog,
3: self.convert_segment,
4: self.convert_neural}
if not progress:
progress = ProgressIndicator()
self._progress = progress
def get_group_for_type(self, entity_type):
name_map = {1: 'Event',
2: 'Analog',
3: 'Segment',
4: 'Neural'}
if entity_type not in self._groups:
name = name_map[entity_type]
group = self._h5.create_group(name)
self._groups[entity_type] = group
return self._groups[entity_type]
def convert(self):
progress = self._progress
progress.setup(len(self._nf.entities))
self.copy_metadata(self._h5, self._nf.metadata_raw)
for entity in self._nf.entities:
conv = self.convert_map[entity.entity_type]
conv(entity)
progress + 1
self._h5.close()
def convert_event(self, event):
dtype = self.dtype_by_event(event)
nitems = event.item_count
data = np.empty([nitems], dtype)
for n in range(0, event.item_count):
data[n] = event.get_data(n)
group = self.get_group_for_type(event.entity_type)
dset = group.create_dataset(event.label, data=data)
self.copy_metadata(dset, event.metadata_raw)
def convert_analog(self, analog):
(data, times, ic) = analog.get_data()
group = self.get_group_for_type(analog.entity_type)
d_t = np.vstack((times, data)).T
dset = group.create_dataset(analog.label, data=d_t)
self.copy_metadata(dset, analog.metadata_raw)
def convert_segment(self, segment):
if not segment.item_count:
return
group = self.get_group_for_type(segment.entity_type)
seg_group = group.create_group(segment.label)
self.copy_metadata(seg_group, segment.metadata_raw)
for index in range(0, segment.source_count):
source = segment.sources[index]
name = 'SourceInfo.%d.' % index
self.copy_metadata(seg_group, source.metadata_raw, prefix=name)
for index in range(0, segment.item_count):
(data, timestamp, samples, unit) = segment.get_data(index)
name = '%d - %f' % (index, timestamp)
dset = seg_group.create_dataset(name, data=data.T)
dset.attrs['Timestamp'] = timestamp
dset.attrs['Unit'] = unit
dset.attrs['Index'] = index
def convert_neural(self, neural):
data = neural.get_data()
group = self._groups[neural.entity_type]
name = "%d - %s" % (neural.id, neural.label)
dset = group.create_dataset(name, data=data)
self.copy_metadata(dset, neural.metadata_raw)
@classmethod
def copy_metadata(cls, target, metadata, prefix=None):
for (key, value) in metadata.items():
if prefix is not None:
key = prefix + key
target.attrs[key] = value
@classmethod
def dtype_by_event(cls, event):
type_map = {ns.EventEntity.EVENT_TEXT : 'a',
ns.EventEntity.EVENT_CSV : 'a',
ns.EventEntity.EVENT_BYTE : 'b',
ns.EventEntity.EVENT_WORD : 'h',
ns.EventEntity.EVENT_DWORD : 'i'}
val_type = type_map[event.event_type]
if val_type == 'a':
val_type += str(event.max_data_length)
return np.dtype([('timestamp', 'd'), ('value', val_type)])
class ConsoleIndicator(ProgressIndicator):
def __init__(self):
super(ConsoleIndicator, self).__init__()
self._size = 60
self._last_msg = ""
def progress(self, max_value, cur_value):
size = self._size
prefix = "Converting"
x = int(size*cur_value/max_value)
msg = "%s [%s%s] %i/%i\r" % (prefix, "#"*x, "." * (size-x),
cur_value, max_value)
self._last_msg = msg
sys.stdout.write(msg)
sys.stdout.flush()
def cleanup(self):
sys.stdout.write('%s\r' % (' '*len(self._last_msg)))
sys.stdout.flush()
def main():
output = None
parser = argparse.ArgumentParser(description='Convert Neuroshare readable files to nix/hdf5 format')
parser.add_argument("-o","--output",type=str,help="name of the output file")
parser.add_argument("--hdf5",action="store_true",help="For plain conversion to hdf5 format")
parser.add_argument("InputFile",type=str,help="File to be converted")
args = parser.parse_args()
filename = args.InputFile
ci = ConsoleIndicator()
if args.output:
output = args.output
else:
(basefile, ext) = os.path.splitext(filename)
if args.hdf5:
if output is None:
output = "%s.hdf5" % basefile
converter = H5Converter(filename, output, progress=ci)
else:
if output is None:
output = "%s.h5" % basefile
converter = Converter(filename, output, progress=ci)
start = time.time()
converter.convert()
ci.cleanup()
sys.stderr.write("Converted from %s to %s in %.2f seconds" % (filename,output,(time.time()-start)))
return 0
if __name__ == "__main__":
res = main()
sys.exit(res)