Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion backend/app/api/v1/module_system/auth/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ async def authenticate_user_service(cls, request: Request, redis: Redis, login_f
if not PwdUtil.verify_password(plain_password=login_form.password, password_hash=user.password):
raise CustomException(msg="账号或密码错误")

if not user.status:
if user.status == "1":
raise CustomException(msg="用户已被停用")

# 更新最后登录时间
Expand Down
2 changes: 1 addition & 1 deletion backend/app/api/v1/module_system/dept/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ async def batch_set_available_service(cls, auth: AuthSchema, data: BatchSetAvail
dept_list = await DeptCRUD(auth).get_list_crud()
total_ids = []

if data.status:
if data.status == "0":
id_map = get_parent_id_map(model_list=dept_list)
for dept_id in data.ids:
enable_ids = get_parent_recursion(id=dept_id, id_map=id_map)
Expand Down
2 changes: 1 addition & 1 deletion backend/app/api/v1/module_system/menu/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ async def set_menu_available_service(cls, auth: AuthSchema, data: BatchSetAvaila
menu_list = await MenuCRUD(auth).get_list_crud()
total_ids = []

if data.status:
if data.status == "0":
# 激活,则需要把所有父级菜单都激活
id_map = get_parent_id_map(model_list=menu_list)
for menu_id in data.ids:
Expand Down
10 changes: 5 additions & 5 deletions backend/app/api/v1/module_system/user/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ async def update_user_service(cls, id: int, data: UserUpdateSchema, auth: AuthSc
dept = await DeptCRUD(auth).get_by_id_crud(id=data.dept_id)
if not dept:
raise CustomException(msg='部门不存在')
if not dept.status:
if dept.status == "1":
raise CustomException(msg='部门已被禁用')

# 更新用户 - 排除不应被修改的字段, 更新不更新密码
Expand Down Expand Up @@ -218,7 +218,7 @@ async def delete_user_service(cls, auth: AuthSchema, ids: list[int]) -> None:
raise CustomException(msg="用户不存在")
if user.is_superuser:
raise CustomException(msg="超级管理员不能删除")
if user.status:
if user.status == "0":
raise CustomException(msg="用户已启用,不能删除")
if auth.user and auth.user.id == id:
raise CustomException(msg="不能删除当前登陆用户")
Expand Down Expand Up @@ -263,7 +263,7 @@ async def get_current_user_info_service(cls, auth: AuthSchema) -> dict:
menu.id
for role in auth.user.roles or []
for menu in role.menus
if menu.status and menu.type in [1, 2, 4]
if menu.status == "0" and menu.type in [1, 2, 4]
}

# 使用树形结构查询,预加载children关系
Expand Down Expand Up @@ -451,7 +451,7 @@ async def forget_password_service(cls, auth: AuthSchema, data: UserForgetPasswor
user = await UserCRUD(auth).get_by_username_crud(username=data.username)
if not user:
raise CustomException(msg="用户不存在")
if not user.status:
if user.status == "1":
raise CustomException(msg="用户已停用")

# 检查是否是超级管理员
Expand Down Expand Up @@ -526,7 +526,7 @@ async def batch_import_user_service(cls, auth: AuthSchema, file: UploadFile, upd
count = count + 1
# 数据转换
gender = 1 if row['gender'] == '男' else (2 if row['gender'] == '女' else 1)
status = True if row['status'] == '正常' else False
status = "0" if row['status'] == '正常' else "1"

# 构建用户数据
user_data = {
Expand Down
4 changes: 2 additions & 2 deletions backend/app/core/dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ async def get_current_user(
)
if not user:
raise CustomException(msg="用户不存在", code=10401, status_code=401)
if not user.status:
if user.status == "1":
raise CustomException(msg="用户已被停用", code=10401, status_code=401)

# 设置请求上下文
Expand Down Expand Up @@ -163,7 +163,7 @@ async def __call__(self, auth: AuthSchema = Depends(get_current_user)) -> AuthSc
menu.permission
for role in auth.user.roles
for menu in role.menus
if role.status and menu.permission and menu.status
if role.status == "0" and menu.permission and menu.status == "0"
}

# 权限验证 - 满足任一权限即可
Expand Down
2 changes: 1 addition & 1 deletion backend/app/plugin/module_application/job/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ async def option_job_service(cls, auth: AuthSchema, id: int, option: int) -> Non
updated_job = await JobCRUD(auth).get_obj_by_id_crud(id=id)
if updated_job:
# 重新添加任务
SchedulerUtil.add_job(job_info=updated_job)
SchedulerUtil().add_job(job_info=updated_job)
# 设置状态为运行中
await JobCRUD(auth).set_obj_field_crud(ids=[id], status='0')

Expand Down
166 changes: 121 additions & 45 deletions backend/app/plugin/module_application/job/tools/ap_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,59 +67,134 @@ class SchedulerUtil:
@classmethod
def scheduler_event_listener(cls, event: JobEvent | JobExecutionEvent) -> None:
"""
监听任务执行事件
监听任务执行事件并记录详细执行信息

参数:
- event (JobEvent | JobExecutionEvent): 任务事件对象。

返回:
- None
"""
# 只处理任务执行相关事件,不处理任务添加、删除等事件
if not isinstance(event, JobExecutionEvent):
return
try:
# 只处理任务执行相关事件,不处理任务添加、删除等事件
if not isinstance(event, JobExecutionEvent):
return

# 延迟导入避免循环导入
from app.plugin.module_application.job.model import JobLogModel

# 延迟导入避免循环导入
from app.plugin.module_application.job.model import JobLogModel

# 获取事件类型和任务ID
event_type = event.__class__.__name__
# 初始化任务状态
status = "0"
exception_info = ''
if isinstance(event, JobExecutionEvent) and event.exception:
exception_info = str(event.exception)
status = "1"
if hasattr(event, 'job_id'):
job_id = event.job_id
query_job = cls.get_job(job_id=job_id)
if query_job:
job_message = (f"事件类型: {event_type}, 任务ID: {job_id}, "
f"状态: {status}, 错误详情: {exception_info}, "
f"执行于{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")

job_log = JobLogModel(
job_name=query_job.name,
job_group=query_job._jobstore_alias,
job_executor=query_job.executor,
invoke_target=query_job.func.__module__ + '.' + query_job.func.__qualname__,
job_args=str(query_job.args),
job_kwargs=str(query_job.kwargs),
job_trigger=query_job.trigger,
job_message=job_message,
status=status,
exception_info=exception_info,
created_time=datetime.now(),
updated_time=datetime.now(),
job_id=job_id,
)
# 获取事件类型和任务ID
event_type = event.__class__.__name__

# 初始化任务状态
status = "0"
exception_info = ''
if hasattr(event, 'exception') and event.exception:
exception_info = str(event.exception)
status = "1"

if hasattr(event, 'job_id'):
job_id = event.job_id
query_job = cls.get_job(job_id=job_id)

if query_job:
# 解析任务的实际执行函数和参数
actual_func = None
actual_args = []
actual_kwargs = {}

try:
if hasattr(query_job, 'args') and len(query_job.args) >= 2:
actual_func = query_job.args[0]
actual_args = query_job.args[2:]

if hasattr(query_job, 'kwargs'):
actual_kwargs = query_job.kwargs
except Exception as e:
log.error(f"解析任务 {job_id} 参数失败: {str(e)}")

# 格式化参数显示
formatted_args = str(actual_args) if actual_args else "()"
formatted_kwargs = str(actual_kwargs) if actual_kwargs else "{}"

# 获取实际的执行函数信息
actual_func_module = ''
actual_func_name = ''
try:
if actual_func:
actual_func_module = getattr(actual_func, '__module__', '')
actual_func_name = getattr(actual_func, '__name__', '')
except Exception as e:
log.error(f"获取任务 {job_id} 函数信息失败: {str(e)}")

# 构建详细的任务消息
scheduled_time_str = "未知"
try:
if hasattr(event, 'scheduled_run_time') and event.scheduled_run_time:
scheduled_time_str = event.scheduled_run_time.strftime('%Y-%m-%d %H:%M:%S')
except Exception:
try:
scheduled_time_str = str(event.scheduled_run_time)
except Exception:
pass

try:
event_type = event_type
func_info = f"{actual_func_module}.{actual_func_name}" if actual_func else "未知"
job_message = f"任务 {job_id} ({query_job.name}) 执行完成: "
job_message += f"状态={'成功' if status == '0' else '失败'}, "
job_message += f"执行函数={func_info}, "
job_message += f"参数={formatted_args}, "
job_message += f"关键字参数={formatted_kwargs}, "
job_message += f"计划时间={scheduled_time_str}, "
job_message += f"实际执行时间={datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
if exception_info:
job_message += f", 错误={exception_info[:500]}..."
except Exception as e:
job_message = f"任务 {job_id} 执行事件,状态={'成功' if status == '0' else '失败'}"
log.error(f"构建任务 {job_id} 消息失败: {str(e)}")

with db_session.begin() as session:
# 创建日志记录
try:
session.add(job_log)
session.commit()
# 获取执行函数信息
invoke_target = func_info
if not invoke_target:
try:
invoke_target = f"{getattr(query_job.func, '__module__', '')}.{getattr(query_job.func, '__name__', '')}"
except Exception:
invoke_target = "未知"

job_log = JobLogModel(
job_name=query_job.name,
job_group=query_job._jobstore_alias,
job_executor=query_job.executor,
invoke_target=invoke_target,
job_args=formatted_args,
job_kwargs=formatted_kwargs,
job_trigger=str(query_job.trigger),
job_message=job_message,
status=status,
exception_info=exception_info,
created_time=datetime.now(),
updated_time=datetime.now(),
job_id=job_id,
)

# 保存到数据库
with db_session.begin() as session:
try:
session.add(job_log)
session.commit()
log.info(f"任务 {job_id} 执行日志已保存")
except Exception as e:
session.rollback()
log.error(f"保存任务 {job_id} 执行日志失败: {str(e)}")
except Exception as e:
session.rollback()
log.error(f"创建任务 {job_id} 日志记录失败: {str(e)}")
except Exception as e:
log.error(f"处理任务执行事件失败: {str(e)}")
import traceback
traceback.print_exc()

@classmethod
async def init_system_scheduler(cls, redis: Redis) -> None:
Expand Down Expand Up @@ -299,7 +374,7 @@ def add_job(cls, job_info: JobModel) -> Job:
# 动态导入模块
# 1. 解析调用目标
module_path, func_name = str(job_info.func).rsplit('.', 1)
module_path = "app.api.v1.module_application.job.function_task." + module_path
module_path = "app.plugin.module_application.job.function_task." + module_path
try:
module = importlib.import_module(module_path)
job_func = getattr(module, func_name)
Expand Down Expand Up @@ -356,9 +431,10 @@ def add_job(cls, job_info: JobModel) -> Job:
if not CronUtil.validate_cron_expression(job_info.trigger_args):
raise ValueError(f'定时任务{job_info.name}, Cron表达式不正确')

parsed_fields = [None if field in ('*', '?') else field for field in fields]
# 将Cron表达式中的"?"替换为"*"以兼容APScheduler
parsed_fields = [field if field != '?' else '*' for field in fields]
if len(fields) == 6:
parsed_fields.append(None)
parsed_fields.append('*') # 如果没有年份字段,添加None

second, minute, hour, day, month, day_of_week, year = tuple(parsed_fields)
trigger = CronTrigger(
Expand Down
2 changes: 1 addition & 1 deletion backend/app/plugin/module_example/demo/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ async def batch_import_service(cls, auth: AuthSchema, file: UploadFile, update_s
try:
# 数据转换前的类型检查
try:
status = True if row['status'] == '正常' else False
status = "0" if row['status'] == '正常' else "1"
except ValueError:
error_msgs.append(f"第{count}行: 状态必须是'正常'或'停用'")
continue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,8 @@
<el-table-column label="任务组名" prop="job_group" min-width="120" show-overflow-tooltip />
<el-table-column label="执行状态" prop="status" min-width="100" show-overflow-tooltip>
<template #default="scope">
<el-tag :type="scope.row.status === '0' ? 'primary' : 'success'">
{{ scope.row.status === "0" ? "运行中" : "完成" }}
<el-tag :type="scope.row.status === '0' ? 'success' : 'danger'">
{{ scope.row.status === "0" ? "成功" : "失败" }}
</el-tag>
</template>
</el-table-column>
Expand Down Expand Up @@ -243,8 +243,8 @@
{{ detailFormData.job_group }}
</el-descriptions-item>
<el-descriptions-item label="执行状态" :span="2">
<el-tag :type="detailFormData.status === '0' ? 'primary' : 'success'">
{{ detailFormData.status === "0" ? "运行中" : "完成" }}
<el-tag :type="detailFormData.status === '0' ? 'success' : 'danger'">
{{ detailFormData.status === "0" ? "成功" : "失败" }}
</el-tag>
</el-descriptions-item>
<el-descriptions-item label="执行信息" :span="2">
Expand Down