Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 18 additions & 5 deletions core/observers/subject/eye_subject.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,26 +81,39 @@ def _run_in_loop(self,
wifi_lock = Lock()

# Save an initial image.
initial_frame = eye_strategy.get_frame()
try:
initial_frame = eye_strategy.get_frame()
except RuntimeError as error:
logger.error(
"[EyeSubject] An error occurred while capturing the frame.")
raise RuntimeError from error
file_location = f"{self._image_path}/initial_frame.jpg"
cv2.imwrite(file_location, initial_frame)
logger.debug("[EyeSubject] Initial frame has been saved.")

while True:
# If WiFi subject would give rights to use camera,
# Check if any intruders detected.
logger.debug("[EyeSubject] WiFi Lock Status: %s", wifi_lock.locked())
logger.debug("[EyeSubject] WiFi Lock Status: %s",
wifi_lock.locked())
if not wifi_lock.locked():
result = eye_strategy.check_if_detected()
logger.debug("[EyeSubject] EyeStrategyResult: %s", str(result.result))
try:
result = eye_strategy.check_if_detected()
except RuntimeError as error:
logger.error(
"[EyeSubject] An error occurred while checking if detected.")
raise RuntimeError from error
logger.debug("[EyeSubject] EyeStrategyResult: %s",
str(result.result))

if result.result:
logger.debug("[EyeSubject] Changing state to DETECTED...")
self._save_image(result)
self.set_state(EyeStates.DETECTED)
sleep_interval = EyeSubject.SLEEP_INTERVAL_DETECTED
else:
logger.debug("[EyeSubject] Changing state to NOT_DETECTED...")
logger.debug(
"[EyeSubject] Changing state to NOT_DETECTED...")
self.set_state(EyeStates.NOT_DETECTED)
sleep_interval = EyeSubject.DEFAULT_SLEEP_INTERVAL

Expand Down
5 changes: 4 additions & 1 deletion core/strategies/eye/base_eye_strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,10 @@ def _detect_humans(self, frame: ndarray) -> DetectorResult:
def check_if_detected(self) -> EyeStrategyResult:
"""This method checks if there are any protectors around."""
#  Get the frame from the camera.
frame = self.get_frame()
try:
frame = self.get_frame()
except RuntimeError as error:
raise RuntimeError from error
# Detect humans in the frame.
result = self._detect_humans(frame)
return EyeStrategyResult(image=result.image, result=result.human_found)
25 changes: 16 additions & 9 deletions core/strategies/eye/picamera_strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,14 @@ class PiCameraStrategy(BaseEyeStrategy):
"""
The camera strategy for eye strategies.
"""

def __init__(self):
self._detector = None
self._picam2 = Picamera2()
still_configuration = self._picam2.create_still_configuration(
main={"format": 'XRGB8888'}
)
self._picam2.configure(still_configuration)

# Interface methods.
def set_detector(self, detector: BaseDetectorStrategy) -> None:
Expand All @@ -31,12 +37,13 @@ def get_detector(self) -> BaseDetectorStrategy:

def get_frame(self) -> numpy.ndarray:
"""This method returns the frame from the camera."""
# Internal attributes
picam2 = Picamera2()
still_configuration = picam2.create_still_configuration(main={"format": 'XRGB8888'})
picam2.configure(still_configuration)
picam2.start()
frame = picam2.capture_array()
picam2.close()
del picam2
return cv2.rotate(frame, cv2.ROTATE_90_COUNTERCLOCKWISE)
try:
self._picam2.start()
frame = self._picam2.capture_array()
self._picam2.stop()
except Exception as error:
logger.error(
"An error occurred while capturing the frame: %s", error)
raise RuntimeError from error
return cv2.rotate(frame,
cv2.ROTATE_90_COUNTERCLOCKWISE)
17 changes: 16 additions & 1 deletion servicer.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ def read_configurations() -> tuple[dict[str, Any], dict[str, Any]]:

# Definitations
MAIN_CONIGS, STRATEGY_CONFIGS = read_configurations()
SERVICER_BOT = AsyncTeleBot(token=STRATEGY_CONFIGS["telegram_strategy"]["bot_key"])
SERVICER_BOT = AsyncTeleBot(
token=STRATEGY_CONFIGS["telegram_strategy"]["bot_key"])
KNOWN_LOG_LOCATIONS: dict[str, str] = {
"hss.service": "/home/raspberry/.home-security-system/logs/hss.log"
}
Expand All @@ -57,6 +58,7 @@ async def info(message):
"/logs hss.service:N - provides the latest N logs.\n"
"/inhouse - provides if protectors are in house, and whose.\n"
"/imageshot - captures an image and sends.\n"
"/meminfo - provides the memory information.\n"
"/reboot - reboots the hardware.\n"
"/shell echo 'test'- provides a shell access to the hardware.\n"
"/info, /help, /hi - this help text.\n")
Expand Down Expand Up @@ -147,6 +149,19 @@ async def image_shot(message):
del frame, encoded_frame


@SERVICER_BOT.message_handler(commands=['meminfo'])
async def mem_info(message):
"""
This method is called when the /meminfo command is sent.
"""
process = await asyncio.create_subprocess_shell(
"cat /proc/meminfo",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE)
stdout, _ = await process.communicate()
await SERVICER_BOT.reply_to(message, f"Memory Information: \n{stdout.decode()}")


@SERVICER_BOT.message_handler(commands=['reboot'])
async def reboot(message):
"""
Expand Down