Skip to content
Merged
1 change: 0 additions & 1 deletion leads_gui/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

from leads import LEADS as _LEADS, Controller as _Controller
from leads_gui.prototype import *
from leads_gui.runtime import *
from leads_gui.config import *
from leads_gui.icons import *
from leads_gui.accelerometer import *
Expand Down
87 changes: 65 additions & 22 deletions leads_gui/prototype.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
from abc import ABCMeta as _ABCMeta, abstractmethod as _abstractmethod
from json import dumps as _dumps
from time import time as _time
from tkinter import Misc as _Misc, Event as _Event
from typing import Callable as _Callable, Self as _Self, TypeVar as _TypeVar, Generic as _Generic, Any as _Any, \
Literal as _Literal
Expand All @@ -8,9 +11,9 @@
set_appearance_mode as _set_appearance_mode
from numpy import lcm as _lcm

from leads import require_config as _require_config
from leads import require_config as _require_config, DataContainer as _DataContainer
from leads.comm import Server as _Server
from leads_gui.performance_checker import PerformanceChecker
from leads_gui.runtime import RuntimeData
from leads_gui.system import _ASSETS_PATH
from leads_gui.types import Widget as _Widget, Color as _Color, Font as _Font

Expand Down Expand Up @@ -180,6 +183,44 @@ def detach(self) -> None:
self._trace_cb_name = None


class FrequencyGenerator(object, metaclass=_ABCMeta):
def __init__(self, period: int, loops: int = -1) -> None:
"""
:param period: the period in milliseconds
:param loops: the number of loops or -1 to indicate infinite loops
"""
self._period: int = period
self._loops: int = loops
self._last_run: float = 0

@_abstractmethod
def do(self) -> None:
raise NotImplementedError

def attempt(self) -> bool:
"""
Attempt to run.
:return: `True`: active; `False`: deprecated
"""
if self._loops == 0:
return False
if (t := _time()) - self._last_run >= self._period * .001:
self.do()
self._loops -= 1
self._last_run = t
return True


class RuntimeData(object):
start_time: int = int(_time())
lap_time: list[int] = []
comm: _Server | None = None

def comm_notify(self, d: _DataContainer | dict[str, _Any]) -> None:
if self.comm:
self.comm.broadcast(d.encode() if isinstance(d, _DataContainer) else _dumps(d).encode())


T = _TypeVar("T", bound=RuntimeData)


Expand All @@ -204,10 +245,12 @@ def __init__(self,
self._width: int = sw if fullscreen else width
self._height: int = sh if fullscreen else height
self._root.geometry(
f"{self._width}x{self._height}+{int((sw - self._width) / 2)}+{int((sh - self._height) / 2)}")
f"{self._width}x{self._height}+{int((sw - self._width) / 2)}+{int((sh - self._height) / 2)}"
)
self._refresh_rate: int = refresh_rate
self._runtime_data: T = runtime_data
self._on_refresh: _Callable[[Window], None] = on_refresh
self._frequency_generators: dict[str, FrequencyGenerator] = {}

self._active: bool = False
self._performance_checker: PerformanceChecker = PerformanceChecker()
Expand Down Expand Up @@ -237,6 +280,18 @@ def runtime_data(self) -> T:
def set_on_refresh(self, on_refresh: _Callable[[_Self], None]) -> None:
self._on_refresh = on_refresh

def add_frequency_generator(self, tag: str, frequency_generator: FrequencyGenerator) -> None:
self._frequency_generators[tag] = frequency_generator

def remove_frequency_generator(self, tag: str) -> None:
try:
self._frequency_generators.pop(tag)
except KeyError:
pass

def clear_frequency_generators(self) -> None:
self._frequency_generators.clear()

def active(self) -> bool:
return self._active

Expand All @@ -245,6 +300,9 @@ def show(self) -> None:

def wrapper() -> None:
self._on_refresh(self)
for tag, fg in self._frequency_generators.items():
if not fg.attempt():
self.remove_frequency_generator(tag)
self._performance_checker.record_frame(self._last_interval)
if self._active:
self._root.after(int((ni := self._performance_checker.next_interval()) * 1000), wrapper)
Expand All @@ -264,16 +322,16 @@ def __init__(self, window: Window) -> None:
self._widgets: dict[str, _Widget] = {}

