Skip to content

Commit 76707d2

Browse files
committed
fix: 修了点东西
1 parent ace3517 commit 76707d2

File tree

6 files changed

+221
-42
lines changed

6 files changed

+221
-42
lines changed

core/__init__.py

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,27 @@
1313
_WAITLOCK = utils.CountLock()
1414
_START_RUNTIME = time.monotonic()
1515

16+
async def call(module, func: str):
17+
try:
18+
init = getattr(module, func)
19+
if asyncio.iscoroutinefunction(init):
20+
await init()
21+
else:
22+
await asyncio.get_event_loop().run_in_executor(None, init)
23+
except:
24+
logger.traceback()
25+
1626
async def main():
1727
start = time.monotonic_ns()
18-
await scheduler.init()
19-
await web.init()
20-
await dashboard.init()
21-
await cluster.init()
28+
await asyncio.gather(*[
29+
call(m, "init") for m in (
30+
scheduler,
31+
web,
32+
database,
33+
dashboard,
34+
cluster,
35+
)
36+
])
2237
_WAITLOCK.acquire()
2338
end = time.monotonic_ns()
2439
logger.tsuccess("main.success.start_service_done", time=f"{((end-start) / 1e9):.2f}")
@@ -27,9 +42,14 @@ async def main():
2742
except:
2843
logger.tdebug("main.debug.service_unfinish")
2944
finally:
30-
await cluster.unload()
31-
await web.unload()
32-
await scheduler.unload()
45+
await asyncio.gather(*[
46+
call(m, "unload") for m in (
47+
scheduler,
48+
web,
49+
cluster,
50+
database
51+
)
52+
])
3353

3454
def read_version():
3555
with open("VERSION", "r") as f:

core/cluster.py

Lines changed: 32 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ def __init__(self, clusters: 'ClusterManager'):
4949
self.cache_filelist: defaultdict[storages.iStorage, defaultdict[str, storages.File]] = defaultdict(defaultdict) # type: ignore
5050

5151
def init(self):
52-
scheduler.run_repeat(self._check_available, 120)
52+
scheduler.run_repeat_later(self._check_available, 1, 120)
5353

5454
async def _check_available(self):
5555
for storage in self.storages:
@@ -61,6 +61,7 @@ async def _check_available(self):
6161
)
6262
if await storage.get_size(CHECK_FILE_MD5) == len(CHECK_FILE_CONTENT) and storage not in self.available_storages:
6363
self.available_storages.append(storage)
64+
logger.debug(f"Available storages: {len(self.available_storages)}")
6465
if len(self.available_storages) > 0:
6566
self.check_available.release()
6667
else:
@@ -88,6 +89,7 @@ async def get_missing_files(self, files: set[File]) -> set[File | Any]:
8889
logger.twarning("cluster.warning.no_check_function")
8990
function = self._check_exists
9091

