From 4317dade25453ea1b17c9c2fcae256ff31026333 Mon Sep 17 00:00:00 2001 From: ATATC Date: Sun, 7 Jul 2024 15:04:22 +0800 Subject: [PATCH 1/3] Put the Jarvis processor of the in a separate thread. (#271) --- leads_vec_rc/cli.py | 22 ++++++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/leads_vec_rc/cli.py b/leads_vec_rc/cli.py index f61fd36d..89958267 100644 --- a/leads_vec_rc/cli.py +++ b/leads_vec_rc/cli.py @@ -3,6 +3,7 @@ from json import loads, JSONDecodeError from os import makedirs from os.path import abspath, exists +from threading import Thread from time import sleep from typing import Any, override @@ -16,16 +17,25 @@ try: from leads_vec_rc.jarvis import process + + + def processor() -> None: + while True: + callback.previous_processed_data = process(data_record[-1]) + + + processor_thread: Thread = Thread(name="Jarvis Processor", target=processor, daemon=True) except ImportError: - def process(d: dict[str, Any]) -> dict[str, Any]: - return d + process: None = None + processor_thread: None = None config: Config = require_config() if not exists(config.data_dir): L.debug(f"Data directory not found. Creating \"{abspath(config.data_dir)}\"...") makedirs(config.data_dir) -data_record: DataPersistence[DataContainer] = DataPersistence(1, compressor=lambda o, s: o[-s:]) +data_record: DataPersistence[dict[str, Any]] = DataPersistence(2, compressor=lambda o, s: o[-s:]) +data_record.append(DataContainer().to_dict()) time_stamp_record: DataPersistence[int] = DataPersistence(2000) speed_record: DataPersistence[float] = DataPersistence(2000) acceleration_record: DataPersistence[float] = DataPersistence(2000) @@ -44,6 +54,7 @@ class CommCallback(Callback): def __init__(self) -> None: super().__init__() self.client: Client = start_client(config.comm_addr, create_client(config.comm_port, self), True) + self.previous_processed_data: dict[str, Any] | None = None @override def on_connect(self, service: Service, connection: Connection) -> None: @@ -84,6 +95,9 @@ def on_disconnect(self, service: Service, connection: ConnectionBase) -> None: callback: CommCallback = CommCallback() +if processor_thread: + processor_thread.start() + app: FastAPI = FastAPI(title="LEADS VeC Remote Analyst") app.add_middleware( @@ -102,7 +116,7 @@ async def index() -> str: @app.get("/current") async def current() -> dict[str, Any]: - return process(data_record[-1] if len(data_record) > 0 else DataContainer().to_dict()) + return callback.previous_processed_data if callback.previous_processed_data else data_record[-1] @app.get("/time_stamp") From a4ecb12dd5117e2a0482c96ffbcb9159fd00ec56 Mon Sep 17 00:00:00 2001 From: ATATC Date: Sun, 7 Jul 2024 15:08:57 +0800 Subject: [PATCH 2/3] Removed the unnecessary data persistence. (#271) --- leads_vec_rc/cli.py | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/leads_vec_rc/cli.py b/leads_vec_rc/cli.py index 89958267..93f47e5f 100644 --- a/leads_vec_rc/cli.py +++ b/leads_vec_rc/cli.py @@ -21,7 +21,7 @@ def processor() -> None: while True: - callback.previous_processed_data = process(data_record[-1]) + callback.processed_data = process(callback.current_data) processor_thread: Thread = Thread(name="Jarvis Processor", target=processor, daemon=True) @@ -34,8 +34,6 @@ def processor() -> None: L.debug(f"Data directory not found. Creating \"{abspath(config.data_dir)}\"...") makedirs(config.data_dir) -data_record: DataPersistence[dict[str, Any]] = DataPersistence(2, compressor=lambda o, s: o[-s:]) -data_record.append(DataContainer().to_dict()) time_stamp_record: DataPersistence[int] = DataPersistence(2000) speed_record: DataPersistence[float] = DataPersistence(2000) acceleration_record: DataPersistence[float] = DataPersistence(2000) @@ -54,7 +52,8 @@ class CommCallback(Callback): def __init__(self) -> None: super().__init__() self.client: Client = start_client(config.comm_addr, create_client(config.comm_port, self), True) - self.previous_processed_data: dict[str, Any] | None = None + self.current_data: dict[str, Any] = DataContainer().to_dict() + self.processed_data: dict[str, Any] | None = None @override def on_connect(self, service: Service, connection: Connection) -> None: @@ -72,8 +71,7 @@ def on_fail(self, service: Service, error: Exception) -> None: def on_receive(self, service: Service, msg: bytes) -> None: self.super(service=service, msg=msg) try: - d = loads(msg.decode()) - data_record.append(d) + self.current_data = d = loads(msg.decode()) acceleration_record.append(Vector(d["forward_acceleration"], d["lateral_acceleration"])) gps_record.append(Vector(d["latitude"], d["longitude"])) if config.save_data: @@ -116,7 +114,7 @@ async def index() -> str: @app.get("/current") async def current() -> dict[str, Any]: - return callback.previous_processed_data if callback.previous_processed_data else data_record[-1] + return callback.processed_data if callback.processed_data else callback.current_data @app.get("/time_stamp") From a34b7b1daf256ba350cd79a51b08acd1162c2a05 Mon Sep 17 00:00:00 2001 From: ATATC Date: Sun, 7 Jul 2024 15:16:06 +0800 Subject: [PATCH 3/3] Added accepted class person. (#271) --- leads_vec_rc/jarvis.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/leads_vec_rc/jarvis.py b/leads_vec_rc/jarvis.py index 73c14564..5971993e 100644 --- a/leads_vec_rc/jarvis.py +++ b/leads_vec_rc/jarvis.py @@ -11,5 +11,5 @@ def process(d: dict[str, _Any]) -> dict[str, _Any]: for key in "front_view_base64", "left_view_base64", "right_view_base64", "rear_view_base64": if key in d.keys(): img = base64_decode(d[key]) - d[key] = "" if img is None else base64_encode(_model.mark(img, filter_type=("car",))) + d[key] = "" if img is None else base64_encode(_model.mark(img, None, ("car", "person"))) return d