diff --git a/neo/rawio/biocamrawio.py b/neo/rawio/biocamrawio.py index 8af28a3ea..495f26b15 100644 --- a/neo/rawio/biocamrawio.py +++ b/neo/rawio/biocamrawio.py @@ -200,13 +200,24 @@ def open_biocam_file_header(filename): scale_factor = experiment_settings["ValueConverter"]["ScaleFactor"] sampling_rate = experiment_settings["TimeConverter"]["FrameRate"] - for key in rf: - if key[:5] == "Well_": - num_channels = len(rf[key]["StoredChIdxs"]) - if len(rf[key]["Raw"]) % num_channels: - raise RuntimeError(f"Length of raw data array is not multiple of channel number in {key}") - num_frames = len(rf[key]["Raw"]) // num_channels - break + if "EventsBasedRawRanges" in experiment_settings["DataSettings"]: + for key in rf: + if key[:5] == "Well_": + num_channels = len(rf[key]["StoredChIdxs"]) + num_frames = np.array(rf["TOC"])[-1][-1] + break + read_function = readHDF5t_brw4_event_based + + else: + for key in rf: + if key[:5] == "Well_": + num_channels = len(rf[key]["StoredChIdxs"]) + if len(rf[key]["Raw"]) % num_channels: + raise RuntimeError(f"Length of raw data array is not multiple of channel number in {key}") + num_frames = len(rf[key]["Raw"]) // num_channels + break + read_function = readHDF5t_brw4 + try: num_channels_x = num_channels_y = int(np.sqrt(num_channels)) except NameError: @@ -217,7 +228,6 @@ def open_biocam_file_header(filename): gain = scale_factor * (max_uv - min_uv) / (max_digital - min_digital) offset = min_uv - read_function = readHDF5t_brw4 return dict( file_handle=rf, @@ -231,6 +241,111 @@ def open_biocam_file_header(filename): ) +def GenerateSyntheticNoise_arr(file, wellID, startFrame, endFrame, numFrames, n_channels): + # collect the TOCs + toc = np.array(file["TOC"]) + noiseToc = np.array(file[wellID + "/NoiseTOC"]) + # from the given start position in frames, localize the corresponding noise positions + # using the TOC + tocStartIdx = np.searchsorted(toc[:, 1], startFrame) + noiseStartPosition = noiseToc[tocStartIdx] + noiseEndPosition = noiseStartPosition + for i in range(tocStartIdx + 1, len(noiseToc)): + nextPosition = noiseToc[i] + if nextPosition > noiseStartPosition: + noiseEndPosition = nextPosition + break + if noiseEndPosition == noiseStartPosition: + for i in range(tocStartIdx - 1, 0, -1): + previousPosition = noiseToc[i] + if previousPosition < noiseStartPosition: + noiseEndPosition = noiseStartPosition + noiseStartPosition = previousPosition + break + + # obtain the noise info at the start position + noiseChIdxs = file[wellID + "/NoiseChIdxs"][noiseStartPosition:noiseEndPosition] + noiseMean = file[wellID + "/NoiseMean"][noiseStartPosition:noiseEndPosition] + noiseStdDev = file[wellID + "/NoiseStdDev"][noiseStartPosition:noiseEndPosition] + noiseLength = noiseEndPosition - noiseStartPosition + + noiseInfo = {} + meanCollection = [] + stdDevCollection = [] + + for i in range(1, noiseLength): + noiseInfo[noiseChIdxs[i]] = [noiseMean[i], noiseStdDev[i]] + meanCollection.append(noiseMean[i]) + stdDevCollection.append(noiseStdDev[i]) + + # calculate the median mean and standard deviation of all channels to be used for + # invalid channels + dataMean = np.median(meanCollection) + dataStdDev = np.median(stdDevCollection) + + arr = np.zeros((numFrames, n_channels)) + + for chIdx in range(n_channels): + if chIdx in noiseInfo: + arr[:, chIdx] = np.array( + np.random.normal(noiseInfo[chIdx][0], noiseInfo[chIdx][1], numFrames), dtype=np.int16 + ) + else: + arr[:, chIdx] = np.array(np.random.normal(dataMean, dataStdDev, numFrames), dtype=np.int16) + + return arr + + +def DecodeEventBasedRawData_arr(rf, wellID, t0, t1, nch): + + toc = np.array(rf["TOC"]) # Main table of contents + eventsToc = np.array(rf[wellID + "/EventsBasedSparseRawTOC"]) # Events table of contents + + # Only select the chunks in the desired range + tocStartIdx = np.searchsorted(toc[:, 1], t0) + tocEndIdx = min(np.searchsorted(toc[:, 1], t1, side="right") + 1, len(toc) - 1) + eventsStartPosition = eventsToc[tocStartIdx] + eventsEndPosition = eventsToc[tocEndIdx] + + numFrames = t1 - t0 + + binaryData = rf[wellID + "/EventsBasedSparseRaw"][eventsStartPosition:eventsEndPosition] + binaryDataLength = len(binaryData) + + synth = True + + if synth: + arr = GenerateSyntheticNoise_arr(rf, wellID, t0, t1, numFrames, nch) + + else: + arr = np.zeros((numFrames, nch)) + + pos = 0 + while pos < binaryDataLength: + chIdx = int.from_bytes(binaryData[pos : pos + 4], byteorder="little", signed=True) + pos += 4 + chDataLength = int.from_bytes(binaryData[pos : pos + 4], byteorder="little", signed=True) + pos += 4 + chDataPos = pos + while pos < chDataPos + chDataLength: + fromInclusive = int.from_bytes(binaryData[pos : pos + 8], byteorder="little", signed=True) + pos += 8 + toExclusive = int.from_bytes(binaryData[pos : pos + 8], byteorder="little", signed=True) + pos += 8 + rangeDataPos = pos + for j in range(fromInclusive, toExclusive): + if j >= t0 + numFrames: + break + if j >= t0: + arr[j - t0, chIdx] = int.from_bytes( + binaryData[rangeDataPos : rangeDataPos + 2], byteorder="little", signed=True + ) + rangeDataPos += 2 + pos += (toExclusive - fromInclusive) * 2 + + return arr + + def readHDF5t_100(rf, t0, t1, nch): return rf["3BData/Raw"][t0:t1] @@ -251,3 +366,9 @@ def readHDF5t_brw4(rf, t0, t1, nch): for key in rf: if key[:5] == "Well_": return rf[key]["Raw"][nch * t0 : nch * t1].reshape((t1 - t0, nch), order="C") + + +def readHDF5t_brw4_event_based(rf, t0, t1, nch): + for key in rf: + if key[:5] == "Well_": + return DecodeEventBasedRawData_arr(rf, key, t0, t1, nch) diff --git a/neo/rawio/intanrawio.py b/neo/rawio/intanrawio.py index 81b23f8b4..61222f495 100644 --- a/neo/rawio/intanrawio.py +++ b/neo/rawio/intanrawio.py @@ -46,7 +46,7 @@ class IntanRawIO(BaseRawIO): ignore_integrity_checks: bool, default: False If True, data that violates integrity assumptions will be loaded. At the moment the only integrity check we perform is that timestamps are continuous. Setting this to True will ignore this check and set - the attribute `discontinuous_timestamps` to True if the timestamps are not continous. This attribute can be checked + the attribute `discontinuous_timestamps` to True if the timestamps are not continous. This attribute can be checked after parsing the header to see if the timestamps are continuous or not. Notes ----- @@ -83,7 +83,7 @@ class IntanRawIO(BaseRawIO): 11: 'Stim channel', * For the "header-attached" and "one-file-per-signal" formats, the structure of the digital input and output channels is - one long vector, which must be post-processed to extract individual digital channel information. + one long vector, which must be post-processed to extract individual digital channel information. See the intantech website for more information on performing this post-processing. Examples @@ -108,7 +108,6 @@ def __init__(self, filename="", ignore_integrity_checks=False): self.ignore_integrity_checks = ignore_integrity_checks self.discontinuous_timestamps = False - def _source_name(self): return self.filename @@ -207,7 +206,7 @@ def _parse_header(self): elif self.file_format == "one-file-per-channel": time_stream_index = max(self._raw_data.keys()) timestamp = self._raw_data[time_stream_index][0] - + discontinuous_timestamps = np.diff(timestamp) != 1 timestamps_are_not_contiguous = np.any(discontinuous_timestamps) if timestamps_are_not_contiguous: @@ -267,7 +266,7 @@ def _parse_header(self): # are in a list we just take the first channel in each list of channels else: self._max_sigs_length = max([raw_data[0].size for raw_data in self._raw_data.values()]) - + # No events event_channels = [] event_channels = np.array(event_channels, dtype=_event_channel_dtype) diff --git a/neo/test/iotest/test_intanio.py b/neo/test/iotest/test_intanio.py index 83ccd8a3a..1e527f052 100644 --- a/neo/test/iotest/test_intanio.py +++ b/neo/test/iotest/test_intanio.py @@ -15,13 +15,13 @@ class TestIntanIO( ioclass = IntanIO entities_to_download = ["intan"] entities_to_test = [ - "intan/intan_rhs_test_1.rhs", # Format header-attached + "intan/intan_rhs_test_1.rhs", # Format header-attached "intan/intan_rhd_test_1.rhd", # Format header-attached "intan/rhs_fpc_multistim_240514_082243/rhs_fpc_multistim_240514_082243.rhs", # Format header-attached newer version "intan/intan_fpc_test_231117_052630/info.rhd", # Format one-file-per-channel "intan/intan_fps_test_231117_052500/info.rhd", # Format one file per signal "intan/intan_fpc_rhs_test_240329_091637/info.rhs", # Format one-file-per-channel - "intan/intan_fps_rhs_test_240329_091536/info.rhs", # Format one-file-per-signal + "intan/intan_fps_rhs_test_240329_091536/info.rhs", # Format one-file-per-signal "intan/rhd_fpc_multistim_240514_082044/info.rhd", # Multiple digital channels one-file-per-channel rhd ] diff --git a/neo/test/rawiotest/test_intanrawio.py b/neo/test/rawiotest/test_intanrawio.py index b5e4549a6..df3723288 100644 --- a/neo/test/rawiotest/test_intanrawio.py +++ b/neo/test/rawiotest/test_intanrawio.py @@ -1,5 +1,5 @@ import unittest -import numpy as np +import numpy as np from neo.rawio.intanrawio import IntanRawIO from neo.test.rawiotest.common_rawio_test import BaseTestRawIO @@ -12,32 +12,46 @@ class TestIntanRawIO( rawioclass = IntanRawIO entities_to_download = ["intan"] entities_to_test = [ - "intan/intan_rhs_test_1.rhs", # Format header-attached + "intan/intan_rhs_test_1.rhs", # Format header-attached "intan/intan_rhd_test_1.rhd", # Format header-attached "intan/rhs_fpc_multistim_240514_082243/rhs_fpc_multistim_240514_082243.rhs", # Format header-attached newer version "intan/intan_fpc_test_231117_052630/info.rhd", # Format one-file-per-channel "intan/intan_fps_test_231117_052500/info.rhd", # Format one file per signal "intan/intan_fpc_rhs_test_240329_091637/info.rhs", # Format one-file-per-channel - "intan/intan_fps_rhs_test_240329_091536/info.rhs", # Format one-file-per-signal + "intan/intan_fps_rhs_test_240329_091536/info.rhs", # Format one-file-per-signal "intan/rhd_fpc_multistim_240514_082044/info.rhd", # Multiple digital channels one-file-per-channel rhd ] - def test_annotations(self): - + intan_reader = IntanRawIO(filename=self.get_local_path("intan/intan_rhd_test_1.rhd")) intan_reader.parse_header() - + raw_annotations = intan_reader.raw_annotations annotations = raw_annotations["blocks"][0]["segments"][0] # Intan is mono segment signal_annotations = annotations["signals"][0] # As in the other exmaples, annotaions are duplicated - - + # Scalar annotations - exepcted_annotations = {'intan_version': '1.5', 'desired_impedance_test_frequency': 1000.0, 'desired_upper_bandwidth': 7500.0, 'note1': '', 'notch_filter_mode': 1, 'notch_filter': False, 'nb_signal_group': 7, - 'dsp_enabled': 1, 'actual_impedance_test_frequency': 1000.0, 'desired_lower_bandwidth': 0.1, 'note3': '', 'actual_dsp_cutoff_frequency': 1.165828, - 'desired_dsp_cutoff_frequency': 1.0, 'actual_lower_bandwidth': 0.0945291, 'eval_board_mode': 0, 'note2': '', 'num_temp_sensor_channels': 0} - + exepcted_annotations = { + "intan_version": "1.5", + "desired_impedance_test_frequency": 1000.0, + "desired_upper_bandwidth": 7500.0, + "note1": "", + "notch_filter_mode": 1, + "notch_filter": False, + "nb_signal_group": 7, + "dsp_enabled": 1, + "actual_impedance_test_frequency": 1000.0, + "desired_lower_bandwidth": 0.1, + "note3": "", + "actual_dsp_cutoff_frequency": 1.165828, + "desired_dsp_cutoff_frequency": 1.0, + "actual_lower_bandwidth": 0.0945291, + "eval_board_mode": 0, + "note2": "", + "num_temp_sensor_channels": 0, + } + for key in exepcted_annotations: if isinstance(exepcted_annotations[key], float): self.assertAlmostEqual(signal_annotations[key], exepcted_annotations[key], places=2) @@ -47,8 +61,11 @@ def test_annotations(self): # Array annotations signal_array_annotations = signal_annotations["__array_annotations__"] np.testing.assert_array_equal(signal_array_annotations["native_order"][:10], [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]) - np.testing.assert_array_equal(signal_array_annotations["spike_scope_digital_edge_polarity"][:10], [1, 1, 1, 1, 1, 1, 1, 1, 1, 1]) + np.testing.assert_array_equal( + signal_array_annotations["spike_scope_digital_edge_polarity"][:10], [1, 1, 1, 1, 1, 1, 1, 1, 1, 1] + ) np.testing.assert_array_equal(signal_array_annotations["board_stream_num"][:10], [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]) + if __name__ == "__main__": unittest.main()