Skip to content

Commit

Permalink
Merge branch 'feat/0.2.2.6' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
yaojin3616 committed Feb 23, 2024
2 parents 660f79f + a08de45 commit 9324c16
Show file tree
Hide file tree
Showing 14 changed files with 2,983 additions and 3,265 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ jobs:
run: |
message=""
IFS=$'\n'
for line in $(echo "${{ steps.git_message.outputs.message }}"); do
for line in $(echo "${{ github.event.head_commit.message }}"); do
message+="$line\n"
done
echo "::set-output name=message::$message"
Expand Down
17 changes: 9 additions & 8 deletions docker/bisheng/config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,19 @@ database_url:
# 缓存配置 redis://[[username]:[password]]@localhost:6379/0
# 普通模式:
redis_url: "redis://redis:6379/1"
# 集群模式:

# 集群模式或者哨兵模式(只能选其一):
# redis_url:
# mode: "cluster"
# startup_nodes:
# - {"host": "192.168.106.115", "port": 6002}
# password: encrypt(gAAAAABlp4b4c59FeVGF_OQRVf6NOUIGdxq8246EBD-b0hdK_jVKRs1x4PoAn0A6C5S6IiFKmWn0Nm5eBUWu-7jxcqw6TiVjQA==)
# #sentinel
# mode: "sentinel"
# sentinel_hosts: [("redis", 6379)]
# sentinel_master: "mymaster"
# sentinel_password: encrypt(gAAAAABlp4b4c59FeVGF_OQRVf6NOUIGdxq8246EBD-b0hdK_jVKRs1x4PoAn0A6C5S6IiFKmWn0Nm5eBUWu-7jxcqw6TiVjQA==)
# db: 1
# password: encrypt(gAAAAABlp4b4c59FeVGF_OQRVf6NOUIGdxq8246EBD-b0hdK_jVKRs1x4PoAn0A6C5S6IiFKmWn0Nm5eBUWu-7jxcqw6TiVjQA==)
# #sentinel
# mode: "sentinel"
# sentinel_hosts: [("redis", 6379)]
# sentinel_master: "mymaster"
# sentinel_password: encrypt(gAAAAABlp4b4c59FeVGF_OQRVf6NOUIGdxq8246EBD-b0hdK_jVKRs1x4PoAn0A6C5S6IiFKmWn0Nm5eBUWu-7jxcqw6TiVjQA==)
# db: 1

environment:
env: dev
Expand Down
299 changes: 299 additions & 0 deletions src/backend/bisheng/api/services/knowledge_imp.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,44 @@
import base64
import json
import re
import time
from typing import List
from uuid import uuid4

import requests
from bisheng.database.base import session_getter
from bisheng.database.models.knowledge import Knowledge, KnowledgeCreate
from bisheng.database.models.knowledge_file import KnowledgeFile
from bisheng.interface.importing.utils import import_vectorstore
from bisheng.interface.initialize.loading import instantiate_vectorstore
from bisheng.settings import settings
from bisheng.utils.minio_client import MinioClient
from bisheng_langchain.document_loaders import ElemUnstructuredLoader
from bisheng_langchain.embeddings import HostEmbeddings
from bisheng_langchain.text_splitter import ElemCharacterTextSplitter
from fastapi import HTTPException
from langchain.document_loaders import (BSHTMLLoader, PyPDFLoader, TextLoader,
UnstructuredMarkdownLoader, UnstructuredPowerPointLoader,
UnstructuredWordDocumentLoader)
from langchain.embeddings.base import Embeddings
from langchain.embeddings.openai import OpenAIEmbeddings
from langchain.schema.document import Document
from langchain.text_splitter import CharacterTextSplitter
from langchain.vectorstores.base import VectorStore
from loguru import logger
from sqlmodel import select

filetype_load_map = {
'txt': TextLoader,
'pdf': PyPDFLoader,
'html': BSHTMLLoader,
'md': UnstructuredMarkdownLoader,
'doc': UnstructuredWordDocumentLoader,
'docx': UnstructuredWordDocumentLoader,
'ppt': UnstructuredPowerPointLoader,
'pptx': UnstructuredPowerPointLoader,
}


def create_knowledge(knowledge: KnowledgeCreate, user_id: int):
""" 创建知识库. """
Expand Down Expand Up @@ -36,3 +67,271 @@ def create_knowledge(knowledge: KnowledgeCreate, user_id: int):
session.commit()
session.refresh(db_knowldge)
return db_knowldge.copy()


def decide_embeddings(model: str) -> Embeddings:
"""embed method"""
model_list = settings.get_knowledge().get('embeddings')
if model == 'text-embedding-ada-002':
return OpenAIEmbeddings(**model_list.get(model))
else:
return HostEmbeddings(**model_list.get(model))


