Skip to content

Commit

Permalink
Clean up a bit
Browse files Browse the repository at this point in the history
  • Loading branch information
egeakman committed Apr 5, 2024
1 parent 568ae1d commit 1342a95
Showing 1 changed file with 85 additions and 106 deletions.
191 changes: 85 additions & 106 deletions mjpeg_streamer/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,41 +14,51 @@ def __init__(
name: str,
fps: int = 30,
) -> None:
if type(self) is StreamBase:
raise TypeError(
"StreamBase is an abstract class and cannot be instantiated"
)
self.name = name.lower().casefold().replace(" ", "_")
self.name = name.casefold().replace(" ", "_")
self.fps = fps
self._frame: np.ndarray = np.zeros((320, 240, 1), dtype=np.uint8)
self._lock: asyncio.Lock = asyncio.Lock()
self._frame_buffer: Deque[int,] = deque(maxlen=fps)
self._frames_buffer: Deque[int] = deque(maxlen=fps)
self._bandwidth_last_modified_time: float = time.time()
self._deque_background_task: Optional[asyncio.Task] = None
self._active_viewers: Set[str,] = set()
self._active_viewers: Set[str] = set()
self._bandwidth_background_task: Optional[asyncio.Task] = None

def __new__(self, *args, **kwargs):
raise TypeError("Cannot instantiate an abstract class")

async def _ensure_background_tasks(self) -> None:
if (
self._bandwidth_background_task is None
or self._bandwidth_background_task.done()
):
self._bandwidth_background_task = asyncio.create_task(
self.__clear_bandwidth()
)

async def __clear_deque(self) -> None:
async def __clear_bandwidth(self) -> None:
while True:
await asyncio.sleep(1 / self.fps)
await asyncio.sleep(1.0 / self.fps)
if (
len(self._frame_buffer) > 0
len(self._frames_buffer) > 0
and time.time() - self._bandwidth_last_modified_time >= 1
):
self._frame_buffer.clear()
self._frames_buffer.clear()

async def _add_viewer(self, viewer_token: Optional[str] = None) -> str:
viewer_token = viewer_token or str(uuid.uuid4())
self._active_viewers.add(viewer_token)
async with self._lock:
self._active_viewers.add(viewer_token)
return viewer_token

async def _remove_viewer(self, viewer_token: str) -> None:
self._active_viewers.discard(viewer_token)
async with self._lock:
self._active_viewers.discard(viewer_token)

async def _ensure_background_tasks(self) -> None:
if self._deque_background_task is None or self._deque_background_task.done():
self._deque_background_task = asyncio.create_task(self.__clear_deque())
async def _process_current_frame(self) -> np.ndarray:
self._last_processed_frame = self._frame
return self._frame

