From b222a61da43af17ff301e2bcc5671f92d92acff6 Mon Sep 17 00:00:00 2001 From: Viktor Petersson Date: Tue, 19 May 2026 11:22:16 +0000 Subject: [PATCH 1/5] feat(server,api): auto-download remote video URLs into the asset pipeline - Detect http(s) single-file video URLs in CreateAssetSerializerMixin (ext-first, HEAD-probe fallback) and rewrite the row to a local destination + is_processing=True (v1.2 / v2 only). - New download_remote_video_asset Celery task streams via requests with a 5 GiB cap and chains into normalize_video_asset for the per-board HW-codec gate. - Live streams (RTSP / RTMP / HLS / DASH / SmoothStreaming) stay as literal URIs the viewer plays directly. - Failures land as metadata.error_message + is_processing=False via a copy-paste of the YouTube download task's on_failure contract. Closes #2894 --- src/anthias_common/remote_video.py | 238 +++++++++++++ src/anthias_server/api/serializers/mixins.py | 59 +++- src/anthias_server/api/tests/test_assets.py | 194 +++++++++++ src/anthias_server/api/views/v1_2.py | 12 + src/anthias_server/api/views/v2.py | 12 + src/anthias_server/celery_tasks.py | 332 ++++++++++++++++++ tests/test_celery_tasks.py | 349 +++++++++++++++++++ tests/test_remote_video.py | 268 ++++++++++++++ 8 files changed, 1457 insertions(+), 7 deletions(-) create mode 100644 src/anthias_common/remote_video.py create mode 100644 tests/test_remote_video.py diff --git a/src/anthias_common/remote_video.py b/src/anthias_common/remote_video.py new file mode 100644 index 000000000..81f7a359c --- /dev/null +++ b/src/anthias_common/remote_video.py @@ -0,0 +1,238 @@ +"""Generic remote-video URL helpers shared across the API +serializers and the Celery worker. + +Mirrors ``anthias_common.youtube`` but for the broader case of a +``http(s)://…`` URL pointing at a single-file video container (mp4 / +webm / mov / mkv / ...). Centralising the classify, the on-disk +destination, and the Celery dispatch here means the create views never +diverge in their handling and a future API version inherits the +behaviour by importing the same helpers. + +Keep this module free of Django and Celery imports so the serializers +can import it without dragging the Django settings module into the +hot create-asset path twice. The dispatch helper does a lazy import of +``anthias_server.celery_tasks`` only when it actually fires. +""" + +from __future__ import annotations + +import logging +import mimetypes +from os import path +from typing import TYPE_CHECKING +from urllib.parse import urlparse + +import requests + +if TYPE_CHECKING: + from anthias_server.settings import AnthiasSettings + + +# Single-file video containers we know how to download and that the +# normalisation pipeline can ffprobe. The set is deliberately +# conservative: anything ending in one of these gets auto-downloaded, +# everything else either falls through to a HEAD probe (extensionless +# URLs) or stays as a streaming URL. Extending the set is a one-line +# change once the codec gate has been verified for the new container. +_VIDEO_CONTAINER_EXTS = frozenset( + { + '.mp4', + '.webm', + '.mov', + '.mkv', + '.avi', + '.m4v', + '.ogv', + } +) + +# Protocol schemes that are streaming-by-construction. The serializer +# never rewrites these to a local download even when the URL path's +# extension suggests a single file (``rtsp://host/stream.mp4`` is +# still an RTSP session, not an HTTP MP4). The viewer plays them +# through mpv's network stack as-is. +_STREAM_SCHEMES = frozenset({'rtsp', 'rtmp', 'srt', 'udp', 'mms'}) + +# HTTP-delivered manifests that describe a stream rather than a single +# downloadable file. ``.m3u8`` (HLS), ``.mpd`` (DASH), ``.m3u`` (legacy +# playlist), ``.ism`` (Smooth Streaming) all need live origin +# connectivity at play time and can't be flattened to a local file. +# Treat them the same as RTSP/RTMP — leave the URI verbatim. +_STREAM_MANIFEST_EXTS = frozenset({'.m3u8', '.mpd', '.m3u', '.ism'}) + +# Wall-clock cap on the HEAD probe for extensionless URLs. Kept short +# because the probe runs synchronously inside the POST /assets path +# and the operator's request blocks on it. 5s covers a slow origin +# without making the create request feel hung. Any failure (timeout, +# DNS, 4xx, redirect loop) downgrades the URL to "stream as-is" +# rather than rejecting the create — operators can still paste a +# weird URL and have it work for stream playback, they just don't +# get the auto-download benefit. +_HEAD_PROBE_TIMEOUT_S = 5 + +# Manifest content-types we explicitly reject even when the server +# advertises them as ``video/*`` (some HLS origins do this). Streaming +# manifests need live origin connectivity at play time — pulling them +# down as a single file would store the playlist, not the segments +# the playlist points at. +_MANIFEST_CONTENT_TYPES = frozenset( + { + 'application/vnd.apple.mpegurl', + 'application/x-mpegurl', + 'application/dash+xml', + } +) + + +def _url_path_ext(uri: str) -> str: + """Return the lowercase extension of the URL's path component, with + the leading dot. Empty string when the URL has no extension. + + Splits on ``urlparse(...).path`` rather than the full URI so a + query string (``?download=true``) or fragment (``#t=10``) doesn't + fool the extension match. + """ + try: + parsed = urlparse(uri.strip()) + except ValueError: + return '' + return path.splitext(parsed.path)[1].lower() + + +def _url_scheme(uri: str) -> str: + try: + return (urlparse(uri.strip()).scheme or '').lower() + except ValueError: + return '' + + +def _head_probe(uri: str) -> tuple[bool, str]: + """Issue an HTTP HEAD against *uri* to classify its content. + + Returns ``(True, ext)`` when the response advertises a downloadable + video and ``(False, '')`` otherwise. Any exception (timeout, DNS, + refused connection, 4xx, redirect chain too long) collapses to the + negative case — the URL stays as a stream URL. + + ``allow_redirects=True`` follows the common CDN pattern where the + canonical URL redirects to a signed S3/Cloudfront URL whose path + *does* carry the extension. The final response's Content-Type is + what we classify on. + + ``mimetypes.guess_extension`` resolves the response's content-type + to a file extension (``video/mp4`` → ``.mp4``). The default + Python mimetypes table covers every container in + ``_VIDEO_CONTAINER_EXTS``. Falls back to ``.mp4`` on the (rare) + case where guess_extension returns None for a video/* type. + """ + try: + resp = requests.head( + uri, + allow_redirects=True, + timeout=_HEAD_PROBE_TIMEOUT_S, + ) + except requests.RequestException: + return False, '' + if resp.status_code >= 400: + return False, '' + content_type = (resp.headers.get('Content-Type') or '').lower() + # Strip parameters (``video/mp4; codecs=...``) before classifying. + base_type = content_type.split(';', 1)[0].strip() + if base_type in _MANIFEST_CONTENT_TYPES: + return False, '' + if not base_type.startswith('video/'): + return False, '' + # ``guess_extension`` returns ``.m4v`` for ``video/mp4`` in some + # Python versions and ``.mp4`` in others. Normalise to a value in + # our container set; default to ``.mp4`` for the common case. + guessed = mimetypes.guess_extension(base_type) + if guessed and guessed.lower() in _VIDEO_CONTAINER_EXTS: + return True, guessed.lower() + return True, '.mp4' + + +def is_downloadable_remote_video(uri: str) -> tuple[bool, str]: + """Classify *uri* as auto-downloadable single-file video or not. + + Returns ``(True, ext)`` when the serializer should rewrite the + asset to a local-download row, with ``ext`` (including the leading + dot) being the extension to use for the on-disk file. Returns + ``(False, '')`` when the URI should stay as a stream URL for the + viewer to play live. + + Three-step decision: + + 1. **Stream short-circuit** — non-http(s) streaming schemes + (``rtsp://``, ``rtmp://``, ``srt://``, ``udp://``, ``mms://``) + and manifest extensions (``.m3u8`` / ``.mpd`` / ...) never + download, no HEAD call. + 2. **Extension match** — the URL path's lowercase extension is in + ``_VIDEO_CONTAINER_EXTS``: ``(True, ext)``, no HEAD call. + Common path, zero network round-trips. + 3. **HEAD probe fallback** — extensionless URL, http(s) only: + single HEAD, accept on ``Content-Type: video/*`` (excluding + manifest types). + + Any unrecognised scheme (file://, ftp://, …) returns ``(False, + '')`` so we never download from non-network or non-HTTP sources. + """ + if not uri: + return False, '' + scheme = _url_scheme(uri) + ext = _url_path_ext(uri) + if scheme in _STREAM_SCHEMES: + return False, '' + if ext in _STREAM_MANIFEST_EXTS: + return False, '' + if scheme not in ('http', 'https'): + return False, '' + if ext in _VIDEO_CONTAINER_EXTS: + return True, ext + return _head_probe(uri) + + +def remote_video_destination_path( + asset_id: str, + ext: str, + settings: 'AnthiasSettings | None' = None, +) -> str: + """Resolve where the downloaded file will land on disk. + + Mirrors ``anthias_common.youtube.youtube_destination_path`` but + takes the extension as a parameter because remote URLs span every + container (mp4 / webm / mkv / ...) — preserving the source + extension lets ffprobe identify the container correctly and the + asset table show what was uploaded. + """ + if settings is None: + from anthias_server.settings import settings as _settings + + settings = _settings + return path.join(settings['assetdir'], f'{asset_id}{ext}') + + +def dispatch_remote_video_download(asset_id: str, source_uri: str) -> None: + """Queue the Celery worker to fetch *source_uri* into the row. + + Mirrors ``anthias_common.youtube.dispatch_download``. Stamps + ``metadata.processing_started_at`` so the periodic + ``reconcile_stuck_processing`` task can recover a row whose + download went missing (worker SIGKILL between enqueue and pickup, + redis flake during dispatch). + + Lazy imports keep ``celery_tasks`` and Django settings out of any + import path that only needs ``is_downloadable_remote_video`` (the + serializer hot path). + """ + from anthias_server.celery_tasks import download_remote_video_asset + from anthias_server.processing import stamp_processing_start + + stamp_processing_start(asset_id) + download_remote_video_asset.delay(asset_id, source_uri) + + +# Suppress noisy library-side logging during the HEAD probe — the +# serializer caller already turns a failure into "stays as stream URL" +# and we don't want a transient DNS hiccup to spew tracebacks into +# every create-asset POST log line. +logging.getLogger('urllib3').setLevel(logging.WARNING) diff --git a/src/anthias_server/api/serializers/mixins.py b/src/anthias_server/api/serializers/mixins.py index fa6d6d2ca..627d82767 100644 --- a/src/anthias_server/api/serializers/mixins.py +++ b/src/anthias_server/api/serializers/mixins.py @@ -6,6 +6,10 @@ from rest_framework.serializers import CharField, Serializer from anthias_server.api.errors import AssetCreationError +from anthias_common.remote_video import ( + is_downloadable_remote_video, + remote_video_destination_path, +) from anthias_common.utils import ( get_video_duration, url_fails, @@ -31,6 +35,17 @@ class CreateAssetSerializerMixin: # and the view skips the dispatch. _pending_youtube_uri: str | None = None + # Source URL of an in-flight generic remote-video download, set by + # prepare_asset when ``mimetype == 'video'`` and the URI is an + # http(s) link whose extension or HEAD-probed Content-Type + # identifies it as a downloadable single-file container. Same + # hand-off shape as ``_pending_youtube_uri``: the view picks the + # field up after persistence and dispatches + # ``download_remote_video_asset``. None means "no auto-download + # needed" — either the URI is local, is a YouTube link, or is a + # live stream (RTSP / HLS / DASH) the viewer plays directly. + _pending_remote_video_uri: str | None = None + # Set to ``'image'`` or ``'video'`` by ``prepare_asset`` when the # newly created row needs the normalisation pipeline to run before # the viewer can play it. The view dispatches the matching Celery @@ -103,6 +118,30 @@ def prepare_asset( self._pending_youtube_uri = uri uri = youtube_destination_path(asset['asset_id'], settings) + # Generic remote-video URLs follow the YouTube lifecycle: the + # row lands with the eventual local path on disk, is_processing + # is flipped, and a Celery task downloads the file out of band + # before chaining into normalize_video_asset for the per-board + # codec gate. Live streams (RTSP / HLS / DASH) are filtered out + # by ``is_downloadable_remote_video`` — they reach the viewer + # as literal stream URLs the same way they always have. + is_remote_video_download = False + if ( + not is_youtube + and not is_local_upload + and 'video' in (asset['mimetype'] or '') + and uri.startswith(('http://', 'https://')) + ): + should_download, source_ext = is_downloadable_remote_video(uri) + if should_download: + asset['is_processing'] = True if version == 'v2' else 1 + asset['duration'] = 0 + self._pending_remote_video_uri = uri + uri = remote_video_destination_path( + asset['asset_id'], source_ext, settings + ) + is_remote_video_download = True + asset['uri'] = uri # Decide whether the new row needs the normalisation pipeline. @@ -119,7 +158,11 @@ def prepare_asset( and 'video' in (asset['mimetype'] or '') ) - if 'video' in asset['mimetype'] and not is_youtube: + if ( + 'video' in asset['mimetype'] + and not is_youtube + and not is_remote_video_download + ): duration_raw = data.get('duration') if duration_raw is not None and int(duration_raw) == 0: if needs_video: @@ -146,7 +189,7 @@ def prepare_asset( raise AssetCreationError( 'Duration must be zero for video assets.' ) - elif not is_youtube: + elif not is_youtube and not is_remote_video_download: # Crashes if it's not an int. We want that. duration = data.get('duration', settings['default_duration']) @@ -175,13 +218,15 @@ def prepare_asset( if field in data: asset[field] = data[field] - # Skip the reachability probe for in-flight YouTube rows: the - # local mp4 path is the *future* destination, the file does - # not exist yet, and url_fails on a schemeless path is a - # silent no-op anyway. Asserting that explicitly prevents a - # future url_fails change from breaking the create flow. + # Skip the reachability probe for in-flight YouTube rows and + # generic remote-video downloads: in both cases the local + # path is the *future* destination, the file does not exist + # yet, and url_fails on a schemeless path is a silent no-op + # anyway. Asserting both flags explicitly prevents a future + # url_fails change from breaking either create flow. if ( not is_youtube + and not is_remote_video_download and not asset['skip_asset_check'] and url_fails(asset['uri']) ): diff --git a/src/anthias_server/api/tests/test_assets.py b/src/anthias_server/api/tests/test_assets.py index 60e700b11..50bb04ea1 100644 --- a/src/anthias_server/api/tests/test_assets.py +++ b/src/anthias_server/api/tests/test_assets.py @@ -755,6 +755,200 @@ def test_create_heic_image_dispatches_normalize_task_on_v1_2_and_v2( mock_dispatch.assert_called_once_with(asset_id) +# --------------------------------------------------------------------------- +# Remote video URL auto-download (issue #2894) +# --------------------------------------------------------------------------- +# +# Mirror of the YouTube dispatch tests above. v1.2 / v2 are the only +# versions that detect http(s) video URLs and rewrite them into a +# local-download row; v1 / v1.1 keep literal-URL semantics by design. + + +@pytest.mark.django_db +@pytest.mark.parametrize('version', ['v1_2', 'v2']) +def test_create_remote_video_url_dispatches_download_task( + api_client: APIClient, version: str +) -> None: + """POSTing a http(s) URL pointing at a single-file video container + (.mp4 in this test) on v1.2 / v2 must rewrite the row to a local + destination path, flip ``is_processing=True``, and dispatch the + new ``download_remote_video_asset`` task with the source URL. + + This is the core acceptance criterion for issue #2894: the row's + final state is a local download that runs through + ``normalize_video_asset``, not a streaming URL that bypasses the + per-board codec gate. + """ + remote_url = 'https://example.com/test-clip.mp4' + payload = { + **ASSET_CREATION_DATA, + 'uri': remote_url, + 'mimetype': 'video', + 'duration': 0, + } + asset_list_url = reverse(f'api:asset_list_{version}') + dispatch_target = ( + f'anthias_server.api.views.{version}.dispatch_remote_video_download' + ) + with ( + mock.patch(dispatch_target) as mock_dispatch, + mock.patch( + 'anthias_server.api.serializers.mixins.url_fails', + return_value=False, + ), + ): + response = api_client.post( + asset_list_url, data=get_request_data(payload, version) + ) + + assert response.status_code == status.HTTP_201_CREATED, response.data + asset_id = response.data['asset_id'] + # Row landed with is_processing=True (a boolean on v2, an int on + # v1.2) and the URI rewritten to the local destination so the + # viewer's filesystem check will pick up the file once the task + # finishes downloading. + assert response.data['is_processing'] in (True, 1) + assert response.data['uri'].endswith(f'{asset_id}.mp4') + mock_dispatch.assert_called_once_with(asset_id, remote_url) + + +@pytest.mark.django_db +@pytest.mark.parametrize('version', ['v1', 'v1_1']) +def test_create_remote_video_url_keeps_literal_uri_on_legacy_endpoints( + api_client: APIClient, version: str +) -> None: + """v1 / v1.1 deliberately do NOT auto-download remote video URLs — + they preserve the literal-URL semantics older clients depend on, + matching how image normalisation stayed v1.2 / v2-only. The row + lands with the original URI intact and ``is_processing`` False. + + Guards against a future "consistency" refactor accidentally + promoting auto-download onto the legacy surface. + """ + remote_url = 'https://example.com/test-clip.mp4' + payload = { + **ASSET_CREATION_DATA, + 'uri': remote_url, + 'mimetype': 'video', + 'duration': 0, + } + asset_list_url = reverse(f'api:asset_list_{version}') + with ( + mock.patch( + 'anthias_server.api.serializers.v1_1.url_fails', + return_value=False, + ), + # The v1.1 serializer probes duration synchronously when 0 + # is passed; mock that out so the test isn't dependent on an + # ffprobe-able fixture file. + mock.patch( + 'anthias_server.api.serializers.v1_1.get_video_duration', + return_value=__import__('datetime').timedelta(seconds=10), + ), + ): + response = api_client.post( + asset_list_url, data=get_request_data(payload, version) + ) + + assert response.status_code == status.HTTP_201_CREATED, response.data + # Literal URI preserved; no is_processing flag. + assert response.data['uri'] == remote_url + assert response.data['is_processing'] in (False, 0) + + +@pytest.mark.django_db +@pytest.mark.parametrize('version', ['v1_2', 'v2']) +def test_create_remote_hls_manifest_stays_as_stream_url( + api_client: APIClient, version: str +) -> None: + """HLS / DASH manifests must NOT be auto-downloaded — they describe + a stream rather than a single file. The serializer's classify + rejects them at the URL-extension check, so the row lands with + the manifest URL intact and the viewer plays it as a live stream. + + Out-of-scope per the issue, but enshrining it as a test keeps a + future classifier change from silently breaking the stream + playback path. + """ + manifest_url = 'https://example.com/live/stream.m3u8' + payload = { + **ASSET_CREATION_DATA, + 'uri': manifest_url, + 'mimetype': 'video', + 'duration': 0, + } + asset_list_url = reverse(f'api:asset_list_{version}') + dispatch_target = ( + f'anthias_server.api.views.{version}.dispatch_remote_video_download' + ) + with ( + mock.patch(dispatch_target) as mock_dispatch, + mock.patch( + 'anthias_server.api.serializers.mixins.url_fails', + return_value=False, + ), + # The mixin's stream-URL branch falls through to an inline + # ``get_video_duration`` ffprobe; the test must not actually + # shell out to ffprobe against ``example.com``. + mock.patch( + 'anthias_server.api.serializers.mixins.get_video_duration', + return_value=__import__('datetime').timedelta(seconds=10), + ), + ): + response = api_client.post( + asset_list_url, data=get_request_data(payload, version) + ) + + assert response.status_code == status.HTTP_201_CREATED, response.data + # Manifest URL preserved verbatim; no download dispatch. + assert response.data['uri'] == manifest_url + assert response.data['is_processing'] in (False, 0) + mock_dispatch.assert_not_called() + + +@pytest.mark.django_db +@pytest.mark.parametrize('version', ['v1_2', 'v2']) +def test_create_rtsp_url_stays_as_stream_url( + api_client: APIClient, version: str +) -> None: + """RTSP camera feeds are streaming-by-construction. Even though + ``mimetype='video'`` matches the auto-download branch's mimetype + filter, the scheme check inside ``is_downloadable_remote_video`` + routes it through as a literal-URI stream. + """ + rtsp_url = 'rtsp://camera.local/feed' + payload = { + **ASSET_CREATION_DATA, + 'uri': rtsp_url, + 'mimetype': 'video', + 'duration': 0, + } + asset_list_url = reverse(f'api:asset_list_{version}') + dispatch_target = ( + f'anthias_server.api.views.{version}.dispatch_remote_video_download' + ) + with ( + mock.patch(dispatch_target) as mock_dispatch, + mock.patch( + 'anthias_server.api.serializers.mixins.url_fails', + return_value=False, + ), + # Same inline-ffprobe fallback as the HLS test: rtsp URLs + # go through the stream-URL branch with duration=0. + mock.patch( + 'anthias_server.api.serializers.mixins.get_video_duration', + return_value=__import__('datetime').timedelta(seconds=10), + ), + ): + response = api_client.post( + asset_list_url, data=get_request_data(payload, version) + ) + + assert response.status_code == status.HTTP_201_CREATED, response.data + assert response.data['uri'] == rtsp_url + mock_dispatch.assert_not_called() + + @pytest.mark.django_db @pytest.mark.parametrize('version', ['v1', 'v1_1']) def test_create_heic_image_does_not_dispatch_on_legacy_endpoints( diff --git a/src/anthias_server/api/views/v1_2.py b/src/anthias_server/api/views/v1_2.py index 6cb94e097..0a8f030a6 100644 --- a/src/anthias_server/api/views/v1_2.py +++ b/src/anthias_server/api/views/v1_2.py @@ -6,6 +6,7 @@ from rest_framework.views import APIView from anthias_server.app.models import Asset +from anthias_common.remote_video import dispatch_remote_video_download from anthias_common.youtube import dispatch_download from anthias_server.processing import dispatch_pending_normalize from anthias_server.api.helpers import ( @@ -64,6 +65,17 @@ def post(self, request: Request) -> Response: if serializer._pending_youtube_uri: dispatch_download(asset.asset_id, serializer._pending_youtube_uri) + # Same hand-off for generic remote http(s) video URLs: the + # serializer rewrote the row to a local destination path and + # flipped is_processing; the Celery task downloads the file + # and chains into normalize_video_asset. Live streams (RTSP / + # HLS / DASH) never reach this branch — the serializer's + # classify keeps them as literal URIs. + if serializer._pending_remote_video_uri: + dispatch_remote_video_download( + asset.asset_id, serializer._pending_remote_video_uri + ) + # Same hand-off as v2: the serializer's prepare_asset stamps # is_processing on rows that need normalisation; we dispatch # the matching Celery task after persistence. Shared with diff --git a/src/anthias_server/api/views/v2.py b/src/anthias_server/api/views/v2.py index fe05cdaaa..4cfc915fe 100644 --- a/src/anthias_server/api/views/v2.py +++ b/src/anthias_server/api/views/v2.py @@ -33,6 +33,7 @@ operator_username, ) from anthias_common.internal_auth import is_internal_request +from anthias_common.remote_video import dispatch_remote_video_download from anthias_common.youtube import dispatch_download from anthias_server.processing import dispatch_pending_normalize from anthias_server.api.serializers.v2 import ( @@ -387,6 +388,17 @@ def post(self, request: Request) -> Response: if serializer._pending_youtube_uri: dispatch_download(asset.asset_id, serializer._pending_youtube_uri) + # Generic remote video URLs get the same hand-off: the + # serializer pointed Asset.uri at a local destination path + # and flipped is_processing; the Celery task downloads via + # requests and chains into normalize_video_asset. Live + # streams (RTSP / HLS / DASH) are excluded by the + # serializer's ``is_downloadable_remote_video`` classify. + if serializer._pending_remote_video_uri: + dispatch_remote_video_download( + asset.asset_id, serializer._pending_remote_video_uri + ) + # Normalisation pipeline: HEIC/HEIF/TIFF → WebP, exotic video # → H.264 MP4. Dispatched after persistence (same pattern as # the YouTube hop above) so the row's ``asset_id`` is already diff --git a/src/anthias_server/celery_tasks.py b/src/anthias_server/celery_tasks.py index 8a6c0b46a..034ffed91 100755 --- a/src/anthias_server/celery_tasks.py +++ b/src/anthias_server/celery_tasks.py @@ -41,6 +41,9 @@ shutdown_via_balena_supervisor, url_fails, ) +from anthias_common.remote_video import ( # noqa: E402 + remote_video_destination_path, +) from anthias_common.youtube import youtube_destination_path # noqa: E402 from anthias_server.settings import settings # noqa: E402 @@ -637,6 +640,335 @@ def download_youtube_asset(asset_id: str, uri: str) -> None: dispatch_normalize_video(asset_id) +# --------------------------------------------------------------------------- +# download_remote_video_asset — generic http(s) single-file video URLs +# --------------------------------------------------------------------------- +# +# Mirrors the YouTube lifecycle for any ``http(s)://…`` URL whose +# extension or Content-Type identifies it as a downloadable video +# container (mp4 / webm / mov / mkv / ...). The serializer flips the +# row to ``is_processing=True`` and points ``Asset.uri`` at a local +# destination path; this task fetches the file, stamps metadata, and +# chains into ``normalize_video_asset`` for the per-board HW-codec +# gate — the same path YouTube downloads already follow. The win is +# uniformity: a 4K H.264 URL on a Pi 5 no longer silently SW-decodes +# at rotation time, origin downtime no longer turns into mid-rotation +# black-screen slots, and operators see codec/dims/fps in the asset +# table. +# +# Live streams (HLS / DASH / RTSP) are excluded by the serializer's +# classify step — they reach this task only via mis-routing, which +# the explicit Content-Type guard below catches and rejects. + + +# Wall-clock cap on a single download attempt. Matches +# YOUTUBE_DOWNLOAD_TIME_LIMIT_S so an operator who pastes a 1080p +# clip URL gets the same patience as a YouTube link. +REMOTE_VIDEO_DOWNLOAD_TIME_LIMIT_S = 60 * 15 + +# Hard ceiling on the downloaded file size. 5 GiB covers >99% of +# legitimate signage content (a 2-hour 4K H.264 at 5 Mbps is ~4.5 GB; +# typical 5-minute 1080p clips are ~150 MB). The cap is enforced +# *during* the stream — a malicious or misconfigured origin can't +# blow out the device's SD card by advertising a small file and then +# serving terabytes. Exceeding the cap raises a hard error so the +# operator sees the row land as Failed with a clear message, rather +# than silently truncating to the cap. +REMOTE_VIDEO_MAX_BYTES = 5 * 1024**3 + +# Connect / read timeouts. The connect timeout is short because a +# legitimate origin establishes the TCP + TLS handshake in well under +# a second; a 15s ceiling tolerates a slow DNS resolver or a sleepy +# CDN edge. The read timeout is per-chunk — a stalled stream that +# doesn't send any bytes for 60s gets killed rather than tying up +# the worker for the full 15-minute time_limit. +REMOTE_VIDEO_CONNECT_TIMEOUT_S = 15 +REMOTE_VIDEO_READ_TIMEOUT_S = 60 + +# Chunk size for the streaming download. 1 MiB is the sweet spot for +# the Pi-class SD-card writer: smaller chunks add per-write syscall +# overhead, larger chunks tie up RAM that could be feeding the +# kernel page cache. Same value used elsewhere in the codebase for +# bulk file IO. +_REMOTE_VIDEO_CHUNK_BYTES = 1024 * 1024 + +# Manifest Content-Types we explicitly reject at GET time even though +# the upfront HEAD probe in the serializer should have caught them. +# Some origins serve different headers on HEAD vs GET (HEAD returns +# 200 with ``video/mp4``, GET returns 200 with +# ``application/vnd.apple.mpegurl``) — defence in depth rather than +# trust the serializer's upstream classify. +_REMOTE_VIDEO_MANIFEST_CONTENT_TYPES = frozenset( + { + 'application/vnd.apple.mpegurl', + 'application/x-mpegurl', + 'application/dash+xml', + } +) + + +class RemoteVideoDownloadError(Exception): + """Raised by ``download_remote_video_asset`` for permanent + failures the operator needs to see on the row. + + Covers: non-2xx HTTP response, wrong Content-Type, file exceeded + the size cap, zero-byte response. All four are conditions where + retrying would just keep failing — the row lands on + ``on_failure`` and the operator sees the message on the Failed + pill's hover tooltip. + """ + + +class _DownloadRemoteVideoTask(Task): # type: ignore[type-arg] + """Custom Task subclass that funnels failures through the same + metadata-error contract as ``_DownloadYoutubeTask`` and + ``_NormalizeAssetTask``. + + Kept as a copy-paste of the YouTube task's failure handler rather + than refactored into a shared base — the failure body is 12 lines + and a shared base would obscure the per-task error contract that + each docstring documents inline. If a third download task ever + appears the refactor becomes worth doing. + """ + + def on_failure( + self, + exc: BaseException, + task_id: str, + args: tuple[Any, ...], + kwargs: dict[str, Any], + einfo: Any, + ) -> None: + asset_id = args[0] if args else kwargs.get('asset_id') + if not asset_id: + return + try: + from anthias_server.processing import ( + _notify, + _set_processing_error, + ) + + _set_processing_error(asset_id, f'{type(exc).__name__}: {exc}') + _notify(asset_id) + except Exception: + logging.exception( + 'download_remote_video_asset on_failure cleanup failed for %s', + asset_id, + ) + + +@celery.task( + base=_DownloadRemoteVideoTask, + time_limit=REMOTE_VIDEO_DOWNLOAD_TIME_LIMIT_S, + # Transient network / IO hiccups retry; permanent classes + # (RemoteVideoDownloadError covers HTTP 4xx, content-type + # mismatch, size cap) bubble straight to on_failure. + autoretry_for=(OSError,), + retry_backoff=15, + retry_backoff_max=300, + retry_jitter=True, + max_retries=2, +) +def download_remote_video_asset(asset_id: str, uri: str) -> None: + """Fetch *uri* into the row's persisted ``Asset.uri`` and chain + into ``normalize_video_asset``. + + The row is created upstream with ``mimetype='video'``, + ``is_processing=True``, and ``uri`` pointing at + ``/.`` (the eventual local path). On + success this task lands the file at that path, stamps + ``metadata['source']='remote_url'`` so the origin URL is + recoverable after a row edit, then dispatches + ``normalize_video_asset`` for the per-board HW-codec gate. The + row stays ``is_processing=True`` across the chain so the table + shows a single Processing → Done transition. + + Failure surface: + * non-2xx response → ``RemoteVideoDownloadError`` → on_failure + writes ``metadata.error_message`` and clears the flag. + * wrong Content-Type on GET (manifest, HTML error page) → + same. + * downloaded bytes > ``REMOTE_VIDEO_MAX_BYTES`` → same. + * transient network / IO hiccup → ``autoretry_for`` retries + twice with backoff; persistent failure lands on on_failure. + + Cleans up ``.part`` on every failure path so a partially-written + download doesn't linger as orphan content for the cleanup sweep + to deal with an hour later. + """ + # Lazy import: keeps the requests dependency out of the + # serializer's hot path import graph and matches the + # download_youtube_asset pattern. + import requests + + try: + asset = Asset.objects.get(asset_id=asset_id) + except Asset.DoesNotExist: + return + + if not asset.is_processing: + # Already finalised by a previous invocation, or operator- + # edited. Don't re-download or clobber state. + return + + # Trust the row's persisted destination — same rationale as + # download_youtube_asset's ``location = asset.uri or …``: a + # mid-flight assetdir change would otherwise write the file to + # the new path while the row still points at the old one. + # ``remote_video_destination_path`` with the URL's extension is + # the defensive fallback when the row landed with an empty uri. + location = asset.uri + if not location: + from urllib.parse import urlparse + + ext = path.splitext(urlparse(uri).path)[1].lower() or '.mp4' + location = remote_video_destination_path(asset_id, ext, settings) + + staging = f'{location}.part' + + def _drop_staging() -> None: + """Best-effort removal of the partial download. + + Mirrors the image-pipeline ``_drop_image_staging`` contract: + every failure path through the streaming download removes the + ``.part`` file before propagating, so a stalled or refused + download never leaves multi-GB debris behind. ``cleanup()`` + would catch it eventually but only after the 1h freshness + window — inline cleanup is the difference between an + operator's next upload working and "disk full" appearing in + the logs. + """ + try: + os.remove(staging) + except OSError: + pass + + try: + # ``stream=True`` so the response body isn't materialised in + # RAM — important for the multi-GB cap. ``allow_redirects`` + # follows the common CDN pattern where the canonical URL + # 302s to a signed download URL. The ``(connect, read)`` + # tuple form is the requests convention for per-phase + # timeouts; the read timeout is per-chunk so a stalled + # origin can't pin the worker for the full 15-minute + # task time_limit. + with requests.get( + uri, + stream=True, + allow_redirects=True, + timeout=( + REMOTE_VIDEO_CONNECT_TIMEOUT_S, + REMOTE_VIDEO_READ_TIMEOUT_S, + ), + ) as resp: + if resp.status_code >= 400: + raise RemoteVideoDownloadError( + f'HTTP {resp.status_code} fetching {uri!r}' + ) + content_type = (resp.headers.get('Content-Type') or '').lower() + base_type = content_type.split(';', 1)[0].strip() + if base_type in _REMOTE_VIDEO_MANIFEST_CONTENT_TYPES: + # The serializer's HEAD probe should have caught this, + # but some origins lie on HEAD and serve a manifest on + # GET. Defence in depth. + raise RemoteVideoDownloadError( + f'origin served streaming manifest ' + f'({base_type!r}) instead of a downloadable file; ' + 'live streams are not auto-downloaded' + ) + # Some misconfigured CDNs serve a video file as + # ``application/octet-stream``. Accept octet-stream and + # any ``video/*`` type; reject anything else (HTML error + # pages, JSON error envelopes, etc.) so we don't store + # a 200 OK error page as the asset. + if not ( + base_type.startswith('video/') + or base_type == 'application/octet-stream' + or base_type == '' # some origins omit Content-Type + ): + raise RemoteVideoDownloadError( + f'unexpected Content-Type {base_type!r} from {uri!r}; ' + 'expected video/* or application/octet-stream' + ) + + written = 0 + with open(staging, 'wb') as fh: + for chunk in resp.iter_content( + chunk_size=_REMOTE_VIDEO_CHUNK_BYTES + ): + if not chunk: + # iter_content yields empty bytes for + # keep-alive padding on some servers. Skip + # rather than treating as EOF. + continue + written += len(chunk) + if written > REMOTE_VIDEO_MAX_BYTES: + raise RemoteVideoDownloadError( + f'download exceeded size cap of ' + f'{REMOTE_VIDEO_MAX_BYTES} bytes for {uri!r}' + ) + fh.write(chunk) + if written == 0: + raise RemoteVideoDownloadError( + f'origin returned zero bytes for {uri!r}' + ) + except RemoteVideoDownloadError: + _drop_staging() + raise + except (OSError, requests.RequestException): + _drop_staging() + raise + + # Atomic rename within assetdir — POSIX guarantees a single inode + # swap so the viewer never sees a half-written file under the + # final name. ``os.replace`` is the cross-platform spelling of + # ``rename`` that overwrites an existing destination (e.g. a + # second invocation on the same asset_id, or a re-download + # triggered by the reconciler). + try: + os.replace(staging, location) + except OSError: + _drop_staging() + raise + + metadata = dict(asset.metadata or {}) + metadata.update( + { + 'source': 'remote_url', + 'source_url': uri, + } + ) + metadata.pop('error_message', None) + + update: dict[str, Any] = { + # ``is_processing`` deliberately stays True — the chained + # normalize_video_asset below clears it once its probe + # finishes. Single Processing → Done transition reads + # better than the previous two-step. + 'metadata': metadata, + } + # Same uri-backfill rule as download_youtube_asset: write the + # row's uri only if it was empty on entry, otherwise leave the + # operator-set value alone. + if not asset.uri: + update['uri'] = location + Asset.objects.filter(asset_id=asset_id).update(**update) + + from anthias_server.processing import ( + _notify, + dispatch_normalize_video, + ) + + # Dashboard nudge so the operator's table picks up the row's + # progress without waiting for the 5s poll. Viewer reload is + # deferred to the normalize chain (same as YouTube) — the row is + # still ``is_processing=True`` and the on-device viewer would + # just reload to a row it can't display. + _notify(asset_id, reload_viewer=False) + + dispatch_normalize_video(asset_id) + + @celery.task def reboot_anthias() -> None: """ diff --git a/tests/test_celery_tasks.py b/tests/test_celery_tasks.py index 4a45e88a8..8a182f483 100644 --- a/tests/test_celery_tasks.py +++ b/tests/test_celery_tasks.py @@ -963,6 +963,355 @@ def test_download_youtube_asset_on_failure_writes_error_metadata() -> None: mock_notify.assert_called_once_with('yt-1') +# --------------------------------------------------------------------------- +# download_remote_video_asset — generic http(s) single-file video URLs +# --------------------------------------------------------------------------- + + +from anthias_server.celery_tasks import ( # noqa: E402 + RemoteVideoDownloadError, + download_remote_video_asset, +) + + +def _make_remote_video_asset( + asset_dir: str, + asset_id: str = 'rv-1', + ext: str = '.mp4', + is_processing: bool = True, +) -> Asset: + """A row in the state the serializer leaves behind for an + http(s) video URL: mimetype='video', is_processing=True, uri + pointing at the eventual local destination.""" + return Asset.objects.create( + asset_id=asset_id, + name='https://example.com/clip' + ext, + uri=path.join(asset_dir, f'{asset_id}{ext}'), + mimetype='video', + duration=0, + is_enabled=True, + is_processing=is_processing, + play_order=0, + ) + + +def _fake_response( + body: bytes = b'fake mp4 bytes', + content_type: str = 'video/mp4', + status_code: int = 200, + chunk_size: int = 4, +) -> mock.MagicMock: + """Shape a fake ``requests.get`` response: context-manager-shaped, + streams ``body`` in fixed-size chunks via ``iter_content``.""" + resp = mock.MagicMock() + resp.status_code = status_code + resp.headers = {'Content-Type': content_type} + + def _iter_content(chunk_size: int) -> Iterator[bytes]: + for i in range(0, len(body), chunk_size): + yield body[i : i + chunk_size] + + resp.iter_content = _iter_content + resp.__enter__.return_value = resp + resp.__exit__.return_value = False + return resp + + +@pytest.fixture +def remote_video_asset_dir() -> Iterator[str]: + """Per-test temp directory pinned as settings['assetdir'] so the + download task writes inside it and we can inspect the on-disk + result without depending on the host filesystem.""" + Asset.objects.all().delete() + tmp = tempfile.TemporaryDirectory() + original = settings['assetdir'] + settings['assetdir'] = tmp.name + try: + yield tmp.name + finally: + settings['assetdir'] = original + tmp.cleanup() + Asset.objects.all().delete() + + +@pytest.mark.django_db +def test_download_remote_video_asset_success_chains_into_normalize( + remote_video_asset_dir: str, +) -> None: + """Happy path: requests streams a small payload to the row's + persisted ``uri``, metadata gets stamped with ``source='remote_url'`` + and ``source_url``, the dashboard notify fires, and + ``normalize_video_asset`` is dispatched so the per-board codec + gate runs. ``is_processing`` stays True across the chain — the + chained normalize_video clears it once ffprobe finishes.""" + _make_remote_video_asset(remote_video_asset_dir) + payload = b'mp4-bytes' * 100 + with ( + mock.patch('requests.get', return_value=_fake_response(body=payload)), + mock.patch( + 'anthias_server.processing.dispatch_normalize_video' + ) as mock_dispatch_normalize, + mock.patch( + 'anthias_server.app.consumers.notify_asset_update' + ) as mock_notify, + ): + download_remote_video_asset('rv-1', 'https://example.com/clip.mp4') + + a = Asset.objects.get(asset_id='rv-1') + # File landed at the persisted destination, .part is gone. + dest = path.join(remote_video_asset_dir, 'rv-1.mp4') + assert path.isfile(dest) + assert not path.exists(f'{dest}.part') + with open(dest, 'rb') as fh: + assert fh.read() == payload + # Origin URL is recoverable after future row edits. + assert a.metadata['source'] == 'remote_url' + assert a.metadata['source_url'] == 'https://example.com/clip.mp4' + # Row stays in-flight until normalize clears the flag — matches + # the YouTube lifecycle for a single Processing → Done arc. + assert a.is_processing is True + mock_dispatch_normalize.assert_called_once_with('rv-1') + mock_notify.assert_called_once_with('rv-1') + + +@pytest.mark.django_db +def test_download_remote_video_asset_size_cap_aborts( + remote_video_asset_dir: str, +) -> None: + """A payload that streams past ``REMOTE_VIDEO_MAX_BYTES`` raises + inside the task body. The ``.part`` file must be cleaned up so a + multi-GB partial download doesn't linger on the SD card waiting + for the cleanup() sweep an hour later. + + Monkey-patch the cap down to 100 bytes so the test allocates a + tiny payload — actually writing 5 GiB of fake bytes would blow + out the test runner's tmpfs / disk quota. + """ + _make_remote_video_asset(remote_video_asset_dir) + fake_resp = _fake_response(body=b'x' * 200, chunk_size=32) + with ( + mock.patch.object(celery_tasks_module, 'REMOTE_VIDEO_MAX_BYTES', 100), + mock.patch('requests.get', return_value=fake_resp), + mock.patch( + 'anthias_server.processing.dispatch_normalize_video' + ) as mock_dispatch, + mock.patch('anthias_server.app.consumers.notify_asset_update'), + ): + with pytest.raises(RemoteVideoDownloadError, match='size cap'): + download_remote_video_asset('rv-1', 'https://example.com/huge.mp4') + dest = path.join(remote_video_asset_dir, 'rv-1.mp4') + assert not path.exists(dest) + assert not path.exists(f'{dest}.part') + mock_dispatch.assert_not_called() + + +@pytest.mark.django_db +def test_download_remote_video_asset_http_404_propagates_for_on_failure( + remote_video_asset_dir: str, +) -> None: + """A non-2xx response raises ``RemoteVideoDownloadError``. Celery + routes the exception through ``_DownloadRemoteVideoTask.on_failure`` + which clears ``is_processing`` and writes + ``metadata.error_message`` — same shape as YouTube's permanent + failures.""" + _make_remote_video_asset(remote_video_asset_dir) + with ( + mock.patch( + 'requests.get', + return_value=_fake_response(status_code=404, body=b''), + ), + mock.patch('anthias_server.processing.dispatch_normalize_video'), + mock.patch('anthias_server.app.consumers.notify_asset_update'), + ): + with pytest.raises(RemoteVideoDownloadError, match='HTTP 404'): + download_remote_video_asset( + 'rv-1', 'https://example.com/missing.mp4' + ) + + +@pytest.mark.django_db +def test_download_remote_video_asset_manifest_content_type_aborts( + remote_video_asset_dir: str, +) -> None: + """Defence in depth: even if the serializer's HEAD probe said the + URL was a single file, an origin that serves an HLS manifest on + GET must be rejected. Storing the playlist as the asset would + leave the viewer pointing at a file that references segments it + can't fetch.""" + _make_remote_video_asset(remote_video_asset_dir) + with ( + mock.patch( + 'requests.get', + return_value=_fake_response( + content_type='application/vnd.apple.mpegurl', + body=b'#EXTM3U\n', + ), + ), + mock.patch('anthias_server.processing.dispatch_normalize_video'), + mock.patch('anthias_server.app.consumers.notify_asset_update'), + ): + with pytest.raises(RemoteVideoDownloadError, match='manifest'): + download_remote_video_asset('rv-1', 'https://example.com/sneaky') + + +@pytest.mark.django_db +def test_download_remote_video_asset_wrong_content_type_aborts( + remote_video_asset_dir: str, +) -> None: + """``Content-Type: text/html`` on the GET response — most likely + a 200-OK error page from a misbehaving origin. Reject rather than + store the HTML as the asset.""" + _make_remote_video_asset(remote_video_asset_dir) + with ( + mock.patch( + 'requests.get', + return_value=_fake_response( + content_type='text/html', body=b'not here' + ), + ), + mock.patch('anthias_server.processing.dispatch_normalize_video'), + mock.patch('anthias_server.app.consumers.notify_asset_update'), + ): + with pytest.raises(RemoteVideoDownloadError, match='Content-Type'): + download_remote_video_asset('rv-1', 'https://example.com/error') + + +@pytest.mark.django_db +def test_download_remote_video_asset_accepts_octet_stream( + remote_video_asset_dir: str, +) -> None: + """Some CDNs serve video files as ``application/octet-stream``. + Accept it — ffprobe in the chained normalize step is the real + arbiter of container/codec, not the HTTP Content-Type.""" + _make_remote_video_asset(remote_video_asset_dir) + with ( + mock.patch( + 'requests.get', + return_value=_fake_response( + content_type='application/octet-stream', + body=b'mp4-bytes' * 50, + ), + ), + mock.patch( + 'anthias_server.processing.dispatch_normalize_video' + ) as mock_dispatch, + mock.patch('anthias_server.app.consumers.notify_asset_update'), + ): + download_remote_video_asset('rv-1', 'https://example.com/clip.mp4') + dest = path.join(remote_video_asset_dir, 'rv-1.mp4') + assert path.isfile(dest) + mock_dispatch.assert_called_once_with('rv-1') + + +@pytest.mark.django_db +def test_download_remote_video_asset_zero_bytes_aborts( + remote_video_asset_dir: str, +) -> None: + """A 200 OK with an empty body almost certainly means the origin + is broken; storing an empty file would just queue a row that the + viewer would refuse anyway. Surface it as a clear failure on the + operator's row.""" + _make_remote_video_asset(remote_video_asset_dir) + with ( + mock.patch('requests.get', return_value=_fake_response(body=b'')), + mock.patch('anthias_server.processing.dispatch_normalize_video'), + mock.patch('anthias_server.app.consumers.notify_asset_update'), + ): + with pytest.raises(RemoteVideoDownloadError, match='zero bytes'): + download_remote_video_asset( + 'rv-1', 'https://example.com/empty.mp4' + ) + dest = path.join(remote_video_asset_dir, 'rv-1.mp4') + assert not path.exists(dest) + assert not path.exists(f'{dest}.part') + + +@pytest.mark.django_db +def test_download_remote_video_asset_no_op_for_missing_row( + remote_video_asset_dir: str, +) -> None: + """Row deleted between dispatch and pickup — task returns cleanly + without hitting the network.""" + with mock.patch('requests.get') as fake_get: + download_remote_video_asset( + 'does-not-exist', 'https://example.com/x.mp4' + ) + fake_get.assert_not_called() + + +@pytest.mark.django_db +def test_download_remote_video_asset_no_op_when_row_already_finalized( + remote_video_asset_dir: str, +) -> None: + """A duplicate task firing for a row whose first invocation already + cleared is_processing must not re-download or stomp on + operator-edited state.""" + _make_remote_video_asset(remote_video_asset_dir, is_processing=False) + with mock.patch('requests.get') as fake_get: + download_remote_video_asset('rv-1', 'https://example.com/x.mp4') + fake_get.assert_not_called() + + +@pytest.mark.django_db +def test_download_remote_video_asset_backfills_uri_when_row_uri_missing( + remote_video_asset_dir: str, +) -> None: + """Defensive: if the row landed without a uri (e.g. a custom caller + skipped the serializer's path-stamping), the task uses the + recomputed destination AND backfills the row so the viewer + API + can find the file.""" + Asset.objects.create( + asset_id='rv-empty', + name='https://example.com/x.mp4', + uri='', + mimetype='video', + duration=0, + is_enabled=True, + is_processing=True, + play_order=0, + ) + with ( + mock.patch( + 'requests.get', + return_value=_fake_response(body=b'mp4-bytes'), + ), + mock.patch('anthias_server.processing.dispatch_normalize_video'), + mock.patch('anthias_server.app.consumers.notify_asset_update'), + ): + download_remote_video_asset('rv-empty', 'https://example.com/x.mp4') + a = Asset.objects.get(asset_id='rv-empty') + expected = path.join(remote_video_asset_dir, 'rv-empty.mp4') + assert a.uri == expected + assert path.isfile(expected) + + +@pytest.mark.django_db +def test_download_remote_video_asset_on_failure_writes_error_metadata( + remote_video_asset_dir: str, +) -> None: + """When Celery declares the task failed, is_processing must flip + back to False AND ``metadata.error_message`` must carry the + exception class + message so the asset table renders the "Failed" + pill — same operator-visible contract as YouTube / normalize + failures.""" + _make_remote_video_asset(remote_video_asset_dir) + with mock.patch( + 'anthias_server.app.consumers.notify_asset_update' + ) as mock_notify: + download_remote_video_asset.on_failure( + RuntimeError('connection refused'), + task_id='t-1', + args=('rv-1',), + kwargs={}, + einfo=None, + ) + a = Asset.objects.get(asset_id='rv-1') + assert a.is_processing is False + assert 'RuntimeError' in a.metadata['error_message'] + assert 'connection refused' in a.metadata['error_message'] + mock_notify.assert_called_once_with('rv-1') + + # --------------------------------------------------------------------------- # reconcile_stuck_processing (periodic recovery for is_processing=True # rows that never finished — GH #2870 second-line defence) diff --git a/tests/test_remote_video.py b/tests/test_remote_video.py new file mode 100644 index 000000000..7ba533de2 --- /dev/null +++ b/tests/test_remote_video.py @@ -0,0 +1,268 @@ +"""Unit tests for ``anthias_common.remote_video``. + +The classifier sits in the synchronous POST /assets path, so behaviour +under each input shape (known extension, manifest, stream scheme, +extensionless URL with various HEAD responses, network failure) is +covered explicitly to lock the create-asset contract. + +These tests do not need Django — the helpers are framework-free — but +they live under ``tests/`` so the existing pytest harness picks them +up alongside the celery/processing suites. +""" + +from __future__ import annotations + +from unittest import mock + +import pytest +import requests + +from anthias_common.remote_video import ( + is_downloadable_remote_video, + remote_video_destination_path, +) + + +# --------------------------------------------------------------------------- +# Extension-based classify (no HEAD call) +# --------------------------------------------------------------------------- + + +@pytest.mark.parametrize( + 'uri,expected_ext', + [ + ('https://example.com/clip.mp4', '.mp4'), + ('https://cdn.example.com/path/to/file.webm', '.webm'), + ('https://example.com/movie.MOV', '.mov'), + ('http://example.com/x.mkv', '.mkv'), + ('https://example.com/a.avi', '.avi'), + ('https://example.com/short.m4v', '.m4v'), + ('https://example.com/old.ogv', '.ogv'), + # Query strings and fragments do not fool the extension match. + ('https://example.com/clip.mp4?download=true', '.mp4'), + ('https://example.com/clip.mp4#t=10', '.mp4'), + ], +) +def test_classify_known_video_extension_returns_download( + uri: str, expected_ext: str +) -> None: + """A URL whose path ends in a known single-file video container + auto-downloads with the matching local extension. No HEAD call + fires — extension match is the fast path.""" + with mock.patch('anthias_common.remote_video.requests.head') as head: + ok, ext = is_downloadable_remote_video(uri) + assert ok is True + assert ext == expected_ext + head.assert_not_called() + + +@pytest.mark.parametrize( + 'uri', + [ + 'https://example.com/stream.m3u8', + 'https://example.com/dash/manifest.mpd', + 'https://example.com/legacy.m3u', + 'https://example.com/smooth/Manifest.ism', + ], +) +def test_classify_streaming_manifest_extensions_return_stream( + uri: str, +) -> None: + """HLS / DASH / SmoothStreaming manifests never auto-download — + they describe a stream, not a single file. No HEAD call (the + extension match short-circuits).""" + with mock.patch('anthias_common.remote_video.requests.head') as head: + ok, ext = is_downloadable_remote_video(uri) + assert ok is False + assert ext == '' + head.assert_not_called() + + +@pytest.mark.parametrize( + 'uri', + [ + 'rtsp://camera.local/feed', + 'rtmp://media.example.com/live', + 'srt://stream.example.com:9000', + 'udp://239.0.0.1:1234', + 'mms://media.example.com/live', + ], +) +def test_classify_streaming_schemes_return_stream(uri: str) -> None: + """RTSP / RTMP / SRT / UDP / MMS are streaming-by-construction, + even if the URL's path happens to end in ``.mp4``. The viewer + plays them live via mpv's network stack.""" + with mock.patch('anthias_common.remote_video.requests.head') as head: + ok, ext = is_downloadable_remote_video(uri) + assert ok is False + assert ext == '' + head.assert_not_called() + + +def test_classify_streaming_scheme_with_mp4_path_returns_stream() -> None: + """``rtsp://camera/feed.mp4`` is RTSP. Path extension does not + promote it to an http(s) download.""" + with mock.patch('anthias_common.remote_video.requests.head') as head: + ok, ext = is_downloadable_remote_video('rtsp://camera/feed.mp4') + assert ok is False + assert ext == '' + head.assert_not_called() + + +def test_classify_non_http_scheme_returns_stream() -> None: + """Non-http(s)/non-streaming schemes (file://, ftp://, ...) get + the negative classify. The classifier deliberately refuses to + download from anything but well-known network protocols.""" + with mock.patch('anthias_common.remote_video.requests.head') as head: + ok, ext = is_downloadable_remote_video('file:///tmp/clip.mp4') + assert ok is False + assert ext == '' + head.assert_not_called() + + +def test_classify_empty_uri_returns_stream() -> None: + ok, ext = is_downloadable_remote_video('') + assert ok is False + assert ext == '' + + +# --------------------------------------------------------------------------- +# HEAD-probe fallback (extensionless / unknown-extension URLs) +# --------------------------------------------------------------------------- + + +def _fake_head(content_type: str, status_code: int = 200) -> mock.MagicMock: + """Shape a fake ``requests.head`` response with a given + Content-Type and status code.""" + resp = mock.MagicMock() + resp.status_code = status_code + resp.headers = {'Content-Type': content_type} + return resp + + +def test_classify_bare_url_falls_back_to_head_probe_video() -> None: + """No extension on the URL, but HEAD reports ``Content-Type: + video/mp4`` → auto-download with the inferred extension.""" + with mock.patch( + 'anthias_common.remote_video.requests.head', + return_value=_fake_head('video/mp4'), + ) as head: + ok, ext = is_downloadable_remote_video( + 'https://api.example.com/video/12345' + ) + assert ok is True + assert ext == '.mp4' + head.assert_called_once() + + +def test_classify_head_probe_html_returns_stream() -> None: + """HEAD reports ``Content-Type: text/html`` (a 404 page, a JSON + error envelope's text/html landing page, ...) → stay as stream + URL. The download task would have stored the error page as the + asset; we want the row to remain a literal-URL stream instead.""" + with mock.patch( + 'anthias_common.remote_video.requests.head', + return_value=_fake_head('text/html; charset=utf-8'), + ): + ok, ext = is_downloadable_remote_video( + 'https://api.example.com/video/12345' + ) + assert ok is False + assert ext == '' + + +def test_classify_head_probe_manifest_content_type_returns_stream() -> None: + """Some HLS origins serve ``application/vnd.apple.mpegurl`` from + URLs without a ``.m3u8`` extension. Reject those at the HEAD + probe — downloading the manifest as a single file would store + the playlist, not the segments it points at.""" + with mock.patch( + 'anthias_common.remote_video.requests.head', + return_value=_fake_head('application/vnd.apple.mpegurl'), + ): + ok, ext = is_downloadable_remote_video( + 'https://hls.example.com/stream' + ) + assert ok is False + assert ext == '' + + +def test_classify_head_probe_http_error_returns_stream() -> None: + """HEAD returns 4xx → stay as stream URL. Some origins respond + 405 Method Not Allowed to HEAD; either way, downgrading to + stream-mode keeps the create call from failing — the viewer + will play (or fail to play) the URL as a stream.""" + with mock.patch( + 'anthias_common.remote_video.requests.head', + return_value=_fake_head('video/mp4', status_code=405), + ): + ok, ext = is_downloadable_remote_video( + 'https://api.example.com/video/12345' + ) + assert ok is False + assert ext == '' + + +@pytest.mark.parametrize( + 'exc', + [ + requests.exceptions.Timeout('slow origin'), + requests.exceptions.ConnectionError('refused'), + requests.exceptions.TooManyRedirects('loop'), + requests.exceptions.SSLError('bad cert'), + ], +) +def test_classify_head_probe_network_failure_returns_stream( + exc: Exception, +) -> None: + """Any network exception during the HEAD probe → stay as stream + URL. The classifier is best-effort; we never block the create + call on a flaky origin.""" + with mock.patch( + 'anthias_common.remote_video.requests.head', + side_effect=exc, + ): + ok, ext = is_downloadable_remote_video( + 'https://api.example.com/video/12345' + ) + assert ok is False + assert ext == '' + + +def test_classify_head_probe_uses_short_timeout() -> None: + """The synchronous HEAD probe must run with the documented 5s + timeout — operators are blocking on the POST /assets call. Any + drift in the timeout constant would slow create requests.""" + with mock.patch( + 'anthias_common.remote_video.requests.head', + return_value=_fake_head('video/mp4'), + ) as head: + is_downloadable_remote_video('https://api.example.com/video/12345') + _, kwargs = head.call_args + assert kwargs['timeout'] == 5 + assert kwargs['allow_redirects'] is True + + +# --------------------------------------------------------------------------- +# Destination path +# --------------------------------------------------------------------------- + + +def test_remote_video_destination_path_uses_assetdir(tmp_path) -> None: + """The local destination lives under settings['assetdir'] so + cleanup() recognises the downloaded file as referenced and + doesn't sweep it as an orphan.""" + fake_settings = {'assetdir': str(tmp_path)} + result = remote_video_destination_path('abc123', '.mp4', fake_settings) + assert result == f'{tmp_path}/abc123.mp4' + + +def test_remote_video_destination_path_preserves_extension() -> None: + """The extension is the caller's responsibility — pass through + verbatim. Allows webm/mkv/avi to land with their real container + so ffprobe identifies them correctly.""" + for ext in ('.mp4', '.webm', '.mkv', '.mov'): + result = remote_video_destination_path( + 'asset-1', ext, {'assetdir': '/data'} + ) + assert result == f'/data/asset-1{ext}' From 7903bdedf1b13abb5b80b807848967c3eabfb793 Mon Sep 17 00:00:00 2001 From: Viktor Petersson Date: Tue, 19 May 2026 11:29:59 +0000 Subject: [PATCH 2/5] refactor(remote_video): apply review fixes - Route HEAD probe + streaming GET through ``AnthiasSession`` so origins see the project-wide ``Anthias/`` UA (#2897). - Drop the ``urllib3`` logger level side effect at import time. - Trust the serializer-stamped ``Asset.uri`` exclusively; refuse the task on an empty uri rather than guessing the extension (which could diverge from the HEAD-probed choice). --- src/anthias_common/remote_video.py | 23 ++++++----- src/anthias_server/celery_tasks.py | 50 ++++++++++++++---------- tests/test_celery_tasks.py | 63 ++++++++++++++++++------------ tests/test_remote_video.py | 22 +++++------ 4 files changed, 94 insertions(+), 64 deletions(-) diff --git a/src/anthias_common/remote_video.py b/src/anthias_common/remote_video.py index 81f7a359c..0983431da 100644 --- a/src/anthias_common/remote_video.py +++ b/src/anthias_common/remote_video.py @@ -16,7 +16,6 @@ from __future__ import annotations -import logging import mimetypes from os import path from typing import TYPE_CHECKING @@ -24,10 +23,19 @@ import requests +from anthias_common.http import AnthiasSession + if TYPE_CHECKING: from anthias_server.settings import AnthiasSettings +# Module-level shared session so the HEAD probe reuses one TCP/TLS +# connection pool across the lifetime of the process. Pattern matches +# ``anthias_server.lib.screenly_migration._session`` — tests patch +# ``_session.head`` (or whichever method) directly. +_session = AnthiasSession() + + # Single-file video containers we know how to download and that the # normalisation pipeline can ffprobe. The set is deliberately # conservative: anything ending in one of these gets auto-downloaded, @@ -125,8 +133,12 @@ def _head_probe(uri: str) -> tuple[bool, str]: ``_VIDEO_CONTAINER_EXTS``. Falls back to ``.mp4`` on the (rare) case where guess_extension returns None for a video/* type. """ + # Route through the module-level ``AnthiasSession`` so origins + # see a consistent ``Anthias/`` UA (matches the project- + # wide outbound convention from #2897) and the connection pool + # is reused across probes. try: - resp = requests.head( + resp = _session.head( uri, allow_redirects=True, timeout=_HEAD_PROBE_TIMEOUT_S, @@ -229,10 +241,3 @@ def dispatch_remote_video_download(asset_id: str, source_uri: str) -> None: stamp_processing_start(asset_id) download_remote_video_asset.delay(asset_id, source_uri) - - -# Suppress noisy library-side logging during the HEAD probe — the -# serializer caller already turns a failure into "stays as stream URL" -# and we don't want a transient DNS hiccup to spew tracebacks into -# every create-asset POST log line. -logging.getLogger('urllib3').setLevel(logging.WARNING) diff --git a/src/anthias_server/celery_tasks.py b/src/anthias_server/celery_tasks.py index 034ffed91..6e5dbcc16 100755 --- a/src/anthias_server/celery_tasks.py +++ b/src/anthias_server/celery_tasks.py @@ -41,9 +41,6 @@ shutdown_via_balena_supervisor, url_fails, ) -from anthias_common.remote_video import ( # noqa: E402 - remote_video_destination_path, -) from anthias_common.youtube import youtube_destination_path # noqa: E402 from anthias_server.settings import settings # noqa: E402 @@ -707,6 +704,14 @@ def download_youtube_asset(asset_id: str, uri: str) -> None: ) +# Module-level session — same UA convention as the HEAD probe in +# ``anthias_common.remote_video``. Tests patch ``_session.get``. +# Lazy import so the symbol resolves after Django's apps_ready. +from anthias_common.http import AnthiasSession # noqa: E402 + +_session = AnthiasSession() + + class RemoteVideoDownloadError(Exception): """Raised by ``download_remote_video_asset`` for permanent failures the operator needs to see on the row. @@ -796,9 +801,9 @@ def download_remote_video_asset(asset_id: str, uri: str) -> None: download doesn't linger as orphan content for the cleanup sweep to deal with an hour later. """ - # Lazy import: keeps the requests dependency out of the - # serializer's hot path import graph and matches the - # download_youtube_asset pattern. + # Lazy import of ``requests`` for the exception type only; + # ``_session`` (module-level ``AnthiasSession``) is the actual + # client. import requests try: @@ -811,18 +816,22 @@ def download_remote_video_asset(asset_id: str, uri: str) -> None: # edited. Don't re-download or clobber state. return - # Trust the row's persisted destination — same rationale as - # download_youtube_asset's ``location = asset.uri or …``: a - # mid-flight assetdir change would otherwise write the file to - # the new path while the row still points at the old one. - # ``remote_video_destination_path`` with the URL's extension is - # the defensive fallback when the row landed with an empty uri. + # The serializer stamps ``Asset.uri`` at the eventual local + # destination path before dispatching this task (the extension + # is picked from the URL or the HEAD probe's Content-Type). + # Trust that value rather than recomputing — recomputing the + # extension here from the URL alone would diverge from what + # the serializer chose for a HEAD-probed extensionless URL + # (``video/webm`` → ``.webm`` at the serializer vs. ``.mp4`` + # default here). A row reaching this task with an empty uri is + # a programming error (broken dispatch site, hand-edited row), + # not something to paper over with a guess. + if not asset.uri: + raise RemoteVideoDownloadError( + f'asset {asset_id!r} has no destination uri — refusing ' + 'to download without a serializer-stamped path' + ) location = asset.uri - if not location: - from urllib.parse import urlparse - - ext = path.splitext(urlparse(uri).path)[1].lower() or '.mp4' - location = remote_video_destination_path(asset_id, ext, settings) staging = f'{location}.part' @@ -850,9 +859,10 @@ def _drop_staging() -> None: # 302s to a signed download URL. The ``(connect, read)`` # tuple form is the requests convention for per-phase # timeouts; the read timeout is per-chunk so a stalled - # origin can't pin the worker for the full 15-minute - # task time_limit. - with requests.get( + # origin can't pin the worker for the full 15-minute task + # time_limit. The module-level ``_session`` injects the + # project-wide ``Anthias/`` User-Agent (#2897). + with _session.get( uri, stream=True, allow_redirects=True, diff --git a/tests/test_celery_tasks.py b/tests/test_celery_tasks.py index 8a182f483..0676b694f 100644 --- a/tests/test_celery_tasks.py +++ b/tests/test_celery_tasks.py @@ -1001,7 +1001,7 @@ def _fake_response( status_code: int = 200, chunk_size: int = 4, ) -> mock.MagicMock: - """Shape a fake ``requests.get`` response: context-manager-shaped, + """Shape a fake ``_session.get`` response: context-manager-shaped, streams ``body`` in fixed-size chunks via ``iter_content``.""" resp = mock.MagicMock() resp.status_code = status_code @@ -1047,7 +1047,10 @@ def test_download_remote_video_asset_success_chains_into_normalize( _make_remote_video_asset(remote_video_asset_dir) payload = b'mp4-bytes' * 100 with ( - mock.patch('requests.get', return_value=_fake_response(body=payload)), + mock.patch( + 'anthias_server.celery_tasks._session.get', + return_value=_fake_response(body=payload), + ), mock.patch( 'anthias_server.processing.dispatch_normalize_video' ) as mock_dispatch_normalize, @@ -1091,7 +1094,9 @@ def test_download_remote_video_asset_size_cap_aborts( fake_resp = _fake_response(body=b'x' * 200, chunk_size=32) with ( mock.patch.object(celery_tasks_module, 'REMOTE_VIDEO_MAX_BYTES', 100), - mock.patch('requests.get', return_value=fake_resp), + mock.patch( + 'anthias_server.celery_tasks._session.get', return_value=fake_resp + ), mock.patch( 'anthias_server.processing.dispatch_normalize_video' ) as mock_dispatch, @@ -1117,7 +1122,7 @@ def test_download_remote_video_asset_http_404_propagates_for_on_failure( _make_remote_video_asset(remote_video_asset_dir) with ( mock.patch( - 'requests.get', + 'anthias_server.celery_tasks._session.get', return_value=_fake_response(status_code=404, body=b''), ), mock.patch('anthias_server.processing.dispatch_normalize_video'), @@ -1141,7 +1146,7 @@ def test_download_remote_video_asset_manifest_content_type_aborts( _make_remote_video_asset(remote_video_asset_dir) with ( mock.patch( - 'requests.get', + 'anthias_server.celery_tasks._session.get', return_value=_fake_response( content_type='application/vnd.apple.mpegurl', body=b'#EXTM3U\n', @@ -1164,7 +1169,7 @@ def test_download_remote_video_asset_wrong_content_type_aborts( _make_remote_video_asset(remote_video_asset_dir) with ( mock.patch( - 'requests.get', + 'anthias_server.celery_tasks._session.get', return_value=_fake_response( content_type='text/html', body=b'not here' ), @@ -1186,7 +1191,7 @@ def test_download_remote_video_asset_accepts_octet_stream( _make_remote_video_asset(remote_video_asset_dir) with ( mock.patch( - 'requests.get', + 'anthias_server.celery_tasks._session.get', return_value=_fake_response( content_type='application/octet-stream', body=b'mp4-bytes' * 50, @@ -1213,7 +1218,10 @@ def test_download_remote_video_asset_zero_bytes_aborts( operator's row.""" _make_remote_video_asset(remote_video_asset_dir) with ( - mock.patch('requests.get', return_value=_fake_response(body=b'')), + mock.patch( + 'anthias_server.celery_tasks._session.get', + return_value=_fake_response(body=b''), + ), mock.patch('anthias_server.processing.dispatch_normalize_video'), mock.patch('anthias_server.app.consumers.notify_asset_update'), ): @@ -1232,7 +1240,7 @@ def test_download_remote_video_asset_no_op_for_missing_row( ) -> None: """Row deleted between dispatch and pickup — task returns cleanly without hitting the network.""" - with mock.patch('requests.get') as fake_get: + with mock.patch('anthias_server.celery_tasks._session.get') as fake_get: download_remote_video_asset( 'does-not-exist', 'https://example.com/x.mp4' ) @@ -1247,19 +1255,26 @@ def test_download_remote_video_asset_no_op_when_row_already_finalized( cleared is_processing must not re-download or stomp on operator-edited state.""" _make_remote_video_asset(remote_video_asset_dir, is_processing=False) - with mock.patch('requests.get') as fake_get: + with mock.patch('anthias_server.celery_tasks._session.get') as fake_get: download_remote_video_asset('rv-1', 'https://example.com/x.mp4') fake_get.assert_not_called() @pytest.mark.django_db -def test_download_remote_video_asset_backfills_uri_when_row_uri_missing( +def test_download_remote_video_asset_refuses_row_with_empty_uri( remote_video_asset_dir: str, ) -> None: - """Defensive: if the row landed without a uri (e.g. a custom caller - skipped the serializer's path-stamping), the task uses the - recomputed destination AND backfills the row so the viewer + API - can find the file.""" + """The serializer is the source of truth for the destination path — + it stamps ``Asset.uri`` at ``/`` before + dispatching, picking the extension from the URL or the HEAD + probe's Content-Type. + + A row reaching this task with an empty uri is a programming error + (broken dispatch site, hand-edited row, backup restore). The task + refuses rather than guessing an extension — recomputing here + would diverge from the serializer's choice for a HEAD-probed + extensionless URL (``video/webm`` → ``.webm`` at the serializer + vs. ``.mp4`` default here).""" Asset.objects.create( asset_id='rv-empty', name='https://example.com/x.mp4', @@ -1271,18 +1286,18 @@ def test_download_remote_video_asset_backfills_uri_when_row_uri_missing( play_order=0, ) with ( - mock.patch( - 'requests.get', - return_value=_fake_response(body=b'mp4-bytes'), - ), + mock.patch('anthias_server.celery_tasks._session.get') as fake_get, mock.patch('anthias_server.processing.dispatch_normalize_video'), mock.patch('anthias_server.app.consumers.notify_asset_update'), ): - download_remote_video_asset('rv-empty', 'https://example.com/x.mp4') - a = Asset.objects.get(asset_id='rv-empty') - expected = path.join(remote_video_asset_dir, 'rv-empty.mp4') - assert a.uri == expected - assert path.isfile(expected) + with pytest.raises( + RemoteVideoDownloadError, match='no destination uri' + ): + download_remote_video_asset( + 'rv-empty', 'https://example.com/x.mp4' + ) + # No network call attempted — the guard fires before the GET. + fake_get.assert_not_called() @pytest.mark.django_db diff --git a/tests/test_remote_video.py b/tests/test_remote_video.py index 7ba533de2..45c4d61ac 100644 --- a/tests/test_remote_video.py +++ b/tests/test_remote_video.py @@ -49,7 +49,7 @@ def test_classify_known_video_extension_returns_download( """A URL whose path ends in a known single-file video container auto-downloads with the matching local extension. No HEAD call fires — extension match is the fast path.""" - with mock.patch('anthias_common.remote_video.requests.head') as head: + with mock.patch('anthias_common.remote_video._session.head') as head: ok, ext = is_downloadable_remote_video(uri) assert ok is True assert ext == expected_ext @@ -71,7 +71,7 @@ def test_classify_streaming_manifest_extensions_return_stream( """HLS / DASH / SmoothStreaming manifests never auto-download — they describe a stream, not a single file. No HEAD call (the extension match short-circuits).""" - with mock.patch('anthias_common.remote_video.requests.head') as head: + with mock.patch('anthias_common.remote_video._session.head') as head: ok, ext = is_downloadable_remote_video(uri) assert ok is False assert ext == '' @@ -92,7 +92,7 @@ def test_classify_streaming_schemes_return_stream(uri: str) -> None: """RTSP / RTMP / SRT / UDP / MMS are streaming-by-construction, even if the URL's path happens to end in ``.mp4``. The viewer plays them live via mpv's network stack.""" - with mock.patch('anthias_common.remote_video.requests.head') as head: + with mock.patch('anthias_common.remote_video._session.head') as head: ok, ext = is_downloadable_remote_video(uri) assert ok is False assert ext == '' @@ -102,7 +102,7 @@ def test_classify_streaming_schemes_return_stream(uri: str) -> None: def test_classify_streaming_scheme_with_mp4_path_returns_stream() -> None: """``rtsp://camera/feed.mp4`` is RTSP. Path extension does not promote it to an http(s) download.""" - with mock.patch('anthias_common.remote_video.requests.head') as head: + with mock.patch('anthias_common.remote_video._session.head') as head: ok, ext = is_downloadable_remote_video('rtsp://camera/feed.mp4') assert ok is False assert ext == '' @@ -113,7 +113,7 @@ def test_classify_non_http_scheme_returns_stream() -> None: """Non-http(s)/non-streaming schemes (file://, ftp://, ...) get the negative classify. The classifier deliberately refuses to download from anything but well-known network protocols.""" - with mock.patch('anthias_common.remote_video.requests.head') as head: + with mock.patch('anthias_common.remote_video._session.head') as head: ok, ext = is_downloadable_remote_video('file:///tmp/clip.mp4') assert ok is False assert ext == '' @@ -144,7 +144,7 @@ def test_classify_bare_url_falls_back_to_head_probe_video() -> None: """No extension on the URL, but HEAD reports ``Content-Type: video/mp4`` → auto-download with the inferred extension.""" with mock.patch( - 'anthias_common.remote_video.requests.head', + 'anthias_common.remote_video._session.head', return_value=_fake_head('video/mp4'), ) as head: ok, ext = is_downloadable_remote_video( @@ -161,7 +161,7 @@ def test_classify_head_probe_html_returns_stream() -> None: URL. The download task would have stored the error page as the asset; we want the row to remain a literal-URL stream instead.""" with mock.patch( - 'anthias_common.remote_video.requests.head', + 'anthias_common.remote_video._session.head', return_value=_fake_head('text/html; charset=utf-8'), ): ok, ext = is_downloadable_remote_video( @@ -177,7 +177,7 @@ def test_classify_head_probe_manifest_content_type_returns_stream() -> None: probe — downloading the manifest as a single file would store the playlist, not the segments it points at.""" with mock.patch( - 'anthias_common.remote_video.requests.head', + 'anthias_common.remote_video._session.head', return_value=_fake_head('application/vnd.apple.mpegurl'), ): ok, ext = is_downloadable_remote_video( @@ -193,7 +193,7 @@ def test_classify_head_probe_http_error_returns_stream() -> None: stream-mode keeps the create call from failing — the viewer will play (or fail to play) the URL as a stream.""" with mock.patch( - 'anthias_common.remote_video.requests.head', + 'anthias_common.remote_video._session.head', return_value=_fake_head('video/mp4', status_code=405), ): ok, ext = is_downloadable_remote_video( @@ -219,7 +219,7 @@ def test_classify_head_probe_network_failure_returns_stream( URL. The classifier is best-effort; we never block the create call on a flaky origin.""" with mock.patch( - 'anthias_common.remote_video.requests.head', + 'anthias_common.remote_video._session.head', side_effect=exc, ): ok, ext = is_downloadable_remote_video( @@ -234,7 +234,7 @@ def test_classify_head_probe_uses_short_timeout() -> None: timeout — operators are blocking on the POST /assets call. Any drift in the timeout constant would slow create requests.""" with mock.patch( - 'anthias_common.remote_video.requests.head', + 'anthias_common.remote_video._session.head', return_value=_fake_head('video/mp4'), ) as head: is_downloadable_remote_video('https://api.example.com/video/12345') From 31d816ecf5e1ce4577d7fbb5cb592a739bf2549f Mon Sep 17 00:00:00 2001 From: Viktor Petersson Date: Tue, 19 May 2026 11:33:14 +0000 Subject: [PATCH 3/5] test(remote_video): type-annotate destination-path tests for mypy MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit CI's mypy step rejected ``test_remote_video_destination_path_*`` — ``tmp_path`` was unannotated and the literal ``{'assetdir': str}`` arg failed against ``AnthiasSettings | None``. Cast through ``AnthiasSettings`` (a ``UserDict[str, Any]``) so mypy is happy without spinning up the real config layer that needs ``HOME`` set. --- tests/test_remote_video.py | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/tests/test_remote_video.py b/tests/test_remote_video.py index 45c4d61ac..43cff0532 100644 --- a/tests/test_remote_video.py +++ b/tests/test_remote_video.py @@ -12,6 +12,8 @@ from __future__ import annotations +from pathlib import Path +from typing import cast from unittest import mock import pytest @@ -21,6 +23,7 @@ is_downloadable_remote_video, remote_video_destination_path, ) +from anthias_server.settings import AnthiasSettings # --------------------------------------------------------------------------- @@ -248,11 +251,16 @@ def test_classify_head_probe_uses_short_timeout() -> None: # --------------------------------------------------------------------------- -def test_remote_video_destination_path_uses_assetdir(tmp_path) -> None: +def test_remote_video_destination_path_uses_assetdir(tmp_path: Path) -> None: """The local destination lives under settings['assetdir'] so cleanup() recognises the downloaded file as referenced and doesn't sweep it as an orphan.""" - fake_settings = {'assetdir': str(tmp_path)} + # ``AnthiasSettings`` is a ``UserDict`` subclass whose real + # constructor reads ``~/.anthias/anthias.conf``; for the + # destination-path test we only need the ``assetdir`` key, so + # cast a minimal dict to the type to satisfy mypy without + # spinning up the full config layer. + fake_settings = cast(AnthiasSettings, {'assetdir': str(tmp_path)}) result = remote_video_destination_path('abc123', '.mp4', fake_settings) assert result == f'{tmp_path}/abc123.mp4' @@ -261,8 +269,7 @@ def test_remote_video_destination_path_preserves_extension() -> None: """The extension is the caller's responsibility — pass through verbatim. Allows webm/mkv/avi to land with their real container so ffprobe identifies them correctly.""" + fake_settings = cast(AnthiasSettings, {'assetdir': '/data'}) for ext in ('.mp4', '.webm', '.mkv', '.mov'): - result = remote_video_destination_path( - 'asset-1', ext, {'assetdir': '/data'} - ) + result = remote_video_destination_path('asset-1', ext, fake_settings) assert result == f'/data/asset-1{ext}' From 150b56866fe1c9534ddeeff36c073e22d4fe511f Mon Sep 17 00:00:00 2001 From: Viktor Petersson Date: Tue, 19 May 2026 11:44:16 +0000 Subject: [PATCH 4/5] refactor(remote_video): address SonarCloud findings MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Extract ``_stream_remote_video_to_file`` + ``_validate_remote_video_response`` helpers so ``download_remote_video_asset`` lands under SonarCloud's cognitive- complexity ceiling. - Drop redundant ``requests.RequestException`` from the except clause — it's a subclass of ``OSError`` so ``except OSError`` already covers it (S5713). - Drop the redundant ``startswith(('http://', 'https://'))`` in the serializer; ``is_downloadable_remote_video`` already rejects every non-http(s) scheme. Removes the literal ``http://`` hotspot in mixins.py. - Replace ``udp://239.0.0.1:1234`` with ``udp://stream.example.test:1234`` in the test fixture (S1313 hardcoded IP). - Annotate the deliberate ``http://`` test case with ``# NOSONAR`` and a comment explaining the LAN-without-TLS use case. --- src/anthias_server/api/serializers/mixins.py | 9 +- src/anthias_server/celery_tasks.py | 201 +++++++++---------- tests/test_remote_video.py | 7 +- 3 files changed, 105 insertions(+), 112 deletions(-) diff --git a/src/anthias_server/api/serializers/mixins.py b/src/anthias_server/api/serializers/mixins.py index 627d82767..9f35b32f7 100644 --- a/src/anthias_server/api/serializers/mixins.py +++ b/src/anthias_server/api/serializers/mixins.py @@ -122,15 +122,16 @@ def prepare_asset( # row lands with the eventual local path on disk, is_processing # is flipped, and a Celery task downloads the file out of band # before chaining into normalize_video_asset for the per-board - # codec gate. Live streams (RTSP / HLS / DASH) are filtered out - # by ``is_downloadable_remote_video`` — they reach the viewer - # as literal stream URLs the same way they always have. + # codec gate. Live streams (RTSP / HLS / DASH) and any non- + # http(s) scheme are filtered out inside + # ``is_downloadable_remote_video`` — they reach the viewer as + # literal stream URLs the same way they always have, so no + # extra scheme check is needed here. is_remote_video_download = False if ( not is_youtube and not is_local_upload and 'video' in (asset['mimetype'] or '') - and uri.startswith(('http://', 'https://')) ): should_download, source_ext = is_downloadable_remote_video(uri) if should_download: diff --git a/src/anthias_server/celery_tasks.py b/src/anthias_server/celery_tasks.py index 6e5dbcc16..51dfae7f3 100755 --- a/src/anthias_server/celery_tasks.py +++ b/src/anthias_server/celery_tasks.py @@ -724,6 +724,88 @@ class RemoteVideoDownloadError(Exception): """ +def _validate_remote_video_response(resp: Any, uri: str) -> None: + """Reject non-2xx responses, manifest Content-Types, and anything + that isn't ``video/*`` / ``application/octet-stream`` / empty. + + Extracted from ``download_remote_video_asset`` so the task body + stays under SonarCloud's cognitive-complexity ceiling. The serializer + pre-classifies via HEAD, but some origins return different headers + on HEAD vs GET — these checks are the second line of defence. + """ + if resp.status_code >= 400: + raise RemoteVideoDownloadError( + f'HTTP {resp.status_code} fetching {uri!r}' + ) + content_type = (resp.headers.get('Content-Type') or '').lower() + base_type = content_type.split(';', 1)[0].strip() + if base_type in _REMOTE_VIDEO_MANIFEST_CONTENT_TYPES: + raise RemoteVideoDownloadError( + f'origin served streaming manifest ' + f'({base_type!r}) instead of a downloadable file; ' + 'live streams are not auto-downloaded' + ) + # Accept ``video/*`` and ``application/octet-stream`` (some CDNs + # serve video files this way). An empty Content-Type also passes + # — a few origins omit it. Anything else (HTML error page, JSON + # error envelope) gets rejected so we don't store a 200 OK error + # page as the asset. + if ( + base_type.startswith('video/') + or base_type == 'application/octet-stream' + or base_type == '' + ): + return + raise RemoteVideoDownloadError( + f'unexpected Content-Type {base_type!r} from {uri!r}; ' + 'expected video/* or application/octet-stream' + ) + + +def _stream_remote_video_to_file(uri: str, staging: str) -> None: + """Fetch *uri* with the module-level session and stream it to + *staging*, enforcing the size cap and validating the response + headers. Raises ``RemoteVideoDownloadError`` on permanent + failures and lets transient ``OSError`` / + ``requests.RequestException`` (a subclass of ``OSError``) + propagate for the caller's ``autoretry_for``. + + Caller is responsible for cleaning up the partial staging file + on any exception path. + """ + with _session.get( + uri, + stream=True, + allow_redirects=True, + timeout=( + REMOTE_VIDEO_CONNECT_TIMEOUT_S, + REMOTE_VIDEO_READ_TIMEOUT_S, + ), + ) as resp: + _validate_remote_video_response(resp, uri) + written = 0 + with open(staging, 'wb') as fh: + for chunk in resp.iter_content( + chunk_size=_REMOTE_VIDEO_CHUNK_BYTES + ): + if not chunk: + # iter_content yields empty bytes for keep-alive + # padding on some servers; skip rather than + # treating as EOF. + continue + written += len(chunk) + if written > REMOTE_VIDEO_MAX_BYTES: + raise RemoteVideoDownloadError( + f'download exceeded size cap of ' + f'{REMOTE_VIDEO_MAX_BYTES} bytes for {uri!r}' + ) + fh.write(chunk) + if written == 0: + raise RemoteVideoDownloadError( + f'origin returned zero bytes for {uri!r}' + ) + + class _DownloadRemoteVideoTask(Task): # type: ignore[type-arg] """Custom Task subclass that funnels failures through the same metadata-error contract as ``_DownloadYoutubeTask`` and @@ -801,11 +883,6 @@ def download_remote_video_asset(asset_id: str, uri: str) -> None: download doesn't linger as orphan content for the cleanup sweep to deal with an hour later. """ - # Lazy import of ``requests`` for the exception type only; - # ``_session`` (module-level ``AnthiasSession``) is the actual - # client. - import requests - try: asset = Asset.objects.get(asset_id=asset_id) except Asset.DoesNotExist: @@ -832,113 +909,25 @@ def download_remote_video_asset(asset_id: str, uri: str) -> None: 'to download without a serializer-stamped path' ) location = asset.uri - staging = f'{location}.part' - def _drop_staging() -> None: - """Best-effort removal of the partial download. - - Mirrors the image-pipeline ``_drop_image_staging`` contract: - every failure path through the streaming download removes the - ``.part`` file before propagating, so a stalled or refused - download never leaves multi-GB debris behind. ``cleanup()`` - would catch it eventually but only after the 1h freshness - window — inline cleanup is the difference between an - operator's next upload working and "disk full" appearing in - the logs. - """ + # Stream the response, then atomically swap into place. Both + # phases share the cleanup contract: a partial ``.part`` left + # behind would otherwise wait for the hourly ``cleanup()`` sweep + # to clear — meanwhile an operator's next upload could trip a + # "disk full" if the partial was multi-GB. ``OSError`` covers + # both filesystem failures and ``requests.RequestException`` + # (which is an ``IOError``/``OSError`` subclass), so the single + # ``except OSError`` re-raise is sufficient for the + # ``autoretry_for`` to pick up. + try: + _stream_remote_video_to_file(uri, staging) + os.replace(staging, location) + except (OSError, RemoteVideoDownloadError): try: os.remove(staging) except OSError: pass - - try: - # ``stream=True`` so the response body isn't materialised in - # RAM — important for the multi-GB cap. ``allow_redirects`` - # follows the common CDN pattern where the canonical URL - # 302s to a signed download URL. The ``(connect, read)`` - # tuple form is the requests convention for per-phase - # timeouts; the read timeout is per-chunk so a stalled - # origin can't pin the worker for the full 15-minute task - # time_limit. The module-level ``_session`` injects the - # project-wide ``Anthias/`` User-Agent (#2897). - with _session.get( - uri, - stream=True, - allow_redirects=True, - timeout=( - REMOTE_VIDEO_CONNECT_TIMEOUT_S, - REMOTE_VIDEO_READ_TIMEOUT_S, - ), - ) as resp: - if resp.status_code >= 400: - raise RemoteVideoDownloadError( - f'HTTP {resp.status_code} fetching {uri!r}' - ) - content_type = (resp.headers.get('Content-Type') or '').lower() - base_type = content_type.split(';', 1)[0].strip() - if base_type in _REMOTE_VIDEO_MANIFEST_CONTENT_TYPES: - # The serializer's HEAD probe should have caught this, - # but some origins lie on HEAD and serve a manifest on - # GET. Defence in depth. - raise RemoteVideoDownloadError( - f'origin served streaming manifest ' - f'({base_type!r}) instead of a downloadable file; ' - 'live streams are not auto-downloaded' - ) - # Some misconfigured CDNs serve a video file as - # ``application/octet-stream``. Accept octet-stream and - # any ``video/*`` type; reject anything else (HTML error - # pages, JSON error envelopes, etc.) so we don't store - # a 200 OK error page as the asset. - if not ( - base_type.startswith('video/') - or base_type == 'application/octet-stream' - or base_type == '' # some origins omit Content-Type - ): - raise RemoteVideoDownloadError( - f'unexpected Content-Type {base_type!r} from {uri!r}; ' - 'expected video/* or application/octet-stream' - ) - - written = 0 - with open(staging, 'wb') as fh: - for chunk in resp.iter_content( - chunk_size=_REMOTE_VIDEO_CHUNK_BYTES - ): - if not chunk: - # iter_content yields empty bytes for - # keep-alive padding on some servers. Skip - # rather than treating as EOF. - continue - written += len(chunk) - if written > REMOTE_VIDEO_MAX_BYTES: - raise RemoteVideoDownloadError( - f'download exceeded size cap of ' - f'{REMOTE_VIDEO_MAX_BYTES} bytes for {uri!r}' - ) - fh.write(chunk) - if written == 0: - raise RemoteVideoDownloadError( - f'origin returned zero bytes for {uri!r}' - ) - except RemoteVideoDownloadError: - _drop_staging() - raise - except (OSError, requests.RequestException): - _drop_staging() - raise - - # Atomic rename within assetdir — POSIX guarantees a single inode - # swap so the viewer never sees a half-written file under the - # final name. ``os.replace`` is the cross-platform spelling of - # ``rename`` that overwrites an existing destination (e.g. a - # second invocation on the same asset_id, or a re-download - # triggered by the reconciler). - try: - os.replace(staging, location) - except OSError: - _drop_staging() raise metadata = dict(asset.metadata or {}) diff --git a/tests/test_remote_video.py b/tests/test_remote_video.py index 43cff0532..b1ec9b867 100644 --- a/tests/test_remote_video.py +++ b/tests/test_remote_video.py @@ -37,7 +37,10 @@ ('https://example.com/clip.mp4', '.mp4'), ('https://cdn.example.com/path/to/file.webm', '.webm'), ('https://example.com/movie.MOV', '.mov'), - ('http://example.com/x.mkv', '.mkv'), + # http (not https) is intentional — the classifier must accept + # both schemes so operators on internal LANs (where TLS isn't + # set up for the media server) can still auto-download. + ('http://example.com/x.mkv', '.mkv'), # NOSONAR ('https://example.com/a.avi', '.avi'), ('https://example.com/short.m4v', '.m4v'), ('https://example.com/old.ogv', '.ogv'), @@ -87,7 +90,7 @@ def test_classify_streaming_manifest_extensions_return_stream( 'rtsp://camera.local/feed', 'rtmp://media.example.com/live', 'srt://stream.example.com:9000', - 'udp://239.0.0.1:1234', + 'udp://stream.example.test:1234', 'mms://media.example.com/live', ], ) From 69c272f0835c0682d4b024b498ef497ecb0cd8d6 Mon Sep 17 00:00:00 2001 From: Viktor Petersson Date: Tue, 19 May 2026 11:56:29 +0000 Subject: [PATCH 5/5] refactor: collapse duplicate blocks flagged by SonarCloud MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Extract ``_DownloadAssetTask`` base for the YouTube and remote- video download tasks. Subclasses override ``_failure_log_prefix`` only — the metadata-error / notify body lives in one place. - Merge ``test_create_remote_hls_manifest_stays_as_stream_url`` and ``test_create_rtsp_url_stays_as_stream_url`` into a single parametrized test that asserts both shapes through the same path. Brings new-code duplication below SonarCloud's 3% gate. --- src/anthias_server/api/tests/test_assets.py | 76 +++++----------- src/anthias_server/celery_tasks.py | 99 +++++++++------------ 2 files changed, 64 insertions(+), 111 deletions(-) diff --git a/src/anthias_server/api/tests/test_assets.py b/src/anthias_server/api/tests/test_assets.py index 50bb04ea1..2b4ec92ac 100644 --- a/src/anthias_server/api/tests/test_assets.py +++ b/src/anthias_server/api/tests/test_assets.py @@ -857,23 +857,34 @@ def test_create_remote_video_url_keeps_literal_uri_on_legacy_endpoints( @pytest.mark.django_db +@pytest.mark.parametrize( + 'stream_uri', + [ + # HLS manifest: extension match short-circuits to "stream". + 'https://example.com/live/stream.m3u8', + # RTSP scheme: streaming-by-construction even if the path + # ended in ``.mp4`` (which it doesn't here). Scheme check + # rejects before any HEAD probe. + 'rtsp://camera.local/feed', + ], + ids=['hls_manifest', 'rtsp'], +) @pytest.mark.parametrize('version', ['v1_2', 'v2']) -def test_create_remote_hls_manifest_stays_as_stream_url( - api_client: APIClient, version: str +def test_create_stream_uri_stays_as_literal( + api_client: APIClient, version: str, stream_uri: str ) -> None: - """HLS / DASH manifests must NOT be auto-downloaded — they describe - a stream rather than a single file. The serializer's classify - rejects them at the URL-extension check, so the row lands with - the manifest URL intact and the viewer plays it as a live stream. + """Stream URLs (HLS manifests, RTSP feeds, ...) must NOT be + auto-downloaded — they describe a live stream rather than a single + file. The serializer's classify routes both shapes through as + literal URIs and the viewer plays them live. Out-of-scope per the issue, but enshrining it as a test keeps a future classifier change from silently breaking the stream playback path. """ - manifest_url = 'https://example.com/live/stream.m3u8' payload = { **ASSET_CREATION_DATA, - 'uri': manifest_url, + 'uri': stream_uri, 'mimetype': 'video', 'duration': 0, } @@ -889,7 +900,7 @@ def test_create_remote_hls_manifest_stays_as_stream_url( ), # The mixin's stream-URL branch falls through to an inline # ``get_video_duration`` ffprobe; the test must not actually - # shell out to ffprobe against ``example.com``. + # shell out to ffprobe against the test URLs. mock.patch( 'anthias_server.api.serializers.mixins.get_video_duration', return_value=__import__('datetime').timedelta(seconds=10), @@ -900,55 +911,12 @@ def test_create_remote_hls_manifest_stays_as_stream_url( ) assert response.status_code == status.HTTP_201_CREATED, response.data - # Manifest URL preserved verbatim; no download dispatch. - assert response.data['uri'] == manifest_url + # Stream URI preserved verbatim; no download dispatch. + assert response.data['uri'] == stream_uri assert response.data['is_processing'] in (False, 0) mock_dispatch.assert_not_called() -@pytest.mark.django_db -@pytest.mark.parametrize('version', ['v1_2', 'v2']) -def test_create_rtsp_url_stays_as_stream_url( - api_client: APIClient, version: str -) -> None: - """RTSP camera feeds are streaming-by-construction. Even though - ``mimetype='video'`` matches the auto-download branch's mimetype - filter, the scheme check inside ``is_downloadable_remote_video`` - routes it through as a literal-URI stream. - """ - rtsp_url = 'rtsp://camera.local/feed' - payload = { - **ASSET_CREATION_DATA, - 'uri': rtsp_url, - 'mimetype': 'video', - 'duration': 0, - } - asset_list_url = reverse(f'api:asset_list_{version}') - dispatch_target = ( - f'anthias_server.api.views.{version}.dispatch_remote_video_download' - ) - with ( - mock.patch(dispatch_target) as mock_dispatch, - mock.patch( - 'anthias_server.api.serializers.mixins.url_fails', - return_value=False, - ), - # Same inline-ffprobe fallback as the HLS test: rtsp URLs - # go through the stream-URL branch with duration=0. - mock.patch( - 'anthias_server.api.serializers.mixins.get_video_duration', - return_value=__import__('datetime').timedelta(seconds=10), - ), - ): - response = api_client.post( - asset_list_url, data=get_request_data(payload, version) - ) - - assert response.status_code == status.HTTP_201_CREATED, response.data - assert response.data['uri'] == rtsp_url - mock_dispatch.assert_not_called() - - @pytest.mark.django_db @pytest.mark.parametrize('version', ['v1', 'v1_1']) def test_create_heic_image_does_not_dispatch_on_legacy_endpoints( diff --git a/src/anthias_server/celery_tasks.py b/src/anthias_server/celery_tasks.py index 51dfae7f3..60d4d3111 100755 --- a/src/anthias_server/celery_tasks.py +++ b/src/anthias_server/celery_tasks.py @@ -377,26 +377,25 @@ def probe_video_duration(asset_id: str) -> None: notify_asset_update(asset_id) -class _DownloadYoutubeTask(Task): # type: ignore[type-arg] - """Custom Task subclass that funnels failures through the same - metadata-error contract as ``_NormalizeAssetTask``. - - Sharing the failure path means a failed YouTube download lands in - the same operator-visible state as a failed HEIC conversion or - failed video transcode: ``is_processing=False`` *and* a populated - ``metadata.error_message`` that the asset table renders as a - "Failed" pill (with the message on the hover tooltip). Without - that unification, a yt-dlp DownloadError would clear the - Processing pill but leave no on-row diagnostic — the operator - couldn't tell a fresh download from a 404'd one. - - The previous in-process daemon thread swallowed yt-dlp's exit - code, so a failed download silently left the row stuck at - "Processing" with an empty .mp4. Now any uncaught exception - (DownloadError, ExtractorError, ...) bubbles to celery and lands - here once retries are exhausted. +class _DownloadAssetTask(Task): # type: ignore[type-arg] + """Shared ``on_failure`` for the download tasks (YouTube + + generic remote video). + + Both surface a permanent failure to the operator the same way a + failed HEIC conversion does: ``is_processing=False`` and a + populated ``metadata.error_message`` that the asset table renders + as a "Failed" pill with the message on the hover tooltip. Without + that unification, a yt-dlp DownloadError (or a ``RemoteVideoDownload + Error``) would clear the Processing pill but leave no on-row + diagnostic — the operator couldn't tell a fresh download from a + 404'd one. + + Subclasses override ``_failure_log_prefix`` so the worker log line + names the actual task that failed. """ + _failure_log_prefix: str = 'download_asset' + def on_failure( self, exc: BaseException, @@ -409,11 +408,11 @@ def on_failure( if not asset_id: return try: - # Reuse the helpers in anthias_server.processing so the - # YouTube and normalize pipelines share the exact same + # Reuse the helpers in anthias_server.processing so every + # download / normalize task shares the exact same # "row failed" semantics — single source of truth for the - # error_message contract instead of two near-duplicate - # blocks that could drift. + # error_message contract instead of near-duplicate blocks + # that could drift. from anthias_server.processing import ( _notify, _set_processing_error, @@ -423,11 +422,26 @@ def on_failure( _notify(asset_id) except Exception: logging.exception( - 'download_youtube_asset on_failure cleanup failed for %s', + '%s on_failure cleanup failed for %s', + self._failure_log_prefix, asset_id, ) +class _DownloadYoutubeTask(_DownloadAssetTask): + """YouTube-specific download task. See ``_DownloadAssetTask`` for + the failure contract. + + The previous in-process daemon thread swallowed yt-dlp's exit + code, so a failed download silently left the row stuck at + "Processing" with an empty .mp4. Now any uncaught exception + (DownloadError, ExtractorError, ...) bubbles to celery and lands + on the inherited ``on_failure`` once retries are exhausted. + """ + + _failure_log_prefix = 'download_youtube_asset' + + # Bound the wall-clock cost of a single download attempt. 1080p videos # on a slow connection / Pi 1 can run several minutes; 15 min is a # generous ceiling that still fails fast enough for the operator to @@ -806,42 +820,13 @@ def _stream_remote_video_to_file(uri: str, staging: str) -> None: ) -class _DownloadRemoteVideoTask(Task): # type: ignore[type-arg] - """Custom Task subclass that funnels failures through the same - metadata-error contract as ``_DownloadYoutubeTask`` and - ``_NormalizeAssetTask``. - - Kept as a copy-paste of the YouTube task's failure handler rather - than refactored into a shared base — the failure body is 12 lines - and a shared base would obscure the per-task error contract that - each docstring documents inline. If a third download task ever - appears the refactor becomes worth doing. +class _DownloadRemoteVideoTask(_DownloadAssetTask): + """Generic remote-video download task. Inherits the failure + contract from ``_DownloadAssetTask`` — only the log prefix + differs. """ - def on_failure( - self, - exc: BaseException, - task_id: str, - args: tuple[Any, ...], - kwargs: dict[str, Any], - einfo: Any, - ) -> None: - asset_id = args[0] if args else kwargs.get('asset_id') - if not asset_id: - return - try: - from anthias_server.processing import ( - _notify, - _set_processing_error, - ) - - _set_processing_error(asset_id, f'{type(exc).__name__}: {exc}') - _notify(asset_id) - except Exception: - logging.exception( - 'download_remote_video_asset on_failure cleanup failed for %s', - asset_id, - ) + _failure_log_prefix = 'download_remote_video_asset' @celery.task(