diff --git a/Makefile b/Makefile index f035c2369..a7d72ea29 100644 --- a/Makefile +++ b/Makefile @@ -1158,7 +1158,7 @@ test_node_docker: hub standalone_docker standalone_chrome standalone_firefox sta echo VIDEO_TAG=$(FFMPEG_TAG_VERSION)-$(BUILD_DATE) >> .env ; \ echo TEST_DRAIN_AFTER_SESSION_COUNT=$(or $(TEST_DRAIN_AFTER_SESSION_COUNT), 0) >> .env ; \ echo TEST_PARALLEL_HARDENING=$(or $(TEST_PARALLEL_HARDENING), "false") >> .env ; \ - echo LOG_LEVEL=$(or $(LOG_LEVEL), "FINE") >> .env ; \ + echo LOG_LEVEL=$(or $(LOG_LEVEL), "INFO") >> .env ; \ echo REQUEST_TIMEOUT=$(or $(REQUEST_TIMEOUT), 300) >> .env ; \ echo SELENIUM_ENABLE_MANAGED_DOWNLOADS=$(or $(SELENIUM_ENABLE_MANAGED_DOWNLOADS), "false") >> .env ; \ echo TEST_DELAY_AFTER_TEST=$(or $(TEST_DELAY_AFTER_TEST), 2) >> .env ; \ @@ -1175,14 +1175,14 @@ test_node_docker: hub standalone_docker standalone_chrome standalone_firefox sta else \ echo HOST_IP=127.0.0.1 >> .env ; \ fi; \ + BASIC_AUTH_USER=admin ; \ + BASIC_AUTH_PASSWORD=admin ; \ if [ "$(PLATFORMS)" = "linux/amd64" ]; then \ NODE_EDGE=edge ; \ NODE_CHROME=chrome ; \ else \ NODE_EDGE=chromium ; \ NODE_CHROME=chromium ; \ - BASIC_AUTH_USER=admin ; \ - BASIC_AUTH_PASSWORD=admin ; \ fi; \ echo BASIC_AUTH_USER=$${BASIC_AUTH_USER} >> .env ; \ echo BASIC_AUTH_PASSWORD=$${BASIC_AUTH_PASSWORD} >> .env ; \ diff --git a/Video/video_service.py b/Video/video_service.py index abaeb1b03..8c0de78f9 100755 --- a/Video/video_service.py +++ b/Video/video_service.py @@ -163,6 +163,7 @@ def __init__(self): self.upload_opts = os.environ.get("SE_UPLOAD_OPTS", "-P --cutoff-mode SOFT --metadata --inplace") self.retain_local = os.environ.get("SE_UPLOAD_RETAIN_LOCAL_FILE", "false").lower() == "true" self.upload_batch_size = int(os.environ.get("SE_VIDEO_UPLOAD_BATCH_CHECK", "10")) + self.upload_timeout = int(os.environ.get("SE_VIDEO_UPLOAD_TIMEOUT", "300")) self.upload_failure_only = os.environ.get("SE_UPLOAD_FAILURE_SESSION_ONLY", "false").lower() == "true" default_failure_events = [":failure", ":failed"] custom_failure_events = os.environ.get("SE_UPLOAD_FAILURE_SESSION_EVENTS", "").lower() @@ -225,6 +226,9 @@ def __init__(self): self.recorder_done = asyncio.Event() self.uploader_done = asyncio.Event() + # Tracked delayed-cleanup tasks so they can be cancelled on shutdown + self._cleanup_tasks: List[asyncio.Task] = [] + # Rename SE_RCLONE_* env vars self._rename_rclone_env() @@ -588,7 +592,13 @@ async def process_upload(self, task: UploadTask) -> None: ) self.active_uploads.append(proc) try: - stdout, stderr = await proc.communicate() + try: + stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=self.upload_timeout) + except asyncio.TimeoutError: + logger.warning(f"Upload timed out after {self.upload_timeout}s: {task.video_file}, killing process") + proc.kill() + await proc.communicate() + return finally: try: self.active_uploads.remove(proc) @@ -713,8 +723,11 @@ async def handle_session_closed(self, data: dict) -> None: await self.stop_recording(session) await self.queue_upload(session) - # Clean up session after a delay (keep for potential late events) - asyncio.create_task(self._cleanup_session_delayed(session_id, delay=60)) + # Clean up session after a delay (keep for potential late events). + # Tracked so cleanup() can cancel these on shutdown instead of waiting 60s. + t = asyncio.create_task(self._cleanup_session_delayed(session_id, delay=60)) + self._cleanup_tasks.append(t) + t.add_done_callback(lambda fut: self._cleanup_tasks.remove(fut) if fut in self._cleanup_tasks else None) # Check drain condition if self.max_sessions > 0 and self.recorded_count >= self.max_sessions: @@ -814,6 +827,10 @@ async def subscribe_events(self) -> None: if await self.subscriber.poll(timeout=1000): frames = await self.subscriber.recv_multipart() + # Re-check shutdown before spending time processing the event + if self.shutdown_event.is_set(): + break + if len(frames) < 4: continue @@ -892,6 +909,14 @@ async def cleanup(self) -> None: """Cleanup all resources.""" logger.info("Shutting down...") + # Cancel delayed session-cleanup tasks immediately — they have a 60s + # sleep that would keep the event loop alive long after shutdown. + for t in list(self._cleanup_tasks): + t.cancel() + if self._cleanup_tasks: + await asyncio.gather(*self._cleanup_tasks, return_exceptions=True) + self._cleanup_tasks.clear() + # Snapshot active sessions outside the lock so we don't hold # sessions_lock across slow awaits (stop_recording can take up to 10s). async with self.sessions_lock: