diff --git a/src/probeinterface/neuropixels_tools.py b/src/probeinterface/neuropixels_tools.py index d101b62..71eb370 100644 --- a/src/probeinterface/neuropixels_tools.py +++ b/src/probeinterface/neuropixels_tools.py @@ -744,6 +744,7 @@ def read_openephys( oe_version = parse(info_chain.find("VERSION").text) neuropix_pxi_processor = None onebox_processor = None + onix_processor = None for signal_chain in root.findall("SIGNALCHAIN"): for processor in signal_chain: if "PROCESSOR" == processor.tag: @@ -752,10 +753,12 @@ def read_openephys( neuropix_pxi_processor = processor if "OneBox" in name: onebox_processor = processor + if "ONIX" in name: + onix_processor = processor - if neuropix_pxi_processor is None and onebox_processor is None: + if neuropix_pxi_processor is None and onebox_processor is None and onix_processor is None: if raise_error: - raise Exception("Open Ephys can only be read when the Neuropix-PXI or the " "OneBox plugin is used.") + raise Exception("Open Ephys can only be read from Neuropix-PXI, OneBox or ONIX plugins.") return None if neuropix_pxi_processor is not None: @@ -769,6 +772,8 @@ def read_openephys( if onebox_processor is not None: assert neuropix_pxi_processor is None, "Only one processor should be present" processor = onebox_processor + if onix_processor is not None: + processor = onix_processor if "NodeId" in processor.attrib: node_id = processor.attrib["NodeId"] @@ -797,6 +802,9 @@ def read_openephys( has_streams = False probe_names_used = None + if onix_processor is not None: + probe_names_used = [probe_name for probe_name in probe_names_used if "Probe" in probe_name] + # for Open Ephys version < 1.0 np_probes is in the EDITOR field. # for Open Ephys version >= 1.0 np_probes is in the CUSTOM_PARAMETERS field. editor = processor.find("EDITOR") @@ -804,7 +812,19 @@ def read_openephys( np_probes = editor.findall("NP_PROBE") else: custom_parameters = editor.find("CUSTOM_PARAMETERS") - np_probes = custom_parameters.findall("NP_PROBE") + if onix_processor is not None: + possible_probe_names = ["NEUROPIXELSV1E", "NEUROPIXELSV1F", "NEUROPIXELSV2E"] + parent_np_probe = "" + for possible_probe_name in possible_probe_names: + parent_np_probe = custom_parameters.findall(possible_probe_name) + if len(parent_np_probe) > 0: + break + if possible_probe_name == "NEUROPIXELSV2E": + np_probes = [parent_np_probe[0].findall(f"PROBE{a}")[0] for a in range(2)] + else: + np_probes = [parent_np_probe[0]] + else: + np_probes = custom_parameters.findall("NP_PROBE") if len(np_probes) == 0: if raise_error: @@ -829,117 +849,155 @@ def read_openephys( # make sure we have at least as many NP_PROBE as the number of used probes if len(np_probes) < len(probe_names_used): if raise_error: - raise Exception( - f"Not enough NP_PROBE entries ({len(np_probes)}) " f"for used probes: {probe_names_used}" - ) + raise Exception(f"Not enough NP_PROBE entries ({len(np_probes)}) for used probes: {probe_names_used}") return None probe_features = _load_np_probe_features() + list_of_probes = [] + np_probes_info = [] + # now load probe info from NP_PROBE fields np_probes_info = [] for probe_idx, np_probe in enumerate(np_probes): - slot = np_probe.attrib["slot"] - port = np_probe.attrib["port"] - dock = np_probe.attrib["dock"] - probe_part_number = np_probe.attrib["probe_part_number"] - probe_serial_number = np_probe.attrib["probe_serial_number"] - # read channels + # selected_electrodes is the preferred way to instantiate the probe + # if this field is available, a full probe is created from the probe_part_number + # and then sliced using the selected electrodes. + # if not available, the xpos and ypos fields are used to create the probe + slot = np_probe.attrib.get("slot") + port = np_probe.attrib.get("port") + dock = np_probe.attrib.get("dock") + probe_part_number = np_probe.attrib.get("probe_part_number") or np_probe.attrib.get("probePartNumber") + probe_serial_number = np_probe.attrib.get("probe_serial_number") or np_probe.attrib.get("probeSerialNumber") + selected_electrodes = np_probe.find("SELECTED_ELECTRODES") channels = np_probe.find("CHANNELS") - channel_names = np.array(list(channels.attrib.keys())) - channel_ids = np.array([int(ch[2:]) for ch in channel_names]) - channel_order = np.argsort(channel_ids) - - # sort channel_names and channel_values - channel_names = channel_names[channel_order] - channel_values = np.array(list(channels.attrib.values()))[channel_order] - - # check if shank ids is present - if all(":" in val for val in channel_values): - shank_ids = np.array([int(val.split(":")[1]) for val in channel_values]) - elif all("_" in val for val in channel_names): - shank_ids = np.array([int(val.split("_")[1]) for val in channel_names]) - else: - shank_ids = None - electrode_xpos = np_probe.find("ELECTRODE_XPOS") - electrode_ypos = np_probe.find("ELECTRODE_YPOS") + pt_metadata, _, mux_info = get_probe_metadata_from_probe_features(probe_features, probe_part_number) - if electrode_xpos is None or electrode_ypos is None: - if raise_error: - raise Exception("ELECTRODE_XPOS or ELECTRODE_YPOS is not available in settings!") - return None - xpos = np.array([float(electrode_xpos.attrib[ch]) for ch in channel_names]) - ypos = np.array([float(electrode_ypos.attrib[ch]) for ch in channel_names]) - positions = np.array([xpos, ypos]).T + if selected_electrodes is not None: + selected_electrodes_values = selected_electrodes.attrib.values() - probe_part_number = np_probe.get("probe_part_number", None) - pt_metadata, _, mux_info = get_probe_metadata_from_probe_features(probe_features, probe_part_number) + num_shank = pt_metadata["num_shanks"] + contact_per_shank = pt_metadata["cols_per_shank"] * pt_metadata["rows_per_shank"] - shank_pitch = pt_metadata["shank_pitch_um"] - - if fix_x_position_for_oe_5 and oe_version < parse("0.6.0") and shank_ids is not None: - positions[:, 1] = positions[:, 1] - shank_pitch * shank_ids - - # x offset so that the first column is at 0x - offset = np.min(positions[:, 0]) - # if some shanks are not used, we need to adjust the offset - if shank_ids is not None: - offset -= np.min(shank_ids) * shank_pitch - positions[:, 0] -= offset - - # - y_pitch = pt_metadata["electrode_pitch_vert_um"] # Vertical spacing between the centers of adjacent contacts - x_pitch = pt_metadata[ - "electrode_pitch_horz_um" - ] # Horizontal spacing between the centers of contacts within the same row - number_of_columns = pt_metadata["cols_per_shank"] - probe_stagger = ( - pt_metadata["even_row_horz_offset_left_edge_to_leftmost_electrode_center_um"] - - pt_metadata["odd_row_horz_offset_left_edge_to_leftmost_electrode_center_um"] - ) - num_shanks = pt_metadata["num_shanks"] + if num_shank == 1: + elec_ids = np.arange(contact_per_shank) + shank_ids = None + else: + elec_ids = np.concatenate([np.arange(contact_per_shank) for i in range(num_shank)]) + shank_ids = np.concatenate([np.zeros(contact_per_shank) + i for i in range(num_shank)]) - description = pt_metadata.get("description") + full_probe = _make_npx_probe_from_description( + pt_metadata, probe_part_number, elec_ids, shank_ids, mux_info=mux_info + ) - elec_ids = [] - for i, pos in enumerate(positions): - # Do not calculate contact ids if the model name is not known - if description is None: - elec_ids = None - break + selected_electrode_indices = [int(electrode_index) for electrode_index in selected_electrodes_values] + + sliced_probe = full_probe.get_slice(selection=selected_electrode_indices) - x_pos = pos[0] - y_pos = pos[1] + np_probe_dict = { + "pt_metadata": pt_metadata, + "serial_number": probe_serial_number, + "part_number": probe_part_number, + "mux_info": mux_info, + "probe": sliced_probe, + } + else: - # Adds a shift to rows in the staggered configuration - is_row_staggered = np.mod(y_pos / y_pitch + 1, 2) == 1 - row_stagger = probe_stagger if is_row_staggered else 0 + channel_names = np.array(list(channels.attrib.keys())) + channel_ids = np.array([int(ch[2:]) for ch in channel_names]) + channel_order = np.argsort(channel_ids) - # Map the positions to the contacts ids - shank_id = shank_ids[i] if num_shanks > 1 else 0 + # sort channel_names and channel_values + channel_names = channel_names[channel_order] + channel_values = np.array(list(channels.attrib.values()))[channel_order] - # Electrode ids are computed from the positions of the electrodes. The computation - # is different for probes with one row of electrodes, or more than one. - if x_pitch == 0: - elec_id = int(number_of_columns * y_pos / y_pitch) + # check if shank ids is present + if all(":" in val for val in channel_values): + shank_ids = np.array([int(val.split(":")[1]) for val in channel_values]) + elif all("_" in val for val in channel_names): + shank_ids = np.array([int(val.split("_")[1]) for val in channel_names]) else: - elec_id = int( - (x_pos - row_stagger - shank_pitch * shank_id) / x_pitch + number_of_columns * y_pos / y_pitch - ) - elec_ids.append(elec_id) - - np_probe_dict = { - "shank_ids": shank_ids, - "elec_ids": elec_ids, - "pt_metadata": pt_metadata, - "slot": slot, - "port": port, - "dock": dock, - "serial_number": probe_serial_number, - "part_number": probe_part_number, - "mux_info": mux_info, - } + shank_ids = None + + electrode_xpos = np_probe.find("ELECTRODE_XPOS") + electrode_ypos = np_probe.find("ELECTRODE_YPOS") + + if electrode_xpos is None or electrode_ypos is None: + if raise_error: + raise Exception("ELECTRODE_XPOS or ELECTRODE_YPOS is not available in settings!") + return None + xpos = np.array([float(electrode_xpos.attrib[ch]) for ch in channel_names]) + ypos = np.array([float(electrode_ypos.attrib[ch]) for ch in channel_names]) + positions = np.array([xpos, ypos]).T + + shank_pitch = pt_metadata["shank_pitch_um"] + + if fix_x_position_for_oe_5 and oe_version < parse("0.6.0") and shank_ids is not None: + positions[:, 1] = positions[:, 1] - shank_pitch * shank_ids + + # x offset so that the first column is at 0x + offset = np.min(positions[:, 0]) + # if some shanks are not used, we need to adjust the offset + if shank_ids is not None: + offset -= np.min(shank_ids) * shank_pitch + positions[:, 0] -= offset + + # + y_pitch = pt_metadata[ + "electrode_pitch_vert_um" + ] # Vertical spacing between the centers of adjacent contacts + x_pitch = pt_metadata[ + "electrode_pitch_horz_um" + ] # Horizontal spacing between the centers of contacts within the same row + number_of_columns = pt_metadata["cols_per_shank"] + probe_stagger = ( + pt_metadata["even_row_horz_offset_left_edge_to_leftmost_electrode_center_um"] + - pt_metadata["odd_row_horz_offset_left_edge_to_leftmost_electrode_center_um"] + ) + num_shanks = pt_metadata["num_shanks"] + + description = pt_metadata.get("description") + + elec_ids = [] + for i, pos in enumerate(positions): + # Do not calculate contact ids if the model name is not known + if description is None: + elec_ids = None + break + + x_pos = pos[0] + y_pos = pos[1] + + # Adds a shift to rows in the staggered configuration + is_row_staggered = np.mod(y_pos / y_pitch + 1, 2) == 1 + row_stagger = probe_stagger if is_row_staggered else 0 + + # Map the positions to the contacts ids + shank_id = shank_ids[i] if num_shanks > 1 else 0 + + # Electrode ids are computed from the positions of the electrodes. The computation + # is different for probes with one row of electrodes, or more than one. + if x_pitch == 0: + elec_id = int(number_of_columns * y_pos / y_pitch) + else: + elec_id = int( + (x_pos - row_stagger - shank_pitch * shank_id) / x_pitch + number_of_columns * y_pos / y_pitch + ) + elec_ids.append(elec_id) + + np_probe_dict = { + "shank_ids": shank_ids, + "elec_ids": elec_ids, + "pt_metadata": pt_metadata, + "slot": slot, + "port": port, + "dock": dock, + "serial_number": probe_serial_number, + "part_number": probe_part_number, + "mux_info": mux_info, + } + # Sequentially assign probe names if "custom_probe_name" in np_probe.attrib and np_probe.attrib["custom_probe_name"] != probe_serial_number: name = np_probe.attrib["custom_probe_name"] @@ -948,10 +1006,11 @@ def read_openephys( np_probe_dict.update({"name": name}) np_probes_info.append(np_probe_dict) - # now select correct probe (if multiple) + # now select find the selected probe (if multiple) if len(np_probes) > 1: found = False probe_names = [p["name"] for p in np_probes_info] + if stream_name is not None: assert probe_name is None and serial_number is None, ( "Use one of 'stream_name', 'probe_name', " "or 'serial_number'" @@ -1039,34 +1098,39 @@ def read_openephys( np_probe_info = np_probes_info[probe_idx] np_probe = np_probes[probe_idx] - shank_ids = np_probe_info["shank_ids"] - elec_ids = np_probe_info["elec_ids"] - pt_metadata = np_probe_info["pt_metadata"] - mux_info = np_probe_info["mux_info"] - - # check if subset of channels - chans_saved = get_saved_channel_indices_from_openephys_settings(settings_file, stream_name=stream_name) - - # if a recording state is found, slice probe - if chans_saved is not None: - positions = positions[chans_saved] - if shank_ids is not None: - shank_ids = np.array(shank_ids)[chans_saved] - if elec_ids is not None: - elec_ids = np.array(elec_ids)[chans_saved] - - probe = _make_npx_probe_from_description( - pt_metadata, probe_part_number, elec_ids, shank_ids=shank_ids, mux_info=mux_info - ) + probe = np_probe_info.get("probe") + + if probe is None: + # check if subset of channels + chans_saved = get_saved_channel_indices_from_openephys_settings(settings_file, stream_name=stream_name) + shank_ids = np_probe_info["shank_ids"] + elec_ids = np_probe_info["elec_ids"] + pt_metadata = np_probe_info["pt_metadata"] + mux_info = np_probe_info["mux_info"] + + # if a recording state is found, slice probe + if chans_saved is not None: + positions = positions[chans_saved] + if shank_ids is not None: + shank_ids = np.array(shank_ids)[chans_saved] + if elec_ids is not None: + elec_ids = np.array(elec_ids)[chans_saved] + + probe = _make_npx_probe_from_description( + pt_metadata, probe_part_number, elec_ids, shank_ids=shank_ids, mux_info=mux_info + ) probe.serial_number = np_probe_info["serial_number"] probe.name = np_probe_info["name"] probe.annotate( part_number=np_probe_info["part_number"], - slot=np_probe_info["slot"], - dock=np_probe_info["dock"], - port=np_probe_info["port"], ) + if "slot" in np_probe_info: + probe.annotate(slot=np_probe_info["slot"]) + if "port" in np_probe_info: + probe.annotate(port=np_probe_info["port"]) + if "dock" in np_probe_info: + probe.annotate(dock=np_probe_info["dock"]) return probe diff --git a/tests/data/openephys/OE_ONIX-NP/settings_NP2.xml b/tests/data/openephys/OE_ONIX-NP/settings_NP2.xml new file mode 100644 index 0000000..9794438 --- /dev/null +++ b/tests/data/openephys/OE_ONIX-NP/settings_NP2.xml @@ -0,0 +1,382 @@ + + + + + 1.0.1 + 10 + 11 Sep 2025 11:52:51 + Windows 11 + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/tests/data/openephys/OE_ONIX-NP/settings_NP2_1.xml b/tests/data/openephys/OE_ONIX-NP/settings_NP2_1.xml new file mode 100644 index 0000000..23f8b40 --- /dev/null +++ b/tests/data/openephys/OE_ONIX-NP/settings_NP2_1.xml @@ -0,0 +1,367 @@ + + + + + 1.0.1 + 10 + 8 Oct 2025 11:56:44 + Windows 11 + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +