Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion backend/app/api/v1/module_common/file/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,6 @@ async def download_controller(
"""
result = await FileService.download_service(file_path=file_path)
if delete:
background_tasks.add_task(UploadUtil.delete_file, Path(file_path))
background_tasks.add_task(UploadUtil.delete_file, Path(result.file_path))
log.info("下载文件成功")
return UploadFileResponse(file_path=result.file_path, filename=result.file_name)
47 changes: 42 additions & 5 deletions backend/app/api/v1/module_common/file/service.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
import os

from fastapi import UploadFile

from app.config.setting import settings
from app.core.base_schema import DownloadFileSchema, UploadResponseSchema
from app.core.exceptions import CustomException
from app.core.logger import log
from app.utils.upload_util import UploadUtil


Expand Down Expand Up @@ -42,6 +46,38 @@ async def upload_service(
file_url=f"{file_url}",
).model_dump()

@staticmethod
def _validate_download_path(file_path: str) -> str:
"""
验证下载路径是否安全。

参数:
- file_path (str): 文件路径。

返回:
- str: 安全的绝对路径。

异常:
- CustomException: 当路径不安全时抛出。
"""
if not file_path:
raise CustomException(msg="请选择要下载的文件")

dangerous_patterns = ["../", "..\\", "\0"]
for pattern in dangerous_patterns:
if pattern in file_path:
log.error(f"检测到路径穿越攻击: {file_path}")
raise CustomException(msg="非法的文件路径")

upload_root = settings.UPLOAD_FILE_PATH.resolve()
abs_path = os.path.normpath(os.path.abspath(file_path))

if not abs_path.startswith(str(upload_root)):
log.error(f"路径不在上传目录内: {file_path}")
raise CustomException(msg="非法的文件路径")

return abs_path

@classmethod
async def download_service(cls, file_path: str) -> DownloadFileSchema:
"""
Expand All @@ -56,13 +92,14 @@ async def download_service(cls, file_path: str) -> DownloadFileSchema:
异常:
- CustomException: 当未选择文件或文件不存在时抛出。
"""
if not file_path:
raise CustomException(msg="请选择要下载的文件")
if not UploadUtil.check_file_exists(file_path):
safe_path = cls._validate_download_path(file_path)

if not UploadUtil.check_file_exists(safe_path):
raise CustomException(msg="文件不存在")
file_name = UploadUtil.download_file(file_path)

file_name = UploadUtil.download_file(safe_path)

return DownloadFileSchema(
file_path=file_path,
file_path=safe_path,
file_name=str(file_name),
)
110 changes: 91 additions & 19 deletions backend/app/api/v1/module_monitor/resource/service.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
import re
import shutil
from datetime import datetime
from pathlib import Path
Expand All @@ -11,6 +12,7 @@
from app.core.exceptions import CustomException
from app.core.logger import log
from app.utils.excel_util import ExcelUtil
from app.utils.upload_util import DANGEROUS_EXTENSIONS, MIME_TYPE_MAPPING

from .schema import (
ResourceCopySchema,
Expand Down Expand Up @@ -124,6 +126,54 @@ def _path_exists(cls, path: str) -> bool:
except Exception as e:
raise CustomException(msg=f"检查路径是否存在失败: {e!s}")

@staticmethod
def _sanitize_filename(filename: str) -> str:
"""
清理文件名,移除危险字符和路径穿越。

参数:
- filename (str): 原始文件名。

返回:
- str: 安全的文件名。
"""
if not filename:
return f"file_{datetime.now().strftime('%Y%m%d%H%M%S')}"
filename = os.path.basename(filename)
filename = re.sub(r'[<>:"/\\|?*\x00-\x1f]', "", filename)
filename = re.sub(r"\.{2,}", ".", filename)
filename = filename.strip(". ")
if not filename:
filename = f"file_{datetime.now().strftime('%Y%m%d%H%M%S')}"
return filename

@staticmethod
def _detect_file_type(content: bytes) -> str | None:
"""
通过文件内容检测真实文件类型。

参数:
- content (bytes): 文件内容(前几字节即可)。

返回:
- str | None: 检测到的 MIME 类型,无法识别返回 None。
"""
if content.startswith(b"\xff\xd8\xff"):
return "image/jpeg"
if content.startswith(b"\x89PNG\r\n\x1a\n"):
return "image/png"
if content.startswith(b"GIF87a") or content.startswith(b"GIF89a"):
return "image/gif"
if content.startswith(b"PK\x03\x04"):
if b"[Content_Types].xml" in content[:1000]:
return "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
return "application/zip"
if content.startswith(b"%PDF"):
return "application/pdf"
if content.startswith(b"\xd0\xcf\x11\xe0\xa1\xb1\x1a\xe1"):
return "application/msword"
return None

@classmethod
def _generate_http_url(cls, file_path: str, base_url: str | None = None) -> str:
"""
Expand Down Expand Up @@ -512,59 +562,81 @@ async def upload_file_service(
if not file or not file.filename:
raise CustomException(msg="请选择要上传的文件")

# 文件名安全检查
if ".." in file.filename or "/" in file.filename or "\\" in file.filename:
raise CustomException(msg="文件名包含不安全字符")
original_filename = file.filename

dangerous_patterns = ["../", "..\\", "/", "\\", "\0"]
for pattern in dangerous_patterns:
if pattern in original_filename:
log.error(f"检测到路径穿越攻击: {original_filename}")
raise CustomException(msg="文件名包含非法字符")

if "." not in original_filename:
raise CustomException(msg="无法识别文件类型")

ext = os.path.splitext(original_filename)[1].lower()
if not ext:
raise CustomException(msg="无法识别文件类型")

if ext in DANGEROUS_EXTENSIONS:
log.error(f"尝试上传危险文件类型: {ext}")
raise CustomException(msg=f"不允许上传此类型的文件: {ext}")

try:
# 检查文件大小
content = await file.read()
if len(content) > cls.MAX_UPLOAD_SIZE:
raise CustomException(
msg=f"文件太大,最大支持{cls.MAX_UPLOAD_SIZE // (1024 * 1024)}MB"
)

# 确定上传目录,如果没有指定目标路径,使用静态文件根目录
detected_type = cls._detect_file_type(content)
if detected_type:
expected_ext = MIME_TYPE_MAPPING.get(detected_type, "")
if expected_ext and expected_ext != ext:
log.warning(
f"文件类型不匹配: 声明扩展名={ext}, 检测类型={detected_type}"
)

safe_dir = (
cls._get_resource_root() if target_path is None else cls._get_safe_path(target_path)
)

# 创建目录(如果不存在)
os.makedirs(safe_dir, exist_ok=True)

# 生成文件路径
filename = file.filename
file_path = os.path.join(safe_dir, filename)
safe_filename = cls._sanitize_filename(original_filename)
file_path = os.path.join(safe_dir, safe_filename)

file_path_abs = os.path.normpath(os.path.abspath(file_path))
safe_dir_abs = os.path.normpath(os.path.abspath(safe_dir))
if not file_path_abs.startswith(safe_dir_abs):
log.error(f"检测到路径穿越攻击,目标路径: {file_path}")
raise CustomException(msg="非法的文件路径")

# 检查文件是否已存在
if os.path.exists(file_path):
# 生成唯一文件名
base_name, ext = os.path.splitext(filename)
base_name, extension = os.path.splitext(safe_filename)
counter = 1
while os.path.exists(file_path):
new_filename = f"{base_name}_{counter}{ext}"
new_filename = f"{base_name}_{counter}{extension}"
file_path = os.path.join(safe_dir, new_filename)
counter += 1
filename = os.path.basename(file_path)
safe_filename = os.path.basename(file_path)

# 保存文件(使用已读取的内容)
Path(file_path).write_bytes(content)

# 获取文件信息
file_info = cls._get_file_info(file_path, base_url)

# 生成文件URL
file_url = cls._generate_http_url(file_path, base_url)

log.info(f"文件上传成功: {filename}")
log.info(f"文件上传成功: {safe_filename}")

return ResourceUploadSchema(
filename=filename,
filename=safe_filename,
file_url=file_url,
file_size=file_info.get("size", 0),
upload_time=datetime.now(),
).model_dump(mode="json")

except CustomException:
raise
except Exception as e:
log.error(f"文件上传失败: {e!s}")
raise CustomException(msg=f"文件上传失败: {e!s}")
Expand Down
6 changes: 6 additions & 0 deletions backend/app/api/v1/module_system/notice/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from app.common.enums import QueueEnum
from app.core.base_schema import BaseSchema, UserBySchema
from app.core.validator import DateTimeStr
from app.utils.xss_util import sanitize_html


class NoticeCreateSchema(BaseModel):
Expand All @@ -28,6 +29,11 @@ def _validate_notice_type(cls, value: str):
raise ValueError("公告类型仅支持 '1'(通知) 或 '2'(公告)")
return value

@field_validator("notice_content")
@classmethod
def _sanitize_notice_content(cls, value: str) -> str:
return sanitize_html(value)

@model_validator(mode="after")
def _validate_after(self):
if not self.notice_title.strip():
Expand Down
22 changes: 22 additions & 0 deletions backend/app/plugin/module_task/job/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,23 @@
from .tools.ap_scheduler import SchedulerUtil


def validate_job_func(func: str) -> None:
"""
校验任务函数格式是否有效。

参数:
- func (str): 任务函数字符串,格式应为 "module.function"

异常:
- CustomException: 当 func 格式无效时抛出
"""
if not func or "." not in func:
raise CustomException(msg=f"任务函数格式无效: {func},必须包含模块名和函数名(如: module.function)")
parts = func.rsplit(".", 1)
if len(parts) != 2 or not parts[0] or not parts[1]:
raise CustomException(msg=f"任务函数格式无效: {func},模块名和函数名不能为空")


class JobService:
"""
定时任务管理模块服务层
Expand Down Expand Up @@ -72,6 +89,8 @@ async def create_job_service(cls, auth: AuthSchema, data: JobCreateSchema) -> di
if exist_obj:
raise CustomException(msg="创建失败,该定时任务已存在")

validate_job_func(data.func)

obj = await JobCRUD(auth).create_obj_crud(data=data)
if not obj:
raise CustomException(msg="创建失败,该数据定时任务不存在")
Expand Down Expand Up @@ -100,6 +119,9 @@ async def update_job_service(cls, auth: AuthSchema, id: int, data: JobUpdateSche
and not CronUtil.validate_cron_expression(data.trigger_args)
):
raise CustomException(msg=f"新增定时任务{data.name}失败, Cron表达式不正确")

validate_job_func(data.func)

obj = await JobCRUD(auth).update_obj_crud(id=id, data=data)
if not obj:
raise CustomException(msg="更新失败,该数据定时任务不存在")
Expand Down
12 changes: 11 additions & 1 deletion backend/app/plugin/module_task/job/tools/ap_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,17 @@ def add_job(cls, job_info: JobModel) -> Job:
"""
# 动态导入模块
# 1. 解析调用目标
module_path, func_name = str(job_info.func).rsplit(".", 1)
func_str = str(job_info.func)
if "." not in func_str:
log.error(f"任务 {job_info.id} 的 func 格式无效: {func_str},必须包含模块名和函数名(如: module.function)")
raise CustomException(msg=f"任务函数格式无效: {func_str},必须包含模块名和函数名(如: module.function)")

try:
module_path, func_name = func_str.rsplit(".", 1)
except ValueError as e:
log.error(f"任务 {job_info.id} 的 func 解析失败: {func_str}, 错误: {e}")
raise CustomException(msg=f"任务函数格式无效: {func_str}") from e

module_path = "app.plugin.module_task.job.function_task." + module_path
try:
module = importlib.import_module(module_path)
Expand Down
Loading