-
Notifications
You must be signed in to change notification settings - Fork 91
feat: add vllm-metal #605
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
feat: add vllm-metal #605
Changes from all commits
ea914fc
2b409a6
4af3084
81286d9
0f7b99e
8626ed7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -440,74 +440,131 @@ func (c *Client) ChatWithMessagesContext(ctx context.Context, model string, conv | |
| TotalTokens int `json:"total_tokens"` | ||
| } | ||
|
|
||
| scanner := bufio.NewScanner(resp.Body) | ||
| for scanner.Scan() { | ||
| // Check if context was cancelled | ||
| select { | ||
| case <-ctx.Done(): | ||
| return assistantResponse.String(), ctx.Err() | ||
| default: | ||
| } | ||
| // Read the first line to detect if this is SSE streaming or a regular JSON response | ||
| reader := bufio.NewReader(resp.Body) | ||
| firstLine, err := reader.ReadString('\n') | ||
| if err != nil && !errors.Is(err, io.EOF) { | ||
| return assistantResponse.String(), fmt.Errorf("error reading response: %w", err) | ||
| } | ||
| firstLine = strings.TrimSpace(firstLine) | ||
|
|
||
| line := scanner.Text() | ||
| if line == "" { | ||
| continue | ||
| // Check if this is a non-streaming JSON response (doesn't start with "data: ") | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was wondering if we could use the headers to understand if its a streaming response or not. |
||
| if firstLine != "" && !strings.HasPrefix(firstLine, "data: ") { | ||
| // This might be a regular JSON response - read the rest and try to parse it | ||
| restOfBody, readErr := io.ReadAll(reader) | ||
| if readErr != nil { | ||
| return assistantResponse.String(), fmt.Errorf("error reading response body: %w", readErr) | ||
| } | ||
| fullBody := firstLine + string(restOfBody) | ||
|
|
||
| if !strings.HasPrefix(line, "data: ") { | ||
| continue | ||
| var nonStreamResp OpenAIChatResponse | ||
| if err := json.Unmarshal([]byte(fullBody), &nonStreamResp); err != nil { | ||
| return assistantResponse.String(), fmt.Errorf("error parsing response: %w", err) | ||
| } | ||
|
|
||
| data := strings.TrimPrefix(line, "data: ") | ||
|
|
||
| if data == "[DONE]" { | ||
| break | ||
| // Extract content from non-streaming response | ||
| if len(nonStreamResp.Choices) > 0 && nonStreamResp.Choices[0].Message.Content != "" { | ||
| content := nonStreamResp.Choices[0].Message.Content | ||
| outputFunc(content) | ||
| assistantResponse.WriteString(content) | ||
| } | ||
|
|
||
| var streamResp OpenAIChatResponse | ||
| if err := json.Unmarshal([]byte(data), &streamResp); err != nil { | ||
| return assistantResponse.String(), fmt.Errorf("error parsing stream response: %w", err) | ||
| if nonStreamResp.Usage != nil { | ||
| finalUsage = nonStreamResp.Usage | ||
| } | ||
|
|
||
| if streamResp.Usage != nil { | ||
| finalUsage = streamResp.Usage | ||
| } else { | ||
| // SSE streaming response - process line by line | ||
| scanner := bufio.NewScanner(reader) | ||
|
|
||
| // Process the first line if it was SSE data | ||
| if strings.HasPrefix(firstLine, "data: ") { | ||
| data := strings.TrimPrefix(firstLine, "data: ") | ||
| if data != "[DONE]" { | ||
| var streamResp OpenAIChatResponse | ||
| if err := json.Unmarshal([]byte(data), &streamResp); err == nil { | ||
| if streamResp.Usage != nil { | ||
| finalUsage = streamResp.Usage | ||
| } | ||
| if len(streamResp.Choices) > 0 { | ||
| if streamResp.Choices[0].Delta.Content != "" { | ||
| chunk := streamResp.Choices[0].Delta.Content | ||
| printerState = chatPrinterContent | ||
| outputFunc(chunk) | ||
| assistantResponse.WriteString(chunk) | ||
| } | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| if len(streamResp.Choices) > 0 { | ||
| if streamResp.Choices[0].Delta.ReasoningContent != "" { | ||
| chunk := streamResp.Choices[0].Delta.ReasoningContent | ||
| if printerState == chatPrinterContent { | ||
| outputFunc("\n\n") | ||
| } | ||
| if printerState != chatPrinterReasoning { | ||
| const thinkingHeader = "Thinking:\n" | ||
| for scanner.Scan() { | ||
| // Check if context was cancelled | ||
| select { | ||
| case <-ctx.Done(): | ||
| return assistantResponse.String(), ctx.Err() | ||
| default: | ||
| } | ||
|
|
||
| line := scanner.Text() | ||
| if line == "" { | ||
| continue | ||
| } | ||
|
|
||
| if !strings.HasPrefix(line, "data: ") { | ||
| continue | ||
| } | ||
|
|
||
| data := strings.TrimPrefix(line, "data: ") | ||
|
|
||
| if data == "[DONE]" { | ||
| break | ||
| } | ||
|
|
||
| var streamResp OpenAIChatResponse | ||
| if err := json.Unmarshal([]byte(data), &streamResp); err != nil { | ||
| return assistantResponse.String(), fmt.Errorf("error parsing stream response: %w", err) | ||
| } | ||
|
|
||
| if streamResp.Usage != nil { | ||
| finalUsage = streamResp.Usage | ||
| } | ||
|
|
||
| if len(streamResp.Choices) > 0 { | ||
| if streamResp.Choices[0].Delta.ReasoningContent != "" { | ||
| chunk := streamResp.Choices[0].Delta.ReasoningContent | ||
| if printerState == chatPrinterContent { | ||
| outputFunc("\n\n") | ||
| } | ||
| if printerState != chatPrinterReasoning { | ||
| const thinkingHeader = "Thinking:\n" | ||
| if reasoningFmt != nil { | ||
| reasoningFmt.Print(thinkingHeader) | ||
| } else { | ||
| outputFunc(thinkingHeader) | ||
| } | ||
| } | ||
| printerState = chatPrinterReasoning | ||
| if reasoningFmt != nil { | ||
| reasoningFmt.Print(thinkingHeader) | ||
| reasoningFmt.Print(chunk) | ||
| } else { | ||
| outputFunc(thinkingHeader) | ||
| outputFunc(chunk) | ||
| } | ||
| } | ||
| printerState = chatPrinterReasoning | ||
| if reasoningFmt != nil { | ||
| reasoningFmt.Print(chunk) | ||
| } else { | ||
| if streamResp.Choices[0].Delta.Content != "" { | ||
| chunk := streamResp.Choices[0].Delta.Content | ||
| if printerState == chatPrinterReasoning { | ||
| outputFunc("\n\n--\n\n") | ||
| } | ||
| printerState = chatPrinterContent | ||
| outputFunc(chunk) | ||
| assistantResponse.WriteString(chunk) | ||
| } | ||
| } | ||
| if streamResp.Choices[0].Delta.Content != "" { | ||
| chunk := streamResp.Choices[0].Delta.Content | ||
| if printerState == chatPrinterReasoning { | ||
| outputFunc("\n\n--\n\n") | ||
| } | ||
| printerState = chatPrinterContent | ||
| outputFunc(chunk) | ||
| assistantResponse.WriteString(chunk) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| if err := scanner.Err(); err != nil { | ||
| return assistantResponse.String(), fmt.Errorf("error reading response stream: %w", err) | ||
| if err := scanner.Err(); err != nil { | ||
| return assistantResponse.String(), fmt.Errorf("error reading response stream: %w", err) | ||
| } | ||
| } | ||
doringeman marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| if finalUsage != nil { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What would happen in case of error, and the error not being
io.EOF?