def decide_vectorstores(collection_name: str, vector_store: str,
embedding: Embeddings) -> VectorStore:
"""vector db"""
vector_config = settings.get_knowledge().get('vectorstores').get(vector_store)
if not vector_config:
# 无相关配置
return None

if vector_store == 'ElasticKeywordsSearch':
param = {'index_name': collection_name, 'embedding': embedding}
if isinstance(vector_config['ssl_verify'], str):
vector_config['ssl_verify'] = eval(vector_config['ssl_verify'])
else:
param = {'collection_name': collection_name, 'embedding': embedding}
vector_config.pop('partition_suffix', '')
vector_config.pop('is_partition', '')

param.update(vector_config)
class_obj = import_vectorstore(vector_store)
return instantiate_vectorstore(class_object=class_obj, params=param)


def addEmbedding(collection_name,
index_name,
knowledge_id: int,
model: str,
chunk_size: int,
separator: str,
chunk_overlap: int,
file_paths: List[str],
knowledge_files: List[KnowledgeFile],
callback: str,
extra_meta: str = None):
error_msg = ''
try:
vectore_client, es_client = None, None
minio_client = MinioClient()
embeddings = decide_embeddings(model)
vectore_client = decide_vectorstores(collection_name, 'Milvus', embeddings)
except Exception as e:
error_msg = 'MilvusExcept:' + str(e)
logger.exception(e)

try:
es_client = decide_vectorstores(index_name, 'ElasticKeywordsSearch', embeddings)
except Exception as e:
error_msg = error_msg + 'ESException:' + str(e)
logger.exception(e)

callback_obj = {}
for index, path in enumerate(file_paths):
ts1 = time.time()
knowledge_file = knowledge_files[index]
logger.info('process_file_begin knowledge_id={} file_name={} file_size={} ',
knowledge_files[0].knowledge_id, knowledge_file.file_name, len(file_paths))
# 原始文件保存
file_type = knowledge_file.file_name.rsplit('.', 1)[-1]
knowledge_file.object_name = f'original/{knowledge_file.id}.{file_type}'
with session_getter() as session:
session.add(knowledge_file)
session.commit()
session.refresh(knowledge_file)
if not vectore_client and not es_client:
# 设置错误
with session_getter() as session:
db_file = session.get(KnowledgeFile, knowledge_file.id)
setattr(db_file, 'status', 3)
setattr(db_file, 'remark', error_msg[:500])
session.add(db_file)
callback_obj = db_file.copy()
session.commit()
if callback:
inp = {
'file_name': knowledge_file.file_name,
'file_status': knowledge_file.status,
'file_id': callback_obj.id,
'error_msg': callback_obj.remark
}
logger.error('add_fail callback={} file_name={} status={}', callback,
callback_obj.file_name, callback_obj.status)
requests.post(url=callback, json=inp, timeout=3)
continue
try:
minio_client.upload_minio(knowledge_file.object_name, path)
texts, metadatas = read_chunk_text(path, knowledge_file.file_name, chunk_size,
chunk_overlap, separator)

if len(texts) == 0:
raise ValueError('文件解析为空')
# 溯源必须依赖minio, 后期替换更通用的oss
minio_client.upload_minio(str(knowledge_file.id), path)

logger.info(f'chunk_split file_name={knowledge_file.file_name} size={len(texts)}')
for metadata in metadatas:
metadata.update({
'file_id': knowledge_file.id,
'knowledge_id': f'{knowledge_id}',
'extra': extra_meta or ''
})

if vectore_client:
vectore_client.add_texts(texts=texts, metadatas=metadatas)

# 存储es
if es_client:
es_client.add_texts(texts=texts, metadatas=metadatas)

# 存储 mysql
with session_getter() as session:
knowledge_file.status = 2
session.add(knowledge_file)
session.commit()
session.refresh(knowledge_file)
callback_obj = knowledge_file.copy()
logger.info('process_file_done file_name={} file_id={} time_cost={}',
knowledge_file.file_name, knowledge_file.id,
time.time() - ts1)

except Exception as e:
logger.error('insert_metadata={} ', metadatas, e)
with session_getter() as session:
db_file = session.get(KnowledgeFile, knowledge_file.id)
setattr(db_file, 'status', 3)
setattr(db_file, 'remark', str(e)[:500])
session.add(db_file)
callback_obj = db_file.copy()
session.commit()
if callback:
# asyn
inp = {
'file_name': callback_obj.file_name,
'file_status': callback_obj.status,
'file_id': callback_obj.id,
'error_msg': callback_obj.remark
}
logger.info(
f'add_complete callback={callback} file_name={callback_obj.file_name} status={callback_obj.status}'
)
requests.post(url=callback, json=inp, timeout=3)


