Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions leads_vec/devices_jarvis.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from leads import device, MAIN_CONTROLLER, mark_device, FRONT_VIEW_CAMERA, LEFT_VIEW_CAMERA, RIGHT_VIEW_CAMERA, \
REAR_VIEW_CAMERA, require_config
from leads_gui import Config
from leads_video import Base64Camera
from leads_video import LowLatencyBase64Camera

import_error: ImportError | None = None
try:
Expand All @@ -30,7 +30,7 @@


@device(CAMERA_TAGS, MAIN_CONTROLLER, CAMERA_ARGS)
class Cameras(Base64Camera):
class Cameras(LowLatencyBase64Camera):
@override
def initialize(self, *parent_tags: str) -> None:
mark_device(self, "Jarvis")
Expand Down
51 changes: 45 additions & 6 deletions leads_video/camera.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from threading import Thread as _Thread
from typing import override as _override

from PIL.Image import fromarray as _fromarray, Image as _Image
Expand Down Expand Up @@ -59,23 +60,61 @@ def close(self) -> None:
self._video_capture.release()


class Base64Camera(Camera, _ShadowDevice):
class LowLatencyCamera(Camera, _ShadowDevice):
def __init__(self, port: int, resolution: tuple[int, int] | None = None) -> None:
Camera.__init__(self, port, resolution)
_ShadowDevice.__init__(self, port)
self._original: _ndarray | None = None
self._base64: str = ""
self._frame: _ndarray | None = None

@_override
def loop(self) -> None:
if self._video_capture:
self._original = super().read()
self._base64 = base64_encode(self._original)
self._frame = super().read()

@_override
def read(self) -> _ndarray | None:
return self._frame


class Base64Camera(LowLatencyCamera):
def __init__(self, port: int, resolution: tuple[int, int] | None = None) -> None:
super().__init__(port, resolution)
self._base64: str = ""

@_override
def loop(self) -> None:
super().loop()
if self._frame is not None:
self._base64 = base64_encode(self._frame)

@_override
def read(self) -> str:
return self._base64

@_override
def read_numpy(self) -> _ndarray | None:
return self._original
return self._frame


class LowLatencyBase64Camera(Base64Camera):
def __init__(self, port: int, resolution: tuple[int, int] | None = None) -> None:
super().__init__(port, resolution)
self._shadow_thread2: _Thread | None = None

@_override
def loop(self) -> None:
LowLatencyCamera.loop(self)

def loop2(self) -> None:
if (local_frame := self._frame) is not None:
self._base64 = base64_encode(local_frame)

def run2(self) -> None:
while True:
self.loop2()

@_override
def initialize(self, *parent_tags: str) -> None:
super().initialize(*parent_tags)
self._shadow_thread2 = _Thread(name=f"{id(self)} shadow2", target=self.run2, daemon=True)
self._shadow_thread2.start()