Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion src/backend/bisheng/common/errcode/knowledge.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,4 +83,10 @@ class KnowledgeMetadataFieldNotExistError(BaseErrorCode):
# 不能修改内置元数据字段
class KnowledgeMetadataFieldImmutableError(BaseErrorCode):
Code: int = 10983
Msg: str = '内置元数据字段 {field_name} 不能修改'
Msg: str = '内置元数据字段 {field_name} 不能修改'


# 元数据值类型转换错误
class KnowledgeMetadataValueTypeConvertError(BaseErrorCode):
Code: int = 10984
Msg: str = '元数据字段 {field_name} 值类型转换错误: {error_msg}'
43 changes: 22 additions & 21 deletions src/backend/bisheng/knowledge/domain/models/knowledge_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -353,13 +353,13 @@ def update_status_bulk(cls, file_ids: List[int], status: KnowledgeFileStatus, re

@classmethod
def filter_file_by_metadata_fields(cls, knowledge_id: int, logical: Literal["and", "or"],
metadata_filters: Dict[str, Dict[str, Any]]) -> List[int]:
metadata_filters: List[Dict[str, Dict[str, Any]]]) -> List[int]:
"""
根据用户自定义元数据字段过滤知识文件
:param knowledge_id: 知识库ID
:param logical: 逻辑操作符,支持 "AND" 或 "OR"
:param metadata_filters: 用户自定义元数据字段及其对应的值
{
[{
field_a: {
'comparison': '=',
'value': 'some_value',
Expand All @@ -370,7 +370,7 @@ def filter_file_by_metadata_fields(cls, knowledge_id: int, logical: Literal["and
}
]
}
}
}]
:return: 符合条件的知识文件ID列表
"""

