From bc837a798842fa2748c78b3777daa97eed395a7f Mon Sep 17 00:00:00 2001 From: Timo Weiss Date: Fri, 21 Jun 2024 15:30:53 +0200 Subject: [PATCH] switched to tcp socket --- agent/templates/dashboard.html | 25 ++++++++---- agent/video_agent.py | 58 +++++++++++++------------- agent/web_server.py | 75 ++++++++++++++++++++++++++-------- 3 files changed, 101 insertions(+), 57 deletions(-) diff --git a/agent/templates/dashboard.html b/agent/templates/dashboard.html index a35848b..708bf78 100644 --- a/agent/templates/dashboard.html +++ b/agent/templates/dashboard.html @@ -2,13 +2,20 @@ {% block content %}

Dashboard

-{% if stream_url %} - {% include 'partials/video_stream.html' %} -{% else %} -
-
-

No video stream available

-
-
-{% endif %} + + + {% endblock content %} \ No newline at end of file diff --git a/agent/video_agent.py b/agent/video_agent.py index 836ad57..1c2328f 100644 --- a/agent/video_agent.py +++ b/agent/video_agent.py @@ -3,13 +3,12 @@ # pylint: disable=import-error, broad-except import os import threading -import time import socket import logger import base from picamera2 import Picamera2 -from picamera2.encoders import H264Encoder, Quality -from picamera2.outputs import FileOutput +from picamera2.encoders import H264Encoder +from picamera2.outputs import CircularOutput # Initialize logger LOGGER = logger.get_module_logger(__name__) @@ -26,20 +25,11 @@ def __init__(self, mqtt_client): self._record_time = int(os.environ.get("VIDEO_RECORDING_DURATION")) self._picamera = Picamera2() self._encoder = H264Encoder(bitrate=int(os.environ.get("VIDEO_BITRATE"))) - - recording_folder = os.path.join( - os.path.dirname(os.path.abspath(__file__)), - "recordings", - ) - - if not os.path.exists(recording_folder): - os.makedirs(recording_folder) - LOGGER.info("Created recording folder: %s", recording_folder) - - self._output = FileOutput(recording_folder + "/video_recording.h264") + self._output = CircularOutput(buffersize=10 * 1024 * 1024) self._stream_thread = None self._sock = None + self._conn = None def run(self): """Subscribe to the mqtt topic and start listening for video messages.""" @@ -57,16 +47,17 @@ def run(self): # pylint: disable=unused-argument def _on_video_message(self, client, userdata, msg): LOGGER.info("Mqtt message received: %s", msg.payload) - self._toggle_video(msg.payload.decode("utf-8")) + payload = msg.payload.decode("utf-8") + self._toggle_video(payload) - def _toggle_video(self, payload): + def _toggle_video(self, command): """Toggle the video stream on or off.""" - if payload == "on": + if command == "on": self._start_video_stream() - elif payload == "off": + elif command == "off": self._stop_video_stream() else: - LOGGER.error("Unknown video command: %s", payload) + LOGGER.error("Unknown video command: %s", command) def _start_video_stream(self): """Start the video stream.""" @@ -88,15 +79,19 @@ def _start_video_stream(self): controls={"FrameRate": int(os.environ.get("VIDEO_FPS"))}, ) ) - self._picamera.start_recording( - self._encoder, self._output, quality=Quality.HIGH - ) - self._streaming = True + self._encoder.output = self._output + self._picamera.start_recording(self._encoder) - self._sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self._sock.bind(("0.0.0.0", self._port)) + self._sock.listen(1) + LOGGER.info(f"Waiting for a connection on port {self._port}...") + self._conn, addr = self._sock.accept() + LOGGER.info(f"Connection accepted from {addr}") + + self._streaming = True self._stream_thread = threading.Thread(target=self._stream_video) self._stream_thread.start() except Exception as e: @@ -112,21 +107,24 @@ def _stop_video_stream(self): self._streaming = False if self._stream_thread: self._stream_thread.join() + if self._conn: + self._conn.close() if self._sock: self._sock.close() except Exception as e: LOGGER.error("Failed to stop video stream: %s", e) def _stream_video(self): - """Stream video frames to the UDP socket.""" + """Stream video frames to the TCP client.""" try: - conn, addr = self._sock.accept() - with conn.makefile("wb") as stream: - while self._streaming: - self._picamera.start_recording(self._encoder, FileOutput(stream)) - time.sleep(0.01) + while self._streaming: + data = self._output.read() + if data: + self._conn.sendall(data) except Exception as e: LOGGER.error("Error streaming video: %s", e) + finally: + self._conn.close() def stop(self): """Stop the agent.""" diff --git a/agent/web_server.py b/agent/web_server.py index afc5517..17e6300 100644 --- a/agent/web_server.py +++ b/agent/web_server.py @@ -5,9 +5,11 @@ import dotenv from flask import Flask, render_template, request, redirect, url_for from flask_assets import Environment, Bundle -import waitress +from flask_socketio import SocketIO, emit import base import logger +import threading +import socket # Load environment variables dotenv.load_dotenv() @@ -27,6 +29,7 @@ def __init__(self, mqtt_client) -> None: self._port = int(os.environ.get("WEBSERVER_PORT")) self.app = Flask(__name__) + self.socketio = SocketIO(self.app) # Initialize assets self._assets = Environment(self.app) @@ -39,10 +42,13 @@ def __init__(self, mqtt_client) -> None: self.__css.build() self.__js.build() - # Register routes + # Register routes and socket events self._setup_routes() + self._setup_socket_events() self._web_server = None + self._video_stream_thread = None + self._streaming = False def _setup_routes(self) -> None: """Setup routes for the webserver.""" @@ -51,8 +57,7 @@ def _setup_routes(self) -> None: def dashboard(): LOGGER.info("Dashboard requested by %s", request.remote_addr) # pylint: disable=line-too-long - stream_url = f"http://{os.popen('hostname -I').read().split()[0]}:{os.environ.get('VIDEO_STREAM_PORT')}/video_stream" - return render_template("dashboard.html", stream_url=stream_url) + return render_template("dashboard.html") @self.app.route("/settings", methods=["GET", "POST"]) def settings(): @@ -79,6 +84,17 @@ def logs(): return render_template("partials/log_view.html", logs=logs) return render_template("logs.html", logs=logs, selected_lines=lines) + def _setup_socket_events(self): + @self.socketio.on("connect") + def handle_connect(): + LOGGER.info("Client connected") + self._start_video_stream() + + @self.socketio.on("disconnect") + def handle_disconnect(): + LOGGER.info("Client disconnected") + self._stop_video_stream() + def run(self) -> None: """Run the webserver.""" self._mqtt.subscribe(self._location_topic) @@ -87,17 +103,52 @@ def run(self) -> None: self._on_doorbell_message, ) LOGGER.info("Webserver subscribed to MQTT topic: %s", self._location_topic) - self._web_server = waitress.serve(self.app, host="0.0.0.0", port=self._port) + self._web_server = threading.Thread( + target=lambda: self.socketio.run(self.app, host="0.0.0.0", port=self._port) + ) + self._web_server.start() LOGGER.info("Webserver started on port %i.", self._port) def stop(self) -> None: """Stop the webserver.""" - self._web_server.close() + self._stop_video_stream() + self._web_server.join() self._mqtt.unsubscribe(self._location_topic) LOGGER.info("Webserver unsubscribed from MQTT topic: %s", self._location_topic) self._mqtt.message_callback_remove(self._location_topic) LOGGER.info("Webserver stopped.") + def _start_video_stream(self): + """Start video streaming.""" + if not self._streaming: + LOGGER.info("Starting video stream") + self._streaming = True + self._video_stream_thread = threading.Thread(target=self._stream_video) + self._video_stream_thread.start() + + def _stop_video_stream(self): + """Stop video streaming.""" + if self._streaming: + LOGGER.info("Stopping video stream") + self._streaming = False + if self._video_stream_thread: + self._video_stream_thread.join() + + def _stream_video(self): + """Stream video to the WebSocket.""" + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.connect(("127.0.0.1", int(os.environ.get("VIDEO_STREAM_PORT")))) + try: + while self._streaming: + data = sock.recv(4096) + if not data: + break + self.socketio.emit("video_frame", data, broadcast=True) + except Exception as e: + LOGGER.error("Error streaming video: %s", e) + finally: + sock.close() + # pylint: disable=unused-argument def _on_doorbell_message(self, client, userdata, msg): """Process the doorbell message. @@ -195,15 +246,3 @@ def tail(file_path, lines=25): with open(file_path, "r", encoding="utf-8") as file: lines = file.readlines()[-lines:] return [line.strip() for line in lines] - - -if __name__ == "__main__": - import mqtt_agent - - web_server = WebServer( - mqtt_agent.MqttAgent( - f"{os.environ.get('AGENT_LOCATION')}_{os.environ.get('AGENT_TYPE')}", - [], - ) - ) - web_server.run()