A collection of utility functions for working with aeon raw data 

In [16]:
import os

import aeon.io.api as aeon
import aeon.io.reader
import numpy as np
import pandas as pd
import plotly.graph_objects as go
from aeon.analysis.utils import visits
from aeon.io.video import frames
from aeon.schema.dataset import exp02
from dotmap import DotMap

In [4]:
def get_experiment_times(
    root: str | os.PathLike, start_time: pd.Timestamp, end_time: pd.Timestamp
) -> DotMap:
    """
    Retrieve experiment start and stop times from environment states
    (i.e. times outside of maintenance mode) occurring within the
    given start and end times.

    Args:
        root (str or os.PathLike): The root path where epoch data is stored.
        start_time (pandas.Timestamp): Start time.
        end_time (pandas.Timestamp): End time.

    Returns:
        DotMap: A DotMap object containing two keys: 'start' and 'stop',
        corresponding to pairs of experiment start and stop times.

    Notes:
    This function uses the last 'Maintenance' event as the last 'Experiment'
    stop time. If the first retrieved state is 'Maintenance' (e.g.
    'Experiment' mode entered before `start`), `start` is used
    as the first 'Experiment' start time.
    """

    experiment_times = DotMap()
    env_states = aeon.load(
        root,
        exp02.ExperimentalMetadata.EnvironmentState,
        start_time,
        end_time,
    )
    # Use the last 'maintenance' event as end time
    end_time = (env_states[env_states.state == "Maintenance"]).index[-1]
    env_states = env_states[~env_states.index.duplicated(keep="first")]
    # Retain only events between visit start and stop times
    env_states = env_states.iloc[
        env_states.index.get_indexer([start_time], method="bfill")[
            0
        ] : env_states.index.get_indexer([end_time], method="ffill")[0] + 1
    ]
    # Retain only events where state changes (experiment-maintenance pairs)
    env_states = env_states[env_states["state"].ne(env_states["state"].shift())]
    if env_states["state"].iloc[0] == "Maintenance":
        # Pad with an "Experiment" event at the start
        env_states = pd.concat(
            [
                pd.DataFrame(
                    "Experiment",
                    index=[start_time],
                    columns=env_states.columns,
                ),
                env_states,
            ]
        )
    else:
        # Use start time as the first "Experiment" event
        env_states.rename(index={env_states.index[0]: start_time}, inplace=True)
    experiment_times.start = env_states[
        env_states["state"] == "Experiment"
    ].index.values
    experiment_times.stop = env_states[
        env_states["state"] == "Maintenance"
    ].index.values

    return experiment_times


def exclude_maintenance_data(
    data: pd.DataFrame, experiment_times: DotMap
) -> pd.DataFrame:
    """
    Exclude rows not in experiment times (i.e., corresponding to maintenance times)
    from the given dataframe.

    Args:
        data (pandas.DataFrame): The data to filter. Expected to have a DateTimeIndex.
        experiment_times (DotMap): A DotMap object containing experiment start and stop times.

    Returns:
        pandas.DataFrame: The filtered data.
    """
    filtered_data = pd.concat(
        [
            data.loc[start:stop]
            for start, stop in zip(experiment_times.start, experiment_times.stop)
        ]
    )
    return filtered_data

In [12]:
def get_single_frame(
    root: str | os.PathLike,
    video_reader: aeon.io.reader.Video,
    time: pd.Timestamp,
) -> np.ndarray:
    """
    Retrieve a single frame from the given root directory,
    Video reader, and time.

    Args:
        root (str or os.PathLike): The root path where epoch data
            is stored.
        video_reader (aeon.io.reader.Video): The Video reader.
        time (pd.Timestamp): The timestamp of the frame to retrieve.

    Returns:
        numpy.ndarray: The raw frame.
    """
    vdata = aeon.load(
        root, video_reader, start=time, end=time + pd.Timedelta(seconds=1)
    )
    vframe = frames(vdata.iloc[:1])
    return np.squeeze(list(vframe))


def show_single_frame(raw_frame: np.ndarray, width: int = 1440, height: int = 1080):
    """
    Display a single frame retrieved from the given root directory,
    Video reader, and time.

    Args:
        raw_frame (numpy.ndarray): The raw frame.
        width (int): The width of the display layout.
        height (int): The height of the display layout.

    Returns:
        None
    """
    width = width
    height = height
    fig = go.Figure(
        data=[go.Image(z=raw_frame)],
        layout=go.Layout(
            width=width,
            height=height,
            xaxis=dict(
                visible=False,
            ),
            yaxis=dict(
                visible=False,
                scaleanchor="x",
            ),
            margin=dict(l=0, r=0, t=0, b=0),
        ),
    )
    fig.show()

In [None]:
# example
root = "/ceph/aeon/aeon/data/raw/AEON2/experiment0.2/"
subject_data = aeon.load(root, exp02.ExperimentalMetadata.SubjectState)
subject_data = subject_data[subject_data.id.str.startswith("BAA")]
subject_data = subject_data[subject_data.event != "Remain"]
subject_visits = visits(subject_data)
visit_start = subject_visits.enter[0]
visit_end = subject_visits.exit[0]
experiment_times = get_experiment_times(root, visit_start, visit_end)
pos_data = aeon.load(root, exp02.CameraTop.Position, visit_start, visit_end)
pos_data = exclude_maintenance_data(pos_data, experiment_times)

In [None]:
# retrieve and show single frame
vframe = get_single_frame(
    root, exp02.CameraTop.Video, pd.Timestamp("2022-02-23 14:21:25.550335884")
)
show_single_frame(vframe, width=1440 / 4, height=1080 / 4)

# export frame as png
import cv2

cv2.imwrite("test.png", vframe)