Skip to content

Commit

Permalink
feat: 更新循环任务调度器,修复启动后服务器停不掉的问题
Browse files Browse the repository at this point in the history
  • Loading branch information
helloplhm-qwq committed Dec 23, 2023
1 parent d4056fc commit d9a13c5
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 16 deletions.
32 changes: 17 additions & 15 deletions common/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,13 @@
# 一个简单的循环任务调度器

import time
import threading
from .variable import running
import asyncio
import traceback
from .utils import timestamp_format
from . import log

logger = log.log("scheduler")

running_event = asyncio.Event()
global tasks
tasks = []

Expand All @@ -29,31 +30,32 @@ def __init__(self, name, function, interval = 86400, latest_execute = 0):
def check_available(self):
return (time.time() - self.latest_execute) >= self.interval

def run(self):
async def run(self):
try:
logger.info(f"task {self.name} run start")
self.function()
await self.function()
logger.info(f'task {self.name} run success, next execute: {timestamp_format(self.interval + self.latest_execute)}')
except Exception as e:
logger.error(f"task {self.name} run failed, waiting for next execute...")
logger.error(traceback.format_exc())

def append(name, task, interval = 86400):
global tasks
logger.debug(f"new task ({name}) registered")
wrapper = taskWrapper(name, task, interval)
return tasks.append(wrapper)

def thread_runner():
global tasks
while True:
if not running:
return
# 在 thread_runner 函数中修改循环逻辑
async def thread_runner():
global tasks, running_event
while not running_event.is_set():
for t in tasks:
if t.check_available():
if t.check_available() and not running_event.is_set():
t.latest_execute = int(time.time())
threading.Thread(target = t.run).start()
time.sleep(1)
await t.run() # 等待异步任务完成
await asyncio.sleep(1)

def run():
async def run():
logger.debug("scheduler thread starting...")
threading.Thread(target = thread_runner).start()
task = asyncio.create_task(thread_runner())
logger.debug("schedluer thread load success")
8 changes: 7 additions & 1 deletion main.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from common import log
from common import Httpx
from common import variable
from common import scheduler
from aiohttp.web import Response
import ujson as json
import threading
Expand Down Expand Up @@ -130,6 +131,7 @@ async def run_app():
logger.info(f"监听 -> http://{host}:{port}")

async def initMain():
await scheduler.run()
variable.aioSession = aiohttp.ClientSession()
try:
await run_app()
Expand All @@ -147,7 +149,11 @@ async def initMain():
logger.error("遇到未知错误,请查看日志")
logger.error(traceback.format_exc())
finally:
await variable.aioSession.close()
logger.info('wating for sessions to complete...')
if variable.aioSession:
await variable.aioSession.close()

variable.running = False
logger.info("Server stopped")

if __name__ == "__main__":
Expand Down

0 comments on commit d9a13c5

Please sign in to comment.