Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion leads/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,16 +92,24 @@ def __init__(self,
throttle: float = 0,
brake: float = 0,
front_view_base64: str = "",
front_view_latency: int = 0,
left_view_base64: str = "",
left_view_latency: int = 0,
right_view_base64: str = "",
rear_view_base64: str = "") -> None:
right_view_latency: int = 0,
rear_view_base64: str = "",
rear_view_latency: int = 0) -> None:
super().__init__(voltage, speed, front_wheel_speed, rear_wheel_speed, forward_acceleration,
lateral_acceleration, mileage, gps_valid, gps_ground_speed, latitude, longitude, throttle,
brake)
self.front_view_base64: str = front_view_base64
self.front_view_latency: int = front_view_latency
self.left_view_base64: str = left_view_base64
self.left_view_latency: int = left_view_latency
self.right_view_base64: str = right_view_base64
self.right_view_latency: int = right_view_latency
self.rear_view_base64: str = rear_view_base64
self.rear_view_latency: int = rear_view_latency


def dlat2meters(dlat: float) -> float:
Expand Down
11 changes: 1 addition & 10 deletions leads_vec/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
Typography, Speedometer, ProxyCanvas, SpeedTrendMeter, GForceMeter, Stopwatch, Hazard, initialize, Battery, Brake, \
ESC, Satellite, Motor, Speed, Photo, Light, ImageVariable
from leads_vec.__version__ import __version__
from leads_video import Camera
from leads_video import get_camera


class CustomRuntimeData(RuntimeData):
Expand All @@ -41,15 +41,6 @@ def get_proxy_canvas(context_manager: ContextManager, key: str) -> ProxyCanvas:
return r


def get_camera(tag: str) -> Camera | None:
if has_device(tag):
cam = get_device(tag)
if not isinstance(cam, Camera):
raise TypeError(f"Device \"{tag}\" is supposed to be a camera")
return cam
return None


class StreamCallback(Callback):
def __init__(self, context_manager: ContextManager) -> None:
super().__init__()
Expand Down
19 changes: 14 additions & 5 deletions leads_vec/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@

from leads import device, controller, MAIN_CONTROLLER, LEFT_FRONT_WHEEL_SPEED_SENSOR, RIGHT_FRONT_WHEEL_SPEED_SENSOR, \
Controller, CENTER_REAR_WHEEL_SPEED_SENSOR, require_config, mark_device, ODOMETER, GPS_RECEIVER, \
ConcurrentOdometer, LEFT_INDICATOR, RIGHT_INDICATOR, VOLTAGE_SENSOR, DataContainer, get_device, has_device, \
ConcurrentOdometer, LEFT_INDICATOR, RIGHT_INDICATOR, VOLTAGE_SENSOR, DataContainer, has_device, \
FRONT_VIEW_CAMERA, LEFT_VIEW_CAMERA, RIGHT_VIEW_CAMERA, REAR_VIEW_CAMERA, VisualDataContainer, BRAKE_INDICATOR, \
SFT, read_device_marker, has_controller
from leads_arduino import ArduinoMicro, WheelSpeedSensor, VoltageSensor
from leads_gui import Config
from leads_raspberry_pi import NMEAGPSReceiver, LEDGroup, LED, LEDGroupCommand, LEDCommand, Entire, Transition
from leads_video import Base64Camera, get_camera

config: Config = require_config()
GPS_ONLY: int = config.get("gps_only", False)
Expand Down Expand Up @@ -58,13 +59,21 @@ def read(self) -> DataContainer:
wsc = {"speed": gps[0]} if GPS_ONLY else self.device("wsc").read()
visual = {}
if has_device(FRONT_VIEW_CAMERA):
visual["front_view_base64"] = get_device(FRONT_VIEW_CAMERA).read()
cam = get_camera(FRONT_VIEW_CAMERA, Base64Camera)
visual["front_view_base64"] = cam.read()
visual["front_view_latency"] = int(cam.latency() * 1000)
if has_device(LEFT_VIEW_CAMERA):
visual["left_view_base64"] = get_device(LEFT_VIEW_CAMERA).read()
cam = get_camera(LEFT_VIEW_CAMERA, Base64Camera)
visual["left_view_base64"] = cam.read()
visual["left_view_latency"] = int(cam.latency() * 1000)
if has_device(RIGHT_VIEW_CAMERA):
visual["right_view_base64"] = get_device(RIGHT_VIEW_CAMERA).read()
cam = get_camera(RIGHT_VIEW_CAMERA, Base64Camera)
visual["right_view_base64"] = cam.read()
visual["right_view_latency"] = int(cam.latency() * 1000)
if has_device(REAR_VIEW_CAMERA):
visual["rear_view_base64"] = get_device(REAR_VIEW_CAMERA).read()
cam = get_camera(REAR_VIEW_CAMERA, Base64Camera)
visual["rear_view_base64"] = cam.read()
visual["rear_view_latency"] = int(cam.latency() * 1000)
return DataContainer(**wsc, **general) if len(visual) < 1 else VisualDataContainer(**visual, **wsc, **general)


