In [None]:
from dandi.dandiapi import DandiAPIClient
from pynwb import NWBHDF5IO

with DandiAPIClient() as client:
    asset = client.get_dandiset("000231").get_asset_by_path(
        "sub-KF134/sub-KF134_ses-20180211T154918_behavior+ecephys+image.nwb"
    )
    url = asset.get_content_url(follow_redirects=1, strip_query=True)

io = NWBHDF5IO(url, mode="r", load_namespaces=True, driver="ros3")
nwb = io.read()

# What's inside?
print("Acquisition:", list(nwb.acquisition.keys()))
print("Processing:", list(nwb.processing.keys()))
print("Units:", nwb.units if nwb.units else "None")
print("Trials:", nwb.trials.to_dataframe().head() if nwb.trials else "None")
print("Electrodes:", nwb.electrodes.to_dataframe().head())

In [None]:
from pathlib import Path
import numpy as np
from numpy.typing import NDArray


class GenericEphysLoader:
    """Auto-detecting ephys loader. Uses Neo for known formats,
    falls back to memmap for raw binary.
    
    For known formats (.rhd, .rhs, .oebin, .ns5, .abf, .edf, .nwb),
    Neo extracts dtype, gain, rate, and channel count from headers.
    
    For raw binary (.dat, .bin), user must provide n_channels and
    sampling_rate at minimum.
    """

    KNOWN_EXTENSIONS = {
        ".rhd": "IntanRawIO",
        ".rhs": "IntanRawIO",
        ".oebin": "OpenEphysBinaryRawIO",
        ".nwb": "NWBIO",
        ".ns5": "BlackrockRawIO",
        ".ns6": "BlackrockRawIO",
        ".nev": "BlackrockRawIO",
        ".abf": "AxonRawIO",
        ".edf": "EdfRawIO",
        ".bdf": "EdfRawIO",
        ".vhdr": "BrainVisionRawIO",
    }

    def __init__(
        self,
        path: str | Path,
        n_channels: int | None = None,
        sampling_rate: float | None = None,
        dtype: str = "int16",
        stream_id: str = "0",
    ):
        self.path = Path(path)
        self._stream_id = stream_id
        self._reader = None

        ext = self.path.suffix.lower()
        rawio_class = self.KNOWN_EXTENSIONS.get(ext)

        if rawio_class:
            self._init_neo(rawio_class, stream_id)
        elif ext in (".dat", ".bin", ".raw"):
            if n_channels is None or sampling_rate is None:
                raise ValueError(
                    f"Raw binary '{ext}' requires n_channels and sampling_rate. "
                    "Use a format with headers (.rhd, .oebin, .edf, ...) "
                    "for automatic detection."
                )
            self._init_memmap(n_channels, sampling_rate, dtype)
        else:
            self._init_neo_auto(n_channels, sampling_rate, stream_id)

    def _init_neo(self, rawio_name: str, stream_id: str):
        import neo.rawio

        rawio_cls = getattr(neo.rawio, rawio_name)

        if rawio_name == "OpenEphysBinaryRawIO":
            self._reader = rawio_cls(dirname=str(self.path.parent))
        else:
            self._reader = rawio_cls(filename=str(self.path))

        self._reader.parse_header()
        self._resolve_stream(stream_id)

    def _init_neo_auto(self, n_channels, sampling_rate, stream_id):
        """Try Neo's auto-detection for unknown extensions."""
        from neo.rawio import get_rawio

        rawio_cls = get_rawio(str(self.path))
        if rawio_cls is None:
            if n_channels is None or sampling_rate is None:
                raise ValueError(
                    f"Cannot auto-detect format for '{self.path.name}'. "
                    "Provide n_channels and sampling_rate for raw binary, "
                    "or use a supported format."
                )
            self._init_memmap(n_channels, sampling_rate, "int16")
            return

        self._reader = rawio_cls(filename=str(self.path))
        self._reader.parse_header()
        self._resolve_stream(stream_id)

    def _resolve_stream(self, stream_id: str):
        streams = self._reader.header["signal_streams"]
        stream_ids = list(streams["id"])

        if stream_id not in stream_ids:
            stream_id = stream_ids[0]

        self._stream_idx = stream_ids.index(stream_id)
        self._n_samples = self._reader.get_signal_size(
            block_index=0, seg_index=0, stream_index=self._stream_idx,
        )

        channels = self._reader.header["signal_channels"]
        mask = channels["stream_id"] == stream_id
        self._n_channels = int(np.sum(mask))
        self.rate = float(channels[mask]["sampling_rate"][0])
        self._is_memmap = False

    def _init_memmap(self, n_channels: int, sampling_rate: float, dtype: str):
        self._mmap = np.memmap(str(self.path), dtype=np.dtype(dtype), mode="r")
        total = len(self._mmap) // n_channels
        self._mmap = self._mmap[:total * n_channels].reshape(total, n_channels)
        self._n_channels = n_channels
        self._n_samples = total
        self.rate = sampling_rate
        self._is_memmap = True

    def __len__(self) -> int:
        return self._n_samples

    @property
    def n_channels(self) -> int:
        return self._n_channels

    @property
    def streams(self) -> dict | None:
        if self._reader is None:
            return None
        streams = self._reader.header["signal_streams"]
        channels = self._reader.header["signal_channels"]
        info = {}
        for sid, name in zip(streams["id"], streams["name"]):
            mask = channels["stream_id"] == sid
            info[sid] = {
                "name": name,
                "n_channels": int(np.sum(mask)),
                "rate": float(channels[mask]["sampling_rate"][0]),
            }
        return info

    def __getitem__(self, key) -> NDArray[np.float64]:
        if isinstance(key, slice):
            start, stop, _ = key.indices(self._n_samples)
        else:
            start, stop = key, key + 1

        if self._is_memmap:
            return self._mmap[start:stop].astype(np.float64)

        raw = self._reader.get_analogsignal_chunk(
            block_index=0, seg_index=0,
            i_start=start, i_stop=stop,
            stream_index=self._stream_idx,
        )
        return self._reader.rescale_signal_raw_to_float(
            raw, dtype="float64", stream_index=self._stream_idx,
        )
        
from ethograph.gui.plots_ephystrace import EphysTracePlot

# Point to the info.rhd in your Intan folder
loader = GenericEphysLoader(r"C:\Users\aksel\Desktop\ephys\Ivy_250309_111721\info.rhd")

# What's in there?
print(f"Channels: {loader.n_channels}")
print(f"Rate: {loader.rate} Hz")
print(f"Duration: {len(loader) / loader.rate:.1f} s")
print(f"Streams: {loader.streams}")

# Quick plot
plot = EphysTracePlot(loader, channel=0)
plot.show()
plot.set_xrange(0, 2.0)  # first 2 seconds

: 