Skip to content

Commit

Permalink
Feat/report (#178)
Browse files Browse the repository at this point in the history
  • Loading branch information
yaojin3616 committed Dec 5, 2023
2 parents 325c675 + 13007b4 commit 1fc1e08
Show file tree
Hide file tree
Showing 14 changed files with 142 additions and 86 deletions.
3 changes: 3 additions & 0 deletions docker/bisheng/config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ database_url:

# 缓存配置 redis://[[username]:[password]]@localhost:6379/0
redis_url: "redis://redis:6379/0"
redis_host: [("redis")]
redis_master:
redis_password:

environment:
env: dev
Expand Down
7 changes: 5 additions & 2 deletions src/backend/bisheng/api/v1/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,11 @@ def update_flow(*,
session.add(db_flow)
session.commit()
session.refresh(db_flow)
if not get_L2_param_from_flow(db_flow.data, db_flow.id):
logger.error(f'flow_id={db_flow.id} extract file_node fail')
try:
if not get_L2_param_from_flow(db_flow.data, db_flow.id):
logger.error(f'flow_id={db_flow.id} extract file_node fail')
except Exception:
pass
return db_flow


Expand Down
24 changes: 14 additions & 10 deletions src/backend/bisheng/api/v1/knowledge.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ async def process_knowledge(*,
file_path = data.get('file_path')
auto_p = data.get('auto')
separator = data.get('separator')
chunk_overlap = 0
chunk_overlap = data.get('chunk_overlap')

if auto_p:
separator = ['\n\n', '\n', ' ', '']
Expand All @@ -118,9 +118,10 @@ async def process_knowledge(*,
filepath, file_name = file_download(path)
md5_ = filepath.rsplit('/', 1)[1].split('.')[0]
# 是否包含重复文件
repeat = session.exec(select(KnowledgeFile
).where(KnowledgeFile.md5 == md5_, KnowledgeFile.status == 2,
KnowledgeFile.knowledge_id == knowledge_id)).all()
repeat = session.exec(select(KnowledgeFile)
.where(KnowledgeFile.md5 == md5_,
KnowledgeFile.status == 2,
KnowledgeFile.knowledge_id == knowledge_id)).all()
status = 3 if repeat else 1
remark = 'file repeat' if repeat else ''
db_file = KnowledgeFile(knowledge_id=knowledge_id, file_name=file_name,
Expand All @@ -133,7 +134,7 @@ async def process_knowledge(*,
files.append(db_file)
file_paths.append(filepath)
logger.info(f'fileName={file_name} col={collection_name}')
result.append(db_file)
result.append(db_file.copy())

if not repeat:
asyncio.create_task(
Expand Down Expand Up @@ -368,7 +369,8 @@ async def addEmbedding(collection_name, model: str, chunk_size: int, separator:
session.add(db_file)
session.flush()
# 原文件
minio_client.MinioClient().upload_minio(knowledge_file.file_name, path)
object_name_original = f'original/{db_file.id}'
minio_client.MinioClient().upload_minio(object_name_original, path)

texts, metadatas = _read_chunk_text(path, knowledge_file.file_name, chunk_size,
chunk_overlap, separator)
Expand Down Expand Up @@ -416,11 +418,13 @@ def _read_chunk_text(input_file, file_name, size, chunk_overlap, separator):
b64_data = base64.b64encode(open(input_file, 'rb').read()).decode()
inp = dict(filename=file_name, b64_data=[b64_data], mode='topdf')
resp = requests.post(settings.get_knowledge().get('unstructured_api_url'),
json=inp).json()
if not resp or resp['status_code'] != 200:
logger.error(f'file_pdf=not_success resp={resp}')
json=inp)
if not resp or resp.status_code != 200:
logger.error(f'file_pdf=not_success resp={resp.text}')
raise Exception(f"当前文件无法解析, {resp['status_message']}")
b64_data = resp['b64_pdf']
if len(resp.text) < 200:
logger.error(f'file_pdf=not_success resp={resp.text}')
b64_data = resp.json()['b64_pdf']
# 替换历史文件
with open(input_file, 'wb') as fout:
fout.write(base64.b64decode(b64_data))
Expand Down
3 changes: 2 additions & 1 deletion src/backend/bisheng/api/v1/report.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ async def callback(data: dict, session: Session = Depends(get_session)):
logger.debug(f'calback={data}')
if status in {2, 6}:
# 保存回掉
logger.info(f'office_callback url={file_url}')
file = Requests().get(url=file_url)
object_name = mino_prefix+key+'.docx'
minio_client.MinioClient().upload_minio_data(object_name, file._content,
Expand Down Expand Up @@ -55,7 +56,7 @@ async def get_template(*, flow_id: str, session: Session = Depends(get_session))
db_report = Report(flow_id=flow_id)
elif db_report.object_name:
file_url = minio_client.MinioClient().get_share_link(db_report.object_name)
if not db_report.newversion_key:
if not db_report.newversion_key or not db_report.object_name:
version_key = uuid4().hex
db_report.newversion_key = version_key
session.add(db_report)
Expand Down
5 changes: 4 additions & 1 deletion src/backend/bisheng/chat/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ async def process_file(self, session: ChatManager,
input_key = langchain_object.input_keys[0]

report = ''
logger.info(f'process_file batch_question={batch_question}')
for question in batch_question:
if not question:
continue
Expand Down Expand Up @@ -252,11 +253,13 @@ async def intermediate_logs(self, session: ChatManager,
# autogen produce multi dialog
for message in intermediate_steps:
content = message.get('message')
log = message.get('log', '')
sender = message.get('sender')
receiver = message.get('receiver')
is_bot = False if receiver and receiver.get('is_bot') else True
category = message.get('category', 'processing')
msg = ChatResponse(message=content, sender=sender, receiver=receiver,
msg = ChatResponse(message=content, intermediate_steps=log,
sender=sender, receiver=receiver,
type='end', user_id=user_id, is_bot=is_bot,
category=category)
steps.append(msg)
Expand Down
19 changes: 13 additions & 6 deletions src/backend/bisheng/chat/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ def __init__(self):
self.cache_manager = cache_manager
self.cache_manager.attach(self.update)
self.in_memory_cache = InMemoryCache()
self.task_manager: List[asyncio.Task] = []

# def on_chat_history_update(self):
# """Send the last chat message to the client."""
Expand Down Expand Up @@ -150,11 +151,13 @@ async def handle_websocket(self, client_id: str, chat_id: str, websocket: WebSoc

try:
while True:

json_payload = await websocket.receive_json()
try:
payload = json.loads(json_payload)
except TypeError:
payload = json_payload

if 'clear_history' in payload:
self.chat_history.history[client_id] = []
continue
Expand All @@ -178,8 +181,9 @@ async def handle_websocket(self, client_id: str, chat_id: str, websocket: WebSoc
# should input data
langchain_obj_key = get_cache_key(client_id, chat_id)
has_file = False
if 'inputs' in payload and 'data' in payload['inputs']:
node_data = payload['inputs']['data']
if 'inputs' in payload and ('data' in payload['inputs']
or 'file_path' in payload['inputs']):
node_data = payload['inputs'].get('data') or [payload['inputs']]
gragh_data = self.refresh_graph_data(gragh_data, node_data)
self.set_cache(langchain_obj_key, None) # rebuild object
has_file = any(['InputFile' in nd.get('id') for nd in node_data])
Expand Down Expand Up @@ -215,7 +219,8 @@ async def handle_websocket(self, client_id: str, chat_id: str, websocket: WebSoc
langchain_obj = self.in_memory_cache.get(langchain_obj_key)
if isinstance(langchain_obj, Report):
action = 'report'
elif action != 'autogen' and 'data' in payload['inputs']:
elif action != 'autogen' and ('data' in payload['inputs'] or
'file_path' in payload['inputs']):
action = 'auto_file' # has input data, default is file process
# default not set, for autogen set before

Expand All @@ -231,10 +236,12 @@ async def handle_websocket(self, client_id: str, chat_id: str, websocket: WebSoc
step_resp.intermediate_steps = 'File parsing complete. Analysis starting'
await self.send_json(client_id, chat_id, step_resp, add=False)
if action == 'auto_file':
payload['inputs']['questions'] = [question for question in batch_question]
question = []
[question.extend(q) for q in batch_question]
payload['inputs']['questions'] = question

asyncio.create_task(Handler().dispatch_task(self, client_id, chat_id, action,
payload, user_id))
asyncio.create_task(Handler().dispatch_task(self, client_id, chat_id,
action, payload, user_id))

except Exception as e:
# Handle any exceptions that might occur
Expand Down
17 changes: 14 additions & 3 deletions src/backend/bisheng/graph/vertex/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,12 @@ def _build_params(self):
schema = value['value'].split('|')
if PRESET_QUESTION not in params:
params[PRESET_QUESTION] = {}
params[PRESET_QUESTION].update({inner_edge.target.id:
(inner_edge.source.id, schema)})
if inner_edge.target.id in params[PRESET_QUESTION]:
params[PRESET_QUESTION][inner_edge.target.id].append(
(inner_edge.source.id, schema))
else:
params[PRESET_QUESTION].update({inner_edge.target.id:
[(inner_edge.source.id, schema)]})

for key, value in template_dict.items():
if key == '_type' or (not value.get('show') and not value.get('value')):
Expand Down Expand Up @@ -216,7 +220,14 @@ def _build_list_of_nodes_and_update_params(self, key, nodes):
def _build_dict_of_nodes_and_update_params(self, key, dicts):
self.params[key] = {}
for k, v in dicts.items():
if self._is_node(v[1]):
if isinstance(v, list):
# loaderOutput
for k1, v1 in v:
if self._is_node(v1):
self.params[key][k] = (k1, v1.build())
else:
self.params[key][k] = (k1, v1)
elif self._is_node(v[1]):
self.params[key][k] = (v[0], v[1].build())
else:
self.params[key][k] = (v[0], v[1])
Expand Down
22 changes: 11 additions & 11 deletions src/backend/bisheng/initdb_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,18 @@ knowledges: # 知识库相关配置
vectorstores:
# Milvus 最低要求cpu 4C 8G 推荐4C 16G
Milvus: # 如果需要切换其他vectordb,确保其他服务已经启动,然后配置对应参数
connection_args: {'host': '110.16.193.170', 'port': '50032', 'user': '', 'password': '', 'secure': False}
connection_args: {'host': 'milvus', 'port': '19530', 'user': '', 'password': '', 'secure': False}
# 可选配置,有些类型的场景使用ES可以提高召回效果
# ElasticKeywordsSearch:
# elasticsearch_url: ''
# ssl_verify: "{'ca_certs': False, 'basic_auth': ('elastic', 'password'), 'verify_certs': False}"
# minio: # 如果要支持溯源功能,由于溯源会展示源文件,必须配置 oss 存储
# SCHEMA: true
# CERT_CHECK: false
# MINIO_ENDPOINT: ""
# MINIO_SHAREPOIN: ""
# MINIO_ACCESS_KEY: ""
# MINIO_SECRET_KEY: ""
ElasticKeywordsSearch:
elasticsearch_url: 'http://elasticsearch:9200'
ssl_verify: "{'basic_auth': ('elastic', 'password')}"
minio: # 如果要支持溯源功能,由于溯源会展示源文件,必须配置 oss 存储
SCHEMA: true
CERT_CHECK: false
MINIO_ENDPOINT: "milvus:9001"
MINIO_SHAREPOIN: "milvus:9001"
MINIO_ACCESS_KEY: "minioadmin"
MINIO_SECRET_KEY: "minioadmin"
#

# 全局配置大模型
Expand Down
15 changes: 7 additions & 8 deletions src/backend/bisheng/interface/initialize/loading.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import contextlib
import json
from typing import Any, Callable, Dict, List, Sequence, Type
from venv import logger

from bisheng.cache.utils import file_download
from bisheng.chat.config import ChatConfig
Expand Down Expand Up @@ -128,22 +129,20 @@ def instantiate_input_output(node_type, class_object, params, id_dict):
chain_obj['object'] = chains[index]
if id in preset_question:
chain_obj['node_id'] = preset_question[id][0]
elif 'LoaderOutputChain' in id:
chain_obj['node_id'] = id
if id in preset_question:
chain_obj['input'] = {chains[index].input_keys[0]: preset_question[id][1]}
else:
# give a default input
logger.error(f'Report has no question id={id}')
chain_obj['input'] = {chains[index].input_keys[0]: 'start'}
chain_list.append(chain_obj)
params['chains'] = chain_list
# variable
variable = params.get('variables')
variable_node_id = id_dict.get('variables')
if variable:
params['variables'] = [{'node_id': variable_node_id[0],
'input': variable}]

params['chains'] = chain_list
params['variables'] = []
for index, id in enumerate(variable_node_id):
params['variables'].append({'node_id': id,
'input': variable[index]})
return class_object(**params)
if node_type == 'InputFileNode':
file_path = class_object(**params).text()
Expand Down
1 change: 1 addition & 0 deletions src/backend/bisheng/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ class Settings(BaseSettings):
environment: Union[dict, str] = 'dev'
database_url: Optional[str] = None
redis_url: Optional[str] = None
redis: Optional[dict] = None
admin: dict = {}
cache: str = 'InMemoryCache'
remove_api_keys: bool = False
Expand Down
3 changes: 3 additions & 0 deletions src/backend/bisheng/template/frontend_node/chains.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ def format_field(field: TemplateField, name: Optional[str] = None) -> None:
if name == 'RuleBasedRouter' and field.name == 'rule_function':
field.field_type = 'function'

if name == 'LoaderOutputChain' and field.name == 'documents':
field.is_list = False

if name == 'RetrievalQA' and field.name == 'memory':
field.show = False
field.required = False
Expand Down
9 changes: 9 additions & 0 deletions src/backend/bisheng/template/frontend_node/documentloaders.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,15 @@ def add_extra_fields(self) -> None:
name='task_type',
advanced=False,
))
self.template.add_field(
TemplateField(
field_type='int',
required=True,
show=True,
name='request_timeout',
advanced=True,
value=10
))
self.template.add_field(self.file_path_templates[self.template.type_name])
elif self.template.type_name in self.file_path_templates:
self.template.add_field(self.file_path_templates[self.template.type_name])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import cv2
import fitz
import numpy as np
import requests
from bisheng_langchain.utils.requests import Requests
from langchain.docstore.document import Document
from langchain.document_loaders.base import BaseLoader
from PIL import Image
Expand Down Expand Up @@ -58,7 +58,7 @@ def __init__(self, file_path:str,
schemas: str,
elem_server_id: str,
task_type: str,
request_timeout: Optional[Union[float, Tuple[float, float]]] = 10) -> None:
request_timeout: Optional[Union[float, Tuple[float, float]]] = 30) -> None:
"""Initialize with a file path."""
self.file_path = file_path
self.elm_api_base_url = elm_api_base_url
Expand All @@ -67,13 +67,14 @@ def __init__(self, file_path:str,
self.task_type = task_type
self.schemas = set(schemas.split('|'))
self.headers = {'Authorization': f'Bearer {elm_api_key}'}
self.timeout = request_timeout
self.requests = Requests(headers=self.headers,
request_timeout=request_timeout)
if '~' in self.file_path:
self.file_path = os.path.expanduser(self.file_path)

# If the file is a web path, download it to a temporary file, and use that
if not os.path.isfile(self.file_path) and self._is_valid_url(self.file_path):
r = requests.get(self.file_path)
r = self.requests.get(self.file_path)

if r.status_code != 200:
raise ValueError(
Expand Down Expand Up @@ -117,7 +118,7 @@ def load(self) -> List[Document]:
url = self.elm_api_base_url + '/logic-job'
body = {'logic_service_id': self.elem_server_id}

resp = requests.post(url=url, data=body, files=file, headers=self.headers)
resp = self.requests.post(url=url, json={}, data=body, files=file)
if resp.status_code == 200:
task_id = resp.json().get('data').get('task_id')
if not task_id:
Expand All @@ -127,22 +128,26 @@ def load(self) -> List[Document]:
status_url = url + f'/status?task_id={task_id}'
count = 0
while True:
status = requests.get(status_url, headers=self.headers).json()
if 1 == status.get('data').get('status') and count <10:
status = self.requests.get(status_url).json()
if 1 == status.get('data').get('status'):
count += 1
sleep(2)
elif 3 == status.get('data').get('status'):
# 失败
logger.error(f'custom_kv type={self.task_type} resp={status}')
return []
else:
break
# get result
job_id = 'job_id' if self.task_type == 'logic-job' else 'task_id'
match = re.match(r'^(?:https?:\/\/)?(?:www\.)?([^\/\n]+)', self.elm_api_base_url)
detail_url = quote_plus(match.group()+f'/logic-job-detail/{task_id}')
result_url = url + f'/result?{job_id}={task_id}&detail_url={detail_url}'
result = requests.get(result_url, headers=self.headers).json()
result = requests.get(result_url, headers=self.headers).json()
result = self.requests.get(result_url).json()
# only for independent key
document_result = {}
try:
result = self.requests.get(result_url).json()
file_reuslt = result.get('data')
for result in file_reuslt:
independent = result.get('result').get('independent_list')
Expand Down
Loading

0 comments on commit 1fc1e08

Please sign in to comment.