Skip to content

Commit

Permalink
Feat/0.2.3 (#351)
Browse files Browse the repository at this point in the history
提测0.2.3版本改动
  • Loading branch information
zgqgit committed Feb 23, 2024
2 parents e5f255c + 7010514 commit b91eebb
Show file tree
Hide file tree
Showing 44 changed files with 837 additions and 288 deletions.
5 changes: 5 additions & 0 deletions src/backend/bisheng/api/errcode/finetune.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,8 @@ class InvalidExtraParamsError(BaseErrorCode):
class TrainFileNotExistError(BaseErrorCode):
Code: int = 10120
Msg: str = '训练文件不存在'


class GetGPUInfoError(BaseErrorCode):
Code: int = 10125
Msg: str = '获取GPU信息失败'
4 changes: 2 additions & 2 deletions src/backend/bisheng/api/errcode/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@


# RT服务相关的返回错误码,功能模块代码:100
class NotFoundServerError(BaseErrorCode):
class NoSftServerError(BaseErrorCode):
Code: int = 10001
Msg: str = '未找到RT服务'
Msg: str = '未找到SFT服务'
124 changes: 81 additions & 43 deletions src/backend/bisheng/api/services/finetune.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,21 @@
from uuid import UUID

from bisheng.api.errcode.finetune import (CancelJobError, ChangeModelNameError, CreateFinetuneError,
DeleteJobError, ExportJobError, InvalidExtraParamsError,
JobStatusError, NotFoundJobError, TrainDataNoneError,
UnExportJobError)
DeleteJobError, ExportJobError, GetGPUInfoError,
InvalidExtraParamsError, JobStatusError, NotFoundJobError,
TrainDataNoneError, UnExportJobError)
from bisheng.api.errcode.model_deploy import NotFoundModelError
from bisheng.api.errcode.server import NotFoundServerError
from bisheng.api.errcode.server import NoSftServerError
from bisheng.api.services.rt_backend import RTBackend
from bisheng.api.services.sft_backend import SFTBackend
from bisheng.api.utils import parse_server_host
from bisheng.api.utils import parse_gpus, parse_server_host
from bisheng.api.v1.schemas import FinetuneInfoResponse, UnifiedResponseModel, resp_200
from bisheng.cache import InMemoryCache
from bisheng.database.models.finetune import (Finetune, FinetuneChangeModelName, FinetuneDao,
FinetuneExtraParams, FinetuneList, FinetuneStatus)
from bisheng.database.models.model_deploy import ModelDeploy, ModelDeployDao
from bisheng.database.models.server import ServerDao
from bisheng.database.models.server import Server, ServerDao
from bisheng.database.models.sft_model import SftModelDao
from bisheng.utils.logger import logger
from bisheng.utils.minio_client import MinioClient
from pydantic import ValidationError
Expand Down Expand Up @@ -103,17 +104,28 @@ def validate_status(cls, finetune: Finetune, new_status: int) -> UnifiedResponse
return JobStatusError.return_resp('发布完成只能变为训练成功')
return None

@classmethod
def get_sft_server(cls, server_id: int) -> Server | None:
server = cls.get_server_by_cache(server_id)
if not server:
logger.warning('not found rt server data by id: %s', server_id)
return None
if not server.sft_endpoint:
logger.warning('not found sft endpoint by id: %s', server_id)
return None
return server

@classmethod
def create_job(cls, finetune: Finetune) -> UnifiedResponseModel[Finetune]:
# 校验额外参数
validate_ret = cls.validate_params(finetune)
if validate_ret is not None:
return validate_ret

# 查找RT服务是否存在
server = ServerDao.find_server(finetune.server)
# 查找SFT服务是否存在
server = cls.get_sft_server(finetune.server)
if not server:
return NotFoundServerError.return_resp()
return NoSftServerError.return_resp()

