From ec8fe04559ee09c83e065f981c6dd694f2b0035d Mon Sep 17 00:00:00 2001 From: ATATC Date: Wed, 24 Jul 2024 12:20:15 +0800 Subject: [PATCH 1/2] Added CAN bus support. (#208) --- leads_can/__init__.py | 6 ++++++ leads_can/prototype.py | 32 ++++++++++++++++++++++++++++++++ 2 files changed, 38 insertions(+) create mode 100644 leads_can/__init__.py create mode 100644 leads_can/prototype.py diff --git a/leads_can/__init__.py b/leads_can/__init__.py new file mode 100644 index 00000000..63489d5e --- /dev/null +++ b/leads_can/__init__.py @@ -0,0 +1,6 @@ +from importlib.util import find_spec as _find_spec + +if not _find_spec("can"): + raise ImportError("Please install `python-can` to run this module\n>>>pip install python-can") + +from leads_can.prototype import * diff --git a/leads_can/prototype.py b/leads_can/prototype.py new file mode 100644 index 00000000..b547acb1 --- /dev/null +++ b/leads_can/prototype.py @@ -0,0 +1,32 @@ +from typing import override as _override + +from can import Bus as _Bus, Notifier as _Notifier, Listener as _Listener, Message as _Message + +from leads import Controller as _Controller + + +class CANBus(_Controller, _Listener): + def __init__(self) -> None: + _Controller.__init__(self) + _Listener.__init__(self) + self._bus: _Bus | None = None + self._notifier: _Notifier | None = None + + @_override + def on_message_received(self, msg: _Message) -> None: + for device in self.devices(): + device.update(msg) + + @_override + def initialize(self, *parent_tags: str) -> None: + super().initialize(*parent_tags) + self._bus = _Bus() + self._notifier = _Notifier(self._bus, (self,)) + + @_override + def write(self, payload: _Message) -> None: + self._bus.send(payload) + + @_override + def close(self) -> None: + self._bus.close() From bf7b7e31a6d64000442ea2050446e16f5a79a702 Mon Sep 17 00:00:00 2001 From: ATATC Date: Wed, 24 Jul 2024 19:33:25 +0800 Subject: [PATCH 2/2] Added OBD2 support. (#208) --- leads_can/__init__.py | 1 + leads_can/obd.py | 27 +++++++++++++++++++++++++++ 2 files changed, 28 insertions(+) create mode 100644 leads_can/obd.py diff --git a/leads_can/__init__.py b/leads_can/__init__.py index 63489d5e..81381838 100644 --- a/leads_can/__init__.py +++ b/leads_can/__init__.py @@ -4,3 +4,4 @@ raise ImportError("Please install `python-can` to run this module\n>>>pip install python-can") from leads_can.prototype import * +from leads_can.obd import * diff --git a/leads_can/obd.py b/leads_can/obd.py new file mode 100644 index 00000000..85c75c83 --- /dev/null +++ b/leads_can/obd.py @@ -0,0 +1,27 @@ +from typing import override as _override + +from can import Message as _Message + +from leads import DataContainer as _DataContainer +from leads_can.prototype import CANBus + + +class OBD2(CANBus): + @_override + def write(self, payload: _DataContainer) -> None: + t = payload.time_stamp() * .001 + super().write(_Message(t, 0x00, data=str(payload.voltage).encode())) + super().write(_Message(t, 0x01, data=str(payload.speed).encode())) + super().write(_Message(t, 0x10, data=str(payload.front_wheel_speed).encode())) + super().write(_Message(t, 0x11, data=str(payload.rear_wheel_speed).encode())) + super().write(_Message(t, 0x20, data=str(payload.forward_acceleration).encode())) + super().write(_Message(t, 0x21, data=str(payload.lateral_acceleration).encode())) + super().write(_Message(t, 0x30, data=str(payload.mileage).encode())) + super().write(_Message(t, 0x40, data=str(payload.gps_ground_speed).encode(), + is_error_frame=not payload.gps_valid)) + super().write(_Message(t, 0x41, data=str(payload.latitude).encode(), + is_error_frame=not payload.gps_valid)) + super().write(_Message(t, 0x42, data=str(payload.longitude).encode(), + is_error_frame=not payload.gps_valid)) + super().write(_Message(t, 0x50, data=str(payload.throttle).encode())) + super().write(_Message(t, 0x51, data=str(payload.brake).encode()))