def read_chunk_text(input_file, file_name, size, chunk_overlap, separator):
if not settings.get_knowledge().get('unstructured_api_url'):
file_type = file_name.split('.')[-1]
if file_type not in filetype_load_map:
raise Exception('Unsupport file type')
loader = filetype_load_map[file_type](file_path=input_file)
separator = separator[0] if separator and isinstance(separator, list) else separator
text_splitter = CharacterTextSplitter(separator=separator,
chunk_size=size,
chunk_overlap=chunk_overlap,
add_start_index=True)
documents = loader.load()
texts = text_splitter.split_documents(documents)
raw_texts = [t.page_content for t in texts]
metadatas = [{
'bbox': json.dumps({'chunk_bboxes': t.metadata.get('chunk_bboxes', '')}),
'page': t.metadata.get('page') or 0,
'source': file_name,
'extra': ''
} for t in texts]
else:
# 如果文件不是pdf 需要内部转pdf
if file_name.rsplit('.', 1)[-1] != 'pdf':
b64_data = base64.b64encode(open(input_file, 'rb').read()).decode()
inp = dict(filename=file_name, b64_data=[b64_data], mode='topdf')
resp = requests.post(settings.get_knowledge().get('unstructured_api_url'), json=inp)
if not resp or resp.status_code != 200:
logger.error(f'file_pdf=not_success resp={resp.text}')
raise Exception(f"当前文件无法解析, {resp['status_message']}")
if len(resp.text) < 300:
logger.error(f'file_pdf=not_success resp={resp.text}')
b64_data = resp.json()['b64_pdf']
# 替换历史文件
with open(input_file, 'wb') as fout:
fout.write(base64.b64decode(b64_data))
file_name = file_name.rsplit('.', 1)[0] + '.pdf'

loader = ElemUnstructuredLoader(
file_name,
input_file,
unstructured_api_url=settings.get_knowledge().get('unstructured_api_url'))
documents = loader.load()
text_splitter = ElemCharacterTextSplitter(separators=separator,
chunk_size=size,
chunk_overlap=chunk_overlap)
texts = text_splitter.split_documents(documents)
raw_texts = [t.page_content for t in texts]
metadatas = [{
'bbox': json.dumps({'chunk_bboxes': t.metadata.get('chunk_bboxes', '')}),
'page': t.metadata.get('chunk_bboxes')[0].get('page'),
'source': t.metadata.get('source', ''),
'extra': '',
} for t in texts]
return (raw_texts, metadatas)


def text_knowledge(db_knowledge: Knowledge, db_file: KnowledgeFile, documents: List[Document]):
"""使用text 导入knowledge"""
try:
embeddings = decide_embeddings(db_knowledge.model)
vectore_client = decide_vectorstores(db_knowledge.collection_name, 'Milvus', embeddings)
index_name = db_knowledge.index_name or db_knowledge.collection_name
es_client = decide_vectorstores(index_name, 'ElasticKeywordsSearch', embeddings)
except Exception as e:
logger.exception(e)

separator = '\n\n'
chunk_size = 500
chunk_overlap = 50

text_splitter = CharacterTextSplitter(separator=separator,
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
add_start_index=True)

texts = text_splitter.split_documents(documents)

logger.info(f'chunk_split knowledge_id={db_knowledge.id} size={len(texts)}')

# 存储 mysql
file_name = documents[0].metadata.get('source')
db_file.file_name = file_name
with session_getter() as session:
session.add(db_file)
session.commit()
session.refresh(db_file)
result = db_file.model_dump()
try:
metadata = [{
'file_id': db_file.id,
'knowledge_id': f'{db_knowledge.id}',
'page': doc.metadata.pop('page', 1),
'source': doc.metadata.pop('source', ''),
'bbox': doc.metadata.pop('bbox', ''),
'extra': json.dumps(doc.metadata)
} for doc in documents]
vectore_client.add_texts(texts=[t.page_content for t in texts], metadatas=metadata)

# 存储es
if es_client:
es_client.add_texts(texts=[t.page_content for t in texts], metadatas=metadata)
db_file.status = 2
result['status'] = 2
with session_getter() as session:
session.add(db_file)
session.commit()
except Exception as e:
logger.error(e)
setattr(db_file, 'status', 3)
setattr(db_file, 'remark', str(e)[:500])
with session_getter() as session:
session.add(db_file)
session.commit()
result['status'] = 3
result['remark'] = str(e)[:500]
return result
Loading

0 comments on commit 9324c16

Please sign in to comment.