Skip to content

Commit adfedef

Browse files
test(utils): cover queue enqueue forwarding
1 parent 50b7dae commit adfedef

2 files changed

Lines changed: 61 additions & 0 deletions

File tree

tests/test_analysis/test_analysis_csv.py

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,49 @@ async def test_analyse_csv_send_udata_webhook(
171171
assert webhook.get(f"analysis:parsing:{k}", False) is None
172172

173173

174+
async def test_analyse_csv_enqueues_export_jobs_on_low_queue(
175+
mocker, setup_catalog, rmock, catalog_content, db, fake_check, produce_mock
176+
):
177+
"""Parquet export is scheduled on the low RQ queue (CSV geo export disabled)."""
178+
import asyncio
179+
180+
recorded: list[tuple[object, str | None]] = []
181+
182+
async def tracking_parquet_export(*args, **kwargs):
183+
pass
184+
185+
mocker.patch("udata_hydra.analysis.csv.export_parquet", tracking_parquet_export)
186+
187+
def capture_enqueue(fn, *args, **kwargs):
188+
recorded.append((fn, kwargs.get("_priority")))
189+
kwargs = dict(kwargs)
190+
kwargs.pop("_priority", None)
191+
kwargs.pop("_exception", None)
192+
result = fn(*args, **kwargs)
193+
if asyncio.iscoroutine(result):
194+
loop = asyncio.get_running_loop()
195+
return loop.run_until_complete(result)
196+
return result
197+
198+
mocker.patch("udata_hydra.utils.queue.enqueue", capture_enqueue)
199+
200+
check = await fake_check()
201+
url = check["url"]
202+
rmock.get(url, status=200, body=catalog_content)
203+
with (
204+
patch("udata_hydra.config.DB_TO_PARQUET", True),
205+
patch("udata_hydra.config.MIN_LINES_FOR_PARQUET", 1),
206+
patch("udata_hydra.config.DB_TO_GEOJSON", False),
207+
):
208+
await analyse_csv(check=check)
209+
210+
assert any(f is tracking_parquet_export and p == "low" for f, p in recorded)
211+
212+
from udata_hydra.analysis.exports import export_geojson_pmtiles as exp_geo
213+
214+
assert not any(f is exp_geo for f, _ in recorded)
215+
216+
174217
@pytest.mark.parametrize(
175218
"forced_analysis",
176219
(

tests/test_utils/test_queue.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
from unittest.mock import MagicMock
2+
3+
from udata_hydra import config
4+
from udata_hydra.utils.queue import enqueue
5+
6+
7+
def test_enqueue_forwards_priority_exception_and_failure_ttl(mocker) -> None:
8+
"""Check that enqueue passes priority, exception flag, and TTL to the RQ queue.
9+
Hydra uses custom _priority/_exception kwargs; this test ensures they reach Redis/RQ."""
10+
rq_queue = MagicMock()
11+
context_queue = mocker.patch("udata_hydra.utils.queue.context.queue", return_value=rq_queue)
12+
13+
enqueue(lambda: None, key="val", _priority="low", _exception=True)
14+
15+
context_queue.assert_called_once_with("low", True)
16+
rq_queue.enqueue.assert_called_once_with(
17+
mocker.ANY, key="val", failure_ttl=config.RQ_DEFAULT_FAILURE_TTL
18+
)

0 commit comments

Comments
 (0)