From 8d05c89b72741c0eeb743d9e1c52cb36ae3502dc Mon Sep 17 00:00:00 2001 From: Andrew Lauer Date: Thu, 7 May 2026 12:19:16 -0700 Subject: [PATCH 1/8] Move to memory2 --- dimos/utils/cli/dtop.py | 60 ++++++++++++++++++++++++++++++----------- 1 file changed, 45 insertions(+), 15 deletions(-) diff --git a/dimos/utils/cli/dtop.py b/dimos/utils/cli/dtop.py index 7ca82d3690..64284c7ca7 100644 --- a/dimos/utils/cli/dtop.py +++ b/dimos/utils/cli/dtop.py @@ -21,7 +21,6 @@ from __future__ import annotations from collections import deque -import json import threading import time from typing import TYPE_CHECKING, Any @@ -153,6 +152,19 @@ def _cpu_metric(line: Text, cpu: float, stale: bool, cpu_hist: deque[float] | No _ALL_KEYS = {key for _, key, _ in _LINE1 + _LINE2} | set(_IO_KEYS) +LOGGED_METRICS = ( + "cpu_percent", "pss", "num_threads", "num_children", "num_fds", + "cpu_time_user", "cpu_time_system", "cpu_time_iowait", + "io_read_bytes", "io_write_bytes", +) + + +def _role_name(worker: dict[str, Any]) -> str: + modules = worker.get("modules") or [] + if modules: + return "_".join(str(m) for m in modules) + return f"worker_{worker.get('worker_id', 'unknown')}" + def _compute_ranges(data_dicts: list[dict[str, Any]]) -> dict[str, tuple[float, float]]: """(min, max) per metric across all processes (for relative coloring).""" @@ -192,16 +204,23 @@ class ResourceSpyApp(App[None]): BINDINGS = [("q", "quit"), ("ctrl+c", "quit")] def __init__( - self, topic_name: str = "/dimos/resource_stats", log_path: str | None = None + self, topic_name: str = "/dimos/resource_stats", db_path: str | None = None ) -> None: super().__init__() self._topic_name = topic_name - self._log_file = open(log_path, "a") if log_path else None # Warn about missing system config before entering TUI raw mode. from dimos.protocol.service.lcmservice import autoconf autoconf(check_only=True) + if db_path is not None: + from dimos.memory2.store.sqlite import SqliteStore + self._store: SqliteStore | None = SqliteStore(path=db_path) + self._store.start() + else: + self._store = None + self._mem_streams: dict[str, Any] = {} + self._lcm = PickleLCM() self._lcm.subscribe(Topic(self._topic_name), self._on_msg) self._lcm.start() @@ -220,16 +239,28 @@ def on_mount(self) -> None: async def on_unmount(self) -> None: self._lcm.stop() - if self._log_file: - self._log_file.close() + if self._store is not None: + self._store.stop() def _on_msg(self, msg: dict[str, Any], _topic: str) -> None: with self._lock: self._latest = msg self._last_msg_time = time.monotonic() - if self._log_file: - self._log_file.write(json.dumps({"ts": time.time(), **msg}) + "\n") - self._log_file.flush() + if self._store is not None: + ts = time.time() + self._log_role("coordinator", msg.get("coordinator") or {}, ts) + for worker in msg.get("workers") or []: + self._log_role(_role_name(worker), worker, ts) + + def _log_role(self, role: str, data: dict[str, Any], ts: float) -> None: + for metric in LOGGED_METRICS: + val = data.get(metric) + if val is None: + continue + name = f"{role}_{metric}" + if name not in self._mem_streams: + self._mem_streams[name] = self._store.stream(name, float) + self._mem_streams[name].append(float(val), ts=ts) def _refresh(self) -> None: with self._lock: @@ -509,17 +540,16 @@ def main() -> None: ) parser.add_argument( "--log", - nargs="?", - const=f"dtop_{time.strftime('%Y%m%d_%H%M%S')}.ignore.jsonl", - metavar="PATH", - help="Log stats to a JSONL file. Uses a timestamped filename if no path is given.", + action="store_true", + help="Log stats to a memory2 SQLite database (dtop_{timestamp}.ignore.db).", ) args = parser.parse_args() - if args.log: - print(f"Logging to {args.log}") + db_path = f"dtop_{time.strftime('%Y%m%d_%H%M%S')}.ignore.db" if args.log else None + if db_path: + print(f"Logging to {db_path}") - ResourceSpyApp(topic_name=args.topic, log_path=args.log).run() + ResourceSpyApp(topic_name=args.topic, db_path=db_path).run() if __name__ == "__main__": From 68f77a88f668d7bb368390405e42cde470d38f14 Mon Sep 17 00:00:00 2001 From: Andrew Lauer Date: Thu, 7 May 2026 12:33:28 -0700 Subject: [PATCH 2/8] Fix legend --- dimos/memory2/vis/plot/svg.py | 6 ++++-- dimos/utils/cli/dtop.py | 10 +++------- 2 files changed, 7 insertions(+), 9 deletions(-) diff --git a/dimos/memory2/vis/plot/svg.py b/dimos/memory2/vis/plot/svg.py index 0249f2bcc2..8ba3bc7d52 100644 --- a/dimos/memory2/vis/plot/svg.py +++ b/dimos/memory2/vis/plot/svg.py @@ -234,6 +234,8 @@ def mpl_color(c: Color, opacity: float) -> tuple[float, float, float, float]: facecolor="#1a1a2e", edgecolor="#2a2a4a", framealpha=0.9, + loc="center left", + bbox_to_anchor=(1.02, 0.5), ) ax.set_xlabel("time (s)") @@ -246,10 +248,10 @@ def mpl_color(c: Color, opacity: float) -> tuple[float, float, float, float]: n_twins = sum(1 for k in axes if k is not None) if n_twins >= 2: extras = n_twins - 1 - right_margin = max(0.6, 0.95 - twin_offset_step * extras) + right_margin = max(0.5, 0.75 - twin_offset_step * extras) fig.subplots_adjust(left=0.08, right=right_margin, top=0.95, bottom=0.18) else: - fig.tight_layout() + fig.subplots_adjust(left=0.08, right=0.75, top=0.95, bottom=0.18) buf = io.StringIO() fig.savefig(buf, format="svg") diff --git a/dimos/utils/cli/dtop.py b/dimos/utils/cli/dtop.py index 64284c7ca7..00dcff10e6 100644 --- a/dimos/utils/cli/dtop.py +++ b/dimos/utils/cli/dtop.py @@ -159,12 +159,6 @@ def _cpu_metric(line: Text, cpu: float, stale: bool, cpu_hist: deque[float] | No ) -def _role_name(worker: dict[str, Any]) -> str: - modules = worker.get("modules") or [] - if modules: - return "_".join(str(m) for m in modules) - return f"worker_{worker.get('worker_id', 'unknown')}" - def _compute_ranges(data_dicts: list[dict[str, Any]]) -> dict[str, tuple[float, float]]: """(min, max) per metric across all processes (for relative coloring).""" @@ -250,7 +244,9 @@ def _on_msg(self, msg: dict[str, Any], _topic: str) -> None: ts = time.time() self._log_role("coordinator", msg.get("coordinator") or {}, ts) for worker in msg.get("workers") or []: - self._log_role(_role_name(worker), worker, ts) + modules = worker.get("modules") or [] + if modules: + self._log_role("_".join(str(m) for m in modules), worker, ts) def _log_role(self, role: str, data: dict[str, Any], ts: float) -> None: for metric in LOGGED_METRICS: From 681db9620c2cad8771cc3d4dc76314972f08d7c7 Mon Sep 17 00:00:00 2001 From: Andrew Lauer Date: Thu, 7 May 2026 12:42:25 -0700 Subject: [PATCH 3/8] Delete dtop plot --- dimos/utils/cli/dtop_plot.py | 139 ----------------------------------- 1 file changed, 139 deletions(-) delete mode 100644 dimos/utils/cli/dtop_plot.py diff --git a/dimos/utils/cli/dtop_plot.py b/dimos/utils/cli/dtop_plot.py deleted file mode 100644 index 16f1d8f594..0000000000 --- a/dimos/utils/cli/dtop_plot.py +++ /dev/null @@ -1,139 +0,0 @@ -# Copyright 2026 Dimensional Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""dtop-plot — Plot resource stats from a dtop JSONL log file. - -Usage: - dtop-plot [--metrics cpu_percent,pss] [--out plot.png] -""" - -from __future__ import annotations - -from typing import TYPE_CHECKING - -if TYPE_CHECKING: - import pandas as pd - -_COORDINATOR = "coordinator" - -_METRIC_LABELS: dict[str, str] = { - "cpu_percent": "CPU %", - "pss": "PSS (MB)", - "num_threads": "Threads", - "num_children": "Children", - "num_fds": "File Descriptors", - "cpu_time_user": "User CPU Time (s)", - "cpu_time_system": "Sys CPU Time (s)", - "cpu_time_iowait": "IO Wait Time (s)", - "io_read_bytes": "IO Read (MB)", - "io_write_bytes": "IO Write (MB)", -} - -_SCALE: dict[str, float] = { - "pss": 1 / 1048576, - "io_read_bytes": 1 / 1048576, - "io_write_bytes": 1 / 1048576, -} - - -def _load(path: str) -> tuple[pd.DataFrame, dict[str, str]]: - import pandas as pd - - raw = pd.read_json(path, lines=True) - - rows = [] - for _, msg in raw.iterrows(): - ts = msg["ts"] - rows.append({"ts": ts, "role": _COORDINATOR, **msg[_COORDINATOR]}) - for w in msg.get("workers", []): - wid = w.get("worker_id", 0) - rows.append({"ts": ts, "role": f"worker_{wid}", **w}) - - df = pd.DataFrame(rows) - df["ts"] = pd.to_datetime(df["ts"], unit="s") - - labels: dict[str, str] = {_COORDINATOR: _COORDINATOR} - for role, group in df.groupby("role"): - role = str(role) - if role == _COORDINATOR: - continue - mods = next((m for m in group.get("modules", []) if m), None) - labels[role] = ", ".join(mods) if mods else role - - df["label"] = df["role"].map(labels) - return df, labels - - -def _plot( - df: pd.DataFrame, labels: dict[str, str], metrics: list[str], out: str, show: bool = False -) -> None: - import matplotlib.pyplot as plt - - fig, axes = plt.subplots(len(metrics), 1, figsize=(12, 3 * len(metrics)), sharex=True) - if len(metrics) == 1: - axes = [axes] - - for ax, metric in zip(axes, metrics, strict=False): - if metric not in df.columns: - ax.set_visible(False) - continue - scale = _SCALE.get(metric, 1.0) - for role, group in df.groupby("role"): - ax.plot(group["ts"], group[metric] * scale, label=labels[str(role)]) - ax.set_ylabel(_METRIC_LABELS.get(metric, metric)) - ax.legend(fontsize=8, loc="center left", bbox_to_anchor=(1.01, 0.5), borderaxespad=0) - ax.grid(True, alpha=0.3) - - axes[-1].set_xlabel("Time") - fig.tight_layout() - - fig.savefig(out, dpi=150, bbox_inches="tight") - print(f"Saved to {out}") - if show: - plt.show() - - -def _default_out(log_path: str) -> str: - base = log_path.removesuffix(".ignore.jsonl") - return f"{base}.ignore.png" - - -def main() -> None: - import argparse - - parser = argparse.ArgumentParser( - prog="dtop-plot", description="Plot resource stats from a dtop JSONL log file." - ) - parser.add_argument("log", metavar="LOG", help="Path to a dtop JSONL log file.") - parser.add_argument( - "--metrics", - default="cpu_percent,pss,num_threads", - help="Comma-separated list of metrics to plot (default: cpu_percent,pss,num_threads).", - ) - parser.add_argument( - "--out", metavar="PATH", help="Output image path (default: .ignore.png)." - ) - parser.add_argument( - "--show", action="store_true", help="Open the plot interactively after saving." - ) - args = parser.parse_args() - - out = args.out or _default_out(args.log) - metrics = [m.strip() for m in args.metrics.split(",")] - df, labels = _load(args.log) - _plot(df, labels, metrics, out, args.show) - - -if __name__ == "__main__": - main() From 0874f8fe4e4e56eb552536b6591bf3d8ce167053 Mon Sep 17 00:00:00 2001 From: Andrew Lauer Date: Thu, 7 May 2026 16:16:46 -0700 Subject: [PATCH 4/8] Pre-commit --- dimos/utils/cli/dtop.py | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/dimos/utils/cli/dtop.py b/dimos/utils/cli/dtop.py index 00dcff10e6..0676f0ad47 100644 --- a/dimos/utils/cli/dtop.py +++ b/dimos/utils/cli/dtop.py @@ -153,13 +153,19 @@ def _cpu_metric(line: Text, cpu: float, stale: bool, cpu_hist: deque[float] | No _ALL_KEYS = {key for _, key, _ in _LINE1 + _LINE2} | set(_IO_KEYS) LOGGED_METRICS = ( - "cpu_percent", "pss", "num_threads", "num_children", "num_fds", - "cpu_time_user", "cpu_time_system", "cpu_time_iowait", - "io_read_bytes", "io_write_bytes", + "cpu_percent", + "pss", + "num_threads", + "num_children", + "num_fds", + "cpu_time_user", + "cpu_time_system", + "cpu_time_iowait", + "io_read_bytes", + "io_write_bytes", ) - def _compute_ranges(data_dicts: list[dict[str, Any]]) -> dict[str, tuple[float, float]]: """(min, max) per metric across all processes (for relative coloring).""" ranges: dict[str, tuple[float, float]] = {} @@ -209,6 +215,7 @@ def __init__( if db_path is not None: from dimos.memory2.store.sqlite import SqliteStore + self._store: SqliteStore | None = SqliteStore(path=db_path) self._store.start() else: From 7d87cce1195677d98151131b5dce40fb04c75c63 Mon Sep 17 00:00:00 2001 From: Andrew Lauer Date: Thu, 7 May 2026 16:18:35 -0700 Subject: [PATCH 5/8] Remove old reference --- pyproject.toml | 1 - 1 file changed, 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index b375477fa9..55a7f0fbd7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -103,7 +103,6 @@ dimos = "dimos.robot.cli.dimos:main" rerun-bridge = "dimos.visualization.rerun.bridge:app" doclinks = "dimos.utils.docs.doclinks:main" dtop = "dimos.utils.cli.dtop:main" -dtop-plot = "dimos.utils.cli.dtop_plot:main" [project.urls] Homepage = "https://dimensionalos.com" From ee704f870e93d90b0bc8321867517bed385db2b4 Mon Sep 17 00:00:00 2001 From: Andrew Lauer Date: Thu, 7 May 2026 17:16:46 -0700 Subject: [PATCH 6/8] Multi-module worker fix --- dimos/memory2/vis/plot/svg.py | 2 +- dimos/utils/cli/dtop.py | 30 ++++++++++++++++++++---------- 2 files changed, 21 insertions(+), 11 deletions(-) diff --git a/dimos/memory2/vis/plot/svg.py b/dimos/memory2/vis/plot/svg.py index 8ba3bc7d52..b0a514f2dd 100644 --- a/dimos/memory2/vis/plot/svg.py +++ b/dimos/memory2/vis/plot/svg.py @@ -254,7 +254,7 @@ def mpl_color(c: Color, opacity: float) -> tuple[float, float, float, float]: fig.subplots_adjust(left=0.08, right=0.75, top=0.95, bottom=0.18) buf = io.StringIO() - fig.savefig(buf, format="svg") + fig.savefig(buf, format="svg", bbox_inches="tight") plt.close(fig) return buf.getvalue() diff --git a/dimos/utils/cli/dtop.py b/dimos/utils/cli/dtop.py index 0676f0ad47..5683ba79b1 100644 --- a/dimos/utils/cli/dtop.py +++ b/dimos/utils/cli/dtop.py @@ -247,15 +247,25 @@ def _on_msg(self, msg: dict[str, Any], _topic: str) -> None: with self._lock: self._latest = msg self._last_msg_time = time.monotonic() - if self._store is not None: - ts = time.time() - self._log_role("coordinator", msg.get("coordinator") or {}, ts) - for worker in msg.get("workers") or []: - modules = worker.get("modules") or [] - if modules: - self._log_role("_".join(str(m) for m in modules), worker, ts) - - def _log_role(self, role: str, data: dict[str, Any], ts: float) -> None: + if self._store is None: + return + ts = time.time() + coord = msg.get("coordinator") + if coord: + self._log_role("coordinator", coord, ts, None) + for i, worker in enumerate(msg.get("workers") or []): + worker_id = worker.get("worker_id", i) + modules = worker.get("modules") or [] + self._log_role(f"worker_{worker_id}", worker, ts, modules) + + def _log_role( + self, + role: str, + data: dict[str, Any], + ts: float, + modules: list[str] | None, + ) -> None: + tags = {"modules": list(modules)} if modules is not None else None for metric in LOGGED_METRICS: val = data.get(metric) if val is None: @@ -263,7 +273,7 @@ def _log_role(self, role: str, data: dict[str, Any], ts: float) -> None: name = f"{role}_{metric}" if name not in self._mem_streams: self._mem_streams[name] = self._store.stream(name, float) - self._mem_streams[name].append(float(val), ts=ts) + self._mem_streams[name].append(float(val), ts=ts, tags=tags) def _refresh(self) -> None: with self._lock: From 9d2c2b9d2e4791444d322ada5d24faf2a0eb2fa3 Mon Sep 17 00:00:00 2001 From: Andrew Lauer Date: Thu, 7 May 2026 17:28:55 -0700 Subject: [PATCH 7/8] Nit --- dimos/memory2/vis/plot/svg.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dimos/memory2/vis/plot/svg.py b/dimos/memory2/vis/plot/svg.py index b0a514f2dd..206596b05a 100644 --- a/dimos/memory2/vis/plot/svg.py +++ b/dimos/memory2/vis/plot/svg.py @@ -248,10 +248,10 @@ def mpl_color(c: Color, opacity: float) -> tuple[float, float, float, float]: n_twins = sum(1 for k in axes if k is not None) if n_twins >= 2: extras = n_twins - 1 - right_margin = max(0.5, 0.75 - twin_offset_step * extras) + right_margin = max(0.6, 0.95 - twin_offset_step * extras) fig.subplots_adjust(left=0.08, right=right_margin, top=0.95, bottom=0.18) else: - fig.subplots_adjust(left=0.08, right=0.75, top=0.95, bottom=0.18) + fig.tight_layout() buf = io.StringIO() fig.savefig(buf, format="svg", bbox_inches="tight") From a4355b89c573d72c46727e46067efda78a1d923f Mon Sep 17 00:00:00 2001 From: Andrew Lauer Date: Thu, 7 May 2026 19:33:09 -0700 Subject: [PATCH 8/8] Fix mypy --- dimos/utils/cli/dtop.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/dimos/utils/cli/dtop.py b/dimos/utils/cli/dtop.py index 5683ba79b1..43ad506d22 100644 --- a/dimos/utils/cli/dtop.py +++ b/dimos/utils/cli/dtop.py @@ -265,6 +265,8 @@ def _log_role( ts: float, modules: list[str] | None, ) -> None: + if self._store is None: + return tags = {"modules": list(modules)} if modules is not None else None for metric in LOGGED_METRICS: val = data.get(metric)