Skip to content

Commit e5202ed

Browse files
committed
fix: allow run multiple record usage job
1 parent 488711f commit e5202ed

File tree

1 file changed

+14
-62
lines changed

1 file changed

+14
-62
lines changed

app/jobs/record_usages.py

Lines changed: 14 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -34,31 +34,13 @@
3434
# Start with 2-4, adjust based on DB performance
3535
JOB_SEM = asyncio.Semaphore(3) # Max 3 concurrent DB write operations
3636

37-
# Track running jobs for adaptive scheduling
38-
_running_jobs = {}
39-
40-
# Process pool executor for CPU-bound operations
41-
# Use number of CPU cores, but cap at reasonable limit to avoid overhead
42-
_process_pool = None
43-
_process_pool_lock = asyncio.Lock()
4437

4538
# Thread pool executor for I/O-bound node API calls
4639
# Distributes workload across threads/cores for data collection
4740
_thread_pool = None
4841
_thread_pool_lock = asyncio.Lock()
4942

5043

51-
async def _get_process_pool():
52-
"""Get or create the process pool executor (thread-safe)."""
53-
global _process_pool
54-
async with _process_pool_lock:
55-
if _process_pool is None:
56-
num_workers = min(multiprocessing.cpu_count(), 8) # Cap at 8 workers
57-
_process_pool = ProcessPoolExecutor(max_workers=num_workers)
58-
logger.info(f"Initialized ProcessPoolExecutor with {num_workers} workers")
59-
return _process_pool
60-
61-
6244
async def _get_thread_pool():
6345
"""Get or create the thread pool executor (thread-safe)."""
6446
global _thread_pool
@@ -71,18 +53,6 @@ async def _get_thread_pool():
7153
return _thread_pool
7254

7355

74-
@on_shutdown
75-
async def _cleanup_process_pool():
76-
"""Cleanup process pool on shutdown (thread-safe)."""
77-
global _process_pool
78-
async with _process_pool_lock:
79-
if _process_pool is not None:
80-
logger.info("Shutting down ProcessPoolExecutor...")
81-
_process_pool.shutdown(wait=True)
82-
_process_pool = None
83-
logger.info("ProcessPoolExecutor shut down successfully")
84-
85-
8656
@on_shutdown
8757
async def _cleanup_thread_pool():
8858
"""Cleanup thread pool on shutdown (thread-safe)."""
@@ -739,24 +709,15 @@ async def _record_user_usages_impl():
739709

740710
async def record_user_usages():
741711
"""
742-
Record user usages with hard timeout to prevent scheduler backlog.
743-
Wraps the actual implementation with timeout protection.
712+
Record user usages with hard timeout.
713+
Jobs running longer than 2 minutes are forcefully cancelled.
744714
"""
745-
# Check if previous run is still active (adaptive scheduling)
746-
if _running_jobs.get("record_user_usages"):
747-
logger.warning("Previous record_user_usages run still active; skipping this cycle")
748-
return
749-
750-
_running_jobs["record_user_usages"] = True
751715
try:
752-
# Hard timeout: prevent job from running longer than interval
753-
# This prevents scheduler backlog → spike → crash
754-
try:
755-
await asyncio.wait_for(_record_user_usages_impl(), timeout=30)
756-
except asyncio.TimeoutError:
757-
logger.warning("record_user_usages timed out after 30s; skipping cycle to prevent backlog")
758-
finally:
759-
_running_jobs["record_user_usages"] = False
716+
await asyncio.wait_for(_record_user_usages_impl(), timeout=120)
717+
except asyncio.TimeoutError:
718+
logger.warning("record_user_usages killed after 120s timeout")
719+
except asyncio.CancelledError:
720+
logger.warning("record_user_usages was cancelled")
760721

761722

762723
async def _record_node_usages_impl():
@@ -847,24 +808,15 @@ async def _record_node_usages_impl():
847808

848809
async def record_node_usages():
849810
"""
850-
Record node usages with hard timeout to prevent scheduler backlog.
851-
Wraps the actual implementation with timeout protection.
811+
Record node usages with hard timeout.
812+
Jobs running longer than 2 minutes are forcefully cancelled.
852813
"""
853-
# Check if previous run is still active (adaptive scheduling)
854-
if _running_jobs.get("record_node_usages"):
855-
logger.warning("Previous record_node_usages run still active; skipping this cycle")
856-
return
857-
858-
_running_jobs["record_node_usages"] = True
859814
try:
860-
# Hard timeout: prevent job from running longer than interval
861-
# This prevents scheduler backlog → spike → crash
862-
try:
863-
await asyncio.wait_for(_record_node_usages_impl(), timeout=30)
864-
except asyncio.TimeoutError:
865-
logger.warning("record_node_usages timed out after 30s; skipping cycle to prevent backlog")
866-
finally:
867-
_running_jobs["record_node_usages"] = False
815+
await asyncio.wait_for(_record_node_usages_impl(), timeout=120)
816+
except asyncio.TimeoutError:
817+
logger.warning("record_node_usages killed after 120s timeout")
818+
except asyncio.CancelledError:
819+
logger.warning("record_node_usages was cancelled")
868820

869821

870822
if ROLE.runs_node:

0 commit comments

Comments
 (0)