92+
await self.available()
9193
with tqdm(
9294
total=len(self.available_storages) * 256,
9395
desc="List Files",
@@ -108,7 +110,7 @@ async def get_missing_files(self, files: set[File]) -> set[File | Any]:
108110
waiting_files.put_nowait(file)
109111

110112
await asyncio.gather(*(self._get_missing_file_storage(function, missing_files, waiting_files, storage, pbar) for storage in self.available_storages))
111-
return missing_files or set()
113+
return missing_files
112114

113115
async def get_storage_filelist(self, storage: storages.iStorage, pbar: tqdm):
114116
result = await storage.list_files(pbar)
@@ -736,28 +738,28 @@ async def keepalive(self):
736738
ping = (time.time() - timestamp) // 0.0002
737739
logger.tsuccess("cluster.success.keepalive", cluster=self.id, hits=units.format_number(total_counter.hits), bytes=units.format_bytes(total_counter.bytes), ping=ping)
738740

739-
storage_data = []
740-
for storage, counter in self.counter.items():
741-
storage_data.append(
742-
db.StorageStatistics(
743-
storage=storage.unique_id,
744-
hits=counter.hits,
745-
bytes=counter.bytes,
746-
)
747-
)
748-
storage_data.append(
749-
db.StorageStatistics(
750-
None,
751-
commit_no_storage_counter.hits,
752-
commit_no_storage_counter.bytes
753-
)
754-
)
755-
db.add_statistics(
756-
db.Statistics(
757-
self.id,
758-
storage_data
759-
)
760-
)
741+
#storage_data = []
742+
#for storage, counter in self.counter.items():
743+
# storage_data.append(
744+
# db.StorageStatistics(
745+
# storage=storage.unique_id,
746+
# hits=counter.hits,
747+
# bytes=counter.bytes,
748+
# )
749+
# )
750+
#storage_data.append(
751+
# db.StorageStatistics(
752+
# None,
753+
# commit_no_storage_counter.hits,
754+
# commit_no_storage_counter.bytes
755+
# )
756+
#)
757+
#db.add_statistics(
758+
# db.Statistics(
759+
# self.id,
760+
# storage_data
761+
# )
762+
#)
761763

762764
async def disable(self, exit: bool = False):
763765
scheduler.cancel(self.keepalive_task)
@@ -1055,6 +1057,12 @@ async def _(request: aweb.Request):
10551057
start = request.http_range.start or 0
10561058
end = request.http_range.stop or file.size
10571059
size = end - start + 1
1060+
storage_name = file.storage.unique_id if file.storage is not None else None
1061+
db.add_file(
1062+
cluster.id,
1063+
storage_name,
1064+
size
1065+
)
10581066

10591067
cluster.hit(file.storage, size)
10601068
# add database

core/config.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,9 @@
44
import os
55

66
defaults = {
7-
"advanced.api_version": "1.11.0",
87
"advanced.lang": "zh_cn",
98
"advanced.debug": False,
10-
"advanced.sync_interval": 60,
9+
"advanced.sync_interval": 600,
1110
"advanced.base_url": "https://openbmclapi.bangbang93.com",
1211
"advanced.threads": 128,
1312
"advanced.ssl_dir": ".ssl",
@@ -137,6 +136,6 @@ def check_type(self):
137136

138137
@property
139138
def sync_interval(self):
140-
return Config.get("advanced.sync_interval", 600)
139+
return max(Config.get("advanced.sync_interval", 600), 600)
141140

142141
const = Const()

core/database.py

Lines changed: 114 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
1+
from collections import defaultdict
12
from dataclasses import dataclass
23
import time
34
from typing import Optional
4-
from sqlalchemy import create_engine, Column, Integer, String
5+
from sqlalchemy import create_engine, Column, Integer, String, LargeBinary
56
from sqlalchemy.ext.declarative import declarative_base
67
from sqlalchemy.orm import sessionmaker
78

9+
from core import logger, scheduler
10+
811
@dataclass
912
class StorageStatistics:
1013
storage: Optional[str] = None
@@ -16,6 +19,22 @@ class Statistics:
1619
cluster_id: str
1720
storages: list[StorageStatistics]
1821

22+
@dataclass
23+
class FileStatisticsKey:
24+
hour: int
25+
cluster_id: str
26+
storage_id: Optional[str]
27+
28+
def __hash__(self):
29+
return hash((self.hour, self.cluster_id, self.storage_id))
30+
31+
@dataclass
32+
class FileStatistics:
33+
hits: int = 0
34+
bytes: int = 0
35+
36+
37+
1938
engine = create_engine('sqlite:///database.db')
2039
Base = declarative_base()
2140

@@ -35,6 +54,18 @@ class StorageStatisticsTable(Base):
3554
hits = Column(String, nullable=False)
3655
bytes = Column(String, nullable=False)
3756

57+
class ResponseTable(Base):
58+
__tablename__ = 'Responses'
59+
id = Column(Integer, primary_key=True)
60+
hour = Column(Integer, nullable=False)
61+
cluster = Column(String, nullable=False)
62+
storage = Column(String, nullable=False)
63+
success = Column(String, nullable=False)
64+
not_found = Column(String, nullable=False)
65+
error = Column(String, nullable=False)
66+
redirect = Column(String, nullable=False)
67+
ip_tables = Column(LargeBinary, nullable=False)
68+
3869
class Session:
3970
def __init__(self):
4071
self.session = sessionmaker(bind=engine)()
@@ -47,8 +78,11 @@ def get_session(self):
4778
return self.session
4879

4980
SESSION = Session()
81+
FILE_CACHE: defaultdict[FileStatisticsKey, FileStatistics] = defaultdict(lambda: FileStatistics())
5082

83+
@DeprecationWarning
5184
def add_statistics(data: Statistics):
85+
return
5286
session = SESSION.get_session()
5387
hits, bytes = 0, 0
5488
hour = get_hour()
@@ -78,10 +112,87 @@ def add_statistics(data: Statistics):
78112
}
79113
)
80114
session.commit()
81-
82115

