### Aeon data file structure on Ceph

**Raw data**: `/ceph/aeon/aeon/data/raw/<acquisition_pc>/<experiment_name>/<acquisition_epoch>/<device>/<stream>`

e.g. `/ceph/aeon/aeon/data/raw/AEON3/social0.1/2023-12-01T14-30-34/Patch1/Patch1_90_2023-12-02T12-00-00.bin`

**Processed data (e.g. trained and exported SLEAP model)**: `/ceph/aeon/aeon/data/processed/<processed_pc>/<job_id>/<job_time>/<model_name>/frozen_graph.pb`

e.g. `/ceph/aeon/aeon/data/processed/test-node1/0000005/2023-11-30T01-29-00/topdown_multianimal_id/frozen_graph.pb`

### Reading Aeon data in Python

#### Terminology

**_Chunk Duration_**: The time duration over which experiment data files are written out. Currently, all Aeon experiments write out acquired data to files every hour (1-hour chunks).

**_Acquisition Epoch_**: One run of an experiment workflow. When an experiment workflow restarts, a new epoch starts.

E.g. `ceph/aeon/aeon/data/raw/AEON3/social0.1/2023-12-03T13-05-15` is an acquisition epoch in the Social0.1 experiment. Because the next epoch directory is `ceph/aeon/aeon/data/raw/AEON3/social0.1/2023-12-03T13-30-30`, we know this first epoch lasted only ~25 minutes.

**_Stream_**: Data that comes from a single source.

A single data file is associated with each stream, so often 'stream' and 'file' can be interchanged. If the stream comes from a harp device, the stream-file contains information about the register of the harp device which generated the stream, as well as the associated chunk datetime.

For a harp stream, the filename format is as follows:<br>
`<device><id>_<harp_register>_<datetime>` e.g. `Patch1_90_2023-12-02T12-00-00.bin`<br>
By convention, harp streams which are acquired in software start with register number '200'; e.g. the largest-blob-centroid-tracking stream filename is: `CameraTop_200*.bin`

Each stream can contain single or multi-dimensional data (e.g. a patch wheel magnetic encoder stream contains information about both the magnetic field strength and angle: however, each dimension is associated with a unique bitmask, and thus can be isolated by applying this bitmask to the stream).

**_Reader_**: A Python class whose instantiated objects each read one particular stream. Simple working principle: each `Reader` has a `read` method which takes in a single stream-file and reads the data in that file into a pandas `DataFrame` (see `aeon/io/reader.py` and `aeon/schema/*.py`).

