Skip to content

Commit

Permalink
Feat/0.2.2.6 (#350)
Browse files Browse the repository at this point in the history
1. 增加浪潮YUAN 模型私有化部署和客户端应用
2. 增加知识库重试机制
3. 优化api接口
4. 解决首次注册失败问题
  • Loading branch information
yaojin3616 authored Feb 23, 2024
2 parents 9324c16 + a05b283 commit 40e5280
Show file tree
Hide file tree
Showing 12 changed files with 269 additions and 112 deletions.
3 changes: 2 additions & 1 deletion docker/bisheng/config/config.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# 数据库配置
# 数据库配置, 当前加密串的密码是1234,
# 密码加密参考 https://dataelem.feishu.cn/wiki/BSCcwKd4Yiot3IkOEC8cxGW7nPc#Gxitd1xEeof1TzxdhINcGS6JnXd
database_url:
"mysql+pymysql://root:gAAAAABlp4b4c59FeVGF_OQRVf6NOUIGdxq8246EBD-b0hdK_jVKRs1x4PoAn0A6C5S6IiFKmWn0Nm5eBUWu-7jxcqw6TiVjQA==@mysql:3306/bisheng?charset=utf8mb4"

Expand Down
52 changes: 26 additions & 26 deletions docker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ services:
start_period: 30s
interval: 20s
timeout: 10s
retries: 3
retries: 4
restart: on-failure

redis:
Expand Down Expand Up @@ -144,28 +144,28 @@ services:
timeout: 20s
retries: 3

# milvus:
# container_name: milvus-standalone
# image: milvusdb/milvus:v2.3.3
# command: ["milvus", "run", "standalone"]
# security_opt:
# - seccomp:unconfined
# environment:
# ETCD_ENDPOINTS: etcd:2379
# MINIO_ADDRESS: minio:9000
# volumes:
# - /etc/localtime:/etc/localtime:ro
# - ${DOCKER_VOLUME_DIRECTORY:-.}/data/milvus:/var/lib/milvus
# restart: on-failure
# healthcheck:
# test: ["CMD", "curl", "-f", "http://localhost:9091/healthz"]
# start_period: 90s
# interval: 30s
# timeout: 20s
# retries: 3
# ports:
# - "19530:19530"
# - "9091:9091"
# depends_on:
# - etcd
# - minio
milvus:
container_name: milvus-standalone
image: milvusdb/milvus:v2.3.3
command: ["milvus", "run", "standalone"]
security_opt:
- seccomp:unconfined
environment:
ETCD_ENDPOINTS: etcd:2379
MINIO_ADDRESS: minio:9000
volumes:
- /etc/localtime:/etc/localtime:ro
- ${DOCKER_VOLUME_DIRECTORY:-.}/data/milvus:/var/lib/milvus
restart: on-failure
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:9091/healthz"]
start_period: 90s
interval: 30s
timeout: 20s
retries: 3
ports:
- "19530:19530"
- "9091:9091"
depends_on:
- etcd
- minio
92 changes: 89 additions & 3 deletions src/backend/bisheng/api/services/knowledge_imp.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,15 @@
from bisheng.database.base import session_getter
from bisheng.database.models.knowledge import Knowledge, KnowledgeCreate
from bisheng.database.models.knowledge_file import KnowledgeFile
from bisheng.interface.embeddings.custom import FakeEmbedding
from bisheng.interface.importing.utils import import_vectorstore
from bisheng.interface.initialize.loading import instantiate_vectorstore
from bisheng.settings import settings
from bisheng.utils.minio_client import MinioClient
from bisheng_langchain.document_loaders import ElemUnstructuredLoader
from bisheng_langchain.embeddings import HostEmbeddings
from bisheng_langchain.text_splitter import ElemCharacterTextSplitter
from bisheng_langchain.vectorstores import ElasticKeywordsSearch, Milvus
from fastapi import HTTPException
from langchain.document_loaders import (BSHTMLLoader, PyPDFLoader, TextLoader,
UnstructuredMarkdownLoader, UnstructuredPowerPointLoader,
Expand All @@ -26,6 +28,8 @@
from langchain.text_splitter import CharacterTextSplitter
from langchain.vectorstores.base import VectorStore
from loguru import logger
from pymilvus import Collection
from sqlalchemy import delete
from sqlmodel import select

filetype_load_map = {
Expand Down Expand Up @@ -69,6 +73,85 @@ def create_knowledge(knowledge: KnowledgeCreate, user_id: int):
return db_knowldge.copy()


def delete_knowledge_by(knowledge: Knowledge):
# 处理vector
knowledge_id = knowledge.id
embeddings = FakeEmbedding()
vectore_client = decide_vectorstores(knowledge.collection_name, 'Milvus', embeddings)
if isinstance(vectore_client.col, Collection):
logger.info(f'delete_vectore col={knowledge.collection_name}')
if knowledge.collection_name.startswith('col'):
vectore_client.col.drop()
else:
pk = vectore_client.col.query(expr=f'knowledge_id=="{knowledge.id}"',
output_fields=['pk'])
vectore_client.col.delete(f"pk in {[p['pk'] for p in pk]}")
# 判断milvus 是否还有entity
if vectore_client.col.is_empty:
vectore_client.col.drop()

# 处理 es
# elastic
esvectore_client: 'ElasticKeywordsSearch' = decide_vectorstores(knowledge.index_name,
'ElasticKeywordsSearch',
embeddings)
if esvectore_client:
index_name = knowledge.index_name or knowledge.collection_name # 兼容老版本
res = esvectore_client.client.indices.delete(index=index_name, ignore=[400, 404])
logger.info(f'act=delete_es index={index_name} res={res}')
# 处理knowledgefile
with session_getter() as session:
session.exec(delete(KnowledgeFile).where(KnowledgeFile.knowledge_id == knowledge_id))
session.delete(knowledge)
session.commit()
return True


def delete_knowledge_file_batch(file_ids: List[int]):
""" 删除知识文件信息 """
with session_getter() as session:
knowledge_files = session.exec(
select(KnowledgeFile).where(KnowledgeFile.id.in_(file_ids))).all()
if not knowledge_files:
raise ValueError('文件ID不存在')

knowledge_ids = [file.knowledge_id for file in knowledge_files]
with session_getter() as session:
knowledges = session.exec(select(Knowledge).where(Knowledge.id.in_(knowledge_ids))).all()
knowledgeid_dict = {knowledge.id: knowledge for knowledge in knowledges}
# 处理vectordb
for file in knowledge_files:
knowledge = knowledgeid_dict.get(file.knowledge_id)
collection_name = knowledge.collection_name
embeddings = FakeEmbedding()
vectore_client = decide_vectorstores(collection_name, 'Milvus', embeddings)
if isinstance(vectore_client, Milvus) and vectore_client.col:
pk = vectore_client.col.query(expr=f'file_id == {file.id}', output_fields=['pk'])
res = vectore_client.col.delete(f"pk in {[p['pk'] for p in pk]}")
logger.info(f'act=delete_vector file_id={file.id} res={res}')

# minio
minio_client = MinioClient()
minio_client.delete_minio(str(file.id))
if file.object_name:
minio_client.delete_minio(str(file.object_name))
# elastic
index_name = knowledge.index_name or collection_name
esvectore_client = decide_vectorstores(index_name, 'ElasticKeywordsSearch', embeddings)

if esvectore_client:
res = esvectore_client.client.delete_by_query(
index=index_name, query={'match': {
'metadata.file_id': file.id
}})
logger.info(f'act=delete_es file_id={file.id} res={res}')

with session_getter() as session:
session.delete(file)
session.commit()
return True


def decide_embeddings(model: str) -> Embeddings:
"""embed method"""
model_list = settings.get_knowledge().get('embeddings')
Expand Down Expand Up @@ -130,7 +213,8 @@ def addEmbedding(collection_name,
callback_obj = {}
for index, path in enumerate(file_paths):
ts1 = time.time()
knowledge_file = knowledge_files[index]
with session_getter() as session:
knowledge_file = session.get(KnowledgeFile, knowledge_files[index].id)
logger.info('process_file_begin knowledge_id={} file_name={} file_size={} ',
knowledge_files[0].knowledge_id, knowledge_file.file_name, len(file_paths))
# 原始文件保存
Expand All @@ -142,6 +226,7 @@ def addEmbedding(collection_name,
session.refresh(knowledge_file)
if not vectore_client and not es_client:
# 设置错误
logger.error(f'no_vector_db_found err={error_msg}')
with session_getter() as session:
db_file = session.get(KnowledgeFile, knowledge_file.id)
setattr(db_file, 'status', 3)
Expand All @@ -161,7 +246,8 @@ def addEmbedding(collection_name,
requests.post(url=callback, json=inp, timeout=3)
continue
try:
minio_client.upload_minio(knowledge_file.object_name, path)
res = minio_client.upload_minio(knowledge_file.object_name, path)
logger.info('upload_original_file path={} res={}', knowledge_file.object_name, res)
texts, metadatas = read_chunk_text(path, knowledge_file.file_name, chunk_size,
chunk_overlap, separator)

Expand Down Expand Up @@ -197,7 +283,7 @@ def addEmbedding(collection_name,
time.time() - ts1)

except Exception as e:
logger.error('insert_metadata={} ', metadatas, e)
logger.error('add_vectordb {}', e)
with session_getter() as session:
db_file = session.get(KnowledgeFile, knowledge_file.id)
setattr(db_file, 'status', 3)
Expand Down
43 changes: 36 additions & 7 deletions src/backend/bisheng/api/v1/knowledge.py
Original file line number Diff line number Diff line change
Expand Up @@ -306,33 +306,62 @@ def get_filelist(*,


@router.post('/retry', status_code=200)
def retry(file_ids: List[int]):
def retry(data: dict, background_tasks: BackgroundTasks, Authorize: AuthJWT = Depends()):
"""失败重试"""
file_ids = data.get('file_ids')
with session_getter() as session:
db_files = session.exec(select(KnowledgeFile).where(KnowledgeFile.id.in_(file_ids))).all()
db_files = session.exec(
select(KnowledgeFile).where(KnowledgeFile.id.in_(file_ids),
KnowledgeFile.status == 3)).all()

separator = ['\n\n', '\n', ' ', '']
chunk_size = 500
chunk_overlap = 50
if db_files:
minio = MinioClient()
for file in db_files:
if file.remark == 'file repeat':
# 重复的文件不能重复解析
continue
# file exist
with session_getter() as session:
db_knowledge = session.get(Knowledge, file.knowledge_id)
file.status = 1 # 解析中
session.add(file)
session.commit()
session.refresh(file)
session.refresh(db_knowledge)

index_name = db_knowledge.index_name or db_knowledge.collection_name
original_file = file.object_name
file_path = file_download(minio.get_share_link(original_file))
file_url = minio.get_share_link(original_file)
if file_url:
file_path, _ = file_download(file_url)
else:
with session_getter() as session:
db_knowledge = session.get(Knowledge, file.knowledge_id)
file.status = 3 # 解析中
file.remark = '原始文件丢失'
session.commit()
continue

try:
addEmbedding(db_knowledge.collection_name, index_name, db_knowledge.id,
db_knowledge.model, chunk_size, separator, chunk_overlap, [file_path],
[file], None)
background_tasks.add_task(addEmbedding,
collection_name=db_knowledge.collection_name,
index_name=index_name,
knowledge_id=db_knowledge.id,
model=db_knowledge.model,
chunk_size=chunk_size,
separator=separator,
chunk_overlap=chunk_overlap,
file_paths=[file_path],
knowledge_files=[file],
callback=None,
extra_meta=file.extra_meta)
except Exception as e:
logger.error(e)

pass
return resp_200()


@router.delete('/{knowledge_id}', status_code=200)
Expand Down
6 changes: 4 additions & 2 deletions src/backend/bisheng/api/v1/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,11 @@ def resp_200(data: Union[list, dict, str, Any] = None,
# return data


def resp_500(data: str = None, message: str = 'BAD REQUEST') -> UnifiedResponseModel:
def resp_500(code: int = 500,
data: Union[list, dict, str, Any] = None,
message: str = 'BAD REQUEST') -> UnifiedResponseModel:
"""错误的逻辑回复"""
return UnifiedResponseModel(status_code=500, status_message=message, data=data)
return UnifiedResponseModel(status_code=code, status_message=message, data=data)


class ProcessResponse(BaseModel):
Expand Down
3 changes: 3 additions & 0 deletions src/backend/bisheng/api/v1/user.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ async def regist(*, user: UserCreate):
if not admin:
db_user.user_id = 1
db_user_role = UserRole(user_id=db_user.user_id, role_id=1)
with session_getter() as session:
session.add(db_user_role)
session.commit()

# check if user already exist
with session_getter() as session:
Expand Down
Loading

0 comments on commit 40e5280

Please sign in to comment.