Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions robot_log_visualizer/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import os

# Prefer the PySide6 backend when QtPy resolves the Qt binding.
os.environ.setdefault("QT_API", "pyside2")
os.environ.setdefault("QT_API", "pyqt5")
268 changes: 206 additions & 62 deletions robot_log_visualizer/signal_provider/realtime_signal_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,10 @@
import numpy as np

from robot_log_visualizer.signal_provider.signal_provider import (
ProviderType,
SignalProvider,
)
ProviderType, SignalProvider)
from robot_log_visualizer.utils.utils import PeriodicThreadState

ROBOT_REALTIME_KEY = "robot_realtime"

def are_deps_installed():
try:
Expand Down Expand Up @@ -115,16 +114,16 @@ def __init__(self, period: float, signal_root_name: str):
# Track signals to buffer
self.buffered_signals = set()
# Always include joints_state
self.buffered_signals.add("robot_realtime::joints_state::positions")

# TODO: implement a logic to remove signals that are not needed anymore
self.buffered_signals.add(
f"{ROBOT_REALTIME_KEY}::joints_state::positions"
) # TODO: implement a logic to remove signals that are not needed anymore
def add_signals_to_buffer(self, signals: Union[str, Iterable[str]]):
"""Add signals to the buffer set."""
if isinstance(signals, str):
signals = {signals}
self.buffered_signals.update(signals)
# Always include joints_state
self.buffered_signals.add("robot_realtime::joints_state::positions")
self.buffered_signals.add(f"{ROBOT_REALTIME_KEY}::joints_state::positions")

def __len__(self):
"""
Expand All @@ -145,6 +144,9 @@ def _update_data_buffer(
Any sample older than the fixed time window is removed.
"""

if not keys:
return

if keys[0] not in raw_data:
raw_data[keys[0]] = DequeToNumpyLeaf()

