Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -1158,7 +1158,7 @@ test_node_docker: hub standalone_docker standalone_chrome standalone_firefox sta
echo VIDEO_TAG=$(FFMPEG_TAG_VERSION)-$(BUILD_DATE) >> .env ; \
echo TEST_DRAIN_AFTER_SESSION_COUNT=$(or $(TEST_DRAIN_AFTER_SESSION_COUNT), 0) >> .env ; \
echo TEST_PARALLEL_HARDENING=$(or $(TEST_PARALLEL_HARDENING), "false") >> .env ; \
echo LOG_LEVEL=$(or $(LOG_LEVEL), "FINE") >> .env ; \
echo LOG_LEVEL=$(or $(LOG_LEVEL), "INFO") >> .env ; \
echo REQUEST_TIMEOUT=$(or $(REQUEST_TIMEOUT), 300) >> .env ; \
echo SELENIUM_ENABLE_MANAGED_DOWNLOADS=$(or $(SELENIUM_ENABLE_MANAGED_DOWNLOADS), "false") >> .env ; \
echo TEST_DELAY_AFTER_TEST=$(or $(TEST_DELAY_AFTER_TEST), 2) >> .env ; \
Expand All @@ -1175,14 +1175,14 @@ test_node_docker: hub standalone_docker standalone_chrome standalone_firefox sta
else \
echo HOST_IP=127.0.0.1 >> .env ; \
fi; \
BASIC_AUTH_USER=admin ; \
BASIC_AUTH_PASSWORD=admin ; \
if [ "$(PLATFORMS)" = "linux/amd64" ]; then \
NODE_EDGE=edge ; \
NODE_CHROME=chrome ; \
else \
NODE_EDGE=chromium ; \
NODE_CHROME=chromium ; \
BASIC_AUTH_USER=admin ; \
BASIC_AUTH_PASSWORD=admin ; \
fi; \
echo BASIC_AUTH_USER=$${BASIC_AUTH_USER} >> .env ; \
echo BASIC_AUTH_PASSWORD=$${BASIC_AUTH_PASSWORD} >> .env ; \
Expand Down
31 changes: 28 additions & 3 deletions Video/video_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ def __init__(self):
self.upload_opts = os.environ.get("SE_UPLOAD_OPTS", "-P --cutoff-mode SOFT --metadata --inplace")
self.retain_local = os.environ.get("SE_UPLOAD_RETAIN_LOCAL_FILE", "false").lower() == "true"
self.upload_batch_size = int(os.environ.get("SE_VIDEO_UPLOAD_BATCH_CHECK", "10"))
self.upload_timeout = int(os.environ.get("SE_VIDEO_UPLOAD_TIMEOUT", "300"))
self.upload_failure_only = os.environ.get("SE_UPLOAD_FAILURE_SESSION_ONLY", "false").lower() == "true"
default_failure_events = [":failure", ":failed"]
custom_failure_events = os.environ.get("SE_UPLOAD_FAILURE_SESSION_EVENTS", "").lower()
Expand Down Expand Up @@ -225,6 +226,9 @@ def __init__(self):
self.recorder_done = asyncio.Event()
self.uploader_done = asyncio.Event()

# Tracked delayed-cleanup tasks so they can be cancelled on shutdown
self._cleanup_tasks: List[asyncio.Task] = []

# Rename SE_RCLONE_* env vars
self._rename_rclone_env()

Expand Down Expand Up @@ -588,7 +592,13 @@ async def process_upload(self, task: UploadTask) -> None:
)
self.active_uploads.append(proc)
try:
stdout, stderr = await proc.communicate()
try:
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=self.upload_timeout)
except asyncio.TimeoutError:
logger.warning(f"Upload timed out after {self.upload_timeout}s: {task.video_file}, killing process")
proc.kill()
await proc.communicate()
return
finally:
try:
self.active_uploads.remove(proc)
Expand Down Expand Up @@ -713,8 +723,11 @@ async def handle_session_closed(self, data: dict) -> None:
await self.stop_recording(session)
await self.queue_upload(session)

# Clean up session after a delay (keep for potential late events)
asyncio.create_task(self._cleanup_session_delayed(session_id, delay=60))
# Clean up session after a delay (keep for potential late events).
# Tracked so cleanup() can cancel these on shutdown instead of waiting 60s.
t = asyncio.create_task(self._cleanup_session_delayed(session_id, delay=60))
self._cleanup_tasks.append(t)
t.add_done_callback(lambda fut: self._cleanup_tasks.remove(fut) if fut in self._cleanup_tasks else None)

# Check drain condition
if self.max_sessions > 0 and self.recorded_count >= self.max_sessions:
Expand Down Expand Up @@ -814,6 +827,10 @@ async def subscribe_events(self) -> None:
if await self.subscriber.poll(timeout=1000):
frames = await self.subscriber.recv_multipart()

# Re-check shutdown before spending time processing the event
if self.shutdown_event.is_set():
break

if len(frames) < 4:
continue

Expand Down Expand Up @@ -892,6 +909,14 @@ async def cleanup(self) -> None:
"""Cleanup all resources."""
logger.info("Shutting down...")

# Cancel delayed session-cleanup tasks immediately — they have a 60s
# sleep that would keep the event loop alive long after shutdown.
for t in list(self._cleanup_tasks):
t.cancel()
if self._cleanup_tasks:
await asyncio.gather(*self._cleanup_tasks, return_exceptions=True)
self._cleanup_tasks.clear()

# Snapshot active sessions outside the lock so we don't hold
# sessions_lock across slow awaits (stop_recording can take up to 10s).
async with self.sessions_lock:
Expand Down
Loading