In [None]:
import sleap_io as sio
import imageio.v3 as iio
import numpy as np
from pathlib import Path
from rich.progress import track

In [None]:
dataset_path = "datasets"
dataset_name = "flies13"

labels_paths = {
    "train": r"D:\sleap-data\datasets\wt_gold.13pt\tracking_split2\train.pkg.slp",
    "val": r"D:\sleap-data\datasets\wt_gold.13pt\tracking_split2\val.pkg.slp",
    "test": r"D:\sleap-data\datasets\wt_gold.13pt\tracking_split2\test.pkg.slp",
}

symmetries = [
    ("wingL", "wingR"),
    ("forelegL4", "forelegR4"),
    ("midlegL4", "midlegR4"),
    ("hindlegL4", "hindlegR4"),
    ("eyeL", "eyeR"),
]

class_index = 0
quality = 90
overwrite = True

In [None]:
def convert_instance(instance: sio.Instance, img_height: int, img_width: int, class_index: int = 0) -> str:
    """Convert a SLEAP Instance to a row in the Ultralytics pose format.

    Args:
        instance: A SLEAP Instance representing a single subject in a frame.
        img_height: Height of the image this instance comes from in pixels.
        img_width: Width of the image this instance comes from in pixels.
        class_index: An integer representing the class of the object. Defaults
            to 0.

    Returns:
        A string with the Ultralytics-formatted row.

    Notes:
        The row will be formatted as:
        ```
        <class-index> <x> <y> <width> <height> <px1> <py1> <p1-visibility> <px2> <py2> <p2-visibility> <pxn> <pyn> <p2-visibility>
        ```

        Reference: https://docs.ultralytics.com/datasets/pose/
    """
    pts = instance.numpy()

    x0, x1 = np.nanmin(pts[:, 0]), np.nanmax(pts[:, 0])
    y0, y1 = np.nanmin(pts[:, 1]), np.nanmax(pts[:, 1])

    bbox_midx = ((x0 + x1) / 2) / img_width
    bbox_midy = ((y0 + y1) / 2) / img_height

    bbox_width = (x1 - x0) / img_width
    bbox_height = (y1 - y0) / img_height

    row = [
        f"{class_index:d}",
        f"{bbox_midx:.6f}",
        f"{bbox_midy:.6f}",
        f"{bbox_width:.6f}",
        f"{bbox_height:.6f}",
    ]
    for (px, py) in pts:
        if np.isnan(px):
            px, py, vis = 0., 0., 0
        else:
            px = px / img_width
            py = py / img_height
            vis = 1
        row.extend([
            f"{px:.6f}",
            f"{py:.6f}",
            f"{vis:d}",
        ])
    row = " ".join(row)
    return row


def convert_frames(labels: sio.Labels, save_folder: str, class_index: int = 0, quality: int = 90, overwrite: bool = False):
    """Generate images and text files for individual labeled frames.

    Args:
        labels: SLEAP Labels object.
        save_folder: Folder that will contain images and text files.
        class_index: An integer representing the class of the object. Defaults
            to 0.
        quality: Image compression quality to save at. Defaults to 90.
        overwrite: If False (the default), skip frames that already have saved data.
    """

    try:
        iter_labels = track(enumerate(labels), total=len(labels))
    except:
        iter_labels = enumerate(labels)
    for i, lf in iter_labels:
        name = f"{i:06d}"
        save_folder = Path(save_folder)
        img_path = (save_folder / (name + ".jpg"))
        txt_path = (save_folder / (name + ".txt"))

        if (not overwrite) and img_path.exists() and txt_path.exists():
            continue

        img = lf.image.squeeze()
        img_height, img_width = img.shape[:2]
        instances = "\n".join([
            convert_instance(instance, img_height, img_width, class_index=class_index)
            for instance in lf.user_instances
        ])

        save_folder.mkdir(exist_ok=True, parents=True)
        iio.imwrite(img_path, img, quality=quality)
        with open(txt_path, "w") as f:
            f.write(instances)


def parse_skeleton(skel: sio.Skeleton, symmetries: list = None):
    """Return number of nodes and symmetries."""
    n_nodes = len(skel)

    if symmetries is None:
        symmetries = skel.symmetries

    flip_idx = np.arange(n_nodes)

    if len(symmetries) > 0:
        symmetry_inds = np.array([(skel.index(a), skel.index(b)) for a, b in symmetries])
        flip_idx[symmetry_inds[:, 0]] = symmetry_inds[:, 1]
        flip_idx[symmetry_inds[:, 1]] = symmetry_inds[:, 0]

    flip_idx = flip_idx.tolist()

    return n_nodes, flip_idx


def write_dataset_yaml(dataset_path, dataset_name, n_nodes, flip_idx):
    dataset_path = Path(dataset_path)
    dataset_path.mkdir(exist_ok=True, parents=True)

    dataset_yaml_path = dataset_path / dataset_name / f"{dataset_name}.yaml"

    with open(dataset_yaml_path, "w") as f:
        f.write(
f"""# Train/val/test sets as 1) dir: path/to/imgs, 2) file: path/to/imgs.txt, or 3) list: [path/to/imgs1, path/to/imgs2, ..]
path: {dataset_name}  # dataset root dir
train: train  # train images (relative to 'path')
val: val  # val images (relative to 'path')
test: test  # test images (optional)

# Keypoints
kpt_shape: [{n_nodes}, 3]  # number of keypoints, number of dims (2 for x,y or 3 for x,y,visible)
flip_idx: {flip_idx}

# Classes dictionary
names:
  0: instance
"""
        )

In [None]:
dataset_path = Path(dataset_path)

n_nodes = None
for split_name, labels_path in labels_paths.items():
    labels = sio.load_slp(labels_path)

    if n_nodes is None:
        n_nodes, flip_idx = parse_skeleton(labels.skeletons[0], symmetries)

    img_folder = dataset_path / dataset_name / split_name

    convert_frames(labels, save_folder=img_folder, class_index=class_index, quality=quality, overwrite=overwrite)

write_dataset_yaml(dataset_path, dataset_name, n_nodes, flip_idx)