Expand All @@ -171,26 +173,78 @@ def _update_data_buffer(
def _populate_realtime_logger_metadata(self, raw_data: dict, keys: list, value):
"""
Recursively populate metadata into raw_data.
Here we simply store metadata (e.g. elements names) into a list.

- Creates only missing nested nodes.
- At a leaf: initialize buffers if missing and merge elements_names
(do not overwrite existing elements_names).
- Returns True if the call created or extended metadata for the given path,
False otherwise.
"""

if not isinstance(keys, list) or not keys:
raise ValueError(
f"Invalid keys parameter: {keys}. Expected a non-empty list."
)
if not all(isinstance(k, str) for k in keys):
raise ValueError(
f"Invalid keys elements: {keys}. All elements must be strings."
)

if not isinstance(raw_data, (dict, DequeToNumpyLeaf)):
raise ValueError(
f"Invalid raw_data parameter: {raw_data}. Expected a dictionary-like object."
)

if not isinstance(value, (list, tuple, str, int, float)):
raise ValueError(
f"Invalid value parameter: {value}. Expected a list, tuple, or scalar."
)

if keys[0] == "timestamps":
return
return False

# ensure node exists
if keys[0] not in raw_data:
raw_data[keys[0]] = DequeToNumpyLeaf()
created = True
else:
created = False

if len(keys) == 1:
# leaf
if not value:
if keys[0] in raw_data:
del raw_data[keys[0]]
return
if "elements_names" not in raw_data[keys[0]]:
raw_data[keys[0]]["elements_names"] = []
# Also create empty buffers (which will later be updated in run())
raw_data[keys[0]]["data"] = deque()
raw_data[keys[0]]["timestamps"] = deque()
# do not delete existing node on empty value; just no-op
return created

node = raw_data[keys[0]]

# initialize leaf buffers if missing
if "elements_names" not in node:
node["elements_names"] = (
list(value) if isinstance(value, (list, tuple)) else value
)
node["data"] = deque()
node["timestamps"] = deque()
return True

raw_data[keys[0]]["elements_names"] = value
# merge element names (append only new entries)
if isinstance(node["elements_names"], list) and isinstance(
value, (list, tuple)
):
added = False
for v in value:
if v not in node["elements_names"]:
node["elements_names"].append(v)
added = True
return added or created

# fallback: if elements_names is not a list, don't overwrite
return created
else:
self._populate_realtime_logger_metadata(raw_data[keys[0]], keys[1:], value)
# recurse into the subtree
return self._populate_realtime_logger_metadata(
raw_data[keys[0]], keys[1:], value
)

def open(self, source: str) -> bool:
"""
Expand Down Expand Up @@ -224,10 +278,12 @@ def open(self, source: str) -> bool:
return False

self.realtime_network_init = True
self.joints_name = self.rt_metadata_dict["robot_realtime::description_list"]
self.robot_name = self.rt_metadata_dict["robot_realtime::yarp_robot_name"][
0
self.joints_name = self.rt_metadata_dict[
f"{ROBOT_REALTIME_KEY}::description_list"
]
self.robot_name = self.rt_metadata_dict[
f"{ROBOT_REALTIME_KEY}::yarp_robot_name"
][0]

# Populate metadata into self.data recursively.
for key_string, value in self.rt_metadata_dict.items():
Expand All @@ -254,6 +310,75 @@ def index(self):
finally:
self.index_lock.unlock()

def check_for_new_metadata(self) -> bool:
"""
Check if new metadata is available using the client's streaming data flag.
This avoids expensive RPC calls.

Returns:
bool: True if new metadata is available, False otherwise.
"""
client = self.vector_collections_client
if client is None:
return False

try:
return client.is_new_metadata_available()
except Exception as exc:
print(f"Error checking for new metadata: {exc}")
return False

def update_metadata(self):
"""
Refresh the metadata from the remote realtime logger.
New metadata items are added to self.rt_metadata_dict and
the corresponding data buffers are created in self.data.

Returns:
dict: New metadata items added, or None if no new items.
"""

client = self.vector_collections_client
if client is None:
print("Refresh metadata: realtime client unavailable.")
return

try:
updated_md = client.get_metadata().vectors
except Exception as exc:
print(f"Error fetching metadata: {exc}")
return

existing_md = self.rt_metadata_dict or {}
new_items = {k: v for k, v in updated_md.items() if k not in existing_md}

if not new_items:
return

existing_md.update(new_items)
self.rt_metadata_dict = existing_md

desc_key = f"{ROBOT_REALTIME_KEY}::description_list"
yarp_name_key = f"{ROBOT_REALTIME_KEY}::yarp_robot_name"
if desc_key in new_items:
self.joints_name = existing_md[desc_key]
if yarp_name_key in new_items:
names = existing_md.get(yarp_name_key, [])
if names:
self.robot_name = names[0]

# Populate metadata into self.data recursively.
for key_string, value in self.rt_metadata_dict.items():
keys = key_string.split("::")
self._populate_realtime_logger_metadata(self.data, keys, value)

# Remove keys that are not needed for the realtime plotting.
if self.root_name in self.data:
self.data[self.root_name].pop("description_list", None)
self.data[self.root_name].pop("yarp_robot_name", None)

return new_items

def get_item_from_path_at_index(self, path, index, default_path=None, neighbor=0):
"""
Get the latest data item from the given path at the latest index.
Expand Down Expand Up @@ -288,49 +413,68 @@ def run(self):
"""
while True:
start = time.time()
if self.state == PeriodicThreadState.closed:
return

if self.state == PeriodicThreadState.running:
# Read the latest data from the realtime logger.
vc_input = self.vector_collections_client.read_data(True).vectors
new_samples_read = False

while True:
try:
packet = self.vector_collections_client.read_data(False)
except Exception as exc: # noqa: BLE001 - surface runtime issues
print(f"Error reading realtime data: {exc}")
break

if packet is None:
break

vc_input = getattr(packet, "vectors", None)
if not vc_input:
break

timestamps_key = f"{ROBOT_REALTIME_KEY}::timestamps"
if timestamps_key not in vc_input or not vc_input[timestamps_key]:
continue

recent_timestamp = vc_input[timestamps_key][0]

if vc_input:
self.index_lock.lock()
# Retrieve the most recent timestamp from the input.
recent_timestamp = vc_input["robot_realtime::timestamps"][0]
self._timestamps.append(recent_timestamp)
# Keep the global timestamps within the fixed plot window.
while self._timestamps and (
recent_timestamp - self._timestamps[0]
> self.realtime_fixed_plot_window
):
self._timestamps.popleft()

# Update initial and end times.
if self._timestamps:
self.initial_time = self._timestamps[0]
self.end_time = self._timestamps[-1]

# For signal selected from the user that is in the received data (except timestamps),
# update the appropriate buffer.
for key_string, value in vc_input.items():
if key_string == "robot_realtime::timestamps":
continue

# Check if any selected signal starts with this path
match = any(
sel.startswith(key_string) for sel in self.buffered_signals
)
if not match:
continue

keys = key_string.split("::")
self._update_data_buffer(
self.data, keys, value, recent_timestamp
)

self.index_lock.unlock()

# Signal that new data are available.
self.update_index_signal.emit()
try:
self._timestamps.append(recent_timestamp)

while self._timestamps and (
recent_timestamp - self._timestamps[0]
> self.realtime_fixed_plot_window
):
self._timestamps.popleft()

if self._timestamps:
self.initial_time = self._timestamps[0]
self.end_time = self._timestamps[-1]

for key_string, value in vc_input.items():
if key_string == timestamps_key:
continue

match = any(
sel.startswith(key_string)
for sel in self.buffered_signals
)
if not match:
continue

keys = key_string.split("::")
self._update_data_buffer(
self.data, keys, value, recent_timestamp
)
finally:
self.index_lock.unlock()

new_samples_read = True

if new_samples_read:
self.update_index_signal.emit()

# Sleep until the next period.
sleep_time = self.period - (time.time() - start)
Expand Down
Loading