# 查找基础模型是否存在
base_model = ModelDeployDao.find_model(finetune.base_model)
Expand All @@ -124,7 +136,7 @@ def create_job(cls, finetune: Finetune) -> UnifiedResponseModel[Finetune]:
logger.info(f'start create sft job: {finetune.id.hex}')
# 拼接指令所需的command参数
command_params = cls.parse_command_params(finetune, base_model)
sft_ret = SFTBackend.create_job(host=parse_server_host(server.endpoint),
sft_ret = SFTBackend.create_job(host=parse_server_host(server.sft_endpoint),
job_id=finetune.id.hex, params=command_params)
if not sft_ret[0]:
logger.error(f'create sft job error: job_id: {finetune.id.hex}, err: {sft_ret[1]}')
Expand All @@ -147,14 +159,14 @@ def cancel_job(cls, job_id: UUID, user: Any) -> UnifiedResponseModel[Finetune]:
if validate_ret is not None:
return validate_ret

# 查找RT服务是否存在
server = ServerDao.find_server(finetune.server)
# 查找SFT服务是否存在
server = cls.get_sft_server(finetune.server)
if not server:
return NotFoundServerError.return_resp()
return NoSftServerError.return_resp()

# 调用SFT-backend的API取消任务
logger.info(f'start cancel job_id: {job_id}, user: {user.get("user_name")}')
sft_ret = SFTBackend.cancel_job(host=parse_server_host(server.endpoint), job_id=job_id.hex)
sft_ret = SFTBackend.cancel_job(host=parse_server_host(server.sft_endpoint), job_id=job_id.hex)
if not sft_ret[0]:
logger.error(f'cancel sft job error: job_id: {job_id}, err: {sft_ret[1]}')
return CancelJobError.return_resp()
Expand All @@ -170,16 +182,16 @@ def delete_job(cls, job_id: UUID, user: Any) -> UnifiedResponseModel[Finetune]:
finetune = FinetuneDao.find_job(job_id)
if not finetune:
return NotFoundJobError.return_resp()
# 查找RT服务是否存在
server = ServerDao.find_server(finetune.server)
# 查找SFT服务是否存在
server = cls.get_sft_server(finetune.server)
if not server:
return NotFoundServerError.return_resp()
return NoSftServerError.return_resp()

model_name = cls.delete_published_model(finetune, server.endpoint)
model_name = cls.delete_published_model(finetune, server.sft_endpoint)

# 调用接口删除训练任务
logger.info(f'start delete sft job: {job_id}, user: {user.get("user_name")}')
sft_ret = SFTBackend.delete_job(host=parse_server_host(server.endpoint), job_id=job_id.hex,
sft_ret = SFTBackend.delete_job(host=parse_server_host(server.sft_endpoint), job_id=job_id.hex,
model_name=model_name)
if not sft_ret[0]:
logger.error(f'delete sft job error: job_id: {job_id}, err: {sft_ret[1]}')
Expand Down Expand Up @@ -250,14 +262,14 @@ def publish_job(cls, job_id: UUID, user: Any) -> UnifiedResponseModel[Finetune]:
if validate_ret is not None:
return validate_ret

# 查找RT服务是否存在
server = ServerDao.find_server(finetune.server)
# 查找SFT服务是否存在
server = cls.get_sft_server(finetune.server)
if not server:
return NotFoundServerError.return_resp()
return NoSftServerError.return_resp()

# 调用SFT-backend的API接口
logger.info(f'start export sft job: {job_id}, user: {user.get("user_name")}')
sft_ret = SFTBackend.publish_job(host=parse_server_host(server.endpoint), job_id=job_id.hex,
sft_ret = SFTBackend.publish_job(host=parse_server_host(server.sft_endpoint), job_id=job_id.hex,
model_name=finetune.model_name)
if not sft_ret[0]:
logger.error(f'export sft job error: job_id: {job_id}, err: {sft_ret[1]}')
Expand All @@ -268,6 +280,10 @@ def publish_job(cls, job_id: UUID, user: Any) -> UnifiedResponseModel[Finetune]:
server=str(server.id),
endpoint=f'http://{server.endpoint}/v2.1/models')
published_model = ModelDeployDao.insert_one(published_model)

# 记录可用于训练的模型名称
SftModelDao.insert_sft_model(published_model.model)

# 更新训练任务状态
logger.info('update sft job data')
finetune.status = new_status
Expand All @@ -288,17 +304,18 @@ def cancel_publish_job(cls, job_id: UUID, user: Any) -> UnifiedResponseModel[Fin
return validate_ret

# 查找RT服务是否存在
server = ServerDao.find_server(finetune.server)
server = cls.get_sft_server(finetune.server)
if not server:
return NotFoundServerError.return_resp()
return NoSftServerError.return_resp()

# 调用SFT-backend的API接口
logger.info(f'start cancel export sft job: {job_id}, user: {user.get("user_name")}')
sft_ret = SFTBackend.publish_job(host=parse_server_host(server.endpoint), job_id=job_id.hex,
model_name=finetune.model_name)
sft_ret = SFTBackend.cancel_publish_job(host=parse_server_host(server.sft_endpoint), job_id=job_id.hex,
model_name=finetune.model_name)
if not sft_ret[0]:
logger.error(f'cancel export sft job error: job_id: {job_id}, err: {sft_ret[1]}')
return UnExportJobError.return_resp()
SftModelDao.delete_sft_model(finetune.model_name)
# 删除发布的模型信息
logger.info(f'delete published model: {finetune.model_id}')
ModelDeployDao.delete_model_by_id(finetune.model_id)
Expand All @@ -323,7 +340,7 @@ def get_server_by_cache(cls, server_id: int):

@classmethod
def get_all_job(cls, req_data: FinetuneList) -> UnifiedResponseModel[List[FinetuneInfoResponse]]:
job_list = FinetuneDao.find_jobs(req_data)
job_list, total = FinetuneDao.find_jobs(req_data)
ret = []
for job in job_list:
tmp = FinetuneInfoResponse(**job.dict())
Expand All @@ -333,22 +350,17 @@ def get_all_job(cls, req_data: FinetuneList) -> UnifiedResponseModel[List[Finetu
ret.append(tmp)
# 异步线程更新任务状态
asyncio.get_event_loop().run_in_executor(sync_job_thread_pool, cls.sync_all_job_status, job_list)
return resp_200(data=ret)
return resp_200(data={'data': ret, 'total': total})

@classmethod
def sync_all_job_status(cls, job_list: List[Finetune]) -> None:
# 异步线程更新批量任务的状态
server_cache = {}
for finetune in job_list:
if finetune.server in server_cache.keys():
server = server_cache.get(finetune.server)
else:
server = ServerDao.find_server(finetune.server)
server_cache[finetune.server] = server
server = cls.get_server_by_cache(finetune.server)
if not server:
logger.error(f'server not found: {finetune.server}')
continue
cls.sync_job_status(finetune, server.endpoint)
cls.sync_job_status(finetune, server.sft_endpoint)

@classmethod
def get_job_info(cls, job_id: UUID) -> UnifiedResponseModel:
Expand All @@ -357,18 +369,19 @@ def get_job_info(cls, job_id: UUID) -> UnifiedResponseModel:
finetune = FinetuneDao.find_job(job_id)
if not finetune:
return NotFoundJobError.return_resp()
# 查找对应的RT服务
server = ServerDao.find_server(finetune.server)
# 查找对应的SFT服务
server = cls.get_sft_server(finetune.server)
if not server:
return NotFoundServerError.return_resp()
return NoSftServerError.return_resp()

base_model_name = ''
if finetune.base_model != 0:
base_model = ModelDeployDao.find_model(finetune.base_model)
if base_model:
base_model_name = base_model.model

# 同步任务执行情况
cls.sync_job_status(finetune, server.endpoint)
cls.sync_job_status(finetune, server.sft_endpoint)

# 获取日志文件
log_data = None
Expand All @@ -379,9 +392,9 @@ def get_job_info(cls, job_id: UUID) -> UnifiedResponseModel:

return resp_200(data={
'finetune': FinetuneInfoResponse(**finetune.dict(), base_model_name=base_model_name),
'log': log_data,
'log': log_data if finetune.status != FinetuneStatus.FAILED.value else finetune.reason,
'loss_data': res_data, # like [{"step": 10, "loss": 0.5}, {"step": 20, "loss": 0.3}]
'report': finetune.report,
'report': finetune.report if finetune.report else None,
})

@classmethod
Expand Down Expand Up @@ -477,14 +490,39 @@ def change_published_model_name(cls, finetune: Finetune, model_name: str) -> boo
logger.error(f'published model not found, job_id: {finetune.id.hex}, model_id: {finetune.model_id}')
return False

server = cls.get_sft_server(finetune.server)
if not server:
logger.error(f'change model server not found, job_id: {finetune.id.hex}, server_id: {finetune.server}')
return False
# 调用接口修改已发布模型的名称
sft_ret = SFTBackend.change_model_name(parse_server_host(published_model.endpoint), finetune.id.hex,
sft_ret = SFTBackend.change_model_name(parse_server_host(server.sft_endpoint), finetune.id.hex,
published_model.model, model_name)
if not sft_ret[0]:
logger.error(f'change model name error: job_id: {finetune.id.hex}, err: {sft_ret[1]}')
return False

# 修改可预训练的模型名称
SftModelDao.change_sft_model(published_model.model, model_name)

# 更新已发布模型的model_name
published_model.model = model_name
ModelDeployDao.update_model(published_model)
return True

@classmethod
def get_gpu_info(cls) -> UnifiedResponseModel:
""" 获取GPU信息 """
all_server = ServerDao.find_all_server()
res = []
for server in all_server:
if not server.sft_endpoint:
continue
sft_ret = SFTBackend.get_gpu_info(parse_server_host(server.sft_endpoint))
if not sft_ret[0]:
logger.error(f'get gpu info error: server_id: {server.id}, err: {sft_ret[1]}')
return GetGPUInfoError.return_resp()
gpu_info = parse_gpus(sft_ret[1])
for one in gpu_info:
one['server'] = server.server
res.append(one)
return resp_200(data=res)
9 changes: 8 additions & 1 deletion src/backend/bisheng/api/services/sft_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def publish_job(cls, host: str, job_id: str, model_name: str) -> (bool, str | Di
""" 发布训练任务 从训练路径到处到正式路径"""
uri = '/v2.1/sft/job/publish'
url = '/v2.1/models/sft_elem/infer'
res = requests.post(url, json={'uri': uri, 'job_id': job_id, 'model_name': model_name})
res = requests.post(f'{host}{url}', json={'uri': uri, 'job_id': job_id, 'model_name': model_name})
return cls.handle_response(res)

@classmethod
Expand Down Expand Up @@ -118,3 +118,10 @@ def change_model_name(cls, host, job_id: str, old_model_name: str, model_name: s
json={'uri': uri, 'job_id': job_id, 'old_model_name': old_model_name,
'model_name': model_name})
return cls.handle_response(res)

@classmethod
def get_gpu_info(cls, host) -> (bool, str):
""" 获取GPU信息 """
url = '/v2.1/sft/gpu'
res = requests.get(f'{host}{url}')
return cls.handle_response(res)
30 changes: 28 additions & 2 deletions src/backend/bisheng/api/utils.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import xml.dom.minidom
from typing import Dict, List

from bisheng.api.v1.schemas import StreamData
from bisheng.database.base import session_getter
from bisheng.database.models.role_access import AccessType, RoleAccess
Expand Down Expand Up @@ -228,8 +231,8 @@ def access_check(payload: dict, owner_user_id: int, target_id: int, type: Access


def get_L2_param_from_flow(
flow_data: dict,
flow_id: str,
flow_data: dict,
flow_id: str,
):
graph = Graph.from_payload(flow_data)
node_id = []
Expand Down Expand Up @@ -292,3 +295,26 @@ def parse_server_host(endpoint: str):
""" 将数据库中的endpoints解析为http请求的host """
endpoint = endpoint.replace('http://', '').split('/')[0]
return f'http://{endpoint}'


# 将 nvidia-smi -q -x 的输出解析为可视化数据
def parse_gpus(gpu_str: str) -> List[Dict]:
dom_tree = xml.dom.minidom.parseString(gpu_str)
collections = dom_tree.documentElement
gpus = collections.getElementsByTagName('gpu')
res = []
for one in gpus:
fb_mem_elem = one.getElementsByTagName('fb_memory_usage')[0]
gpu_uuid_elem = one.getElementsByTagName('uuid')[0]
gpu_id_elem = one.getElementsByTagName('minor_number')[0]
gpu_total_mem = fb_mem_elem.getElementsByTagName('total')[0]
free_mem = fb_mem_elem.getElementsByTagName('free')[0]
gpu_utility_elem = one.getElementsByTagName('utilization')[0].getElementsByTagName('gpu_util')[0]
res.append({
'gpu_uuid': gpu_uuid_elem.firstChild.data,
'gpu_id': gpu_id_elem.firstChild.data,
'gpu_total_mem': '%.2f G' % (float(gpu_total_mem.firstChild.data.split(' ')[0]) / 1024),
'gpu_used_mem': '%.2f G' % (float(free_mem.firstChild.data.split(' ')[0]) / 1024),
'gpu_utility': round(float(gpu_utility_elem.firstChild.data.split(' ')[0]) * 100, 2)
})
return res
11 changes: 10 additions & 1 deletion src/backend/bisheng/api/v1/finetune.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ async def get_job(*,
server: int = Query(default=None, description='关联的RT服务ID'),
status: str = Query(default='', title='多个以英文逗号,分隔',
description='训练任务的状态,1: 训练中 2: 训练失败 3: 任务中止 4: 训练成功 5: 发布完成'),
model_name: Optional[str] = Query(default='', description='模型名称,模糊搜索'),
page: Optional[int] = Query(default=1, description='页码'),
limit: Optional[int] = Query(default=10, description='每页条数'),
Authorize: AuthJWT = Depends()):
Expand All @@ -85,7 +86,7 @@ async def get_job(*,
status_list = []
if status.strip():
status_list = [int(one) for one in status.strip().split(',')]
req_data = FinetuneList(server=server, status=status_list, page=page, limit=limit)
req_data = FinetuneList(server=server, status=status_list, model_name=model_name, page=page, limit=limit)
return FinetuneService.get_all_job(req_data)


Expand Down Expand Up @@ -159,3 +160,11 @@ async def get_download_url(*,
return resp_200(data={
'url': download_url
})


@router.get('/gpu', response_model=UnifiedResponseModel)
async def get_gpu_info(*,
Authorize: AuthJWT = Depends()):
# get login user
Authorize.jwt_required()
return FinetuneService.get_gpu_info()
Loading

0 comments on commit b91eebb

Please sign in to comment.