def __setitem__(self, key: str, widget: _Widget) -> None:
self._widgets[key] = widget
self.set(key, widget)

def __getitem__(self, key: str) -> _Widget:
return self._widgets[key]
return self.get(key)

def set(self, key: str, widget: _Widget) -> None:
self[key] = widget
self._widgets[key] = widget

def get(self, key: str) -> _Widget:
return self[key]
return self._widgets[key]

def parse_layout(self, layout: list[list[str | _Widget]]) -> list[list[_Widget]]:
for i in range(len(layout)):
Expand Down Expand Up @@ -301,21 +359,6 @@ def layout(self, layout: list[list[str | _Widget]]) -> None:
def window(self) -> Window:
return self._window

def rd(self) -> T:
return self._window.runtime_data()

def active(self) -> bool:
return self._window.active()

def fps(self) -> float:
return self._window.fps()

def net_delay(self) -> float:
return self._window.net_delay()

def root(self) -> _CTk:
return self._window.root()

def show(self) -> None:
self._window.show()

Expand Down
16 changes: 0 additions & 16 deletions leads_gui/runtime.py

This file was deleted.

4 changes: 2 additions & 2 deletions leads_raspberry_pi/led_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ def do(self, command: LEDCommand, *leds: LED) -> None:


class Transition(LEDGroupAnimation):
def __init__(self, direction: _TransitionDirection, interval: float = 1) -> None:
def __init__(self, direction: _TransitionDirection, interval: int = 1000) -> None:
self._direction: _TransitionDirection = direction
self._interval: float = interval
self._interval: float = interval * .001

def async_do(self, command: LEDCommand, *leds: LED) -> None:
if self._direction == "left2right":
Expand Down
80 changes: 48 additions & 32 deletions leads_vec/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,14 @@ def main() -> int:
ctx.plugin(SystemLiteral.EBI, EBI())
ctx.plugin(SystemLiteral.ATBS, ATBS())
ctx.plugin("GPS_SPEED_CORRECTION", GPSSpeedCorrection())
window = Window(cfg.width,
cfg.height,
cfg.refresh_rate,
CustomRuntimeData(),
fullscreen=cfg.fullscreen,
no_title_bar=cfg.no_title_bar,
theme_mode=cfg.theme_mode)
root = window.root()
w = Window(cfg.width,
cfg.height,
cfg.refresh_rate,
CustomRuntimeData(),
fullscreen=cfg.fullscreen,
no_title_bar=cfg.no_title_bar,
theme_mode=cfg.theme_mode)
root = w.root()
root.configure(cursor="dot")
m1 = StringVar(root, "")
speed = DoubleVar(root, 0)
Expand All @@ -54,9 +54,17 @@ def main() -> int:
g_force = GForceVar(root, 0, 0)
esc = StringVar(root, "STANDARD")

class LeftIndicator(FrequencyGenerator):
def do(self) -> None:
uim["left"].configure(image=Left(cfg.font_size_large, Color.RED if self._loops % 2 == 1 else None))

class RightIndicator(FrequencyGenerator):
def do(self) -> None:
uim["right"].configure(image=Right(cfg.font_size_large, Color.RED if self._loops % 2 == 1 else None))

def render(manager: ContextManager) -> None:
def switch_m1_mode(_) -> None:
manager.rd().m1_mode = (manager.rd().m1_mode + 1) % 3
w.runtime_data().m1_mode = (w.runtime_data().m1_mode + 1) % 3

manager["m1"] = Typography(root, theme_key="CTkButton", variable=m1, clickable=True, command=switch_m1_mode,
font=("Arial", cfg.font_size_small - 4)).lock_ratio(cfg.m_ratio)
Expand All @@ -79,7 +87,7 @@ def switch_m1_mode(_) -> None:
manager[f"{system_lower}_status"] = CTkLabel(root, text=f"{system} READY", text_color="green",
font=("Arial", cfg.font_size_small))
manager[system_lower] = CTkButton(root, text=f"{system} ON",
command=make_system_switch(ctx, SystemLiteral(system), manager.rd()),
command=make_system_switch(ctx, SystemLiteral(system), w.runtime_data()),
font=("Arial", cfg.font_size_small))

