Skip to content

Commit cd62521

Browse files
committed
perf: defer user sync serialization from request path
1 parent 8a0e0df commit cd62521

File tree

5 files changed

+169
-68
lines changed

5 files changed

+169
-68
lines changed

app/db/crud/group.py

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,20 @@ async def get_inbounds_by_tags(db: AsyncSession, tags: list[str]) -> list[ProxyI
2828
return [inbounds_map[tag] for tag in tags]
2929

3030

31-
async def load_group_attrs(group: Group):
32-
await group.awaitable_attrs.users
33-
await group.awaitable_attrs.inbounds
31+
async def load_group_attrs(group: Group, *, load_users: bool = True, load_inbounds: bool = True):
32+
if load_users:
33+
await group.awaitable_attrs.users
34+
if load_inbounds:
35+
await group.awaitable_attrs.inbounds
3436

3537

36-
async def get_group_by_id(db: AsyncSession, group_id: int) -> Group | None:
38+
async def get_group_by_id(
39+
db: AsyncSession,
40+
group_id: int,
41+
*,
42+
load_users: bool = True,
43+
load_inbounds: bool = True,
44+
) -> Group | None:
3745
"""
3846
Retrieves a group by its ID.
3947
@@ -46,7 +54,7 @@ async def get_group_by_id(db: AsyncSession, group_id: int) -> Group | None:
4654
"""
4755
group = (await db.execute(select(Group).where(Group.id == group_id))).unique().scalar_one_or_none()
4856
if group:
49-
await load_group_attrs(group)
57+
await load_group_attrs(group, load_users=load_users, load_inbounds=load_inbounds)
5058
return group
5159

5260

@@ -162,7 +170,13 @@ async def get_groups_simple(
162170
return rows, total
163171

164172

165-
async def get_groups_by_ids(db: AsyncSession, group_ids: list[int]) -> list[Group]:
173+
async def get_groups_by_ids(
174+
db: AsyncSession,
175+
group_ids: list[int],
176+
*,
177+
load_users: bool = True,
178+
load_inbounds: bool = True,
179+
) -> list[Group]:
166180
"""
167181
Retrieves a list of groups by their IDs.
168182
@@ -176,7 +190,7 @@ async def get_groups_by_ids(db: AsyncSession, group_ids: list[int]) -> list[Grou
176190
groups = (await db.execute(select(Group).where(Group.id.in_(group_ids)))).scalars().all()
177191

178192
for group in groups:
179-
await load_group_attrs(group)
193+
await load_group_attrs(group, load_users=load_users, load_inbounds=load_inbounds)
180194

181195
return groups
182196

app/db/crud/user.py

Lines changed: 71 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -41,14 +41,33 @@
4141
_USER_AGENT_MAX_LEN = UserSubscriptionUpdate.__table__.columns.user_agent.type.length or 512
4242

4343

44-
async def load_user_attrs(user: User):
45-
await user.awaitable_attrs.admin
46-
await user.awaitable_attrs.next_plan
47-
await user.awaitable_attrs.usage_logs
48-
await user.awaitable_attrs.groups
44+
async def load_user_attrs(
45+
user: User,
46+
*,
47+
load_admin: bool = True,
48+
load_next_plan: bool = True,
49+
load_usage_logs: bool = True,
50+
load_groups: bool = True,
51+
):
52+
if load_admin:
53+
await user.awaitable_attrs.admin
54+
if load_next_plan:
55+
await user.awaitable_attrs.next_plan
56+
if load_usage_logs:
57+
await user.awaitable_attrs.usage_logs
58+
if load_groups:
59+
await user.awaitable_attrs.groups
4960

5061

51-
async def get_user(db: AsyncSession, username: str) -> Optional[User]:
62+
async def get_user(
63+
db: AsyncSession,
64+
username: str,
65+
*,
66+
load_admin: bool = True,
67+
load_next_plan: bool = True,
68+
load_usage_logs: bool = True,
69+
load_groups: bool = True,
70+
) -> Optional[User]:
5271
"""
5372
Retrieves a user by username.
5473
@@ -63,11 +82,25 @@ async def get_user(db: AsyncSession, username: str) -> Optional[User]:
6382

6483
user = (await db.execute(stmt)).unique().scalar_one_or_none()
6584
if user:
66-
await load_user_attrs(user)
85+
await load_user_attrs(
86+
user,
87+
load_admin=load_admin,
88+
load_next_plan=load_next_plan,
89+
load_usage_logs=load_usage_logs,
90+
load_groups=load_groups,
91+
)
6792
return user
6893

6994

70-
async def get_user_by_id(db: AsyncSession, user_id: int) -> User | None:
95+
async def get_user_by_id(
96+
db: AsyncSession,
97+
user_id: int,
98+
*,
99+
load_admin: bool = True,
100+
load_next_plan: bool = True,
101+
load_usage_logs: bool = True,
102+
load_groups: bool = True,
103+
) -> User | None:
71104
"""
72105
Retrieves a user by user ID.
73106
@@ -82,10 +115,29 @@ async def get_user_by_id(db: AsyncSession, user_id: int) -> User | None:
82115

83116
user = (await db.execute(stmt)).unique().scalar_one_or_none()
84117
if user:
85-
await load_user_attrs(user)
118+
await load_user_attrs(
119+
user,
120+
load_admin=load_admin,
121+
load_next_plan=load_next_plan,
122+
load_usage_logs=load_usage_logs,
123+
load_groups=load_groups,
124+
)
86125
return user
87126

88127

128+
async def get_user_lifetime_used_traffic(db: AsyncSession, user_id: int) -> int:
129+
stmt = (
130+
select(func.coalesce(func.sum(UserUsageResetLogs.used_traffic_at_reset), 0) + func.coalesce(User.used_traffic, 0))
131+
.select_from(User)
132+
.outerjoin(UserUsageResetLogs, UserUsageResetLogs.user_id == User.id)
133+
.where(User.id == user_id)
134+
.group_by(User.id)
135+
)
136+
result = await db.execute(stmt)
137+
value = result.scalar_one_or_none()
138+
return int(value or 0)
139+
140+
89141
async def get_existing_usernames(db: AsyncSession, usernames: Sequence[str]) -> set[str]:
90142
"""
91143
Returns the set of usernames that already exist in the database.
@@ -594,7 +646,7 @@ async def create_user(db: AsyncSession, new_user: UserCreate, groups: list[Group
594646
await db.commit()
595647
await db.refresh(db_user)
596648

597-
await load_user_attrs(db_user)
649+
await load_user_attrs(db_user, load_usage_logs=False)
598650
return db_user
599651

600652

@@ -634,7 +686,7 @@ async def create_users_bulk(
634686
await db.commit()
635687

636688
for user in db_users:
637-
await load_user_attrs(user)
689+
await load_user_attrs(user, load_usage_logs=False)
638690

639691
return db_users
640692

@@ -780,7 +832,7 @@ async def modify_user(db: AsyncSession, db_user: User, modify: UserModify) -> Us
780832

781833
await db.commit()
782834
await db.refresh(db_user)
783-
await load_user_attrs(db_user)
835+
await load_user_attrs(db_user, load_usage_logs=False)
784836
return db_user
785837

786838

@@ -819,7 +871,7 @@ async def reset_user_data_usage(db: AsyncSession, db_user: User) -> User:
819871

820872
await db.commit()
821873
await db.refresh(db_user)
822-
await load_user_attrs(db_user)
874+
await load_user_attrs(db_user, load_usage_logs=False)
823875
return db_user
824876

825877

@@ -841,7 +893,7 @@ async def bulk_reset_user_data_usage(db: AsyncSession, users: list[User]) -> lis
841893
await db.commit()
842894
for user in users:
843895
await db.refresh(user)
844-
await load_user_attrs(user)
896+
await load_user_attrs(user, load_usage_logs=False)
845897
return users
846898

847899

@@ -903,7 +955,7 @@ async def reset_user_by_next(db: AsyncSession, db_user: User) -> User:
903955

904956
await db.commit()
905957
await db.refresh(db_user)
906-
await load_user_attrs(db_user)
958+
await load_user_attrs(db_user, load_usage_logs=False)
907959
return db_user
908960

909961

@@ -926,7 +978,7 @@ async def revoke_user_sub(db: AsyncSession, db_user: User) -> User:
926978
await db.execute(stmt)
927979
await db.commit()
928980
await db.refresh(db_user)
929-
await load_user_attrs(db_user)
981+
await load_user_attrs(db_user, load_usage_logs=False)
930982
return db_user
931983

932984

@@ -1144,7 +1196,7 @@ async def update_users_status(db: AsyncSession, users: list[User], status: UserS
11441196
await db.commit()
11451197
for user in users:
11461198
await db.refresh(user)
1147-
await load_user_attrs(user)
1199+
await load_user_attrs(user, load_usage_logs=False)
11481200
return users
11491201

11501202

@@ -1164,7 +1216,7 @@ async def set_owner(db: AsyncSession, db_user: User, admin: Admin) -> User:
11641216
await db.execute(stmt)
11651217
await db.commit()
11661218
await db.refresh(db_user)
1167-
await load_user_attrs(db_user)
1219+
await load_user_attrs(db_user, load_usage_logs=False)
11681220
return db_user
11691221

11701222

@@ -1192,7 +1244,7 @@ async def start_users_expire(db: AsyncSession, users: list[User]) -> list[User]:
11921244
await db.commit()
11931245
for user in users:
11941246
await db.refresh(user)
1195-
await load_user_attrs(user)
1247+
await load_user_attrs(user, load_usage_logs=False)
11961248
return users
11971249

11981250

app/operation/__init__.py

Lines changed: 22 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
get_user_template,
1616
)
1717
from app.db.crud.admin import get_admin_by_id
18+
from app.db.crud.group import get_groups_by_ids
1819
from app.db.crud.user import get_user_by_id
1920
from app.db.models import Admin as DBAdmin, CoreConfig, Group, Node, ProxyHost, User, UserTemplate
2021
from app.models.admin import AdminDetails
@@ -98,7 +99,7 @@ async def get_validated_sub(self, db: AsyncSession, token: str) -> User:
9899
if not sub:
99100
await self.raise_error(message="Not Found", code=404)
100101

101-
db_user = await get_user(db, sub["username"])
102+
db_user = await get_user(db, sub["username"], load_usage_logs=False)
102103
if not db_user or db_user.created_at.astimezone(tz.utc) > sub["created_at"]:
103104
await self.raise_error(message="Not Found", code=404)
104105

@@ -108,7 +109,7 @@ async def get_validated_sub(self, db: AsyncSession, token: str) -> User:
108109
return db_user
109110

110111
async def get_validated_user(self, db: AsyncSession, username: str, admin: AdminDetails) -> User:
111-
db_user = await get_user(db, username)
112+
db_user = await get_user(db, username, load_usage_logs=False)
112113
if not db_user:
113114
await self.raise_error(message="User not found", code=404)
114115

@@ -118,7 +119,7 @@ async def get_validated_user(self, db: AsyncSession, username: str, admin: Admin
118119
return db_user
119120

120121
async def get_validated_user_by_id(self, db: AsyncSession, user_id: int, admin: AdminDetails) -> User:
121-
db_user = await get_user_by_id(db, user_id)
122+
db_user = await get_user_by_id(db, user_id, load_usage_logs=False)
122123
if not db_user:
123124
await self.raise_error(message="User not found", code=404)
124125

@@ -140,22 +141,31 @@ async def get_validated_admin_by_id(self, db: AsyncSession, id: int) -> DBAdmin:
140141
return db_admin
141142

142143
async def get_validated_group(self, db: AsyncSession, group_id: int) -> Group:
143-
db_group = await get_group_by_id(db, group_id)
144+
db_group = await get_group_by_id(db, group_id, load_users=False, load_inbounds=True)
144145
if not db_group:
145146
await self.raise_error("Group not found", 404)
146147
return db_group
147148

148149
async def validate_all_groups(self, db, model: UserCreate | UserModify | UserTemplate | BulkGroup) -> list[Group]:
149-
all_groups: list[Group] = []
150+
requested_group_ids: list[int] = []
150151
if model.group_ids:
151-
for group_id in model.group_ids:
152-
db_group = await self.get_validated_group(db, group_id)
153-
all_groups.append(db_group)
152+
requested_group_ids.extend(model.group_ids)
154153
if hasattr(model, "has_group_ids") and model.has_group_ids:
155-
for group_id in model.has_group_ids:
156-
db_group = await self.get_validated_group(db, group_id)
157-
all_groups.append(db_group)
158-
return all_groups
154+
requested_group_ids.extend(model.has_group_ids)
155+
156+
if not requested_group_ids:
157+
return []
158+
159+
unique_ids = list(dict.fromkeys(requested_group_ids))
160+
groups = await get_groups_by_ids(db, unique_ids, load_users=False, load_inbounds=True)
161+
groups_by_id = {group.id: group for group in groups}
162+
163+
missing_ids = [group_id for group_id in unique_ids if group_id not in groups_by_id]
164+
if missing_ids:
165+
await self.raise_error("Group not found", 404)
166+
167+
# Preserve the requested order and duplicate semantics.
168+
return [groups_by_id[group_id] for group_id in requested_group_ids]
159169

160170
async def get_validated_user_template(self, db: AsyncSession, template_id: int) -> UserTemplate:
161171
dbuser_template = await get_user_template(db, template_id)

app/operation/admin.py

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,7 @@
2020
from app.db.crud.bulk import activate_all_disabled_users, disable_all_active_users
2121
from app.db.crud.user import get_users, remove_users
2222
from app.models.admin import AdminCreate, AdminDetails, AdminModify, AdminSimple, AdminsResponse, AdminsSimpleResponse
23-
from app.node.sync import schedule_sync_task, sync_remove_users, sync_users, sync_proto_users, remove_user as sync_remove_user
24-
from app.node.user import serialize_users_for_node
23+
from app.node.sync import schedule_sync_task, sync_remove_users, sync_users, remove_user as sync_remove_user
2524
from app.models.stats import Period, UserUsageStatsList
2625
from app.operation import BaseOperation, OperatorType
2726
from app.operation.user import UserOperation
@@ -172,8 +171,7 @@ async def disable_all_active_users(self, db: AsyncSession, username: str, admin:
172171

173172
users = await get_users(db, admin=db_admin)
174173
if self._is_non_blocking_sync_operator(self.operator_type):
175-
proto_users = await serialize_users_for_node(users)
176-
schedule_sync_task(sync_proto_users(proto_users))
174+
schedule_sync_task(sync_users(users))
177175
else:
178176
await sync_users(users)
179177

@@ -190,8 +188,7 @@ async def activate_all_disabled_users(self, db: AsyncSession, username: str, adm
190188

191189
users = await get_users(db, admin=db_admin)
192190
if self._is_non_blocking_sync_operator(self.operator_type):
193-
proto_users = await serialize_users_for_node(users)
194-
schedule_sync_task(sync_proto_users(proto_users))
191+
schedule_sync_task(sync_users(users))
195192
else:
196193
await sync_users(users)
197194

@@ -210,7 +207,9 @@ async def remove_all_users(self, db: AsyncSession, username: str, admin: AdminDe
210207
return 0
211208

212209
user_operation = UserOperation(self.operator_type)
213-
serialized_users = [await user_operation.validate_user(user) for user in users]
210+
serialized_users = [
211+
await user_operation.validate_user(db, user, include_subscription_url=False) for user in users
212+
]
214213

215214
await remove_users(db, users)
216215

0 commit comments

Comments
 (0)