Skip to content

Commit

Permalink
switched to tcp socket
Browse files Browse the repository at this point in the history
  • Loading branch information
exddc committed Jun 21, 2024
1 parent ac83357 commit bc837a7
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 57 deletions.
25 changes: 16 additions & 9 deletions agent/templates/dashboard.html
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,20 @@

{% block content %}
<h1 class="font-bold text-lg mx-auto w-fit">Dashboard</h1>
{% if stream_url %}
{% include 'partials/video_stream.html' %}
{% else %}
<div class="max-w-7xl mx-auto rounded-lg bg-gray-100 border border-gray-300 p-1 shadow-lg">
<div class="flex items-center justify-center h-96">
<p class="text-lg text-gray-500">No video stream available</p>
</div>
</div>
{% endif %}
<video id="video" width="640" height="480" controls autoplay></video>
<script src="https://cdnjs.cloudflare.com/ajax/libs/socket.io/3.0.3/socket.io.min.js"></script>
<script>
const video = document.getElementById('video');
const mediaSource = new MediaSource();
video.src = URL.createObjectURL(mediaSource);

mediaSource.addEventListener('sourceopen', () => {
const sourceBuffer = mediaSource.addSourceBuffer('video/mp4; codecs="avc1.42E01E, mp4a.40.2"');
const socket = io();

socket.on('video_frame', (data) => {
sourceBuffer.appendBuffer(new Uint8Array(data));
});
});
</script>
{% endblock content %}
58 changes: 28 additions & 30 deletions agent/video_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,12 @@
# pylint: disable=import-error, broad-except
import os
import threading
import time
import socket
import logger
import base
from picamera2 import Picamera2
from picamera2.encoders import H264Encoder, Quality
from picamera2.outputs import FileOutput
from picamera2.encoders import H264Encoder
from picamera2.outputs import CircularOutput

# Initialize logger
LOGGER = logger.get_module_logger(__name__)
Expand All @@ -26,20 +25,11 @@ def __init__(self, mqtt_client):
self._record_time = int(os.environ.get("VIDEO_RECORDING_DURATION"))
self._picamera = Picamera2()
self._encoder = H264Encoder(bitrate=int(os.environ.get("VIDEO_BITRATE")))

recording_folder = os.path.join(
os.path.dirname(os.path.abspath(__file__)),
"recordings",
)

if not os.path.exists(recording_folder):
os.makedirs(recording_folder)
LOGGER.info("Created recording folder: %s", recording_folder)

self._output = FileOutput(recording_folder + "/video_recording.h264")
self._output = CircularOutput(buffersize=10 * 1024 * 1024)

self._stream_thread = None
self._sock = None
self._conn = None

def run(self):
"""Subscribe to the mqtt topic and start listening for video messages."""
Expand All @@ -57,16 +47,17 @@ def run(self):
# pylint: disable=unused-argument
def _on_video_message(self, client, userdata, msg):
LOGGER.info("Mqtt message received: %s", msg.payload)
self._toggle_video(msg.payload.decode("utf-8"))
payload = msg.payload.decode("utf-8")
self._toggle_video(payload)

def _toggle_video(self, payload):
def _toggle_video(self, command):
"""Toggle the video stream on or off."""
if payload == "on":
if command == "on":
self._start_video_stream()
elif payload == "off":
elif command == "off":
self._stop_video_stream()
else:
LOGGER.error("Unknown video command: %s", payload)
LOGGER.error("Unknown video command: %s", command)

def _start_video_stream(self):
"""Start the video stream."""
Expand All @@ -88,15 +79,19 @@ def _start_video_stream(self):
controls={"FrameRate": int(os.environ.get("VIDEO_FPS"))},
)
)
self._picamera.start_recording(
self._encoder, self._output, quality=Quality.HIGH
)
self._streaming = True
self._encoder.output = self._output
self._picamera.start_recording(self._encoder)

self._sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self._sock.bind(("0.0.0.0", self._port))
self._sock.listen(1)
LOGGER.info(f"Waiting for a connection on port {self._port}...")

self._conn, addr = self._sock.accept()
LOGGER.info(f"Connection accepted from {addr}")

self._streaming = True
self._stream_thread = threading.Thread(target=self._stream_video)
self._stream_thread.start()
except Exception as e:
Expand All @@ -112,21 +107,24 @@ def _stop_video_stream(self):
self._streaming = False
if self._stream_thread:
self._stream_thread.join()
if self._conn:
self._conn.close()
if self._sock:
self._sock.close()
except Exception as e:
LOGGER.error("Failed to stop video stream: %s", e)

