In [7]:
from __future__ import annotations

from pathlib import Path
import numpy as np
import h5py
import sidpy
import pyNSID
from igor2 import binarywave as igor_binarywave

print('Imports OK')
!run_server_afm


Imports OK
Server started and running in the background. Logs are being written to server.log.


In [8]:
def parse_note(note_bytes: bytes | str | None) -> dict:
    """Parse Igor wave note into a dict with key:value per line."""
    if note_bytes is None:
        return {}
    if isinstance(note_bytes, str):
        text = note_bytes
    else:
        text = (
            note_bytes
            .replace(b"\xb0", b"\xc2\xb0")  # °
            .replace(b"\xb5", b"\xc2\xb5")  # µ
            .decode(errors="replace")
        )
    note = {}
    for line in text.split("\r"):
        s = line.strip()
        if not s or ":" not in s:
            continue
        k, v = s.split(":", 1)
        note[k.strip()] = v.strip()
    return note


def extract_channel_labels(wave: dict, n_channels: int) -> list[str]:
    """Extract per-channel labels from Igor 'labels' structure if present."""
    labels: list[str] = []
    raw_labels = wave.get("labels", None)

    if isinstance(raw_labels, (list, tuple)) and len(raw_labels) >= 3:
        candidate = raw_labels[2]
        if isinstance(candidate, (list, tuple)) and len(candidate) > 1:
            for item in candidate[1:]:
                if isinstance(item, (bytes, bytearray)):
                    labels.append(item.decode(errors="replace"))
                else:
                    labels.append(str(item))

    if len(labels) < n_channels:
        labels += [f"Channel_{i:03d}" for i in range(len(labels), n_channels)]

    return labels[:n_channels]


def build_channel_selection(labels: list[str]) -> list[tuple[int, str, str]]:
    """Return list of (source_index, output_signal_name, unit) for the 4-channel output.

    If a required channel is missing, use src_index = -1 and print a warning.
    Downstream code must handle src_index == -1 by writing a placeholder array.
    """
    label_to_idx = {lab: i for i, lab in enumerate(labels)}

    required = [
        ("HeightRetrace",    "HeightRetrace",     "m"),
        ("AmplitudeRetrace", "AmplitudeRetrace",  "m"),
        ("ZSensorRetrace",   "DeflectionRetrace", "m"),
        ("PhaseRetrace",     "PhaseRetrace",      "deg"),
    ]

    selection: list[tuple[int, str, str]] = []
    missing: list[str] = []

    # Always output 4 entries, in required order
    for src_label, out_signal, unit in required:
        if src_label in label_to_idx:
            selection.append((label_to_idx[src_label], out_signal, unit))
        else:
            missing.append(src_label)
            selection.append((-1, out_signal, unit))  # placeholder, still keeps 4 channels

    if missing:
        print(f"[IBW WARNING] Missing required channels: {missing}. Labels found: {labels}")

    return selection


def make_channel_map_from_selection(selection: list[tuple[int, str, str]], measurement: str = "Measurement_000") -> dict:
    """Make a channel_map containing channels/signals/units/scans/spectra/point_clouds."""
    cm = {"channels": {}, "spectra": [], "point_clouds": []}
    for out_i, (src_i, signal, unit) in enumerate(selection):
        ch = f"Channel_{out_i:03d}"
        cm["channels"][ch] = {
            "src_index": int(src_i),
            "signal": str(signal),
            "units": str(unit),
            "scan": int(out_i),
            "spectrum": None,
            "point_cloud": None,
            "h5": {
                "measurement": measurement,
                "signal_group": str(signal),
                "dataset": str(signal),
            },
        }
    return cm


def channel_map_to_dataset_info(cm: dict) -> list:
    ch_names = sorted(cm["channels"].keys(), key=lambda x: int(x.split("_")[1]))
    signals = [cm["channels"][c]["signal"] for c in ch_names]
    units = [cm["channels"][c]["units"] for c in ch_names]
    scans = [cm["channels"][c]["scan"] for c in ch_names]
    return [
        ("channels", ch_names),
        ("signals", signals),
        ("units", units),
        ("scans", scans),
        ("spectra", cm.get("spectra", [])),
        ("point_clouds", cm.get("point_clouds", [])),
    ]