e.g. `Encoder` readers read values from `Patch<patch_id>_<encoder_register>_<datetime>` files (these contain a patch wheel's magnetic encoder readings, to determine how much the wheel has been spun).

Whenever a new device is implemented in an Aeon experiment, a new `Reader` should be created for the acquired data, such that the data can be read and returned in the form of a pandas `DataFrame`.

**_Device_**: A collection of streams grouped together for convenience, often for related streams.

On ceph, we organize streams into device folders:<br> e.g. `ceph/aeon/aeon/data/raw/AEON3/social0.1/2023-12-01T14-30-34/Patch1` contains the patch-heartbeat stream (`Patch1_8`), the patch-beambreak stream (`Patch1_32`), the patch-pellet delivery-pin-set stream (`Patch1_35`), the patch-pellet-delivery-pin-cleared stream (`Patch1_36`), the patch-wheel-magnetic-encoder stream (`Patch1_90`), the patch-wheel-magnetic-encoder-mode stream (`Patch1_91`), the patch-feeder-dispenser-state stream (`Patch1_200`), the patch-pellet-manual-delivery stream (`Patch1_201`), the patch-missed-pellet-stream (`Patch1_202`), the patch-pellet-delivery-retry stream (`Patch1_203`), and the patch-state stream (`Patch1_State`).

In code, we create logical devices via the `Device` class (see `aeon/io/device.py`)<br>
e.g. We often define 'Patch' devices that contain `Reader` objects associated with specific streams (as experimenters may not care about analyzing all streams in a `Patch` device folder on ceph), e.g. wheel-magnetic-encoder, state, pellet-delivery-pin-set, and beambreak.

**_Schema_**: A  list of devices grouped within a `DotMap` object (see `aeon/docs/examples/schemas.py`). Each experiment is associated with a schema. If a schema changes, then the experiment neccesarily must be different (either in name or version number), as the acquired data is now different.

**_Dataset_**: All data belonging to a particular experiment. 

e.g. All data in `ceph/aeon/aeon/data/raw/AEON3/social0.1`

#### Code

With this terminology in mind, let's get to the code!

In [1]:
"""Imports"""

%load_ext autoreload
%autoreload 2
# %flow mode reactive

from datetime import date
import ipdb
from itertools import product
from pathlib import Path

from dotmap import DotMap
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import plotly.express as px
import plotly.graph_objs as go
import seaborn as sns

import aeon
import aeon.io.binder.core as stream
from aeon.io import api
from aeon.io import reader
from aeon.io.device import Device
from aeon.io.binder.schemas import exp02, exp01
from aeon.analysis.utils import visits, distancetravelled

In [2]:
"""Set experiment root path and time range / set to load data"""

root = Path("/ceph/aeon/aeon/data/raw/AEON3/social0.1")
start_time = pd.Timestamp("2023-12-02 10:30:00")
end_time = pd.Timestamp("2023-12-02 12:30:00")
time_set = pd.concat(
    [
        pd.Series(pd.date_range(start_time, start_time + pd.Timedelta(hours=1), freq="1s")),
        pd.Series(pd.date_range(end_time, end_time + pd.Timedelta(hours=1), freq="1s"))
    ]
)



In [3]:
"""Creating a new `Reader` class"""

# All readers are subclassed from the base `Reader` class. They thus all contain a `read` method,
# for returning data from a file in the form of a pandas DataFrame, and the following attributes, 
# which must be specified on object construction:
# `pattern`: a prefix in the filename used by `aeon.io.api.load` to find matching files to load
# `columns`: a list of column names for the returned DataFrame
# `extension`: the file extension of the files to be read

# Using these principles, we can recreate a simple reader for reading subject weight data from the 
# social0.1 experiments, which are saved in .csv format.

# First, we'll create a general Csv reader, subclassed from `Reader`.
class Csv(reader.Reader):
    """Reads data from csv text files, where the first column stores the Aeon timestamp, in seconds."""

    def __init__(self, pattern, columns, extension="csv"):
        super().__init__(pattern, columns, extension)

    def read(self, file):
        return pd.read_csv(file, header=0, names=self.columns, index_col=0)
    
# Next, we'll create a reader for the subject weight data, subclassed from `Csv`.

# We know from our data that the files of interest start with 'Environment_SubjectWeight' and columns are: 
# 1) Aeon timestamp in seconds from 1904/01/01 (1904 date system)
# 2) Weight in grams
# 3) Weight stability confidence (0-1)
# 4) Subject ID (string)
# 5) Subject ID (integer)
# Since the first column (Aeon timestamp) will be set as the index, we'll use the rest as DataFrame columns.
# And we don't need to define `read`, as it will use the `Csv` class's `read` method.

class Subject_Weight(Csv):
    """Reads subject weight data from csv text files."""
    
    def __init__(
        self, 
        pattern="Environment_SubjectWeight*",
        columns=["weight", "confidence", "subject_id", "int_id"], 
        extension="csv"
    ):
        super().__init__(pattern, columns, extension)


In [4]:
"""Loading data via a `Reader` object"""

# We can now load data by specifying a file 
subject_weight_reader = Subject_Weight()
acq_epoch = "2023-12-01T14-30-34"
weight_file = root / acq_epoch / "Environment/Environment_SubjectWeight_2023-12-02T12-00-00.csv"
display(subject_weight_reader.read(weight_file))

# And we can use `load` to load data across many same-stream files given a time range or time set.
display(aeon.load(root, subject_weight_reader, start=start_time, end=end_time))
display(aeon.load(root, subject_weight_reader, time=time_set.values))

Unnamed: 0,weight,confidence,subject_id,int_id
3.784363e+09,29.799999,1,CAA-1120746,0
3.784363e+09,29.799999,1,CAA-1120746,0
3.784363e+09,29.799999,1,CAA-1120746,0
3.784363e+09,29.799999,1,CAA-1120746,0
3.784363e+09,29.799999,1,CAA-1120746,0
...,...,...,...,...
3.784367e+09,31.200001,1,CAA-1120747,0
3.784367e+09,31.200001,1,CAA-1120747,0
3.784367e+09,31.200001,1,CAA-1120747,0
3.784367e+09,31.200001,1,CAA-1120747,0


Unnamed: 0_level_0,weight,confidence,subject_id,int_id
time,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
2023-12-02 10:31:12.840000153,29.5,1,CAA-1120746,0
2023-12-02 10:31:12.940000057,29.5,1,CAA-1120746,0
2023-12-02 10:31:13.039999962,29.5,1,CAA-1120746,0
2023-12-02 10:31:13.099999905,29.5,1,CAA-1120746,0
2023-12-02 10:31:13.199999809,29.5,1,CAA-1120746,0
...,...,...,...,...
2023-12-02 12:27:29.460000038,31.1,1,CAA-1120747,0
2023-12-02 12:27:29.559999943,31.1,1,CAA-1120747,0
2023-12-02 12:27:29.619999886,31.1,1,CAA-1120747,0
2023-12-02 12:27:29.719999790,31.1,1,CAA-1120747,1


Unnamed: 0,weight,confidence,subject_id,int_id
2023-12-02 10:30:00,30.000000,1.0,CAA-1120747,0.0
2023-12-02 10:30:01,30.000000,1.0,CAA-1120747,0.0
2023-12-02 10:30:02,30.000000,1.0,CAA-1120747,0.0
2023-12-02 10:30:03,30.000000,1.0,CAA-1120747,0.0
2023-12-02 10:30:04,30.000000,1.0,CAA-1120747,0.0
...,...,...,...,...
2023-12-02 13:18:25,29.799999,1.0,CAA-1120746,0.0
2023-12-02 13:18:26,29.799999,1.0,CAA-1120746,0.0
2023-12-02 13:22:05,29.900000,1.0,CAA-1120746,0.0
2023-12-02 13:22:14,29.799999,1.0,CAA-1120746,0.0


In [5]:
"""Updating a `Reader` object"""

# Occasionally, we may want to tweak the output from a `Reader` object's `read` method, or some tweaks to 
# streams on the acquisition side may require us to make corresponding  tweaks to a `Reader` object to
# ensure it works properly. We'll cover some of these cases here.

# 1. Column changes

# First, if we want to simply change the output from `read`, we can change the columns of an instantiated
# `Reader` object. Let's change `subject_id` to `id`, and after reading, drop the `confidence` and `int_id`
# columns.
subject_weight_reader.columns = ["weight", "confidence", "id", "int_id"]
data = subject_weight_reader.read(weight_file)
data.drop(["confidence", "int_id"], axis=1, inplace=True)
display(data)


# 2. Pattern changes

# Next, occasionally a stream's filename may change, in which case we'll need to update the `Reader` 
# object's `pattern` to find the new files using `load`: 

# Let's simulate a case where the old SubjectWeight stream was called Weight, and create a `Reader` class.
class Subject_Weight(Csv):
    """Reads subject weight data from csv text files."""
    
    def __init__(
        self, 
        pattern="Environment_Weight*",
        columns=["weight", "confidence", "subject_id", "int_id"], 
        extension="csv"
    ):
        super().__init__(pattern, columns, extension)

# We'll see that we can't find any files with this pattern.
subject_weight_reader = Subject_Weight()
data = aeon.load(root, subject_weight_reader, start=start_time, end=end_time)
display(data)  # empty dataframe

# But if we just update the pattern, `load` will find the files.
subject_weight_reader.pattern = "Environment_SubjectWeight*"
data = aeon.load(root, subject_weight_reader, start=start_time, end=end_time)
display(data) 


# 3. Bitmask changes for Harp streams

# Lastly, some Harp streams use bitmasks to distinguish writing out different events to the same file.
# e.g. The beambreak stream `Patch<id>_32*` writes out events both for when the beam is broken and when
# it gets reset. Given a Harp stream, we can find all bitmasks associated with it, and choose which one
# to use to filter the data:

# Given a stream, we can create a `Harp` reader object to find all bitmasks associated with it.
pattern = "Patch1_32*"
event_name = "beambreak"
harp_reader = reader.Harp(pattern=pattern, columns=[event_name])
data = api.load(root, harp_reader, start=start_time, end=end_time)
bitmasks = np.unique(data[event_name].values)
print(f"bitmasks: {bitmasks}")

# Let's set the bitmasks to the first returned unique value, and create a new `Reader` object to use this.
bitmask = bitmasks[0]
beambreak_reader = reader.BitmaskEvent(pattern, bitmask, event_name)
bitmasked_data = api.load(root, beambreak_reader, start=start_time, end=end_time)

print(f"raw data:\n {data.head()}\n\n")
print(f"bitmasked data:\n {bitmasked_data.head()}")

Unnamed: 0,weight,id
3.784363e+09,29.799999,CAA-1120746
3.784363e+09,29.799999,CAA-1120746
3.784363e+09,29.799999,CAA-1120746
3.784363e+09,29.799999,CAA-1120746
3.784363e+09,29.799999,CAA-1120746
...,...,...
3.784367e+09,31.200001,CAA-1120747
3.784367e+09,31.200001,CAA-1120747
3.784367e+09,31.200001,CAA-1120747
3.784367e+09,31.200001,CAA-1120747


Unnamed: 0_level_0,weight,confidence,subject_id,int_id
time,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1


Unnamed: 0_level_0,weight,confidence,subject_id,int_id
time,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
2023-12-02 10:31:12.840000153,29.5,1,CAA-1120746,0
2023-12-02 10:31:12.940000057,29.5,1,CAA-1120746,0
2023-12-02 10:31:13.039999962,29.5,1,CAA-1120746,0
2023-12-02 10:31:13.099999905,29.5,1,CAA-1120746,0
2023-12-02 10:31:13.199999809,29.5,1,CAA-1120746,0
...,...,...,...,...
2023-12-02 12:27:29.460000038,31.1,1,CAA-1120747,0
2023-12-02 12:27:29.559999943,31.1,1,CAA-1120747,0
2023-12-02 12:27:29.619999886,31.1,1,CAA-1120747,0
2023-12-02 12:27:29.719999790,31.1,1,CAA-1120747,1


bitmasks: [32 34]
raw data:
                                beambreak
time                                    
2023-12-02 11:33:03.942463875         34
2023-12-02 11:33:03.951744080         32
2023-12-02 11:33:18.500351906         34
2023-12-02 11:33:18.503456115         32
2023-12-02 11:33:18.509632111         34


bitmasked data:
                                    event
time                                    
2023-12-02 11:33:03.942463875  beambreak
2023-12-02 11:33:03.951744080  beambreak
2023-12-02 11:33:18.500351906  beambreak
2023-12-02 11:33:18.503456115  beambreak
2023-12-02 11:33:18.509632111  beambreak


In [6]:
"""Instantiating a `Device` object"""

# A `Device` object is instantiated from a name, followed by one or more 'binder functions', which 
# return a dictionary of a name paired with a `Reader` object. We call such a dictionary of `<name>:Reader`
# key-value pairs a 'registry'. Each binder function must take in a `pattern` argument, which is used to
# set the pattern of the `Reader` object it returns.
def subject_weight_binder(pattern):  # an example subject weight binder function
    return {"subject_weight": Subject_Weight(pattern=pattern)}

def subject_state_binder():  # an example subject state binder function
    return {"subject_state": reader.Subject(pattern=pattern)}

d = Device("SubjectMetadata", subject_weight_binder, subject_state_binder)


# On creation, the `Device` object puts all registries into a single registry, which is  accessible via the
# `registry` attribute.


# This is done so that we can create a 'schema' (a DotMap of a list of `Device` objects), where a `Device`
# object name is a key for the schema, and the `registry` names' (which are keys for the `Device` object) 
# corresponding values are the `Reader` objects associated with that `Device` object.
# This works because, when a list of `Device` objects are passed into the `DotMap` constructor, the
# `__iter__` method of the `Device` object returns a tuple of the  object's name with its  `stream` 
# attribute, which is passed in directly to the DotMap constructor to create a nested DotMap:
# device_name -> stream_name -> stream `Reader` object.

# d = Device("ExperimentalMetadata", stream.environment, stream.messageLog)
# print(d.registry)
# DotMap(d.registry)
# DotMap([d])
# s = DotMap([d])

TypeError: subject_weight_binder() takes 0 positional arguments but 1 was given

#### Social0.1 Data Streams

Now that we've covered streams, devices, and schemas, let's build a schema for the Social0.1 Experiment!

First we'll need to know all the streams we recorded during the Social0.1 experiment: these can be found via
looking through all devices in an acqusition epoch 
(e.g. `ceph/aeon/aeon/data/raw/AEON3/social0.1/2023-12-01T14-30-34`)

And here they are: (*note: register 8 is always the harp heartbeat for any device that has this stream.*)

- Metadata.yml
- Environment
  - BlockState
  - EnvironmentState
  - LightEvents
  - MessageLog
  - SubjectState
  - SubjectVisits
  - SubjectWeight
- CameraTop (200, 201, avi, csv, <model_path>,)
    - 200: position
    - 201: region
- CameraNorth (avi, csv)
- CameraEast (avi, csv)
- CameraSouth (avi, csv)
- CameraWest (avi, csv)
- CameraPatch1 (avi, csv)
- CameraPatch2 (avi, csv)
- CameraPatch3 (avi, csv)
- CameraNest (avi, csv)
- ClockSynchronizer (8, 36)
    - 36: hearbeat_out
- Nest (200, 201, 202, 203)
    - 200: weight_raw
    - 201: weight_tare
    - 202: weight_filtered
    - 203: weight_baseline
    - 204: weight_subject
- Patch1 (8, 32, 35, 36, 87, 90, 91, 200, 201, 202, 203, State)
    - 32: beam_break
    - 35: delivery_set
    - 36: delivery_clear
    - 87: expansion_board
    - 90: encoder_read
    - 91: encoder_mode
    - 200: dispenser_state
    - 201: delivery_manual
    - 202: missed_pellet
    - 203: delivery_retry
- Patch2 (8, 32, 35, 36, 87, 90, 91, State)
- Patch3 (8, 32, 35, 36, 87, 90, 91, 200, 203, State)
- RfidEventsGate (8, 32, 35)
    - 32: entry_id
    - 35: hardware_notifications
- RfidEventsNest1 (8, 32, 35)
- RfidEventsNest2 (8, 32, 35)
- RfidEventsPatch1 (8, 32, 35)
- RfidEventsPatch2 (8, 32, 35)
- RfidEventsPatch3 (8, 32, 35)
- VideoController (8, 32, 33, 34, 35, 36, 45, 52)

In [None]:
"""Creating the Social 0.1 schema"""

# Above we've listed out all the streams we recorded from during Social0.1, but we won't care to analyze all of them.
# Instead, we'll create a schema that only contains the streams we want to analyze:

# Metadata

# Environment



In [None]:
DotMap(
    [
        Device("Metadata", stream.metadata),
        Device("ExperimentalMetadata", stream.environment, stream.messageLog),
        Device("CameraEast", stream.video),
        Device("CameraNest", stream.video),
        Device("CameraNorth", stream.video),
        Device("CameraPatch1", stream.video),
        Device("CameraPatch2", stream.video),
        Device("CameraSouth", stream.video),
        Device("CameraWest", stream.video),
    ]
)

In [None]:
d = Device("ExperimentalMetadata", stream.environment, stream.messageLog)
print(d.registry)
DotMap(d.registry)
DotMap([d])
s = DotMap([d])
s.ExperimentalMetadata.EnvironmentState.pattern

In [None]:
"""Metadata"""

data = api.load(root, reader.Metadata(), start=start, end=end)
data.metadata.iloc[0]  # get device metadata dotmap

In [None]:
d = Device("ExperimentalMetadata", stream.environment, stream.messageLog)

In [None]:
Device("Metadata", stream.metadata).stream

In [None]:
d = Device("test", reader.Video, reader.Metadata)

In [None]:
d.name

In [None]:
exp02.ExperimentalMetadata

In [None]:
exp02.Metadata

In [None]:
exp02.ExperimentalMetadata.keys()

In [None]:
api.load(root, exp02.ExperimentalMetadata., start=start, end=end)

In [None]:
"""Environment"""


In [None]:
"""CameraTop"""

In [None]:
"""Top quadrant and zoomed in patch cameras"""

In [None]:
"""Nest"""

In [None]:
"""Patches"""

In [None]:
"""Rfids"""