manager["left"] = CTkButton(root, text="", image=Left(cfg.font_size_large),
Expand All @@ -93,12 +101,12 @@ def switch_esc_mode(mode) -> None:
manager["esc"].configure(selected_color=(c := "green" if (esc_mode := ESCMode[mode]) < 2 else "red"),
selected_hover_color=c)
ctx.esc_mode(esc_mode)
manager.rd().control_system_switch_changed = True
w.runtime_data().control_system_switch_changed = True

manager["esc"] = CTkSegmentedButton(root, values=["STANDARD", "AGGRESSIVE", "SPORT", "OFF"], variable=esc,
command=switch_esc_mode, font=("Arial", cfg.font_size_small))

uim = initialize(window, render, ctx, get_controller(MAIN_CONTROLLER))
uim = initialize(w, render, ctx, get_controller(MAIN_CONTROLLER))

class CommCallback(Callback):
def on_fail(self, service: Service, error: Exception) -> None:
Expand All @@ -112,24 +120,24 @@ def on_receive(self, service: Service, msg: bytes) -> None:
elif msg == b"hazard":
ctx.hazard(not ctx.hazard())

uim.rd().comm = start_server(create_server(cfg.comm_port, CommCallback()), True)
w.runtime_data().comm = start_server(create_server(cfg.comm_port, CommCallback()), True)

class CustomListener(EventListener):
def pre_push(self, e: DataPushedEvent) -> None:
self.super(e)
d = e.data.to_dict()
d["speed_trend"] = ctx.speed_trend()
d["lap_times"] = ctx.lap_time_list()
uim.rd().comm_notify(d)
w.runtime_data().comm_notify(d)

def on_update(self, e: UpdateEvent) -> None:
self.super(e)
d = e.context.data()
if uim.rd().m1_mode == 0:
if w.runtime_data().m1_mode == 0:
lap_time_list = ctx.lap_time_list()
m1.set(f"LAP TIMES\n\n{"No Lap Timed" if len(lap_time_list) < 1 else "\n".join(map(format_lap_time,
lap_time_list))}")
elif uim.rd().m1_mode == 1:
elif w.runtime_data().m1_mode == 1:
if has_device(GPS_RECEIVER):
gps = get_device(GPS_RECEIVER).read()
m1.set(f"GPS {"VALID" if d.gps_valid else "NO FIX"} - {gps[4]} {gps[5]}\n\n"
Expand All @@ -139,31 +147,31 @@ def on_update(self, e: UpdateEvent) -> None:
m1.set(f"GPS {"VALID" if d.gps_valid else "NO FIX"} - !NF!\n\n"
f"{d.gps_ground_speed:.1f} KM / H\n"
f"LAT {d.latitude:.5f}\nLON {d.longitude:.5f}")
elif uim.rd().m1_mode == 2:
elif w.runtime_data().m1_mode == 2:
m1.set(f"VeC {__version__.upper()}\n\n"
f"{datetime.now().strftime("%Y-%m-%d %H:%M:%S")}\n"
f"{(duration := int(time()) - uim.rd().start_time) // 60} MIN {duration % 60} SEC\n"
f"{(duration := int(time()) - w.runtime_data().start_time) // 60} MIN {duration % 60} SEC\n"
f"{(m := d.mileage):.1f} KM - {m * 3600 / duration:.1f} KM / H\n\n"
f"{cfg.refresh_rate} - {uim.fps():.2f} FPS - {uim.net_delay() * 1000:.1f} MS\n"
f"{ip[-1] if len(ip := my_ip_addresses()) > 0 else "NOT FOUND"}:{uim.rd().comm.port()}")
f"{cfg.refresh_rate} - {w.fps():.2f} FPS - {w.net_delay() * 1000:.1f} MS\n"
f"{ip[-1] if len(ip := my_ip_addresses()) > 0 else "NOT FOUND"}:{w.runtime_data().comm.port()}")
speed.set(d.speed)
voltage.set(f"{d.voltage:.1f} V")
st = ctx.speed_trend()
speed_trend.set(st)
g_force.set((d.lateral_acceleration, d.forward_acceleration))
if uim.rd().comm.num_connections() < 1:
if w.runtime_data().comm.num_connections() < 1:
uim["comm_status"].configure(text="COMM OFFLINE", text_color="gray")
else:
uim["comm_status"].configure(text="COMM ONLINE", text_color=["black", "white"])
if uim.rd().control_system_switch_changed:
if w.runtime_data().control_system_switch_changed:
for system in SystemLiteral:
system_lowercase = system.lower()
if ctx.plugin(SystemLiteral(system)).enabled():
uim[system_lowercase].configure(text=f"{system} ON")
else:
uim[system_lowercase].configure(text=f"{system} OFF")
uim[f"{system_lowercase}_status"].configure(text=f"{system} OFF", text_color=("black", "white"))
uim.rd().control_system_switch_changed = False
w.runtime_data().control_system_switch_changed = False

