diff --git a/leads_gui/__init__.py b/leads_gui/__init__.py index 14832eb8..e78f7b31 100644 --- a/leads_gui/__init__.py +++ b/leads_gui/__init__.py @@ -11,7 +11,6 @@ from leads import LEADS as _LEADS, Controller as _Controller from leads_gui.prototype import * -from leads_gui.runtime import * from leads_gui.config import * from leads_gui.icons import * from leads_gui.accelerometer import * diff --git a/leads_gui/prototype.py b/leads_gui/prototype.py index 3bb42089..e03e8b04 100644 --- a/leads_gui/prototype.py +++ b/leads_gui/prototype.py @@ -1,3 +1,6 @@ +from abc import ABCMeta as _ABCMeta, abstractmethod as _abstractmethod +from json import dumps as _dumps +from time import time as _time from tkinter import Misc as _Misc, Event as _Event from typing import Callable as _Callable, Self as _Self, TypeVar as _TypeVar, Generic as _Generic, Any as _Any, \ Literal as _Literal @@ -8,9 +11,9 @@ set_appearance_mode as _set_appearance_mode from numpy import lcm as _lcm -from leads import require_config as _require_config +from leads import require_config as _require_config, DataContainer as _DataContainer +from leads.comm import Server as _Server from leads_gui.performance_checker import PerformanceChecker -from leads_gui.runtime import RuntimeData from leads_gui.system import _ASSETS_PATH from leads_gui.types import Widget as _Widget, Color as _Color, Font as _Font @@ -180,6 +183,44 @@ def detach(self) -> None: self._trace_cb_name = None +class FrequencyGenerator(object, metaclass=_ABCMeta): + def __init__(self, period: int, loops: int = -1) -> None: + """ + :param period: the period in milliseconds + :param loops: the number of loops or -1 to indicate infinite loops + """ + self._period: int = period + self._loops: int = loops + self._last_run: float = 0 + + @_abstractmethod + def do(self) -> None: + raise NotImplementedError + + def attempt(self) -> bool: + """ + Attempt to run. + :return: `True`: active; `False`: deprecated + """ + if self._loops == 0: + return False + if (t := _time()) - self._last_run >= self._period * .001: + self.do() + self._loops -= 1 + self._last_run = t + return True + + +class RuntimeData(object): + start_time: int = int(_time()) + lap_time: list[int] = [] + comm: _Server | None = None + + def comm_notify(self, d: _DataContainer | dict[str, _Any]) -> None: + if self.comm: + self.comm.broadcast(d.encode() if isinstance(d, _DataContainer) else _dumps(d).encode()) + + T = _TypeVar("T", bound=RuntimeData) @@ -204,10 +245,12 @@ def __init__(self, self._width: int = sw if fullscreen else width self._height: int = sh if fullscreen else height self._root.geometry( - f"{self._width}x{self._height}+{int((sw - self._width) / 2)}+{int((sh - self._height) / 2)}") + f"{self._width}x{self._height}+{int((sw - self._width) / 2)}+{int((sh - self._height) / 2)}" + ) self._refresh_rate: int = refresh_rate self._runtime_data: T = runtime_data self._on_refresh: _Callable[[Window], None] = on_refresh + self._frequency_generators: dict[str, FrequencyGenerator] = {} self._active: bool = False self._performance_checker: PerformanceChecker = PerformanceChecker() @@ -237,6 +280,18 @@ def runtime_data(self) -> T: def set_on_refresh(self, on_refresh: _Callable[[_Self], None]) -> None: self._on_refresh = on_refresh + def add_frequency_generator(self, tag: str, frequency_generator: FrequencyGenerator) -> None: + self._frequency_generators[tag] = frequency_generator + + def remove_frequency_generator(self, tag: str) -> None: + try: + self._frequency_generators.pop(tag) + except KeyError: + pass + + def clear_frequency_generators(self) -> None: + self._frequency_generators.clear() + def active(self) -> bool: return self._active @@ -245,6 +300,9 @@ def show(self) -> None: def wrapper() -> None: self._on_refresh(self) + for tag, fg in self._frequency_generators.items(): + if not fg.attempt(): + self.remove_frequency_generator(tag) self._performance_checker.record_frame(self._last_interval) if self._active: self._root.after(int((ni := self._performance_checker.next_interval()) * 1000), wrapper) @@ -264,16 +322,16 @@ def __init__(self, window: Window) -> None: self._widgets: dict[str, _Widget] = {} def __setitem__(self, key: str, widget: _Widget) -> None: - self._widgets[key] = widget + self.set(key, widget) def __getitem__(self, key: str) -> _Widget: - return self._widgets[key] + return self.get(key) def set(self, key: str, widget: _Widget) -> None: - self[key] = widget + self._widgets[key] = widget def get(self, key: str) -> _Widget: - return self[key] + return self._widgets[key] def parse_layout(self, layout: list[list[str | _Widget]]) -> list[list[_Widget]]: for i in range(len(layout)): @@ -301,21 +359,6 @@ def layout(self, layout: list[list[str | _Widget]]) -> None: def window(self) -> Window: return self._window - def rd(self) -> T: - return self._window.runtime_data() - - def active(self) -> bool: - return self._window.active() - - def fps(self) -> float: - return self._window.fps() - - def net_delay(self) -> float: - return self._window.net_delay() - - def root(self) -> _CTk: - return self._window.root() - def show(self) -> None: self._window.show() diff --git a/leads_gui/runtime.py b/leads_gui/runtime.py deleted file mode 100644 index 36482700..00000000 --- a/leads_gui/runtime.py +++ /dev/null @@ -1,16 +0,0 @@ -from json import dumps as _dumps -from time import time as _time -from typing import Any as _Any - -from leads import DataContainer as _DataContainer -from leads.comm import Server as _Server - - -class RuntimeData(object): - start_time: int = int(_time()) - lap_time: list[int] = [] - comm: _Server | None = None - - def comm_notify(self, d: _DataContainer | dict[str, _Any]) -> None: - if self.comm: - self.comm.broadcast(d.encode() if isinstance(d, _DataContainer) else _dumps(d).encode()) diff --git a/leads_raspberry_pi/led_group.py b/leads_raspberry_pi/led_group.py index 782f9c20..07886eb1 100644 --- a/leads_raspberry_pi/led_group.py +++ b/leads_raspberry_pi/led_group.py @@ -23,9 +23,9 @@ def do(self, command: LEDCommand, *leds: LED) -> None: class Transition(LEDGroupAnimation): - def __init__(self, direction: _TransitionDirection, interval: float = 1) -> None: + def __init__(self, direction: _TransitionDirection, interval: int = 1000) -> None: self._direction: _TransitionDirection = direction - self._interval: float = interval + self._interval: float = interval * .001 def async_do(self, command: LEDCommand, *leds: LED) -> None: if self._direction == "left2right": diff --git a/leads_vec/cli.py b/leads_vec/cli.py index 3fb57500..6dd728b6 100644 --- a/leads_vec/cli.py +++ b/leads_vec/cli.py @@ -38,14 +38,14 @@ def main() -> int: ctx.plugin(SystemLiteral.EBI, EBI()) ctx.plugin(SystemLiteral.ATBS, ATBS()) ctx.plugin("GPS_SPEED_CORRECTION", GPSSpeedCorrection()) - window = Window(cfg.width, - cfg.height, - cfg.refresh_rate, - CustomRuntimeData(), - fullscreen=cfg.fullscreen, - no_title_bar=cfg.no_title_bar, - theme_mode=cfg.theme_mode) - root = window.root() + w = Window(cfg.width, + cfg.height, + cfg.refresh_rate, + CustomRuntimeData(), + fullscreen=cfg.fullscreen, + no_title_bar=cfg.no_title_bar, + theme_mode=cfg.theme_mode) + root = w.root() root.configure(cursor="dot") m1 = StringVar(root, "") speed = DoubleVar(root, 0) @@ -54,9 +54,17 @@ def main() -> int: g_force = GForceVar(root, 0, 0) esc = StringVar(root, "STANDARD") + class LeftIndicator(FrequencyGenerator): + def do(self) -> None: + uim["left"].configure(image=Left(cfg.font_size_large, Color.RED if self._loops % 2 == 1 else None)) + + class RightIndicator(FrequencyGenerator): + def do(self) -> None: + uim["right"].configure(image=Right(cfg.font_size_large, Color.RED if self._loops % 2 == 1 else None)) + def render(manager: ContextManager) -> None: def switch_m1_mode(_) -> None: - manager.rd().m1_mode = (manager.rd().m1_mode + 1) % 3 + w.runtime_data().m1_mode = (w.runtime_data().m1_mode + 1) % 3 manager["m1"] = Typography(root, theme_key="CTkButton", variable=m1, clickable=True, command=switch_m1_mode, font=("Arial", cfg.font_size_small - 4)).lock_ratio(cfg.m_ratio) @@ -79,7 +87,7 @@ def switch_m1_mode(_) -> None: manager[f"{system_lower}_status"] = CTkLabel(root, text=f"{system} READY", text_color="green", font=("Arial", cfg.font_size_small)) manager[system_lower] = CTkButton(root, text=f"{system} ON", - command=make_system_switch(ctx, SystemLiteral(system), manager.rd()), + command=make_system_switch(ctx, SystemLiteral(system), w.runtime_data()), font=("Arial", cfg.font_size_small)) manager["left"] = CTkButton(root, text="", image=Left(cfg.font_size_large), @@ -93,12 +101,12 @@ def switch_esc_mode(mode) -> None: manager["esc"].configure(selected_color=(c := "green" if (esc_mode := ESCMode[mode]) < 2 else "red"), selected_hover_color=c) ctx.esc_mode(esc_mode) - manager.rd().control_system_switch_changed = True + w.runtime_data().control_system_switch_changed = True manager["esc"] = CTkSegmentedButton(root, values=["STANDARD", "AGGRESSIVE", "SPORT", "OFF"], variable=esc, command=switch_esc_mode, font=("Arial", cfg.font_size_small)) - uim = initialize(window, render, ctx, get_controller(MAIN_CONTROLLER)) + uim = initialize(w, render, ctx, get_controller(MAIN_CONTROLLER)) class CommCallback(Callback): def on_fail(self, service: Service, error: Exception) -> None: @@ -112,7 +120,7 @@ def on_receive(self, service: Service, msg: bytes) -> None: elif msg == b"hazard": ctx.hazard(not ctx.hazard()) - uim.rd().comm = start_server(create_server(cfg.comm_port, CommCallback()), True) + w.runtime_data().comm = start_server(create_server(cfg.comm_port, CommCallback()), True) class CustomListener(EventListener): def pre_push(self, e: DataPushedEvent) -> None: @@ -120,16 +128,16 @@ def pre_push(self, e: DataPushedEvent) -> None: d = e.data.to_dict() d["speed_trend"] = ctx.speed_trend() d["lap_times"] = ctx.lap_time_list() - uim.rd().comm_notify(d) + w.runtime_data().comm_notify(d) def on_update(self, e: UpdateEvent) -> None: self.super(e) d = e.context.data() - if uim.rd().m1_mode == 0: + if w.runtime_data().m1_mode == 0: lap_time_list = ctx.lap_time_list() m1.set(f"LAP TIMES\n\n{"No Lap Timed" if len(lap_time_list) < 1 else "\n".join(map(format_lap_time, lap_time_list))}") - elif uim.rd().m1_mode == 1: + elif w.runtime_data().m1_mode == 1: if has_device(GPS_RECEIVER): gps = get_device(GPS_RECEIVER).read() m1.set(f"GPS {"VALID" if d.gps_valid else "NO FIX"} - {gps[4]} {gps[5]}\n\n" @@ -139,23 +147,23 @@ def on_update(self, e: UpdateEvent) -> None: m1.set(f"GPS {"VALID" if d.gps_valid else "NO FIX"} - !NF!\n\n" f"{d.gps_ground_speed:.1f} KM / H\n" f"LAT {d.latitude:.5f}\nLON {d.longitude:.5f}") - elif uim.rd().m1_mode == 2: + elif w.runtime_data().m1_mode == 2: m1.set(f"VeC {__version__.upper()}\n\n" f"{datetime.now().strftime("%Y-%m-%d %H:%M:%S")}\n" - f"{(duration := int(time()) - uim.rd().start_time) // 60} MIN {duration % 60} SEC\n" + f"{(duration := int(time()) - w.runtime_data().start_time) // 60} MIN {duration % 60} SEC\n" f"{(m := d.mileage):.1f} KM - {m * 3600 / duration:.1f} KM / H\n\n" - f"{cfg.refresh_rate} - {uim.fps():.2f} FPS - {uim.net_delay() * 1000:.1f} MS\n" - f"{ip[-1] if len(ip := my_ip_addresses()) > 0 else "NOT FOUND"}:{uim.rd().comm.port()}") + f"{cfg.refresh_rate} - {w.fps():.2f} FPS - {w.net_delay() * 1000:.1f} MS\n" + f"{ip[-1] if len(ip := my_ip_addresses()) > 0 else "NOT FOUND"}:{w.runtime_data().comm.port()}") speed.set(d.speed) voltage.set(f"{d.voltage:.1f} V") st = ctx.speed_trend() speed_trend.set(st) g_force.set((d.lateral_acceleration, d.forward_acceleration)) - if uim.rd().comm.num_connections() < 1: + if w.runtime_data().comm.num_connections() < 1: uim["comm_status"].configure(text="COMM OFFLINE", text_color="gray") else: uim["comm_status"].configure(text="COMM ONLINE", text_color=["black", "white"]) - if uim.rd().control_system_switch_changed: + if w.runtime_data().control_system_switch_changed: for system in SystemLiteral: system_lowercase = system.lower() if ctx.plugin(SystemLiteral(system)).enabled(): @@ -163,7 +171,7 @@ def on_update(self, e: UpdateEvent) -> None: else: uim[system_lowercase].configure(text=f"{system} OFF") uim[f"{system_lowercase}_status"].configure(text=f"{system} OFF", text_color=("black", "white")) - uim.rd().control_system_switch_changed = False + w.runtime_data().control_system_switch_changed = False def pre_intervene(self, e: InterventionEvent) -> None: self.super(e) @@ -188,16 +196,24 @@ def post_suspend(self, e: SuspensionEvent) -> None: def left_indicator(self, e: Event, state: bool) -> None: if has_device(LEFT_INDICATOR): get_device(LEFT_INDICATOR).write(LEDGroupCommand( - LEDCommand.BLINK, Transition("left2right", .1) + LEDCommand.BLINK, Transition("left2right", 100) ) if state else LEDGroupCommand(LEDCommand.OFF, Entire())) - uim["left"].configure(image=Left(cfg.font_size_large, Color.RED if state else None)) + if state: + w.add_frequency_generator("left_indicator", LeftIndicator(500)) + else: + w.remove_frequency_generator("left_indicator") + uim["left"].configure(image=Left(cfg.font_size_large, None)) def right_indicator(self, e: Event, state: bool) -> None: if has_device(RIGHT_INDICATOR): get_device(RIGHT_INDICATOR).write(LEDGroupCommand( - LEDCommand.BLINK, Transition("right2left", .1) + LEDCommand.BLINK, Transition("right2left", 100) ) if state else LEDGroupCommand(LEDCommand.OFF, Entire())) - uim["right"].configure(image=Right(cfg.font_size_large, Color.RED if state else None)) + if state: + w.add_frequency_generator("right_indicator", RightIndicator(500)) + else: + w.remove_frequency_generator("right_indicator") + uim["right"].configure(image=Right(cfg.font_size_large, None)) def hazard(self, e: Event, state: bool) -> None: super().hazard(e, state) @@ -250,7 +266,7 @@ def on_recover(_, e: SuspensionEvent) -> None: ["battery_fault", "brake_fault", "esc_fault", "gps_fault", "motor_fault", "wsc_fault"] ] ctx.esc_mode(ESCMode.OFF) - uim.rd().control_system_switch_changed = True + w.runtime_data().control_system_switch_changed = True else: layout = [ ["m1", "m2", "m3"], @@ -266,13 +282,13 @@ def on_recover(_, e: SuspensionEvent) -> None: def on_press(key: _Key | _KeyCode) -> None: if key == _KeyCode.from_char("1"): - make_system_switch(ctx, SystemLiteral.DTCS, uim.rd())() + make_system_switch(ctx, SystemLiteral.DTCS, w.runtime_data())() elif key == _KeyCode.from_char("2"): - make_system_switch(ctx, SystemLiteral.ABS, uim.rd())() + make_system_switch(ctx, SystemLiteral.ABS, w.runtime_data())() elif key == _KeyCode.from_char("3"): - make_system_switch(ctx, SystemLiteral.EBI, uim.rd())() + make_system_switch(ctx, SystemLiteral.EBI, w.runtime_data())() elif key == _KeyCode.from_char("4"): - make_system_switch(ctx, SystemLiteral.ATBS, uim.rd())() + make_system_switch(ctx, SystemLiteral.ATBS, w.runtime_data())() elif key == _KeyCode.from_char("t"): ctx.time_lap() elif key == _Key.esc: