Skip to content

Commit

Permalink
art: copilot non-streaming output
Browse files Browse the repository at this point in the history
  • Loading branch information
v_frgfeng committed Feb 19, 2024
1 parent 9692d29 commit 5ce8313
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 46 deletions.
18 changes: 9 additions & 9 deletions .goreleaser.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -48,31 +48,31 @@ changelog:
- go mod tidy
groups:
- title: ":arrow_up: Dependency updates" # 依赖更新
regexp: '^.*?(deps|arrow_up)(\([[:word:]]+\))??!?:.+$'
regexp: '^.*?(deps|:arrow_up)\(deps\)!?:.+$'
order: 300
- title: ":sparkles: Features" # 新功能: 例如,新功能、新页面、新组件等
regexp: '^.*?(feat|sparkles)(\([[:word:]]+\))??!?:.+$'
regexp: '^.*?(feat|:sparkles)(\([[:word:]]+\))??!?:.+$'
order: 100
- title: ":art: Refactor" # 重构: 例如,重构代码、格式化代码、移动文件等
regexp: '^.*?(refactor|art)(\([[:word:]]+\))??!?:.+$'
regexp: '^.*?(refactor|:art)(\([[:word:]]+\))??!?:.+$'
order: 150
- title: ":lock: Security updates" # 安全更新: 例如,添加安全头、更新 .gitignore、更新 .gitattributes 等
regexp: '^.*?sec(\([[:word:]]+\))??!?:.+$'
regexp: '^.*?(sec|:lock)(\([[:word:]]+\))??!?:.+$'
order: 150
- title: ":bug: Bug fixes" # 修复错误: 例如,修复 bug、修复拼写错误等
regexp: '^.*?(fix|bug)(\([[:word:]]+\))??!?:.+$'
order: 200
- title: ":zap: Perf" # 优化相关,比如提升性能、体验。
regexp: '^.*?(perf|zap)(\([[:word:]]+\))??!?:.+$'
regexp: '^.*?(perf|:zap)(\([[:word:]]+\))??!?:.+$'
order: 300
- title: "Chore" # 杂务:通常用于描述其他不属于新功能、修复错误或重构代码的提交。 例如,更新文档、清理代码、更新依赖项、删除文件等
regexp: '^.*?chore(\([[:word:]]+\))??!?:.+$'
order: 300
- title: ":memo: Documentation updates" # 文档更新
regexp: ^.*?(doc|memo)(\([[:word:]]+\))??!?:.+$
- title: ":memo: Documentation updates"
regexp: ^.*?(doc|:memo)(\([[:word:]]+\))??!?:.+$
order: 400
- title: ":lipstick: Style" # 样式更新,页面优化
regexp: ^.*?(style|lipstick)(\([[:word:]]+\))??!?:.+$
- title: ":lipstick: Style"
regexp: ^.*?(style|:lipstick)(\([[:word:]]+\))??!?:.+$
order: 400
- title: "Build process updates"
regexp: ^.*?build(\([[:word:]]+\))??!?:.+$
Expand Down
54 changes: 20 additions & 34 deletions api/copilot/copilot.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,6 @@ func (co *CopilotApi) CompletionsHandler(c *gin.Context) {
return
}

// 默认流式输出
isStream := true
if b, ok := req["stream"]; ok && b.(bool) == false {
isStream = false
}

if model, ok := req["model"]; ok && model == "gemini-pro" || model == openai.GPT432K {
global.SugarLog.Debugw("CompletionsHandler gemini-pro model", "model", model)
genApi := &genai.GenApi{}
Expand All @@ -115,7 +109,7 @@ func (co *CopilotApi) CompletionsHandler(c *gin.Context) {
response.FailWithOpenAIError(http.StatusUnauthorized, err.Error(), c)
return
}
err = CompletionsRequest(c, req, isStream, copilotToken)
err = CompletionsRequest(c, req, copilotToken)
// 如果 token 过期,重新获取一次 token
if errors.Is(err, TokenExpiredError) {
CopilotTokenCache.Delete(token) // 删除缓存
Expand All @@ -127,7 +121,7 @@ func (co *CopilotApi) CompletionsHandler(c *gin.Context) {
return
}
global.SugarLog.Infow("CompletionsHandler http get token is success")
err = CompletionsRequest(c, req, isStream, coCopilotToken)
err = CompletionsRequest(c, req, coCopilotToken)
if err != nil {
global.SugarLog.Errorw("CompletionsHandler CompletionsRequest retry request error", "err", err)
response.FailWithOpenAIError(http.StatusBadGateway, err.Error(), c)
Expand All @@ -150,19 +144,13 @@ func (co *CopilotApi) CompletionsOfficialHandler(c *gin.Context) {
return
}

// 默认流式输出
isStream := true
if b, ok := req["stream"]; ok && b.(bool) == false {
isStream = false
}

token, err := utils.GetAuthToken(c, "Bearer")
if err != nil {
global.SugarLog.Errorw("CompletionsOfficialHandler get auth token err", "err", err.Error())
response.FailWithOpenAIError(http.StatusUnauthorized, err.Error(), c)
return
}
err = CompletionsRequest(c, req, isStream, token)
err = CompletionsRequest(c, req, token)
if err != nil {
global.SugarLog.Warnw("CompletionsOfficialHandler CompletionsRequest request error", "err", err, "token", token)
response.FailWithOpenAIError(http.StatusInternalServerError, err.Error(), c)
Expand Down Expand Up @@ -190,27 +178,27 @@ func GetCopilotTokenWithCache(token string) (copilotToken string, err error) {
}

// CompletionsRequest 请求 Copilot CompletionsHandler 接口
func CompletionsRequest(c *gin.Context, req map[string]interface{}, isStream bool, copilotToken string) (err error) {
url := global.Config.Copilot.CompletionsURL
func CompletionsRequest(c *gin.Context, req map[string]interface{}, copilotToken string) (err error) {
completionsURL := global.Config.Copilot.CompletionsURL
resp, err := Client.SetRetryCount(1).R().
AddRetryCondition(func(r *resty.Response, err error) bool {
if err != nil && strings.Contains(err.Error(), "connection reset by peer") {
global.SugarLog.Warnw("CompletionsRequest Client connection reset by peer", "stream", isStream, "err", err.Error())
global.SugarLog.Warnw("CompletionsRequest Client connection reset by peer", "err", err.Error())
}
var netErr net.Error
if errors.As(err, &netErr) && netErr.Timeout() {
global.SugarLog.Warnw("CompletionsRequest Client timeout err, retry", "stream", isStream, "err", netErr.Error())
global.SugarLog.Warnw("CompletionsRequest Client timeout err, retry", "err", netErr.Error())
return true
}
return false
}).
SetDoNotParseResponse(true).
SetHeaders(GetCompletionsHeader(copilotToken)).
SetBody(req).
Post(url)
Post(completionsURL)

if err != nil {
global.SugarLog.Errorw("CompletionsRequest http error", "err", err, "url", url, "req", req, "copilotToken", copilotToken)
global.SugarLog.Errorw("CompletionsRequest http error", "err", err, "completionsURL", completionsURL, "req", req, "copilotToken", copilotToken)
response.FailWithOpenAIError(http.StatusInternalServerError, err.Error(), c)
return
}
Expand All @@ -219,33 +207,31 @@ func CompletionsRequest(c *gin.Context, req map[string]interface{}, isStream boo

respContentType := resp.Header().Get("Content-Type")
if resp.StatusCode() != http.StatusOK {
global.SugarLog.Warnw("CompletionsRequest respContentType", "respContentType", respContentType, "stream", isStream, "statusCode", resp.StatusCode())
global.SugarLog.Warnw("CompletionsRequest respContentType", "respContentType", respContentType, "statusCode", resp.StatusCode())
}

if strings.Contains(respContentType, "text/plain") {
w := c.Writer

if strings.Contains(respContentType, "text/plain") { // 有错误信息
body, err := io.ReadAll(reader)
if err != nil {
global.SugarLog.Errorw("CompletionsHandler reader body err", "stream", isStream, "err", err)
global.SugarLog.Errorw("CompletionsHandler reader body err", "err", err)
return err
}
bodyStr := strings.TrimRight(string(body), "\n")
if bodyStr == "unauthorized: token expired" {
global.SugarLog.Errorw("CompletionsHandler token expired", "stream", isStream, "body", bodyStr)
global.SugarLog.Errorw("CompletionsHandler token expired", "body", bodyStr)
return TokenExpiredError
}
global.SugarLog.Infow("CompletionsHandler response error", "stream", isStream, "body", bodyStr, "copilotToken", copilotToken)
global.SugarLog.Infow("CompletionsHandler response error", "body", bodyStr, "copilotToken", copilotToken)
response.FailWithOpenAIError(resp.StatusCode(), bodyStr, c)
return nil
}

w := c.Writer
// 非流式输出
if !isStream {
utils.SetNotStreamHeaders(c)
} else if strings.Contains(respContentType, "application/json") { // json 格式 非流式
utils.SetJsonHeaders(c)
flusher, _ := w.(http.Flusher)
body, readErr := io.ReadAll(reader)
if readErr != nil {
global.SugarLog.Errorw("CompletionsHandler reader body err", "stream", isStream, "err", readErr)
global.SugarLog.Errorw("CompletionsHandler reader body err", "respContentType", respContentType, "req", req, "err", readErr)
return readErr
}
w.Write(body)
Expand All @@ -260,7 +246,7 @@ func CompletionsRequest(c *gin.Context, req map[string]interface{}, isStream boo
if err == io.EOF {
break
} else if err != nil {
global.SugarLog.Errorw("CompletionsHandler reader err", "stream", isStream, "err", err)
global.SugarLog.Errorw("CompletionsHandler reader err", "err", err)
break
}
w.Write(line)
Expand Down
2 changes: 1 addition & 1 deletion api/genai/genai.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ type GenApi struct {
func (g *GenApi) CompletionsHandler(c *gin.Context) {
token, err := utils.GetAuthToken(c, "Bearer")
if err != nil {
global.SugarLog.Errorw("CompletionsHandler Get hearder token error", "error", err)
global.SugarLog.Errorw("CompletionsHandler Get header token error", "error", err)
response.FailWithOpenAIError(http.StatusUnauthorized, err.Error(), c)
return
}
Expand Down
14 changes: 12 additions & 2 deletions pkg/utils/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,22 @@ func SetEventStreamHeaders(c *gin.Context) {
c.Writer.Header().Set("X-Accel-Buffering", "no") // // 禁用nginx缓存,防止nginx会缓存数据导致数据流是一段一段的
}

// SetNotStreamHeaders 设置非SSE的响应头
func SetNotStreamHeaders(c *gin.Context) {
// SetJsonHeaders 设置非SSE的响应头
func SetJsonHeaders(c *gin.Context) {
c.Writer.Header().Set("Content-Type", "application/json")
c.Writer.Header().Set("Cache-Control", "no-cache")
c.Writer.Header().Set("Connection", "keep-alive")
c.Writer.Header().Set("Transfer-Encoding", "chunked")
c.Writer.Header().Set("Access-Control-Allow-Origin", "*")
c.Writer.Header().Set("X-Accel-Buffering", "no") // // 禁用nginx缓存,防止nginx会缓存数据导致数据流是一段一段的
}

// SetCustomHeaders 设置自定义的响应头
func SetCustomHeaders(c *gin.Context, contentType string) {
c.Writer.Header().Set("Content-Type", contentType)
c.Writer.Header().Set("Cache-Control", "no-cache")
c.Writer.Header().Set("Connection", "keep-alive")
c.Writer.Header().Set("Transfer-Encoding", "chunked")
c.Writer.Header().Set("Access-Control-Allow-Origin", "*")
c.Writer.Header().Set("X-Accel-Buffering", "no") // // 禁用nginx缓存,防止nginx会缓存数据导致数据流是一段一段的
}

0 comments on commit 5ce8313

Please sign in to comment.