def _stream_video(self):
"""Stream video frames to the UDP socket."""
"""Stream video frames to the TCP client."""
try:
conn, addr = self._sock.accept()
with conn.makefile("wb") as stream:
while self._streaming:
self._picamera.start_recording(self._encoder, FileOutput(stream))
time.sleep(0.01)
while self._streaming:
data = self._output.read()
if data:
self._conn.sendall(data)
except Exception as e:
LOGGER.error("Error streaming video: %s", e)
finally:
self._conn.close()

def stop(self):
"""Stop the agent."""
Expand Down
75 changes: 57 additions & 18 deletions agent/web_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@
import dotenv
from flask import Flask, render_template, request, redirect, url_for
from flask_assets import Environment, Bundle
import waitress
from flask_socketio import SocketIO, emit
import base
import logger
import threading
import socket

# Load environment variables
dotenv.load_dotenv()
Expand All @@ -27,6 +29,7 @@ def __init__(self, mqtt_client) -> None:

self._port = int(os.environ.get("WEBSERVER_PORT"))
self.app = Flask(__name__)
self.socketio = SocketIO(self.app)

# Initialize assets
self._assets = Environment(self.app)
Expand All @@ -39,10 +42,13 @@ def __init__(self, mqtt_client) -> None:
self.__css.build()
self.__js.build()

# Register routes
# Register routes and socket events
self._setup_routes()
self._setup_socket_events()

self._web_server = None
self._video_stream_thread = None
self._streaming = False

def _setup_routes(self) -> None:
"""Setup routes for the webserver."""
Expand All @@ -51,8 +57,7 @@ def _setup_routes(self) -> None:
def dashboard():
LOGGER.info("Dashboard requested by %s", request.remote_addr)
# pylint: disable=line-too-long
stream_url = f"http://{os.popen('hostname -I').read().split()[0]}:{os.environ.get('VIDEO_STREAM_PORT')}/video_stream"
return render_template("dashboard.html", stream_url=stream_url)
return render_template("dashboard.html")

@self.app.route("/settings", methods=["GET", "POST"])
def settings():
Expand All @@ -79,6 +84,17 @@ def logs():
return render_template("partials/log_view.html", logs=logs)
return render_template("logs.html", logs=logs, selected_lines=lines)

def _setup_socket_events(self):
@self.socketio.on("connect")
def handle_connect():
LOGGER.info("Client connected")
self._start_video_stream()

@self.socketio.on("disconnect")
def handle_disconnect():
LOGGER.info("Client disconnected")
self._stop_video_stream()

def run(self) -> None:
"""Run the webserver."""
self._mqtt.subscribe(self._location_topic)
Expand All @@ -87,17 +103,52 @@ def run(self) -> None:
self._on_doorbell_message,
)
LOGGER.info("Webserver subscribed to MQTT topic: %s", self._location_topic)
self._web_server = waitress.serve(self.app, host="0.0.0.0", port=self._port)
self._web_server = threading.Thread(
target=lambda: self.socketio.run(self.app, host="0.0.0.0", port=self._port)
)
self._web_server.start()
LOGGER.info("Webserver started on port %i.", self._port)

def stop(self) -> None:
"""Stop the webserver."""
self._web_server.close()
self._stop_video_stream()
self._web_server.join()
self._mqtt.unsubscribe(self._location_topic)
LOGGER.info("Webserver unsubscribed from MQTT topic: %s", self._location_topic)
self._mqtt.message_callback_remove(self._location_topic)
LOGGER.info("Webserver stopped.")

def _start_video_stream(self):
"""Start video streaming."""
if not self._streaming:
LOGGER.info("Starting video stream")
self._streaming = True
self._video_stream_thread = threading.Thread(target=self._stream_video)
self._video_stream_thread.start()

def _stop_video_stream(self):
"""Stop video streaming."""
if self._streaming:
LOGGER.info("Stopping video stream")
self._streaming = False
if self._video_stream_thread:
self._video_stream_thread.join()

def _stream_video(self):
"""Stream video to the WebSocket."""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(("127.0.0.1", int(os.environ.get("VIDEO_STREAM_PORT"))))
try:
while self._streaming:
data = sock.recv(4096)
if not data:
break
self.socketio.emit("video_frame", data, broadcast=True)
except Exception as e:
LOGGER.error("Error streaming video: %s", e)
finally:
sock.close()

# pylint: disable=unused-argument
def _on_doorbell_message(self, client, userdata, msg):
"""Process the doorbell message.
Expand Down Expand Up @@ -195,15 +246,3 @@ def tail(file_path, lines=25):
with open(file_path, "r", encoding="utf-8") as file:
lines = file.readlines()[-lines:]
return [line.strip() for line in lines]


if __name__ == "__main__":
import mqtt_agent

web_server = WebServer(
mqtt_agent.MqttAgent(
f"{os.environ.get('AGENT_LOCATION')}_{os.environ.get('AGENT_TYPE')}",
[],
)
)
web_server.run()

0 comments on commit bc837a7

Please sign in to comment.