Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion leads_gui/performance_checker.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def record_frame(self, interval: float) -> None:
self._net_delay_seq.append(delay - interval)
mark = len(self._net_delay_seq)
self._predicted_offset = max(min(_poly1d(_polyfit(range(mark), self._net_delay_seq, 5))(mark + 1)
if mark > self._refresh_rate else 0, self._interval), 0)
if mark > self._refresh_rate else 0, self._interval - .001), 0)
self._last_frame = t

def next_interval(self) -> float:
Expand Down
2 changes: 1 addition & 1 deletion leads_gui/prototype.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ def wrapper() -> None:
self._root.after(int((ni := self._performance_checker.next_interval()) * 1000), wrapper)
self._last_interval = ni

self._root.after(0, wrapper)
self._root.after(1, wrapper)
self._root.mainloop()

def kill(self) -> None:
Expand Down
9 changes: 4 additions & 5 deletions leads_vec/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
from leads_arduino import ArduinoMicro, WheelSpeedSensor, VoltageSensor
from leads_gui import Config
from leads_raspberry_pi import NMEAGPSReceiver, LEDGroup, LED, LEDGroupCommand, LEDCommand, Entire
from leads_video import base64_encode

config: Config = require_config()
GPS_ONLY: int = config.get("gps_only", False)
Expand Down Expand Up @@ -38,13 +37,13 @@ def read(self) -> DataContainer:
wsc = {"speed": gps[0]} if GPS_ONLY else self.device("wsc").read()
visual = {}
if has_device(FRONT_VIEW_CAMERA):
visual["front_view_base64"] = base64_encode(get_device(FRONT_VIEW_CAMERA).read())
visual["front_view_base64"] = get_device(FRONT_VIEW_CAMERA).read()
if has_device(LEFT_VIEW_CAMERA):
visual["left_view_base64"] = base64_encode(get_device(LEFT_VIEW_CAMERA).read())
visual["left_view_base64"] = get_device(LEFT_VIEW_CAMERA).read()
if has_device(RIGHT_VIEW_CAMERA):
visual["right_view_base64"] = base64_encode(get_device(RIGHT_VIEW_CAMERA).read())
visual["right_view_base64"] = get_device(RIGHT_VIEW_CAMERA).read()
if has_device(REAR_VIEW_CAMERA):
visual["rear_view_base64"] = base64_encode(get_device(REAR_VIEW_CAMERA).read())
visual["rear_view_base64"] = get_device(REAR_VIEW_CAMERA).read()
return DataContainer(**wsc, **general) if len(visual) < 1 else VisualDataContainer(**visual, **wsc, **general)


Expand Down
10 changes: 2 additions & 8 deletions leads_vec/devices_jarvis.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
from typing import override

from numpy import ndarray as _ndarray

from leads import device, MAIN_CONTROLLER, mark_device, FRONT_VIEW_CAMERA, LEFT_VIEW_CAMERA, RIGHT_VIEW_CAMERA, \
REAR_VIEW_CAMERA, require_config
from leads_gui import Config
from leads_video import Camera
from leads_video import Base64Camera

import_error: ImportError | None = None
try:
Expand All @@ -32,16 +30,12 @@


@device(CAMERA_TAGS, MAIN_CONTROLLER, CAMERA_ARGS)
class Cameras(Camera):
class Cameras(Base64Camera):
@override
def initialize(self, *parent_tags: str) -> None:
mark_device(self, "Jarvis")
super().initialize(*parent_tags)

@override
def read(self) -> _ndarray:
return super().read()


if import_error:
raise import_error
26 changes: 22 additions & 4 deletions leads_video/camera.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
from cv2 import VideoCapture as _VideoCapture, cvtColor as _cvtColor, COLOR_BGR2RGB as _COLOR_BGR2RGB
from numpy import ndarray as _ndarray, pad as _pad, array as _array

from leads import Device as _Device
from leads import Device as _Device, ShadowDevice as _ShadowDevice
from leads_video.utils import base64_encode


class Camera(_Device):
Expand Down Expand Up @@ -41,10 +42,27 @@ def transform(self, x: _ndarray) -> _ndarray:
return _array(_fromarray(x).resize(self._resolution))

@_override
def read(self) -> _ndarray:
_, frame = self._video_capture.read()
return _cvtColor(self.transform(frame) if self._resolution else frame, _COLOR_BGR2RGB).transpose(2, 0, 1)
def read(self) -> _ndarray | None:
ret, frame = self._video_capture.read()
return _cvtColor(self.transform(frame) if self._resolution else frame, _COLOR_BGR2RGB).transpose(
2, 0, 1) if ret else None

@_override
def close(self) -> None:
self._video_capture.release()


class Base64Camera(Camera, _ShadowDevice):
def __init__(self, port: int, resolution: tuple[int, int] | None = None) -> None:
Camera.__init__(self, port, resolution)
_ShadowDevice.__init__(self, port)
self._base64: str = ""

@_override
def loop(self) -> None:
if self._video_capture:
self._base64 = base64_encode(super().read())

@_override
def read(self) -> str:
return self._base64
26 changes: 17 additions & 9 deletions leads_video/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,28 @@
from io import BytesIO as _BytesIO
from typing import Literal as _Literal

from PIL.Image import Image as _Image, fromarray as _fromarray, open as _open
from PIL.Image import Image as _Image, fromarray as _fromarray, open as _open, \
UnidentifiedImageError as _UnidentifiedImageError
from numpy import ndarray as _ndarray, array as _array


def base64_encode(x: _ndarray, mode: _Literal["L", "RGB"] | None = None) -> str:
img = _fromarray(x.transpose(1, 2, 0), mode)
buffer = _BytesIO()
img.save(buffer, "PNG")
def encode_image(x: _ndarray | None, mode: _Literal["L", "RGB"] | None = None) -> _Image | None:
return None if x is None else _fromarray(x.transpose(1, 2, 0), mode)


def base64_encode(x: _ndarray | None, mode: _Literal["L", "RGB"] | None = None) -> str:
if not (img := encode_image(x, mode)):
return ""
img.save(buffer := _BytesIO(), "JPEG")
return _b64encode(buffer.getvalue()).decode()


def base64_decode_image(x_base64: str) -> _Image:
return _open(_BytesIO(_b64decode(x_base64)))
def base64_decode_image(x_base64: str) -> _Image | None:
try:
return _open(_BytesIO(_b64decode(x_base64))) if x_base64 else None
except (ValueError, TypeError, _UnidentifiedImageError):
return None


def base64_decode(x_base64: str) -> _ndarray:
return _array(base64_decode_image(x_base64)).transpose(2, 0, 1)
def base64_decode(x_base64: str) -> _ndarray | None:
return _array(image).transpose(2, 0, 1) if (image := base64_decode_image(x_base64)) else None