Skip to content

Commit

Permalink
changed to socket udp stream
Browse files Browse the repository at this point in the history
  • Loading branch information
exddc committed Jun 21, 2024
1 parent 2cc2b12 commit ac83357
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 70 deletions.
2 changes: 1 addition & 1 deletion agent/VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.2.0
0.2.1
93 changes: 24 additions & 69 deletions agent/video_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,73 +4,17 @@
import os
import threading
import time
import io
from http.server import BaseHTTPRequestHandler, HTTPServer
import socket
import logger
import base
from picamera2 import Picamera2
from picamera2.encoders import H264Encoder, Quality
from picamera2.outputs import FileOutput
from PIL import Image

# Initialize logger
LOGGER = logger.get_module_logger(__name__)


class VideoStreamHandler(BaseHTTPRequestHandler):
"""HTTP request handler for video streaming."""

# pylint: disable=invalid-name
def do_GET(self):
"""Handle GET requests."""
if self.path == "/video_stream":
LOGGER.info("Received video stream request")
self.send_response(200)
self.send_header(
"Content-type", "multipart/x-mixed-replace; boundary=frame"
)
self.end_headers()
while True:
try:
frame = self.server.video_stream.get_frame()
if frame is not None:
img = Image.fromarray(frame).convert("RGB")
with io.BytesIO() as output:
img.save(output, format="JPEG")
frame_bytes = output.getvalue()
self.wfile.write(b"--frame\r\n")
self.send_header("Content-Type", "image/jpeg")
self.send_header("Content-Length", len(frame_bytes))
self.end_headers()
self.wfile.write(frame_bytes)
self.wfile.write(b"\r\n")
time.sleep(0.001)
except Exception as e:
LOGGER.error("Error streaming video: %s", e)
break


class VideoStreamServer(threading.Thread):
"""HTTP server for video streaming."""

def __init__(self, port, video_stream):
super().__init__()
self.port = port
self.video_stream = video_stream
self.server = HTTPServer(("0.0.0.0", self.port), VideoStreamHandler)
self.server.video_stream = video_stream

def run(self):
LOGGER.info("Starting HTTP server on port %d", self.port)
self.server.serve_forever()

def stop(self):
"""Stop the HTTP server."""
LOGGER.info("Stopping HTTP server")
self.server.shutdown()
self.server.server_close()


class VideoAgent(base.BaseAgent):
"""Video Stream Agent class."""

Expand All @@ -81,7 +25,6 @@ def __init__(self, mqtt_client):
self._port = int(os.environ.get("VIDEO_STREAM_PORT"))
self._record_time = int(os.environ.get("VIDEO_RECORDING_DURATION"))
self._picamera = Picamera2()
self._video_server = None
self._encoder = H264Encoder(bitrate=int(os.environ.get("VIDEO_BITRATE")))

recording_folder = os.path.join(
Expand All @@ -95,6 +38,9 @@ def __init__(self, mqtt_client):

self._output = FileOutput(recording_folder + "/video_recording.h264")

self._stream_thread = None
self._sock = None

def run(self):
"""Subscribe to the mqtt topic and start listening for video messages."""
self._mqtt.subscribe(f"video/{self._agent_location}")
Expand Down Expand Up @@ -126,6 +72,7 @@ def _start_video_stream(self):
"""Start the video stream."""
if self._streaming:
LOGGER.warning("Video stream already running.")
return

LOGGER.info("Starting video stream")
try:
Expand All @@ -145,8 +92,13 @@ def _start_video_stream(self):
self._encoder, self._output, quality=Quality.HIGH
)
self._streaming = True
self._video_server = VideoStreamServer(self._port, self)
self._video_server.start()

self._sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self._sock.bind(("0.0.0.0", self._port))

self._stream_thread = threading.Thread(target=self._stream_video)
self._stream_thread.start()
except Exception as e:
LOGGER.error("Failed to start video stream: %s", e)
self._streaming = False
Expand All @@ -158,20 +110,23 @@ def _stop_video_stream(self):
try:
self._picamera.stop_recording()
self._streaming = False
if self._video_server:
self._video_server.stop()
self._video_server.join()
if self._stream_thread:
self._stream_thread.join()
if self._sock:
self._sock.close()
except Exception as e:
LOGGER.error("Failed to stop video stream: %s", e)

def get_frame(self):
"""Get the current frame from the video stream."""
def _stream_video(self):
"""Stream video frames to the UDP socket."""
try:
frame = self._picamera.capture_array()
return frame
conn, addr = self._sock.accept()
with conn.makefile("wb") as stream:
while self._streaming:
self._picamera.start_recording(self._encoder, FileOutput(stream))
time.sleep(0.01)
except Exception as e:
LOGGER.error("Failed to capture frame: %s", e)
return None
LOGGER.error("Error streaming video: %s", e)

def stop(self):
"""Stop the agent."""
Expand Down

0 comments on commit ac83357

Please sign in to comment.