116+
def add_file(cluster: str, storage: Optional[str], bytes: int):
117+
global FILE_CACHE
118+
try:
119+
key = FileStatisticsKey(get_hour(), cluster, storage)
120+
FILE_CACHE[key].bytes += bytes
121+
FILE_CACHE[key].hits += 1
122+
except:
123+
logger.traceback()
83124

84125
def get_hour():
85126
return int(time.time() // 3600)
86127

87-
Base.metadata.create_all(engine)
128+
def _commit_storage(hour: int, storage: Optional[str], hits: int, bytes: int):
129+
if hits == bytes == 0:
130+
return False
131+
session = SESSION.get_session()
132+
q = session.query(StorageStatisticsTable).filter_by(hour=hour, storage=storage)
133+
r = q.first() or StorageStatisticsTable(hour=hour, storage=storage, hits=str(0), bytes=str(0))
134+
if q.count() == 0:
135+
session.add(r)
136+
q.update(
137+
{
138+
'hits': str(int(r.hits) + hits), # type: ignore
139+
'bytes': str(int(r.bytes) + bytes) # type: ignore
140+
}
141+
)
142+
return True
143+
144+
def _commit_cluster(hour: int, cluster: str, hits: int, bytes: int):
145+
if hits == bytes == 0:
146+
return False
147+
session = SESSION.get_session()
148+
q = session.query(ClusterStatisticsTable).filter_by(hour=hour, cluster=cluster)
149+
r = q.first() or ClusterStatisticsTable(hour=hour, cluster=cluster, hits=str(0), bytes=str(0))
150+
if q.count() == 0:
151+
session.add(r)
152+
q.update(
153+
{
154+
'hits': str(int(r.hits) + hits), # type: ignore
155+
'bytes': str(int(r.bytes) + bytes) # type: ignore
156+
}
157+
)
158+
return True
159+
160+
def commit():
161+
global FILE_CACHE
162+
total_hits = 0
163+
total_bytes = 0
164+
total_storages = 0
165+
cache = FILE_CACHE.copy()
166+
session = SESSION.get_session()
167+
clusters: defaultdict[tuple[int, str], FileStatistics] = defaultdict(lambda: FileStatistics(0, 0))
168+
for key, value in FILE_CACHE.items():
169+
hour = key.hour
170+
cluster = key.cluster_id
171+
storage = key.storage_id
172+
hits = value.hits
173+
bytes = value.bytes
174+
if _commit_storage(hour, storage, hits, bytes):
175+
total_hits += hits
176+
total_bytes += bytes
177+
total_storages += 1
178+
clusters[(hour, cluster)].hits += hits
179+
clusters[(hour, cluster)].bytes += bytes
180+
for cluster, value in clusters.items():
181+
_commit_cluster(cluster[0], cluster[1], value.hits, value.bytes)
182+
183+
logger.success(f'Committing {total_hits} hits and {total_bytes} bytes to database. {total_storages} storages updated')
184+
185+
session.commit()
186+
for key, value in cache.items():
187+
FILE_CACHE[key].hits -= value.hits
188+
FILE_CACHE[key].bytes -= value.bytes
189+
if FILE_CACHE[key].hits == FILE_CACHE[key].bytes == 0:
190+
del FILE_CACHE[key]
191+
...
192+
193+
async def init():
194+
Base.metadata.create_all(engine)
195+
scheduler.run_repeat_later(commit, 5, 10)
196+
197+
async def unload():
198+
commit()

0 commit comments

Comments
 (0)