Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,3 +102,11 @@ print(first.list_sensors())
### Audio utilities
- `play_audio(sampling_rate=48000)`: play stereo microphone data in a Jupyter environment.
- `save_audio(path, sampling_rate=48000)`: export microphone audio to WAV.
- `get_audio_dataframe(sampling_rate=48000)`: return microphone PCM as a timestamp-indexed DataFrame (`mic.inner`, `mic.outer`).

Example:

```python
audio_df = recording.get_audio_dataframe()
print(audio_df.head())
```
190 changes: 95 additions & 95 deletions src/open_earable_python/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
"ppg": ["ppg.red", "ppg.ir", "ppg.green", "ppg.ambient"],
"bone_acc": ["bone_acc.x", "bone_acc.y", "bone_acc.z"],
"optical_temp": ["optical_temp"],
"microphone": ["mic.inner", "mic.outer"],
}

COLORS: Dict[str, List[str]] = {
Expand Down Expand Up @@ -131,107 +132,43 @@ class SensorDataset:
def __init__(self, filename: str, verbose: bool = False):
self.filename = filename
self.verbose = verbose
self.parse_result: Dict[int, List] = defaultdict(list)
self.parse_result: parser.ParseResult = parser.ParseResult(
sensor_dfs={},
mic_samples=[],
)
# Per-SID dataframes built in _build_accessors
self.sensor_dfs: Dict[int, pd.DataFrame] = {}
self.audio_stereo: Optional[np.ndarray] = None
self.audio_df: pd.DataFrame = pd.DataFrame()
self._audio_df_sampling_rate: Optional[int] = None
self.bone_sound: Optional[np.ndarray] = None
self.df: pd.DataFrame = pd.DataFrame()

self.imu = _SensorAccessor(pd.DataFrame(columns=LABELS["imu"]), LABELS["imu"])
self.barometer = _SensorAccessor(pd.DataFrame(columns=LABELS["barometer"]), LABELS["barometer"])
self.ppg = _SensorAccessor(pd.DataFrame(columns=LABELS["ppg"]), LABELS["ppg"])
self.bone_acc = _SensorAccessor(pd.DataFrame(columns=LABELS["bone_acc"]), LABELS["bone_acc"])
self.optical_temp = _SensorAccessor(pd.DataFrame(columns=LABELS["optical_temp"]), LABELS["optical_temp"])

self.parser: parser.Parser = parser.Parser({
self.SENSOR_SID["imu"]: parser.SchemePayloadParser(scheme.SensorScheme(
name='imu',
sid=self.SENSOR_SID["imu"],
groups=[
scheme.SensorComponentGroupScheme(
name='acc',
components=[
scheme.SensorComponentScheme('x', scheme.ParseType.FLOAT),
scheme.SensorComponentScheme('y', scheme.ParseType.FLOAT),
scheme.SensorComponentScheme('z', scheme.ParseType.FLOAT),
]
),
scheme.SensorComponentGroupScheme(
name='gyro',
components=[
scheme.SensorComponentScheme('x', scheme.ParseType.FLOAT),
scheme.SensorComponentScheme('y', scheme.ParseType.FLOAT),
scheme.SensorComponentScheme('z', scheme.ParseType.FLOAT),
]
),
scheme.SensorComponentGroupScheme(
name='mag',
components=[
scheme.SensorComponentScheme('x', scheme.ParseType.FLOAT),
scheme.SensorComponentScheme('y', scheme.ParseType.FLOAT),
scheme.SensorComponentScheme('z', scheme.ParseType.FLOAT),
]
),
])),
self.SENSOR_SID["barometer"]: parser.SchemePayloadParser(scheme.SensorScheme(
name='barometer',
sid=self.SENSOR_SID["barometer"],
groups=[
scheme.SensorComponentGroupScheme(
name='barometer',
components=[
scheme.SensorComponentScheme('temperature', scheme.ParseType.FLOAT),
scheme.SensorComponentScheme('pressure', scheme.ParseType.FLOAT),
]
),
])),
self.SENSOR_SID["ppg"]: parser.SchemePayloadParser(scheme.SensorScheme(
name='ppg',
sid=self.SENSOR_SID["ppg"],
groups=[
scheme.SensorComponentGroupScheme(
name='ppg',
components=[
scheme.SensorComponentScheme('red', scheme.ParseType.UINT32),
scheme.SensorComponentScheme('ir', scheme.ParseType.UINT32),
scheme.SensorComponentScheme('green', scheme.ParseType.UINT32),
scheme.SensorComponentScheme('ambient', scheme.ParseType.UINT32),
]
),
])),
self.SENSOR_SID["optical_temp"]: parser.SchemePayloadParser(scheme.SensorScheme(
name='optical_temp',
sid=self.SENSOR_SID["optical_temp"],
groups=[
scheme.SensorComponentGroupScheme(
name='optical_temp',
components=[
scheme.SensorComponentScheme('optical_temp', scheme.ParseType.FLOAT),
]
),
])),
self.SENSOR_SID["bone_acc"]: parser.SchemePayloadParser(scheme.SensorScheme(
name='bone_acc',
sid=self.SENSOR_SID["bone_acc"],
groups=[
scheme.SensorComponentGroupScheme(
name='bone_acc',
components=[
scheme.SensorComponentScheme('x', scheme.ParseType.INT16),
scheme.SensorComponentScheme('y', scheme.ParseType.INT16),
scheme.SensorComponentScheme('z', scheme.ParseType.INT16),
]
),
])),
self.SENSOR_SID["microphone"]: parser.MicPayloadParser(
sample_count=48000,
),
}, verbose=verbose)
for sensor_name, labels in LABELS.items():
setattr(
self,
sensor_name,
_SensorAccessor(pd.DataFrame(columns=labels), labels),
)

self.parser: parser.Parser = self._build_parser(verbose=verbose)

self.parse()
self._build_accessors()

@classmethod
def _build_parser(cls, verbose: bool = False) -> parser.Parser:
sensor_schemes = scheme.build_default_sensor_schemes(cls.SENSOR_SID)
dataset_parser = parser.Parser.from_sensor_schemes(
sensor_schemes=sensor_schemes,
verbose=verbose,
)
dataset_parser.parsers[cls.SENSOR_SID["microphone"]] = parser.MicPayloadParser(
sample_count=48000,
verbose=verbose,
)
return dataset_parser

def parse(self) -> None:
"""Parse the binary recording file into structured sensor data."""
with open(self.filename, "rb") as f:
Expand All @@ -245,10 +182,17 @@ def _build_accessors(self) -> None:
The combined DataFrame over all sensors is built lazily in
:meth:`get_dataframe`.
"""
self.audio_stereo = self.parse_result.audio_stereo
self.audio_df = pd.DataFrame()
self._audio_df_sampling_rate = None
self.sensor_dfs = {}

data_dict = self.parse_result.sensor_dfs
for name, sid in self.SENSOR_SID.items():
labels = LABELS.get(name, [f"val{i}" for i in range(0)])
if sid in data_dict and isinstance(data_dict[sid], pd.DataFrame):
labels = LABELS.get(name, [])
if name == "microphone":
df = self.get_audio_dataframe()
elif sid in data_dict and isinstance(data_dict[sid], pd.DataFrame):
df = data_dict[sid]
df = df[~df.index.duplicated(keep="first")]
else:
Expand All @@ -263,8 +207,6 @@ def _build_accessors(self) -> None:
# Clear combined dataframe; it will be built lazily on demand
self.df = pd.DataFrame()

self.audio_stereo = self.parse_result.audio_stereo

def list_sensors(self) -> List[str]:
"""Return a list of available sensor names in the dataset."""
available_sensors = []
Expand Down Expand Up @@ -330,6 +272,64 @@ def get_dataframe(self) -> pd.DataFrame:

return self.df

def get_audio_dataframe(self, sampling_rate: int = 48000) -> pd.DataFrame:
"""Return microphone audio as a timestamp-indexed stereo DataFrame.

The returned DataFrame has:
- index: ``timestamp`` in seconds
- columns: ``mic.inner`` and ``mic.outer`` (int16 PCM)
"""
if sampling_rate <= 0:
raise ValueError(f"sampling_rate must be > 0, got {sampling_rate}")

if self._audio_df_sampling_rate == sampling_rate:
return self.audio_df

mic_packets = getattr(self.parse_result, "mic_packets", [])
if not mic_packets:
self.audio_df = pd.DataFrame(columns=["mic.inner", "mic.outer"])
self.audio_df.index.name = "timestamp"
self._audio_df_sampling_rate = sampling_rate
return self.audio_df

timestamps: List[np.ndarray] = []
stereo_frames: List[np.ndarray] = []

for packet in mic_packets:
ts, stereo = parser.mic_packet_to_stereo_frames(
packet=packet,
sampling_rate=sampling_rate,
)
if stereo.size == 0:
continue
timestamps.append(ts)
stereo_frames.append(stereo)

if not timestamps:
self.audio_df = pd.DataFrame(columns=["mic.inner", "mic.outer"])
self.audio_df.index.name = "timestamp"
self._audio_df_sampling_rate = sampling_rate
return self.audio_df

all_ts = np.concatenate(timestamps)
all_stereo = np.vstack(stereo_frames)

self.audio_df = pd.DataFrame(
{
"mic.inner": all_stereo[:, 0],
"mic.outer": all_stereo[:, 1],
},
index=all_ts,
)
self.audio_df.index.name = "timestamp"
self.audio_df = self.audio_df[~self.audio_df.index.duplicated(keep="first")]
self._audio_df_sampling_rate = sampling_rate

if sampling_rate == 48000:
self.sensor_dfs[self.SENSOR_SID["microphone"]] = self.audio_df

return self.audio_df

def export_csv(self) -> None:
base_filename, _ = os.path.splitext(self.filename)
self.save_csv(base_filename + ".csv")
Expand Down
66 changes: 56 additions & 10 deletions src/open_earable_python/parser.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,24 @@
import struct
from open_earable_python.scheme import SensorScheme, ParseType
import pandas as pd
from typing import BinaryIO, Dict, List, Optional
from dataclasses import dataclass
from typing import BinaryIO, Dict, List, Optional, Tuple, TypedDict, Union
from dataclasses import dataclass, field
import numpy as np


def interleaved_mic_to_stereo(
samples: Union[np.ndarray, List[int], tuple[int, ...]],
) -> np.ndarray:
"""Convert interleaved [outer, inner, ...] int16 samples to [inner, outer] frames."""
interleaved = np.asarray(samples, dtype=np.int16)
if interleaved.size < 2:
return np.empty((0, 2), dtype=np.int16)

frame_count = interleaved.size // 2
interleaved = interleaved[: frame_count * 2]
return np.column_stack((interleaved[1::2], interleaved[0::2]))


class PayloadParser:
"""Abstract base class for payload parsers.

Expand Down Expand Up @@ -34,6 +48,11 @@ def should_build_df(self) -> bool:

# MARK: - ParseResult dataclass

class MicPacket(TypedDict):
timestamp: float
samples: tuple[int, ...]


@dataclass
class ParseResult:
"""Result of parsing a stream.
Expand All @@ -45,18 +64,35 @@ class ParseResult:

sensor_dfs: Dict[int, pd.DataFrame]
mic_samples: List[int]
mic_packets: List[MicPacket] = field(default_factory=list)
audio_stereo: Optional[np.ndarray] = None

@staticmethod
def mic_samples_to_stereo(mic_samples: List[int]) -> Optional[np.ndarray]:
if not mic_samples:
return None
mic_array = np.array(mic_samples, dtype=np.int16)
# If odd number of samples, drop the last one to ensure even pairing
if len(mic_array) % 2 != 0:
mic_array = mic_array[:-1]
# Original behavior: [inner, outer] = [odd, even]
return np.column_stack((mic_array[1::2], mic_array[0::2]))
stereo = interleaved_mic_to_stereo(mic_samples)
if stereo.size == 0:
return None
return stereo


def mic_packet_to_stereo_frames(
packet: MicPacket,
sampling_rate: int,
) -> Tuple[np.ndarray, np.ndarray]:
"""Return timestamps and stereo frames for a parsed microphone packet."""
if sampling_rate <= 0:
raise ValueError(f"sampling_rate must be > 0, got {sampling_rate}")

stereo = interleaved_mic_to_stereo(packet["samples"])
if stereo.size == 0:
return np.empty((0,), dtype=np.float64), stereo

timestamps = float(packet["timestamp"]) + (
np.arange(stereo.shape[0], dtype=np.float64) / sampling_rate
)
return timestamps, stereo

class Parser:
def __init__(self, parsers: dict[int, PayloadParser], verbose: bool = False):
Expand Down Expand Up @@ -115,6 +151,7 @@ def parse(
buffer = bytearray()
packet_idx = 0
mic_samples: List[int] = []
mic_packets: List[MicPacket] = []

def flush_to_dataframes() -> Dict[int, pd.DataFrame]:
result: Dict[int, pd.DataFrame] = {}
Expand Down Expand Up @@ -199,6 +236,10 @@ def flush_to_dataframes() -> Dict[int, pd.DataFrame]:
continue
# `samples` is a tuple of int16; extend global list
mic_samples.extend(list(samples))
mic_packets.append({
"timestamp": timestamp_s,
"samples": samples,
})
if self.verbose:
if isinstance(parser, MicPayloadParser):
print(
Expand Down Expand Up @@ -249,7 +290,12 @@ def flush_to_dataframes() -> Dict[int, pd.DataFrame]:

sensor_dfs = flush_to_dataframes()
audio_stereo = ParseResult.mic_samples_to_stereo(mic_samples)
return ParseResult(sensor_dfs=sensor_dfs, mic_samples=mic_samples, audio_stereo=audio_stereo)
return ParseResult(
sensor_dfs=sensor_dfs,
mic_samples=mic_samples,
mic_packets=mic_packets,
audio_stereo=audio_stereo,
)

def _parse_header(self, header: bytes) -> tuple[int, int, int]:
"""Parse a 10-byte packet header into (sid, size, time)."""
Expand Down Expand Up @@ -448,4 +494,4 @@ def parse_packet(self, data: bytes) -> dict:
group_data[component.name] = value
parsed_data[group.name] = group_data

return parsed_data
return parsed_data
Loading