From b9c963e5a1315b0b05475123f975973d6be6612b Mon Sep 17 00:00:00 2001 From: Leon van Zyl Date: Sat, 16 Aug 2025 07:48:55 +0200 Subject: [PATCH] doc/ fix python code to include response streaming --- .../open-webui.md | 132 ++++++++++-------- 1 file changed, 70 insertions(+), 62 deletions(-) diff --git a/en/integrations/3rd-party-platform-integration/open-webui.md b/en/integrations/3rd-party-platform-integration/open-webui.md index d5bf8cfd..2f4dbe0f 100644 --- a/en/integrations/3rd-party-platform-integration/open-webui.md +++ b/en/integrations/3rd-party-platform-integration/open-webui.md @@ -2,7 +2,7 @@ [Open WebUI](https://github.com/open-webui/open-webui) is an extensible, feature-rich, and user-friendly _self-hosted AI platform_ designed to operate entirely offline. -[Funcitons](https://docs.openwebui.com/features/plugin/functions/) are like plugins for Open WebUI. We can create a custom [Pipe Function](https://docs.openwebui.com/features/plugin/functions/pipe) that process inputs and generate responses by invoking Flowise Prediction API before returning results to the user. Through this, Flowise can be used in Open WebUI. +[Functions](https://docs.openwebui.com/features/plugin/functions/) are like plugins for Open WebUI. We can create a custom [Pipe Function](https://docs.openwebui.com/features/plugin/functions/pipe) that process inputs and generate responses by invoking Flowise Prediction API before returning results to the user. Through this, Flowise can be used in Open WebUI. ## Setup @@ -82,7 +82,7 @@ class Pipe: return [ { "id": "error", - "name": e, + "name": str(e), }, ] else: @@ -105,84 +105,92 @@ class Pipe: def pipe( self, body: dict, __user__: Optional[dict] = None, __metadata__: dict = None - ) -> Union[str, Generator, Iterator]: - """Process chat messages through Flowise""" + ): try: - print("\nProcessing Flowise request:") - print(f"Request body: {json.dumps(body, indent=2)}") - stream_enabled = body.get("stream", True) + session_id = (__metadata__ or {}).get("chat_id") or "owui-session" + # model can be "flowise." or just "" + model_name = body.get("model", "") + dot = model_name.find(".") + model_id = model_name[dot + 1 :] if dot != -1 else model_name - session_id = __metadata__['chat_id'] - - # Extract model id from the model name - model_id = body["model"][body["model"].find(".") + 1 :] - - # Extract messages from the body - messages = body.get("messages", []) + messages = body.get("messages") or [] if not messages: raise Exception("No messages found in request body") + question = self._process_message_content(messages[-1]) - # Get the current message (last message) - current_message = messages[-1] - question = self._process_message_content(current_message) - - # Prepare request payload according to Flowise API format data = { - "question": question, # Current message - "overrideConfig": { - "sessionId": session_id - }, # Optional configuration, - "streaming": stream_enabled + "question": question, + "overrideConfig": {"sessionId": session_id}, + "streaming": stream_enabled, } headers = { "Authorization": f"Bearer {self.valves.flowise_api_key}", "Content-Type": "application/json", + "Accept": "text/event-stream" if stream_enabled else "application/json", } - print("\nMaking Flowise API request:") - print(f"URL: {self.valves.flowise_url}") - print(f"Headers: {headers}") - print(f"Data: {json.dumps(data, indent=2)}") + url = f"{self.valves.flowise_url}/api/v1/prediction/{model_id}" + with requests.post( + url, json=data, headers=headers, stream=stream_enabled, timeout=60 + ) as r: + r.raise_for_status() - # Make the API request - r = requests.post( - url=f"{self.valves.flowise_url}/api/v1/prediction/{model_id}", - json=data, - headers=headers - ) - r.raise_for_status() + if stream_enabled: + # Ensure correct decoding for SSE (prevents ’ etc.) + r.encoding = "utf-8" + + for raw_line in r.iter_lines(decode_unicode=True): + if not raw_line: + continue + line = raw_line.strip() - # Return response based on streaming preference - if stream_enabled: - for line in r.iter_lines(decode_unicode=True): - if line and line.startswith('data:'): + # Skip keep-alives or non-data fields + if not line.startswith("data:"): + continue + + payload = line[5:].strip() + if payload in ("[DONE]", '"[DONE]"'): + break + + # Flowise usually sends {"event":"token","data":"..."} try: - # Remove 'data:' prefix and parse JSON - json_data = line[5:] # Remove 'data:' prefix - response = json.loads(json_data) - - # Only yield content from token events - if isinstance(response, dict) and response.get("event") == "token": - token_data = response.get("data", "") - if token_data: # Only yield non-empty tokens - yield token_data + obj = json.loads(payload) except json.JSONDecodeError: - # Skip malformed JSON lines + # Occasionally plain text arrives—stream it anyway + if payload: + yield payload continue - else: - response = r.json() - # Only return the text field from the response - if isinstance(response, dict) and "text" in response: - return response["text"] - return "" - except Exception as e: - error_msg = f"Error in Flowise pipe: {str(e)}" - print(error_msg) - return error_msg + if isinstance(obj, dict): + if obj.get("event") == "token": + token = obj.get("data") or "" + if token: + yield token + else: + # Some versions send {"data":{"text":"..."}} + data_field = obj.get("data") + if isinstance(data_field, dict): + text = data_field.get("text") + if text: + yield text + return # end streaming + + # Non-streaming fallback + resp = r.json() + return ( + resp.get("text") or (resp.get("data") or {}).get("text", "") or "" + ) + except requests.HTTPError as http_err: + try: + detail = http_err.response.text[:500] + except Exception: + detail = "" + return f"HTTP error from Flowise: {http_err.response.status_code} {detail}" + except Exception as e: + return f"Error in Flowise pipe: {e}" ``` 4. After Function has been saved, enable it, and click the settings button to put in your Flowise URL and Flowise API Key: @@ -193,9 +201,9 @@ class Pipe: 5. Now when you refresh and click New Chat, you will be able to see the list of flows. You can modify the code to show: -* Only Agentflows V2: `f"{self.valves.flowise_url}/api/v1/chatflows?type=AGENTFLOW"` -* Only Chatflows: `f"{self.valves.flowise_url}/api/v1/chatflows?type=CHATFLOW"` -* Only Assistants: `f"{self.valves.flowise_url}/api/v1/chatflows?type=ASSISTANT"` +- Only Agentflows V2: `f"{self.valves.flowise_url}/api/v1/chatflows?type=AGENTFLOW"` +- Only Chatflows: `f"{self.valves.flowise_url}/api/v1/chatflows?type=CHATFLOW"` +- Only Assistants: `f"{self.valves.flowise_url}/api/v1/chatflows?type=ASSISTANT"`