Expand Down
1 change: 1 addition & 0 deletions leads_video/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,6 @@
if not _find_spec("PIL"):
raise ImportError("Please install `Pillow` to run this module\n>>>pip install Pillow")

from leads_video.base64 import *
from leads_video.camera import *
from leads_video.utils import *
29 changes: 29 additions & 0 deletions leads_video/base64.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
from base64 import b64encode as _b64encode, b64decode as _b64decode
from io import BytesIO as _BytesIO
from typing import Literal as _Literal

from PIL.Image import Image as _Image, fromarray as _fromarray, open as _open, \
UnidentifiedImageError as _UnidentifiedImageError
from numpy import ndarray as _ndarray, array as _array


def encode_image(x: _ndarray | None, mode: _Literal["L", "RGB"] | None = None) -> _Image | None:
return None if x is None else _fromarray(x.transpose(1, 2, 0), mode)


def base64_encode(x: _ndarray | None, mode: _Literal["L", "RGB"] | None = None, quality: int = 25) -> str:
if not (img := encode_image(x, mode)):
return ""
img.save(buffer := _BytesIO(), "JPEG", quality=quality)
return _b64encode(buffer.getvalue()).decode()


def base64_decode_image(x_base64: str) -> _Image | None:
try:
return _open(_BytesIO(_b64decode(x_base64))) if x_base64 else None
except (ValueError, TypeError, _UnidentifiedImageError):
return


def base64_decode(x_base64: str) -> _ndarray | None:
return _array(image).transpose(2, 0, 1) if (image := base64_decode_image(x_base64)) else None
8 changes: 7 additions & 1 deletion leads_video/camera.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,21 @@
from threading import Thread as _Thread
from time import time as _time
from typing import override as _override

from PIL.Image import fromarray as _fromarray, Image as _Image
from cv2 import VideoCapture as _VideoCapture, cvtColor as _cvtColor, COLOR_BGR2RGB as _COLOR_BGR2RGB
from numpy import ndarray as _ndarray, pad as _pad, array as _array

from leads import Device as _Device, ShadowDevice as _ShadowDevice
from leads_video.utils import base64_encode
from leads_video.base64 import base64_encode


class Camera(_Device):
def __init__(self, port: int, resolution: tuple[int, int] | None = None) -> None:
super().__init__(port)
self._resolution: tuple[int, int] | None = resolution
self._video_capture: _VideoCapture | None = None
self._birth: float = 0

@_override
def initialize(self, *parent_tags: str) -> None:
Expand Down Expand Up @@ -46,6 +48,7 @@ def transform(self, x: _ndarray) -> _ndarray:
@_override
def read(self) -> _ndarray | None:
ret, frame = self._video_capture.read()
self._birth = _time()
return _cvtColor(self.transform(frame) if self._resolution else frame, _COLOR_BGR2RGB).transpose(
2, 0, 1) if ret else None

Expand All @@ -55,6 +58,9 @@ def read_numpy(self) -> _ndarray | None:
def read_pil(self) -> _Image | None:
return None if (frame := self.read_numpy()) is None else _fromarray(frame.transpose(1, 2, 0))

def latency(self) -> float:
return _time() - self._birth

@_override
def close(self) -> None:
self._video_capture.release()
Expand Down
36 changes: 9 additions & 27 deletions leads_video/utils.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,11 @@
from base64 import b64encode as _b64encode, b64decode as _b64decode
from io import BytesIO as _BytesIO
from typing import Literal as _Literal
from leads import has_device as _has_device, get_device as _get_device
from leads_video.camera import Camera

from PIL.Image import Image as _Image, fromarray as _fromarray, open as _open, \
UnidentifiedImageError as _UnidentifiedImageError
from numpy import ndarray as _ndarray, array as _array


def encode_image(x: _ndarray | None, mode: _Literal["L", "RGB"] | None = None) -> _Image | None:
return None if x is None else _fromarray(x.transpose(1, 2, 0), mode)


def base64_encode(x: _ndarray | None, mode: _Literal["L", "RGB"] | None = None, quality: int = 25) -> str:
if not (img := encode_image(x, mode)):
return ""
img.save(buffer := _BytesIO(), "JPEG", quality=quality)
return _b64encode(buffer.getvalue()).decode()


def base64_decode_image(x_base64: str) -> _Image | None:
try:
return _open(_BytesIO(_b64decode(x_base64))) if x_base64 else None
except (ValueError, TypeError, _UnidentifiedImageError):
return


def base64_decode(x_base64: str) -> _ndarray | None:
return _array(image).transpose(2, 0, 1) if (image := base64_decode_image(x_base64)) else None
def get_camera(tag: str, required_type: type[Camera] = Camera) -> Camera | None:
if not _has_device(tag):
return None
cam = _get_device(tag)
if not isinstance(cam, required_type):
raise TypeError(f"Device \"{tag}\" is supposed to be a camera")
return cam