def pre_intervene(self, e: InterventionEvent) -> None:
self.super(e)
Expand All @@ -188,16 +196,24 @@ def post_suspend(self, e: SuspensionEvent) -> None:
def left_indicator(self, e: Event, state: bool) -> None:
if has_device(LEFT_INDICATOR):
get_device(LEFT_INDICATOR).write(LEDGroupCommand(
LEDCommand.BLINK, Transition("left2right", .1)
LEDCommand.BLINK, Transition("left2right", 100)
) if state else LEDGroupCommand(LEDCommand.OFF, Entire()))
uim["left"].configure(image=Left(cfg.font_size_large, Color.RED if state else None))
if state:
w.add_frequency_generator("left_indicator", LeftIndicator(500))
else:
w.remove_frequency_generator("left_indicator")
uim["left"].configure(image=Left(cfg.font_size_large, None))

def right_indicator(self, e: Event, state: bool) -> None:
if has_device(RIGHT_INDICATOR):
get_device(RIGHT_INDICATOR).write(LEDGroupCommand(
LEDCommand.BLINK, Transition("right2left", .1)
LEDCommand.BLINK, Transition("right2left", 100)
) if state else LEDGroupCommand(LEDCommand.OFF, Entire()))
uim["right"].configure(image=Right(cfg.font_size_large, Color.RED if state else None))
if state:
w.add_frequency_generator("right_indicator", RightIndicator(500))
else:
w.remove_frequency_generator("right_indicator")
uim["right"].configure(image=Right(cfg.font_size_large, None))

def hazard(self, e: Event, state: bool) -> None:
super().hazard(e, state)
Expand Down Expand Up @@ -250,7 +266,7 @@ def on_recover(_, e: SuspensionEvent) -> None:
["battery_fault", "brake_fault", "esc_fault", "gps_fault", "motor_fault", "wsc_fault"]
]
ctx.esc_mode(ESCMode.OFF)
uim.rd().control_system_switch_changed = True
w.runtime_data().control_system_switch_changed = True
else:
layout = [
["m1", "m2", "m3"],
Expand All @@ -266,13 +282,13 @@ def on_recover(_, e: SuspensionEvent) -> None:

def on_press(key: _Key | _KeyCode) -> None:
if key == _KeyCode.from_char("1"):
make_system_switch(ctx, SystemLiteral.DTCS, uim.rd())()
make_system_switch(ctx, SystemLiteral.DTCS, w.runtime_data())()
elif key == _KeyCode.from_char("2"):
make_system_switch(ctx, SystemLiteral.ABS, uim.rd())()
make_system_switch(ctx, SystemLiteral.ABS, w.runtime_data())()
elif key == _KeyCode.from_char("3"):
make_system_switch(ctx, SystemLiteral.EBI, uim.rd())()
make_system_switch(ctx, SystemLiteral.EBI, w.runtime_data())()
elif key == _KeyCode.from_char("4"):
make_system_switch(ctx, SystemLiteral.ATBS, uim.rd())()
make_system_switch(ctx, SystemLiteral.ATBS, w.runtime_data())()
elif key == _KeyCode.from_char("t"):
ctx.time_lap()
elif key == _Key.esc:
Expand Down