In [9]:
def convert_one_ibw_to_h5(
    ibw_path: str | Path,
    out_h5_path: str | Path,
    rotate_k: int = 1,
    measurement_prefix: str = "Measurement_",
) -> dict:
    """Convert a single IBW to an NSID H5 with the 4-channel sample-like output."""
    ibw_path = Path(ibw_path)
    out_h5_path = Path(out_h5_path)
    out_h5_path.parent.mkdir(parents=True, exist_ok=True)

    wave = igor_binarywave.load(str(ibw_path))["wave"]
    arr = np.asarray(wave["wData"])
    if arr.ndim != 3:
        raise ValueError(f"Expected wData to be 3D (X,Y,C). Got shape={arr.shape}")

    note = parse_note(wave.get("note", None))

    n_channels = arr.shape[2]
    labels = extract_channel_labels(wave, n_channels)

    selection = build_channel_selection(labels)
    cm = make_channel_map_from_selection(selection)

    # Build sidpy datasets for only the selected 4 channels
    data_sets: dict[str, sidpy.Dataset] = {}
    for ch_key, meta in cm["channels"].items():
        src_i = meta["src_index"]
        signal = meta["signal"]
        unit = meta["units"]

        ds = sidpy.Dataset.from_array(np.rot90(arr[:, :, src_i], k=rotate_k), name=signal)
        ds.data_type = "image"
        ds.quantity = signal
        ds.units = unit
        ds.title = signal
        ds.modality = "AFM"

        # Spatial dimensions (prefer note, fallback to array shape)
        nx = int(note.get("ScanPoints", ds.shape[0]))
        ny = int(note.get("ScanLines", ds.shape[1]))
        slow = float(note.get("FastScanSize", 1.0)) # swapped
        fast = float(note.get("SlowScanSize", 1.0)) # swapped

        ds.set_dimension(0, sidpy.Dimension(np.linspace(0, fast, nx), name="x", units="m",
                                            quantity="Length", dimension_type="spatial"))
        ds.set_dimension(1, sidpy.Dimension(np.linspace(0, slow, ny), name="y", units="m",
                                            quantity="Length", dimension_type="spatial"))

        ds.metadata["source_file"] = ibw_path.name
        ds.metadata["source_label"] = labels[src_i]
        ds.metadata["note"] = note

        data_sets[ch_key] = ds

    # Write NSID H5 in the layout: /Measurement_000/Channel_000/<signal>/<signal>
    with h5py.File(out_h5_path, mode="w") as h5:
        meas = sidpy.hdf.prov_utils.create_indexed_group(h5, measurement_prefix)

        for ch_key, ds in data_sets.items():
            ch_grp = meas.create_group(ch_key)
            sig_grp = ch_grp.create_group(ds.quantity)
            pyNSID.hdf_io.write_nsid_dataset(ds, sig_grp)

        h5.flush()

    return {
        "ibw": str(ibw_path),
        "h5": str(out_h5_path),
        "labels": labels,
        "channel_map": cm,
        "dataset_info_preview": channel_map_to_dataset_info(cm),
    }


In [10]:
def convert_folder_ibw_to_h5(in_folder: str | Path, out_folder: str | Path | None = None) -> None:
    in_folder = Path(in_folder)
    if out_folder is None:
        out_folder = in_folder.with_name(in_folder.name + "_h5")
    else:
        out_folder = Path(out_folder)
    out_folder.mkdir(parents=True, exist_ok=True)

    ibw_files = sorted(in_folder.glob("*.ibw"))
    print(f"Found {len(ibw_files)} .ibw files in: {in_folder}")
    print(f"Writing .h5 files to: {out_folder}")

    ok, fail = 0, 0
    for ibw in ibw_files:
        try:
            out_h5 = out_folder / f"{ibw.stem}.h5"
            info = convert_one_ibw_to_h5(ibw, out_h5)
            ok += 1
            print(f"[OK] {ibw.name} -> {out_h5.name}")
            # Preview the dataset-info format the server expects:
            if ok == 1:
                print("Preview (server-style):")
                print(info["dataset_info_preview"])
        except Exception as e:
            fail += 1
            print(f"[FAIL] {ibw.name}: {type(e).__name__}: {e}")

    print(f"\nDone. Success: {ok}, Failed: {fail}")


In [11]:
import Pyro5.api
def test_with_mic_server(h5_filename: str, uri: str = "PYRO:microscope.server@localhost:9092", data_path: str = "data/AFM"):

    mic_server = Pyro5.api.Proxy(uri)

    mic_server.initialize_microscope("AFM", data_path="/".join(["..", "..", data_path, h5_filename]))
    mic_server.setup_microscope(data_source="Compound_Dataset_1")
    return mic_server.get_dataset_info()


In [12]:
1# --- EDIT THESE ---
IN_FOLDER = r"last_batch"              # folder containing .ibw files
OUT_FOLDER = r"last_batch_h5"  # output folder for .h5

convert_folder_ibw_to_h5(IN_FOLDER, OUT_FOLDER)


Found 2 .ibw files in: last_batch
Writing .h5 files to: last_batch_h5


  warn('validate_h5_dimension may be removed in a future version',
  warn('validate_h5_dimension may be removed in a future version',
  warn('validate_h5_dimension may be removed in a future version',
  warn('validate_h5_dimension may be removed in a future version',


[OK] cufoil_high.ibw -> cufoil_high.h5
Preview (server-style):
[('channels', ['Channel_000', 'Channel_001', 'Channel_002', 'Channel_003']), ('signals', ['HeightRetrace', 'AmplitudeRetrace', 'DeflectionRetrace', 'PhaseRetrace']), ('units', ['m', 'm', 'm', 'deg']), ('scans', [0, 1, 2, 3]), ('spectra', []), ('point_clouds', [])]


  warn('validate_h5_dimension may be removed in a future version',
  warn('validate_h5_dimension may be removed in a future version',
  warn('validate_h5_dimension may be removed in a future version',


[OK] cufoil_low.ibw -> cufoil_low.h5

Done. Success: 2, Failed: 0


  warn('validate_h5_dimension may be removed in a future version',
