Skip to content

Commit

Permalink
Feat/0.3.1.4 (#649)
Browse files Browse the repository at this point in the history
  • Loading branch information
yaojin3616 committed Jun 17, 2024
2 parents 1ae528b + 84b80c5 commit 4ade957
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 14 deletions.
9 changes: 5 additions & 4 deletions src/backend/bisheng/api/v1/qa.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import asyncio
import json
from typing import List
from typing import Annotated, List

from bisheng.api.v1.schemas import UnifiedResponseModel, resp_200
from bisheng.database.base import session_getter
from bisheng.database.models.knowledge_file import KnowledgeFile
from bisheng.database.models.recall_chunk import RecallChunk
from bisheng.utils.minio_client import MinioClient
from fastapi import APIRouter, HTTPException
from fastapi import APIRouter, Body, HTTPException
from sqlmodel import select

# build router
Expand Down Expand Up @@ -35,8 +35,9 @@ async def get_answer_keyword(message_id: int):
raise HTTPException(status_code=500, detail='后台处理中,稍后再试')


@router.get('/chunk', status_code=200)
def get_original_file(*, message_id: int, keys: str):
@router.post('/chunk', status_code=200)
def get_original_file(*, message_id: Annotated[int, Body(embed=True)],
keys: Annotated[str, Body(embed=True)]):
# 获取命中的key
with session_getter() as session:
chunks = session.exec(
Expand Down
8 changes: 6 additions & 2 deletions src/backend/bisheng/chat/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,15 +203,19 @@ async def handle_gpts_message(self, message: Dict[any, any]):
if isinstance(one, AIMessage):
answer += one.content

# todo: 后续优化代码解释器的实现方案,保证输出的文件可以公开访问
# todo: 后续优化代码解释器的实现方案,保证输出的文件可以公开访问 ugly solve
# 获取minio的share地址,把share域名去掉, 为毕昇的部署方案特殊处理下
if gpts_tool_conf := self.gpts_conf.get('tools'):
if bisheng_code_conf := gpts_tool_conf.get("bisheng_code_interpreter"):
answer = answer.replace(f"http://{bisheng_code_conf['minio']['MINIO_SHAREPOIN']}", "")
answer_end_type = 'end'
# 如果是流式的llm则用end_cover结束, 覆盖之前流式的输出
if getattr(self.gpts_agent.llm, 'streaming', False):
answer_end_type = 'end_cover'

res = await self.add_message('bot', answer, 'answer')
await self.send_response('answer', 'start', '')
await self.send_response('answer', 'end_cover', answer, message_id=res.id if res else None)
await self.send_response('answer', answer_end_type, answer, message_id=res.id if res else None)
logger.info(f'gptsAgentOver assistant_id:{self.client_id} chat_id:{self.chat_id} question:{input_msg}')
logger.info(f'gptsAgentOver assistant_id:{self.client_id} chat_id:{self.chat_id} answer:{answer}')
except Exception as e:
Expand Down
13 changes: 7 additions & 6 deletions src/backend/bisheng/chat/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,12 +278,13 @@ async def process_autogen(self, session: ChatManager, client_id: str, chat_id: s
logger.error(f'act=auto_gen act={action}')
else:
# 普通技能的stop
thread_pool.cancel_task([key]) # 将进行中的任务进行cancel
message = payload.get('inputs') or '手动停止'
res = ChatResponse(type='end', message=message)
close = ChatResponse(type='close')
await session.send_json(client_id, chat_id, res)
await session.send_json(client_id, chat_id, close)
res = thread_pool.cancel_task([key]) # 将进行中的任务进行cancel
if res[0]:
message = payload.get('inputs') or '手动停止'
res = ChatResponse(type='end', user_id=user_id, message=message)
close = ChatResponse(type='close')
await session.send_json(client_id, chat_id, res)
await session.send_json(client_id, chat_id, close)

elif action.lower() == 'continue':
# autgen_user 对话的时候,进程 wait() 需要换新
Expand Down
6 changes: 4 additions & 2 deletions src/backend/bisheng/utils/threadpool.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,14 +107,16 @@ async def as_completed(self,

def cancel_task(self, key_list: List[str]):
with self.lock:
res = []
for key in key_list:
if self.async_task.get(key):
logger.info('clean_pending_task key={}', key)
for task in self.async_task.get(key):
task.result().cancel()
res.append(task.result().cancel())
if self.future_dict.get(key):
for task in self.future_dict.get(key):
task.cancel()
res.append(task.cancel())
return res

def tear_down(self):
key_list = list(self.async_task.keys())
Expand Down

0 comments on commit 4ade957

Please sign in to comment.