diff --git a/core/observers/subject/eye_subject.py b/core/observers/subject/eye_subject.py index 0d3169f..7f6b877 100644 --- a/core/observers/subject/eye_subject.py +++ b/core/observers/subject/eye_subject.py @@ -81,7 +81,12 @@ def _run_in_loop(self, wifi_lock = Lock() # Save an initial image. - initial_frame = eye_strategy.get_frame() + try: + initial_frame = eye_strategy.get_frame() + except RuntimeError as error: + logger.error( + "[EyeSubject] An error occurred while capturing the frame.") + raise RuntimeError from error file_location = f"{self._image_path}/initial_frame.jpg" cv2.imwrite(file_location, initial_frame) logger.debug("[EyeSubject] Initial frame has been saved.") @@ -89,10 +94,17 @@ def _run_in_loop(self, while True: # If WiFi subject would give rights to use camera, # Check if any intruders detected. - logger.debug("[EyeSubject] WiFi Lock Status: %s", wifi_lock.locked()) + logger.debug("[EyeSubject] WiFi Lock Status: %s", + wifi_lock.locked()) if not wifi_lock.locked(): - result = eye_strategy.check_if_detected() - logger.debug("[EyeSubject] EyeStrategyResult: %s", str(result.result)) + try: + result = eye_strategy.check_if_detected() + except RuntimeError as error: + logger.error( + "[EyeSubject] An error occurred while checking if detected.") + raise RuntimeError from error + logger.debug("[EyeSubject] EyeStrategyResult: %s", + str(result.result)) if result.result: logger.debug("[EyeSubject] Changing state to DETECTED...") @@ -100,7 +112,8 @@ def _run_in_loop(self, self.set_state(EyeStates.DETECTED) sleep_interval = EyeSubject.SLEEP_INTERVAL_DETECTED else: - logger.debug("[EyeSubject] Changing state to NOT_DETECTED...") + logger.debug( + "[EyeSubject] Changing state to NOT_DETECTED...") self.set_state(EyeStates.NOT_DETECTED) sleep_interval = EyeSubject.DEFAULT_SLEEP_INTERVAL diff --git a/core/strategies/eye/base_eye_strategy.py b/core/strategies/eye/base_eye_strategy.py index 1c27e44..2b19ea7 100644 --- a/core/strategies/eye/base_eye_strategy.py +++ b/core/strategies/eye/base_eye_strategy.py @@ -36,7 +36,10 @@ def _detect_humans(self, frame: ndarray) -> DetectorResult: def check_if_detected(self) -> EyeStrategyResult: """This method checks if there are any protectors around.""" #  Get the frame from the camera. - frame = self.get_frame() + try: + frame = self.get_frame() + except RuntimeError as error: + raise RuntimeError from error # Detect humans in the frame. result = self._detect_humans(frame) return EyeStrategyResult(image=result.image, result=result.human_found) diff --git a/core/strategies/eye/picamera_strategy.py b/core/strategies/eye/picamera_strategy.py index 4c008e1..05a863c 100644 --- a/core/strategies/eye/picamera_strategy.py +++ b/core/strategies/eye/picamera_strategy.py @@ -17,8 +17,14 @@ class PiCameraStrategy(BaseEyeStrategy): """ The camera strategy for eye strategies. """ + def __init__(self): self._detector = None + self._picam2 = Picamera2() + still_configuration = self._picam2.create_still_configuration( + main={"format": 'XRGB8888'} + ) + self._picam2.configure(still_configuration) # Interface methods. def set_detector(self, detector: BaseDetectorStrategy) -> None: @@ -31,12 +37,13 @@ def get_detector(self) -> BaseDetectorStrategy: def get_frame(self) -> numpy.ndarray: """This method returns the frame from the camera.""" - # Internal attributes - picam2 = Picamera2() - still_configuration = picam2.create_still_configuration(main={"format": 'XRGB8888'}) - picam2.configure(still_configuration) - picam2.start() - frame = picam2.capture_array() - picam2.close() - del picam2 - return cv2.rotate(frame, cv2.ROTATE_90_COUNTERCLOCKWISE) + try: + self._picam2.start() + frame = self._picam2.capture_array() + self._picam2.stop() + except Exception as error: + logger.error( + "An error occurred while capturing the frame: %s", error) + raise RuntimeError from error + return cv2.rotate(frame, + cv2.ROTATE_90_COUNTERCLOCKWISE) diff --git a/servicer.py b/servicer.py index ca84d6e..535be31 100644 --- a/servicer.py +++ b/servicer.py @@ -37,7 +37,8 @@ def read_configurations() -> tuple[dict[str, Any], dict[str, Any]]: # Definitations MAIN_CONIGS, STRATEGY_CONFIGS = read_configurations() -SERVICER_BOT = AsyncTeleBot(token=STRATEGY_CONFIGS["telegram_strategy"]["bot_key"]) +SERVICER_BOT = AsyncTeleBot( + token=STRATEGY_CONFIGS["telegram_strategy"]["bot_key"]) KNOWN_LOG_LOCATIONS: dict[str, str] = { "hss.service": "/home/raspberry/.home-security-system/logs/hss.log" } @@ -57,6 +58,7 @@ async def info(message): "/logs hss.service:N - provides the latest N logs.\n" "/inhouse - provides if protectors are in house, and whose.\n" "/imageshot - captures an image and sends.\n" + "/meminfo - provides the memory information.\n" "/reboot - reboots the hardware.\n" "/shell echo 'test'- provides a shell access to the hardware.\n" "/info, /help, /hi - this help text.\n") @@ -147,6 +149,19 @@ async def image_shot(message): del frame, encoded_frame +@SERVICER_BOT.message_handler(commands=['meminfo']) +async def mem_info(message): + """ + This method is called when the /meminfo command is sent. + """ + process = await asyncio.create_subprocess_shell( + "cat /proc/meminfo", + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE) + stdout, _ = await process.communicate() + await SERVICER_BOT.reply_to(message, f"Memory Information: \n{stdout.decode()}") + + @SERVICER_BOT.message_handler(commands=['reboot']) async def reboot(message): """