def _check_encoding(self, frame: np.ndarray) -> str:
async def __check_encoding(self, frame: np.ndarray) -> str:
if isinstance(frame, np.ndarray) and frame.ndim == 1 and frame.size > 2:
# Check JPEG header (0xFFD8) and footer (0xFFD9)
if (
Expand All @@ -63,6 +73,20 @@ def _check_encoding(self, frame: np.ndarray) -> str:
return "multi-dim"
return "unknown"

async def _resize_and_encode_frame(
self, frame: np.ndarray, size: Tuple[int, int], quality: int
) -> np.ndarray:
resized_frame = cv2.resize(frame, size)
if not await self.__check_encoding(resized_frame) == "jpeg":
val, encoded_frame = cv2.imencode(
".jpg", resized_frame, [cv2.IMWRITE_JPEG_QUALITY, quality]
)
if not val:
raise ValueError(
f"Error encoding frame. Format/shape: {await self.__check_encoding(resized_frame)}"
)
return encoded_frame

def settings(self) -> None:
for key, value in self.__dict__.items():
if key.startswith("_"):
Expand All @@ -76,32 +100,26 @@ def active_viewers(self) -> int:
return len(self._active_viewers)

def get_bandwidth(self) -> float:
return sum(self._frame_buffer)
return sum(self._frames_buffer)

def set_fps(self, fps: int) -> None:
self.fps = fps
self._frames_buffer = deque(maxlen=fps)

# Method for delivering the frame to the StreamHandler
async def _get_frame(self) -> np.ndarray:
# A little hacky, if you have a better way, please let me know
await self._ensure_background_tasks()
# Checking the encoding here instead of set_frame
# to avoid continous polling
if self._check_encoding(self._frame) != "jpeg":
raise ValueError(
"Input is not an encoded JPEG frame. Use OpenCV's imencode method to encode the frame to JPEG."
)
self._frame_buffer.append(len(self._frame.tobytes()))
self._bandwidth_last_modified_time = time.time()
await self._ensure_background_tasks() # A little hacky
if time.time() - self._bandwidth_last_modified_time <= 1.0 / self.fps:
return self._last_processed_frame
async with self._lock:
return self._frame
self._frames_buffer.append(len(self._frame.tobytes()))
self._bandwidth_last_modified_time = time.time()
return await self._process_current_frame()

def set_frame(self, frame: np.ndarray) -> None:
self._frame = frame

# Not very useful, but it's here for the sake of completeness
def get_frame(self) -> np.ndarray:
return self._frame

CustomStream = StreamBase


class Stream(StreamBase):
Expand All @@ -112,59 +130,26 @@ def __init__(
size: Optional[Tuple[int, int]] = None,
quality: int = 50,
) -> None:
super().__init__(name, fps)
self.size = size
self.quality = max(1, min(quality, 100))
self._last_processed_frame: np.ndarray = np.zeros((320, 240, 1), dtype=np.uint8)
super().__init__(name, fps)

async def __process_current_frame(self) -> np.ndarray:
frame = self._frame
if not self._check_encoding(frame) == "jpeg":
frame = cv2.resize(frame, self.size or (frame.shape[1], frame.shape[0]))
val, frame = cv2.imencode(
".jpg", frame, [cv2.IMWRITE_JPEG_QUALITY, self.quality]
)
if not val:
raise ValueError("Error encoding frame")
else:
print(
"The frame is already encoded, I will not encode nor resize it again. \
Consider using CustomStream if you want to handle the processing yourself."
)
self._frame_buffer.append(len(frame.tobytes()))
self._bandwidth_last_modified_time = time.time()
async def _process_current_frame(self) -> np.ndarray:
frame = await self._resize_and_encode_frame(
self._frame,
self.size or (self._frame.shape[1], self._frame.shape[0]),
self.quality,
)
self._last_processed_frame = frame
return frame

async def _get_frame(self) -> np.ndarray:
if time.time() - self._bandwidth_last_modified_time <= 1 / self.fps:
return self._last_processed_frame
await self._ensure_background_tasks()
async with self._lock:
return await self.__process_current_frame()

def set_size(self, size: Tuple[int, int]) -> None:
self.size = size

def set_quality(self, quality: int) -> None:
self.quality = max(1, min(quality, 100))

def set_frame(self, frame: np.ndarray) -> None:
self._frame = frame

def get_frame(self) -> np.ndarray:
return super().get_frame()


class CustomStream(StreamBase):
# Same as StreamBase, but with a friendly name
def __init__(
self,
name: str,
fps: int = 30,
) -> None:
super().__init__(name, fps)


class ManagedStream(StreamBase):
def __init__(
Expand All @@ -184,7 +169,7 @@ def __init__(
raise ValueError(f"Invalid mode. Available modes: {self._available_modes}")
self.size = size
self.quality = max(1, min(quality, 100))
self.poll_delay_seconds = poll_delay_ms / 1000 if poll_delay_ms else 1 / fps
self.poll_delay_seconds = poll_delay_ms / 1000.0 if poll_delay_ms else 1.0 / fps
self._cap_is_open: bool = False
self._cap: cv2.VideoCapture = None
self._cap_background_task: Optional[asyncio.Task] = None
Expand All @@ -201,20 +186,23 @@ async def __manage_cap_state(self) -> None:
await asyncio.sleep(self.poll_delay_seconds)
if self.mode == "full-on-demand":
if self.has_demand() and not self._cap_is_open:
self.__open_cap()
async with self._lock:
await self.__open_cap()
elif not self.has_demand() and self._cap_is_open:
self.__close_cap()
async with self._lock:
await self.__close_cap()
elif not self._cap_is_open:
self.__open_cap()
async with self._lock:
await self.__open_cap()

def __open_cap(self) -> None:
async def __open_cap(self) -> None:
if not self._cap_is_open and self._is_running:
self._cap = cv2.VideoCapture(self.source)
if not self._cap.isOpened():
raise ValueError("Cannot open the capture device")
self._cap_is_open = True

def __close_cap(self) -> None:
async def __close_cap(self) -> None:
if self._cap_is_open and self._is_running:
self._cap.release()
self._cap_is_open = False
Expand All @@ -223,42 +211,33 @@ async def __read_frame(self) -> None:
if self._cap_is_open and self._is_running:
val, frame = self._cap.read()
if not val:
raise ValueError("Error reading frame")
async with self._lock:
val, frame = self._cap.read()
if not val:
raise RuntimeError("Error reading frame")
self._frame = frame
else:
self.__open_cap()
await self.__open_cap()

async def __process_current_frame(self) -> np.ndarray:
async def _process_current_frame(self) -> np.ndarray:
if not self.has_demand():
return self._frame
return self._last_processed_frame
print("reading frame")
await self.__read_frame()
frame = cv2.resize(
self._frame, self.size or (self._frame.shape[1], self._frame.shape[0])
)
val, frame = cv2.imencode(
".jpg", frame, [cv2.IMWRITE_JPEG_QUALITY, self.quality]
frame = await self._resize_and_encode_frame(
self._frame,
self.size or (self._frame.shape[1], self._frame.shape[0]),
self.quality,
)
if not val:
if self._cap.getBackendName() == "FFMPEG":
raise ValueError(
"Seems like you are using a video file as the source. \
The media might have ended."
)
raise ValueError("Error encoding frame")
self._frame_buffer.append(len(frame.tobytes()))
self._bandwidth_last_modified_time = time.time()
self._last_processed_frame = frame
return frame

async def _get_frame(self) -> np.ndarray:
if not self._is_running:
print("Stream is not running, please call the start method first.")
return self._frame
if time.time() - self._bandwidth_last_modified_time <= 1 / self.fps:
return self._last_processed_frame
await self._ensure_background_tasks()
async with self._lock:
return await self.__process_current_frame()
raise RuntimeError(
"Stream is not running, please call the start method first."
)
return await super()._get_frame()

def set_size(self, size: Tuple[int, int]) -> None:
self.size = size
Expand All @@ -282,7 +261,7 @@ def change_source(self, source: Union[int, str]) -> None:
self.__open_cap()

def set_poll_delay_ms(self, poll_delay_ms: float) -> None:
self.poll_delay_seconds = poll_delay_ms / 1000
self.poll_delay_seconds = poll_delay_ms / 1000.0

def start(self) -> None:
if not self._is_running:
Expand Down

0 comments on commit 1342a95

Please sign in to comment.