Expand All @@ -379,25 +379,26 @@ def filter_file_by_metadata_fields(cls, knowledge_id: int, logical: Literal["and

params_index = 1
field_statement = []
for key, key_info in metadata_filters.items():
key_comparison = key_info['comparison']
key_value = key_info['value']
extra_filter = key_info.get('extra_filter')
if key_value is not None:
params_key = f"tmp_params_{params_index}"
params[params_key] = key_value
sub_statement = f"{key} {key_comparison} :{params_key}"
else:
sub_statement = f"{key} {key_comparison}"
if extra_filter:
for sub_info in extra_filter:
params_index += 1
for metadata_filter in metadata_filters:
for key, key_info in metadata_filter.items():
key_comparison = key_info['comparison']
key_value = key_info['value']
extra_filter = key_info.get('extra_filter')
if key_value is not None:
params_key = f"tmp_params_{params_index}"
params[params_key] = sub_info['value']
sub_statement += f" AND {key} {sub_info['comparison']} :{params_key}"
sub_statement = f"({sub_statement})"
field_statement.append(sub_statement)
params_index += 1
params[params_key] = key_value
sub_statement = f"{key} {key_comparison} :{params_key}"
else:
sub_statement = f"{key} {key_comparison}"
if extra_filter:
for sub_info in extra_filter:
params_index += 1
params_key = f"tmp_params_{params_index}"
params[params_key] = sub_info['value']
sub_statement += f" AND {key} {sub_info['comparison']} :{params_key}"
sub_statement = f"({sub_statement})"
field_statement.append(sub_statement)
params_index += 1
field_statement = f" {logical} ".join(field_statement)
statement += f"({field_statement})"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
from bisheng.common.constants.vectorstore_metadata import KNOWLEDGE_RAG_METADATA_SCHEMA
from bisheng.common.dependencies.user_deps import UserPayload
from bisheng.common.errcode.http_error import UnAuthorizedError
from bisheng.common.errcode.knowledge import KnowledgeFileNotExistError, KnowledgeMetadataFieldNotExistError
from bisheng.common.errcode.knowledge import KnowledgeFileNotExistError, KnowledgeMetadataFieldNotExistError, \
KnowledgeMetadataFieldExistError, KnowledgeMetadataValueTypeConvertError
from bisheng.database.models.role_access import AccessType
from bisheng.database.models.user import UserDao
from bisheng.knowledge.domain import utils
Expand Down Expand Up @@ -188,59 +189,60 @@ async def add_file_user_metadata(self, login_user: 'UserPayload', knowledge_id:
metadata_field_dict = {item['field_name']: MetadataField(**item) for item in
knowledge_model.metadata_fields or []}

knowledge_file_ids = [req.knowledge_file_id for req in add_file_metadata_req]
existing_files = await self.knowledge_file_repository.find_by_ids(knowledge_file_ids)
existing_files = await self.knowledge_file_repository.find_by_ids(
[req.knowledge_file_id for req in add_file_metadata_req])

# Map existing file IDs for quick lookup
existing_file_ids = [file.id for file in existing_files]

for knowledge_file_id in knowledge_file_ids:
if knowledge_file_id not in existing_file_ids:
raise KnowledgeFileNotExistError(msg="知识库文件ID:{knowledge_file_id} 不存在",
knowledge_file_id=knowledge_file_id)

add_field_name_set = set([item.field_name for req in add_file_metadata_req for item in req.user_metadata_list])
for field_name in add_field_name_set:
if field_name not in metadata_field_dict.keys():
raise KnowledgeMetadataFieldNotExistError(field_name=field_name)
existing_files_dict = {file.id: file for file in existing_files}

updated_knowledge_files = []

for modify_file_metadata_req in add_file_metadata_req:
knowledge_file_model = await self.knowledge_file_repository.find_by_id(
entity_id=modify_file_metadata_req.knowledge_file_id)
knowledge_file_model = existing_files_dict.get(modify_file_metadata_req.knowledge_file_id)

# Check if knowledge file exists
if not knowledge_file_model:
logger.warning(f"Knowledge file ID {modify_file_metadata_req.knowledge_file_id} does not exist.")
continue
raise KnowledgeFileNotExistError(
msg=f"知识库文件ID:{modify_file_metadata_req.knowledge_file_id} 不存在")

# Initialize metadata if it's None
if knowledge_file_model.user_metadata is None:
knowledge_file_model.user_metadata = {}

current_user_metadata = copy.deepcopy(knowledge_file_model.user_metadata)

# 添加新的元数据字段,避免重复添加相同字段
for item in modify_file_metadata_req.user_metadata_list:
if item.field_name in metadata_field_dict.keys() and item.field_name not in current_user_metadata.keys():
item_dict = item.model_dump()
# 数据类型转换
try:
field_type = metadata_field_dict[item.field_name].field_type
field_value = utils.metadata_value_type_convert(
value=item_dict['field_value'], target_type=field_type)
item_dict['field_value'] = field_value
except Exception as e:
logger.error(f"Metadata value type conversion error: {e}")
continue
item_dict['field_type'] = metadata_field_dict[item.field_name].field_type
item_dict.pop('field_name')
current_user_metadata[item.field_name] = item_dict
if item.field_name not in metadata_field_dict.keys():
raise KnowledgeMetadataFieldNotExistError(field_name=item.field_name)

elif item.field_name in current_user_metadata.keys():
raise KnowledgeMetadataFieldExistError(
field_name=item.field_name,
msg=f"知识库文件ID:{modify_file_metadata_req.knowledge_file_id} 已存在元数据字段:{item.field_name}"
)

item_dict = item.model_dump()
# 数据类型转换
try:
field_type = metadata_field_dict[item.field_name].field_type
field_value = utils.metadata_value_type_convert(
value=item_dict['field_value'], target_type=field_type)
item_dict['field_value'] = field_value
except Exception as e:
raise KnowledgeMetadataValueTypeConvertError(
msg=f"元数据字段 {item.field_name} 值类型转换错误: {e}")

item_dict['field_type'] = metadata_field_dict[item.field_name].field_type
item_dict.pop('field_name')
current_user_metadata[item.field_name] = item_dict

# 更新知识文件的用户元数据
knowledge_file_model.user_metadata = current_user_metadata
knowledge_file_model.updater_id = login_user.user_id

updated_knowledge_files.append(knowledge_file_model)

# 批量更新知识文件
for knowledge_file_model in updated_knowledge_files:
knowledge_file_model = await self.knowledge_file_repository.update(knowledge_file_model)

user_metadata = {key: value.get('field_value') for key, value in knowledge_file_model.user_metadata.items()}
Expand All @@ -258,8 +260,6 @@ async def add_file_user_metadata(self, login_user: 'UserPayload', knowledge_id:
user_metadata=user_metadata
)

updated_knowledge_files.append(knowledge_file_model)

return updated_knowledge_files

# Batch modify file user metadata
Expand All @@ -282,15 +282,19 @@ async def batch_modify_file_user_metadata(self, login_user: 'UserPayload',
metadata_field_dict = {item['field_name']: MetadataField(**item) for item in
knowledge_model.metadata_fields or []}

existing_files = await self.knowledge_file_repository.find_by_ids(
[req.knowledge_file_id for req in modify_file_metadata_reqs])

existing_files_dict = {file.id: file for file in existing_files}

updated_knowledge_files = []

for modify_file_metadata_req in modify_file_metadata_reqs:
knowledge_file_model = await self.knowledge_file_repository.find_by_id(
entity_id=modify_file_metadata_req.knowledge_file_id)
knowledge_file_model = existing_files_dict.get(modify_file_metadata_req.knowledge_file_id)

if not knowledge_file_model:
logger.warning(f"Knowledge file ID {modify_file_metadata_req.knowledge_file_id} does not exist.")
continue
raise KnowledgeFileNotExistError(
msg=f"知识库文件ID:{modify_file_metadata_req.knowledge_file_id} 不存在")

# Initialize metadata if it's None
if knowledge_file_model.user_metadata is None:
Expand All @@ -300,25 +304,37 @@ async def batch_modify_file_user_metadata(self, login_user: 'UserPayload',

# 更新知识文件的用户元数据
for item in modify_file_metadata_req.user_metadata_list:
if item.field_name in metadata_field_dict.keys():
# 查找是否已存在该字段
existing_item = next(
(v for k, v in current_user_metadata.items() if k == item.field_name), None)
if existing_item:
try:
# 数据类型
field_type = metadata_field_dict[item.field_name].field_type
# 更新已有字段的值和更新时间
field_value = utils.metadata_value_type_convert(
value=existing_item['field_value'], target_type=field_type)
existing_item['field_value'] = field_value
existing_item['updated_at'] = item.updated_at
except Exception as e:
logger.error(f"Metadata value type conversion error: {e}")

if item.field_name not in metadata_field_dict.keys():
raise KnowledgeMetadataFieldNotExistError(field_name=item.field_name)

if item.field_name not in current_user_metadata.keys():
raise KnowledgeMetadataFieldNotExistError(
field_name=item.field_name,
msg=f"知识库文件ID:{modify_file_metadata_req.knowledge_file_id} 不存在元数据字段:{item.field_name}"
)

existing_item = current_user_metadata.get(item.field_name)
try:
# 数据类型
field_type = metadata_field_dict[item.field_name].field_type
# 更新已有字段的值和更新时间
field_value = utils.metadata_value_type_convert(
value=item.field_value, target_type=field_type)
existing_item['field_value'] = field_value
existing_item['updated_at'] = item.updated_at
current_user_metadata[item.field_name] = existing_item
except Exception as e:
raise KnowledgeMetadataValueTypeConvertError(
msg=f"元数据字段 {item.field_name} 值类型转换错误: {e}")

knowledge_file_model.user_metadata = current_user_metadata
knowledge_file_model.updater_id = login_user.user_id

updated_knowledge_files.append(knowledge_file_model)

# 批量更新知识文件
for knowledge_file_model in updated_knowledge_files:
knowledge_file_model = await self.knowledge_file_repository.update(knowledge_file_model)

user_metadata = {key: value.get('field_value') for key, value in knowledge_file_model.user_metadata.items()}
Expand All @@ -335,8 +351,6 @@ async def batch_modify_file_user_metadata(self, login_user: 'UserPayload',
user_metadata=user_metadata
)

updated_knowledge_files.append(knowledge_file_model)

return updated_knowledge_files

async def batch_delete_file_user_metadata(self, login_user: 'UserPayload',
Expand Down Expand Up @@ -365,28 +379,39 @@ async def batch_delete_file_user_metadata(self, login_user: 'UserPayload',
):
raise UnAuthorizedError()

existing_files = await self.knowledge_file_repository.find_by_ids(
[req.knowledge_file_id for req in delete_user_metadata_req])

existing_files_dict = {file.id: file for file in existing_files}

updated_knowledge_files = []
for delete_metadata_req in delete_user_metadata_req:
knowledge_file_model = await self.knowledge_file_repository.find_by_id(
entity_id=delete_metadata_req.knowledge_file_id)
knowledge_file_model = existing_files_dict.get(delete_metadata_req.knowledge_file_id)

if not knowledge_file_model:
logger.warning(f"Knowledge file ID {delete_metadata_req.knowledge_file_id} does not exist.")
continue

if knowledge_file_model.user_metadata is None:
knowledge_file_model.user_metadata = {}
raise KnowledgeFileNotExistError(
msg=f"知识库文件ID:{delete_metadata_req.knowledge_file_id} 不存在")

current_user_metadata = copy.deepcopy(knowledge_file_model.user_metadata)
current_user_metadata = copy.deepcopy(knowledge_file_model.user_metadata) or {}

# 删除指定的元数据字段
for field_name in delete_metadata_req.field_names:
if field_name in current_user_metadata.keys():
current_user_metadata.pop(field_name)

if field_name not in current_user_metadata.keys():
raise KnowledgeMetadataFieldNotExistError(
field_name=field_name,
msg=f"知识库文件ID:{delete_metadata_req.knowledge_file_id} 不存在元数据字段:{field_name}"
)

current_user_metadata.pop(field_name)

knowledge_file_model.user_metadata = current_user_metadata
knowledge_file_model.updater_id = login_user.user_id

updated_knowledge_files.append(knowledge_file_model)

# 批量更新知识文件
for knowledge_file_model in updated_knowledge_files:
knowledge_file_model = await self.knowledge_file_repository.update(knowledge_file_model)

user_metadata = {key: value.get('field_value') for key, value in knowledge_file_model.user_metadata.items()}
Expand All @@ -404,8 +429,6 @@ async def batch_delete_file_user_metadata(self, login_user: 'UserPayload',
user_metadata=user_metadata
)

updated_knowledge_files.append(knowledge_file_model)

return updated_knowledge_files

async def list_knowledge_file_user_metadata(self, login_user: 'UserPayload',
Expand Down
2 changes: 1 addition & 1 deletion src/backend/bisheng/run_celery.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from bisheng.worker import bisheng_celery
from bisheng.worker.main import bisheng_celery

if __name__ == '__main__':
bisheng_celery.start(
Expand Down
2 changes: 1 addition & 1 deletion src/backend/bisheng/worker/knowledge/file_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
QAKnowledge,
)
from bisheng.utils import generate_uuid
from bisheng.worker import bisheng_celery
from bisheng.worker.main import bisheng_celery
from bisheng_langchain.vectorstores import ElasticKeywordsSearch, Milvus


Expand Down
2 changes: 1 addition & 1 deletion src/backend/bisheng/worker/knowledge/qa.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
)
from bisheng.interface.embeddings.custom import FakeEmbedding
from bisheng.llm.domain import LLMService
from bisheng.worker import bisheng_celery
from bisheng.worker.main import bisheng_celery
from bisheng_langchain.vectorstores import Milvus, ElasticKeywordsSearch


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
KnowledgeFileDao,
KnowledgeFileStatus
)
from bisheng.worker import bisheng_celery
from bisheng.worker.main import bisheng_celery


@bisheng_celery.task(acks_late=True)
Expand Down
4 changes: 2 additions & 2 deletions src/backend/bisheng/workflow/common/knowledge.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ def get_knowledge_filter(self, knowledge: Knowledge, parent_node: BaseNode) -> (
one.field_name: one.model_dump() for one in KNOWLEDGE_RAG_METADATA_SCHEMA if
one.field_name != "user_metadata"
}
all_filter_field = {}
all_filter_field = []
for condition in self.conditions:
if int(condition.knowledge_id) != knowledge.id:
continue
Expand All @@ -158,7 +158,7 @@ def get_knowledge_filter(self, knowledge: Knowledge, parent_node: BaseNode) -> (
else:
logger.warning(f"condition field {condition.metadata_field} not in knowledge metadata fields")
raise ValueError(f"field {condition.metadata_field} not in knowledge metadata fields")
all_filter_field.update(filter_field_info)
all_filter_field.append(filter_field_info)
if not all_filter_field:
return "", {}

Expand Down
Loading