diff --git a/leads_can/__init__.py b/leads_can/__init__.py new file mode 100644 index 00000000..81381838 --- /dev/null +++ b/leads_can/__init__.py @@ -0,0 +1,7 @@ +from importlib.util import find_spec as _find_spec + +if not _find_spec("can"): + raise ImportError("Please install `python-can` to run this module\n>>>pip install python-can") + +from leads_can.prototype import * +from leads_can.obd import * diff --git a/leads_can/obd.py b/leads_can/obd.py new file mode 100644 index 00000000..85c75c83 --- /dev/null +++ b/leads_can/obd.py @@ -0,0 +1,27 @@ +from typing import override as _override + +from can import Message as _Message + +from leads import DataContainer as _DataContainer +from leads_can.prototype import CANBus + + +class OBD2(CANBus): + @_override + def write(self, payload: _DataContainer) -> None: + t = payload.time_stamp() * .001 + super().write(_Message(t, 0x00, data=str(payload.voltage).encode())) + super().write(_Message(t, 0x01, data=str(payload.speed).encode())) + super().write(_Message(t, 0x10, data=str(payload.front_wheel_speed).encode())) + super().write(_Message(t, 0x11, data=str(payload.rear_wheel_speed).encode())) + super().write(_Message(t, 0x20, data=str(payload.forward_acceleration).encode())) + super().write(_Message(t, 0x21, data=str(payload.lateral_acceleration).encode())) + super().write(_Message(t, 0x30, data=str(payload.mileage).encode())) + super().write(_Message(t, 0x40, data=str(payload.gps_ground_speed).encode(), + is_error_frame=not payload.gps_valid)) + super().write(_Message(t, 0x41, data=str(payload.latitude).encode(), + is_error_frame=not payload.gps_valid)) + super().write(_Message(t, 0x42, data=str(payload.longitude).encode(), + is_error_frame=not payload.gps_valid)) + super().write(_Message(t, 0x50, data=str(payload.throttle).encode())) + super().write(_Message(t, 0x51, data=str(payload.brake).encode())) diff --git a/leads_can/prototype.py b/leads_can/prototype.py new file mode 100644 index 00000000..b547acb1 --- /dev/null +++ b/leads_can/prototype.py @@ -0,0 +1,32 @@ +from typing import override as _override + +from can import Bus as _Bus, Notifier as _Notifier, Listener as _Listener, Message as _Message + +from leads import Controller as _Controller + + +class CANBus(_Controller, _Listener): + def __init__(self) -> None: + _Controller.__init__(self) + _Listener.__init__(self) + self._bus: _Bus | None = None + self._notifier: _Notifier | None = None + + @_override + def on_message_received(self, msg: _Message) -> None: + for device in self.devices(): + device.update(msg) + + @_override + def initialize(self, *parent_tags: str) -> None: + super().initialize(*parent_tags) + self._bus = _Bus() + self._notifier = _Notifier(self._bus, (self,)) + + @_override + def write(self, payload: _Message) -> None: + self._bus.send(payload) + + @_override + def close(self) -> None: + self._bus.close()