Skip to content

Commit 54460ba

Browse files
committed
refactor(hosts): legacy storage
1 parent d13f1c7 commit 54460ba

File tree

8 files changed

+137
-73
lines changed

8 files changed

+137
-73
lines changed

app/core/hosts.py

Lines changed: 59 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,15 @@
1+
from asyncio import Lock
2+
from copy import deepcopy
3+
14
from sqlalchemy.ext.asyncio import AsyncSession
5+
from aiocache import cached
26

37
from app import on_startup
48
from app.core.manager import core_manager
59
from app.db import GetDB
6-
from app.db.crud.host import get_host_by_id, get_hosts, get_or_create_inbound
10+
from app.db.crud.host import get_host_by_id, get_hosts, upsert_inbounds
711
from app.db.models import ProxyHost, ProxyHostSecurity
8-
from app.models.host import MuxSettings, TransportSettings
9-
from app.utils.store import DictStorage
12+
from app.models.host import MuxSettings, TransportSettings, BaseHost
1013

1114

1215
def _prepare_host_data(host: ProxyHost) -> dict:
@@ -40,18 +43,23 @@ def _prepare_host_data(host: ProxyHost) -> dict:
4043
}
4144

4245

43-
@DictStorage
44-
async def hosts(storage: dict, db: AsyncSession):
45-
inbounds_list = await core_manager.get_inbounds()
46-
for tag in inbounds_list:
47-
await get_or_create_inbound(db, tag)
46+
class HostManager:
47+
def __init__(self):
48+
self._hosts = {}
49+
self._lock = Lock()
50+
51+
async def setup(self, db: AsyncSession):
52+
db_hosts = await get_hosts(db)
53+
await self.add_hosts(db, db_hosts)
4854

49-
storage.clear()
50-
db_hosts = await get_hosts(db)
55+
async def _reset_cache(self):
56+
await self.get_hosts.cache.clear()
5157

