Skip to content

Commit a82c96d

Browse files
fix: update calculate_admin_usage to return tuple and filter valid user IDs in record_user_usages
1 parent e7ed9cb commit a82c96d

File tree

1 file changed

+33
-17
lines changed

1 file changed

+33
-17
lines changed

app/jobs/record_usages.py

Lines changed: 33 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -354,9 +354,9 @@ async def get_outbounds_stats(node: PasarGuardNode):
354354
return []
355355

356356

357-
async def calculate_admin_usage(users_usage: list) -> dict:
357+
async def calculate_admin_usage(users_usage: list) -> tuple[dict, set[int]]:
358358
if not users_usage:
359-
return {}
359+
return {}, set()
360360

361361
# Get unique user IDs from users_usage
362362
uids = {int(user_usage["uid"]) for user_usage in users_usage}
@@ -375,7 +375,7 @@ async def calculate_admin_usage(users_usage: list) -> dict:
375375
if admin_id:
376376
admin_usage[admin_id] += user_usage["value"]
377377

378-
return admin_usage
378+
return admin_usage, set(user_admin_map.keys())
379379

380380

381381
async def calculate_users_usage(api_params: dict, usage_coefficient: dict) -> list:
@@ -408,15 +408,21 @@ async def record_user_usages():
408408
if not users_usage:
409409
return
410410

411-
user_stmt = (
412-
update(User)
413-
.where(User.id == bindparam("uid"))
414-
.values(used_traffic=User.used_traffic + bindparam("value"), online_at=dt.now(tz.utc))
415-
.execution_options(synchronize_session=False)
416-
)
417-
await safe_execute(user_stmt, users_usage)
411+
admin_usage, valid_user_ids = await calculate_admin_usage(users_usage)
412+
if not valid_user_ids:
413+
logger.warning("Skipping user usage recording; no matching users found for received stats")
414+
return
415+
416+
valid_users_usage = [usage for usage in users_usage if int(usage["uid"]) in valid_user_ids]
417+
if valid_users_usage:
418+
user_stmt = (
419+
update(User)
420+
.where(User.id == bindparam("uid"))
421+
.values(used_traffic=User.used_traffic + bindparam("value"), online_at=dt.now(tz.utc))
422+
.execution_options(synchronize_session=False)
423+
)
424+
await safe_execute(user_stmt, valid_users_usage)
418425

419-
admin_usage = await calculate_admin_usage(users_usage)
420426
if admin_usage:
421427
admin_data = [{"admin_id": aid, "value": val} for aid, val in admin_usage.items()]
422428
admin_stmt = (
@@ -430,13 +436,23 @@ async def record_user_usages():
430436
if DISABLE_RECORDING_NODE_USAGE:
431437
return
432438

433-
record_tasks = [
434-
asyncio.create_task(
435-
record_user_stats(params=api_params[node_id], node_id=node_id, usage_coefficient=usage_coefficient[node_id])
439+
record_tasks = []
440+
for node_id, params in api_params.items():
441+
filtered_params = [param for param in params if int(param["uid"]) in valid_user_ids]
442+
if not filtered_params:
443+
continue
444+
record_tasks.append(
445+
asyncio.create_task(
446+
record_user_stats(
447+
params=filtered_params,
448+
node_id=node_id,
449+
usage_coefficient=usage_coefficient[node_id],
450+
)
451+
)
436452
)
437-
for node_id in api_params
438-
]
439-
await asyncio.gather(*record_tasks)
453+
454+
if record_tasks:
455+
await asyncio.gather(*record_tasks)
440456

441457

442458
async def record_node_usages():

0 commit comments

Comments
 (0)