52-
for host in db_hosts:
58+
@staticmethod
59+
async def _prepare_host_entry(db: AsyncSession, host: BaseHost, inbounds_list: list[str]) -> tuple[int, dict] | None:
5360
if host.is_disabled or (host.inbound_tag not in inbounds_list):
54-
continue
61+
return None
62+
5563
downstream = None
5664
if (
5765
host.transport_settings
@@ -67,10 +75,47 @@ async def hosts(storage: dict, db: AsyncSession):
6775
else:
6876
host_data["downloadSettings"] = None
6977

70-
storage[host.id] = host_data
78+
return (host.id, host_data)
79+
80+
async def add_host(self, db: AsyncSession, host: BaseHost):
81+
await self.add_hosts(db, [host])
82+
83+
async def add_hosts(self, db: AsyncSession, hosts: list[BaseHost]):
84+
serialized_hosts = [BaseHost.model_validate(host) for host in hosts]
85+
inbounds_list = await core_manager.get_inbounds()
86+
await upsert_inbounds(db, inbounds_list)
87+
await db.commit()
88+
89+
prepared_hosts = []
90+
for host in serialized_hosts:
91+
result = await self._prepare_host_entry(db, host, inbounds_list)
92+
if result:
93+
prepared_hosts.append(result)
94+
95+
# Acquire lock only for updating the dict and cache
96+
async with self._lock:
97+
for host_id, host_data in prepared_hosts:
98+
self._hosts[host_id] = host_data
99+
await self._reset_cache()
100+
101+
async def remove_host(self, id: int):
102+
async with self._lock:
103+
self._hosts.pop(id, None)
104+
await self._reset_cache()
105+
106+
async def get_host(self, id: int) -> dict | None:
107+
async with self._lock:
108+
return deepcopy(self._hosts.get(id))
109+
110+
@cached()
111+
async def get_hosts(self) -> dict[int, dict]:
112+
async with self._lock:
113+
return deepcopy(self._hosts)
114+
71115

116+
host_manager: HostManager = HostManager()
72117

73118
@on_startup
74119
async def initialize_hosts():
75120
async with GetDB() as db:
76-
await hosts.update(db)
121+
await host_manager.setup(db)

app/db/crud/group.py

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,16 @@
44
from app.db.models import ProxyInbound, Group
55
from app.models.group import GroupCreate, GroupModify
66

7-
from .host import get_or_create_inbound
7+
from .host import upsert_inbounds
88

99

1010
async def get_inbounds_by_tags(db: AsyncSession, tags: list[str]) -> list[ProxyInbound]:
1111
"""
12-
Retrieves inbounds by their tags.
12+
Retrieves or creates inbounds by their tags using efficient bulk upsert.
1313
"""
14-
return [(await get_or_create_inbound(db, tag)) for tag in tags]
14+
inbounds_map = await upsert_inbounds(db, tags)
15+
# Return in the same order as input tags
16+
return [inbounds_map[tag] for tag in tags]
1517

1618

1719
async def load_group_attrs(group: Group):
@@ -124,13 +126,14 @@ async def modify_group(db: AsyncSession, db_group: Group, modified_group: GroupM
124126
Group: The updated Group object.
125127
"""
126128

129+
if modified_group.inbound_tags:
130+
inbounds = await get_inbounds_by_tags(db, modified_group.inbound_tags)
131+
db_group.inbounds = inbounds
127132
if db_group.name != modified_group.name:
128133
db_group.name = modified_group.name
129134
if modified_group.is_disabled is not None:
130135
db_group.is_disabled = modified_group.is_disabled
131-
if modified_group.inbound_tags:
132-
inbounds = await get_inbounds_by_tags(db, modified_group.inbound_tags)
133-
db_group.inbounds = inbounds
136+
134137
await db.commit()
135138
await db.refresh(db_group)
136139
await load_group_attrs(db_group)

app/db/crud/host.py

Lines changed: 56 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,35 +2,79 @@
22
from enum import Enum
33
from typing import List, Optional
44

5-
from sqlalchemy import select
5+
from sqlalchemy import bindparam, select
6+
from sqlalchemy.dialects.postgresql import insert as pg_insert
7+
from sqlalchemy.dialects.mysql import insert as mysql_insert
8+
from sqlalchemy import insert
69
from sqlalchemy.ext.asyncio import AsyncSession
710

811
from app.db.models import ProxyHost, ProxyInbound
912
from app.models.host import CreateHost
1013

1114

15+
async def upsert_inbounds(db: AsyncSession, inbound_tags: list[str]) -> dict[str, ProxyInbound]:
16+
"""
17+
Efficiently upserts multiple proxy inbounds and returns them.
18+
Uses INSERT ... ON CONFLICT DO NOTHING pattern to avoid unnecessary SELECT queries.
19+
20+
Args:
21+
db (AsyncSession): Database session.
22+
inbound_tags (List[str]): List of inbound tags to upsert.
23+
24+
Returns:
25+
dict[str, ProxyInbound]: Mapping of tag to ProxyInbound object.
26+
27+
Note:
28+
This function does not commit the transaction. The caller is responsible for committing.
29+
"""
30+
if not inbound_tags:
31+
return {}
32+
33+
# Remove duplicates while preserving order
34+
unique_tags = list(dict.fromkeys(inbound_tags))
35+
36+
dialect = db.bind.dialect.name
37+
38+
# Build upsert statement based on dialect
39+
if dialect == "postgresql":
40+
stmt = pg_insert(ProxyInbound).values(tag=bindparam("tag"))
41+
stmt = stmt.on_conflict_do_nothing(index_elements=["tag"])
42+
elif dialect == "mysql":
43+
stmt = mysql_insert(ProxyInbound).values(tag=bindparam("tag"))
44+
stmt = stmt.on_duplicate_key_update(tag=ProxyInbound.tag)
45+
else: # SQLite
46+
stmt = insert(ProxyInbound).values(tag=bindparam("tag")).prefix_with("OR IGNORE")
47+
48+
# Execute upsert for all tags
49+
params = [{"tag": tag} for tag in unique_tags]
50+
await db.execute(stmt, params)
51+
await db.flush() # Flush to make inserted rows visible in this transaction
52+
53+
# Now select all the inbounds we just upserted
54+
select_stmt = select(ProxyInbound).where(ProxyInbound.tag.in_(unique_tags))
55+
result = await db.execute(select_stmt)
56+
inbounds = result.scalars().all()
57+
58+
# Return as a mapping
59+
return {inbound.tag: inbound for inbound in inbounds}
60+
61+
1262
async def get_or_create_inbound(db: AsyncSession, inbound_tag: str) -> ProxyInbound:
1363
"""
1464
Retrieves or creates a proxy inbound based on the given tag.
1565
66+
Note: This function is deprecated. Use upsert_inbounds() for better performance,
67+
especially when dealing with multiple inbounds.
68+
1669
Args:
1770
db (AsyncSession): Database session.
1871
inbound_tag (str): The tag of the inbound.
1972
2073
Returns:
2174
ProxyInbound: The retrieved or newly created proxy inbound.
2275
"""
23-
stmt = select(ProxyInbound).where(ProxyInbound.tag == inbound_tag)
24-
result = await db.execute(stmt)
25-
inbound = result.scalar_one_or_none()
26-
27-
if not inbound:
28-
inbound = ProxyInbound(tag=inbound_tag)
29-
db.add(inbound)
30-
await db.commit()
31-
await db.refresh(inbound)
32-
33-
return inbound
76+
result = await upsert_inbounds(db, [inbound_tag])
77+
return result[inbound_tag]
3478

3579

3680
async def get_inbounds_not_in_tags(db: AsyncSession, excluded_tags: List[str]) -> List[ProxyInbound]:

app/node/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ async def update_node(self, node: Node) -> PasarGuardNode:
2828
except Exception:
2929
pass
3030
finally:
31-
del self._nodes[node.id]
31+
self._nodes.pop(node.id, None)
3232

3333
new_node = create_node(
3434
connection=type_map[node.connection_type],
@@ -56,7 +56,7 @@ async def remove_node(self, id: int) -> None:
5656
except Exception:
5757
pass
5858
finally:
59-
del self._nodes[id]
59+
self._nodes.pop(id, None)
6060

6161
async def get_node(self, id: int) -> PasarGuardNode | None:
6262
async with self._lock.reader_lock:

app/operation/core.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
from app.core.manager import core_manager
88
from app.operation import BaseOperation
99
from app import notification
10-
from app.core.hosts import hosts as hosts_storage
10+
from app.core.hosts import host_manager
1111
from app.utils.logger import get_logger
1212

1313

@@ -28,7 +28,7 @@ async def create_core(self, db: AsyncSession, new_core: CoreCreate, admin: Admin
2828
core = CoreResponse.model_validate(db_core)
2929
asyncio.create_task(notification.create_core(core, admin.username))
3030

31-
await hosts_storage.update(db)
31+
await host_manager.setup(db)
3232

3333
return core
3434

@@ -55,7 +55,7 @@ async def modify_core(
5555
core = CoreResponse.model_validate(db_core)
5656
asyncio.create_task(notification.modify_core(core, admin.username))
5757

58-
await hosts_storage.update(db)
58+
await host_manager.setup(db)
5959

6060
return core
6161

@@ -72,4 +72,4 @@ async def delete_core(self, db: AsyncSession, core_id: int, admin: AdminDetails)
7272

7373
logger.info(f'core config "{db_core.name}" deleted by admin "{admin.username}"')
7474

75-
await hosts_storage.update(db)
75+
await host_manager.setup(db)

app/operation/host.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from app.models.admin import AdminDetails
77
from app.operation import BaseOperation
88
from app.db.crud.host import create_host, get_host_by_id, remove_host, get_hosts, modify_host
9-
from app.core.hosts import hosts as hosts_storage
9+
from app.core.hosts import host_manager
1010
from app.utils.logger import get_logger
1111

1212
from app import notification
@@ -49,7 +49,7 @@ async def create_host(self, db: AsyncSession, new_host: CreateHost, admin: Admin
4949
host = BaseHost.model_validate(db_host)
5050
asyncio.create_task(notification.create_host(host, admin.username))
5151

52-
await hosts_storage.update(db)
52+
await host_manager.add_host(db, db_host)
5353

5454
return host
5555

@@ -70,7 +70,7 @@ async def modify_host(
7070
host = BaseHost.model_validate(db_host)
7171
asyncio.create_task(notification.modify_host(host, admin.username))
7272

73-
await hosts_storage.update(db)
73+
await host_manager.add_host(db, db_host)
7474

7575
return host
7676

@@ -83,7 +83,7 @@ async def remove_host(self, db: AsyncSession, host_id: int, admin: AdminDetails)
8383

8484
asyncio.create_task(notification.remove_host(host, admin.username))
8585

86-
await hosts_storage.update(db)
86+
await host_manager.remove_host(host.id)
8787

8888
async def modify_hosts(
8989
self, db: AsyncSession, modified_hosts: list[CreateHost], admin: AdminDetails
@@ -100,7 +100,7 @@ async def modify_hosts(
100100
else:
101101
await modify_host(db, old_host, host)
102102

103-
await hosts_storage.update(db)
103+
await host_manager.add_hosts(db, modified_hosts)
104104

105105
logger.info(f'Host\'s has been modified by admin "{admin.username}"')
106106

app/subscription/share.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
from jdatetime import date as jd
88

9-
from app.core.hosts import hosts as hosts_storage
9+
from app.core.hosts import host_manager
1010
from app.core.manager import core_manager
1111
from app.db.models import UserStatus
1212
from app.models.user import UsersResponseWithInbounds
@@ -271,7 +271,7 @@ async def process_inbounds_and_tags(
271271
reverse=False,
272272
) -> list | str:
273273
proxy_settings = user.proxy_settings.dict()
274-
for host in await filter_hosts(hosts_storage.values(), user.status):
274+
for host in await filter_hosts((await host_manager.get_hosts()).values(), user.status):
275275
host_data = await process_host(host, format_variables, user.inbounds, proxy_settings, conf)
276276
if not host_data:
277277
continue

app/utils/store.py

Lines changed: 0 additions & 28 deletions
This file was deleted.

0 commit comments

Comments
 (0)