diff --git a/aixplain/factories/agent_factory/__init__.py b/aixplain/factories/agent_factory/__init__.py index 9440ff81..156f1b54 100644 --- a/aixplain/factories/agent_factory/__init__.py +++ b/aixplain/factories/agent_factory/__init__.py @@ -177,7 +177,9 @@ def create_model_tool( supplier = supplier_ break assert isinstance(supplier, Supplier), f"Supplier {supplier} is not a valid supplier" - return ModelTool(function=function, supplier=supplier, model=model, description=description, parameters=parameters) + return ModelTool( + function=function, supplier=supplier, model=model, name=name, description=description, parameters=parameters + ) @classmethod def create_pipeline_tool( @@ -278,7 +280,6 @@ def create_sql_tool( base_name = os.path.splitext(os.path.basename(source))[0] db_path = os.path.join(os.path.dirname(source), f"{base_name}.db") table_name = tables[0] if tables else None - try: # Create database from CSV schema = create_database_from_csv(source, db_path, table_name) diff --git a/aixplain/factories/model_factory/utils.py b/aixplain/factories/model_factory/utils.py index 589bdf68..a5f489be 100644 --- a/aixplain/factories/model_factory/utils.py +++ b/aixplain/factories/model_factory/utils.py @@ -98,6 +98,7 @@ def create_model_from_response(response: Dict) -> Model: version=response["version"]["id"], inputs=inputs, temperature=temperature, + supports_streaming=response.get("supportsStreaming", False), status=status, ) diff --git a/aixplain/modules/agent/tool/sql_tool.py b/aixplain/modules/agent/tool/sql_tool.py index f2262ee9..5b3f4178 100644 --- a/aixplain/modules/agent/tool/sql_tool.py +++ b/aixplain/modules/agent/tool/sql_tool.py @@ -285,6 +285,7 @@ def __init__( self.tables = tables if isinstance(tables, list) else [tables] if tables else None self.enable_commit = enable_commit self.status = AssetStatus.ONBOARDED # TODO: change to DRAFT when we have a way to onboard the tool + self.validate() # to upload the database def to_dict(self) -> Dict[str, Text]: return { @@ -306,7 +307,6 @@ def validate(self): raise SQLToolError("Description is required") if not self.database: raise SQLToolError("Database must be provided") - # Handle database validation if not ( str(self.database).startswith("s3://") diff --git a/aixplain/modules/model/__init__.py b/aixplain/modules/model/__init__.py index 49cf4591..92e48990 100644 --- a/aixplain/modules/model/__init__.py +++ b/aixplain/modules/model/__init__.py @@ -25,6 +25,7 @@ import traceback from aixplain.enums import Supplier, Function from aixplain.modules.asset import Asset +from aixplain.modules.model.model_response_streamer import ModelResponseStreamer from aixplain.modules.model.utils import build_payload, call_run_endpoint from aixplain.utils import config from urllib.parse import urljoin @@ -56,6 +57,7 @@ class Model(Asset): input_params (ModelParameters, optional): input parameters for the function. output_params (Dict, optional): output parameters for the function. model_params (ModelParameters, optional): parameters for the function. + supports_streaming (bool, optional): whether the model supports streaming. Defaults to False. """ def __init__( @@ -73,6 +75,7 @@ def __init__( input_params: Optional[Dict] = None, output_params: Optional[Dict] = None, model_params: Optional[Dict] = None, + supports_streaming: bool = False, status: Optional[AssetStatus] = AssetStatus.ONBOARDED, # default status for models is ONBOARDED **additional_info, ) -> None: @@ -91,6 +94,7 @@ def __init__( input_params (Dict, optional): input parameters for the function. output_params (Dict, optional): output parameters for the function. model_params (Dict, optional): parameters for the function. + supports_streaming (bool, optional): whether the model supports streaming. Defaults to False. status (AssetStatus, optional): status of the model. Defaults to None. **additional_info: Any additional Model info to be saved """ @@ -105,6 +109,7 @@ def __init__( self.input_params = input_params self.output_params = output_params self.model_params = ModelParameters(model_params) if model_params else None + self.supports_streaming = supports_streaming if isinstance(status, str): try: status = AssetStatus(status) @@ -232,6 +237,19 @@ def poll(self, poll_url: Text, name: Text = "model_process") -> ModelResponse: completed=False, ) + def run_stream( + self, + data: Union[Text, Dict], + parameters: Optional[Dict] = None, + ) -> ModelResponseStreamer: + assert self.supports_streaming, f"Model '{self.name} ({self.id})' does not support streaming" + payload = build_payload(data=data, parameters=parameters, stream=True) + url = f"{self.url}/{self.id}".replace("api/v1/execute", "api/v2/execute") + logging.debug(f"Model Run Stream: Start service for {url} - {payload}") + headers = {"x-api-key": self.api_key, "Content-Type": "application/json"} + r = _request_with_retry("post", url, headers=headers, data=payload, stream=True) + return ModelResponseStreamer(r.iter_lines(decode_unicode=True)) + def run( self, data: Union[Text, Dict], @@ -239,7 +257,8 @@ def run( timeout: float = 300, parameters: Optional[Dict] = None, wait_time: float = 0.5, - ) -> ModelResponse: + stream: bool = False, + ) -> Union[ModelResponse, ModelResponseStreamer]: """Runs a model call. Args: @@ -248,10 +267,12 @@ def run( timeout (float, optional): total polling time. Defaults to 300. parameters (Dict, optional): optional parameters to the model. Defaults to None. wait_time (float, optional): wait time in seconds between polling calls. Defaults to 0.5. - + stream (bool, optional): whether the model supports streaming. Defaults to False. Returns: - Dict: parsed output from model + Union[ModelResponse, ModelStreamer]: parsed output from model """ + if stream: + return self.run_stream(data=data, parameters=parameters) start = time.time() payload = build_payload(data=data, parameters=parameters) url = f"{self.url}/{self.id}".replace("api/v1/execute", "api/v2/execute") diff --git a/aixplain/modules/model/llm_model.py b/aixplain/modules/model/llm_model.py index 0b8e6cf8..546be1b8 100644 --- a/aixplain/modules/model/llm_model.py +++ b/aixplain/modules/model/llm_model.py @@ -25,6 +25,7 @@ import traceback from aixplain.enums import Function, Supplier from aixplain.modules.model import Model +from aixplain.modules.model.model_response_streamer import ModelResponseStreamer from aixplain.modules.model.utils import build_payload, call_run_endpoint from aixplain.utils import config from typing import Union, Optional, List, Text, Dict @@ -108,7 +109,8 @@ def run( timeout: float = 300, parameters: Optional[Dict] = None, wait_time: float = 0.5, - ) -> ModelResponse: + stream: bool = False, + ) -> Union[ModelResponse, ModelResponseStreamer]: """Synchronously running a Large Language Model (LLM) model. Args: @@ -123,9 +125,9 @@ def run( timeout (float, optional): total polling time. Defaults to 300. parameters (Dict, optional): optional parameters to the model. Defaults to None. wait_time (float, optional): wait time in seconds between polling calls. Defaults to 0.5. - + stream (bool, optional): whether the model supports streaming. Defaults to False. Returns: - Dict: parsed output from model + Union[ModelResponse, ModelStreamer]: parsed output from model """ start = time.time() parameters = parameters or {} @@ -141,10 +143,13 @@ def run( parameters.setdefault("max_tokens", max_tokens) parameters.setdefault("top_p", top_p) + if stream: + return self.run_stream(data=data, parameters=parameters) + payload = build_payload(data=data, parameters=parameters) logging.info(payload) url = f"{self.url}/{self.id}".replace("/api/v1/execute", "/api/v2/execute") - logging.debug(f"Model Run Sync: Start service for {name} - {url}") + logging.debug(f"Model Run Sync: Start service for {name} - {url} - {payload}") response = call_run_endpoint(payload=payload, url=url, api_key=self.api_key) if response["status"] == "IN_PROGRESS": try: diff --git a/aixplain/modules/model/model_response_streamer.py b/aixplain/modules/model/model_response_streamer.py new file mode 100644 index 00000000..f1fef5f8 --- /dev/null +++ b/aixplain/modules/model/model_response_streamer.py @@ -0,0 +1,28 @@ +import json +from typing import Iterator + +from aixplain.modules.model.response import ModelResponse, ResponseStatus + + +class ModelResponseStreamer: + def __init__(self, iterator: Iterator): + self.iterator = iterator + self.status = ResponseStatus.IN_PROGRESS + + def __next__(self): + """ + Returns the next chunk of the response. + """ + line = next(self.iterator).replace("data: ", "") + try: + data = json.loads(line) + except json.JSONDecodeError: + data = {"data": line} + content = data.get("data", "") + if content == "[DONE]": + self.status = ResponseStatus.SUCCESS + content = "" + return ModelResponse(status=self.status, data=content) + + def __iter__(self): + return self diff --git a/aixplain/modules/model/utility_model.py b/aixplain/modules/model/utility_model.py index 1cbbf3da..f6b69ba2 100644 --- a/aixplain/modules/model/utility_model.py +++ b/aixplain/modules/model/utility_model.py @@ -237,7 +237,6 @@ def update(self): DeprecationWarning, stacklevel=2, ) - self.validate() url = urljoin(self.backend_url, f"sdk/utilities/{self.id}") headers = {"x-api-key": f"{self.api_key}", "Content-Type": "application/json"} diff --git a/aixplain/modules/model/utils.py b/aixplain/modules/model/utils.py index 6f3f9319..cc68a347 100644 --- a/aixplain/modules/model/utils.py +++ b/aixplain/modules/model/utils.py @@ -7,12 +7,17 @@ from aixplain.exceptions import get_error_from_status_code -def build_payload(data: Union[Text, Dict], parameters: Optional[Dict] = None): +def build_payload(data: Union[Text, Dict], parameters: Optional[Dict] = None, stream: Optional[bool] = None): from aixplain.factories import FileFactory if parameters is None: parameters = {} + if stream is not None: + if "options" not in parameters: + parameters["options"] = {} + parameters["options"]["stream"] = stream + data = FileFactory.to_link(data) if isinstance(data, dict): payload = data @@ -36,6 +41,7 @@ def call_run_endpoint(url: Text, api_key: Text, payload: Dict) -> Dict: resp = "unspecified error" try: + logging.debug(f"Calling {url} with payload: {payload}") r = _request_with_retry("post", url, headers=headers, data=payload) resp = r.json() except Exception as e: diff --git a/aixplain/v2/agent.py b/aixplain/v2/agent.py index eadb155f..a493f723 100644 --- a/aixplain/v2/agent.py +++ b/aixplain/v2/agent.py @@ -131,6 +131,7 @@ def create_custom_python_code_tool( @classmethod def create_sql_tool( cls, + name: str, description: str, source: str, source_type: str, @@ -172,6 +173,7 @@ def create_sql_tool( from aixplain.factories import AgentFactory return AgentFactory.create_sql_tool( + name=name, description=description, source=source, source_type=source_type, diff --git a/tests/functional/agent/agent_functional_test.py b/tests/functional/agent/agent_functional_test.py index 73bab1fe..f4cacfbb 100644 --- a/tests/functional/agent/agent_functional_test.py +++ b/tests/functional/agent/agent_functional_test.py @@ -137,7 +137,7 @@ def test_custom_code_tool(delete_agents_and_team_agents, AgentFactory): ) assert tool is not None assert tool.description == "Add two strings" - assert tool.code == 'def main(aaa: str, bbb: str) -> str:\n """Add two strings"""\n return aaa + bbb' + assert tool.code.startswith("s3://") agent = AgentFactory.create( name="Add Strings Agent", description="Add two strings. Do not directly answer. Use the tool to add the strings.", @@ -354,6 +354,7 @@ def test_specific_model_parameters_e2e(tool_config, delete_agents_and_team_agent @pytest.mark.parametrize("AgentFactory", [AgentFactory, v2.Agent]) def test_sql_tool(delete_agents_and_team_agents, AgentFactory): assert delete_agents_and_team_agents + agent = None try: import os @@ -362,6 +363,7 @@ def test_sql_tool(delete_agents_and_team_agents, AgentFactory): f.write("") tool = AgentFactory.create_sql_tool( + name="Teste", description="Execute an SQL query and return the result", source="ftest.db", source_type="sqlite", @@ -394,11 +396,13 @@ def test_sql_tool(delete_agents_and_team_agents, AgentFactory): assert "eve" in str(response["data"]["output"]).lower() finally: os.remove("ftest.db") - agent.delete() + if agent: + agent.delete() @pytest.mark.parametrize("AgentFactory", [AgentFactory, v2.Agent]) def test_sql_tool_with_csv(delete_agents_and_team_agents, AgentFactory): assert delete_agents_and_team_agents + agent = None try: import os import pandas as pd @@ -424,7 +428,11 @@ def test_sql_tool_with_csv(delete_agents_and_team_agents, AgentFactory): # Create SQL tool from CSV tool = AgentFactory.create_sql_tool( - description="Execute SQL queries on employee data", source="test.csv", source_type="csv", tables=["employees"] + name="CSV Tool Test", + description="Execute SQL queries on employee data", + source="test.csv", + source_type="csv", + tables=["employees"], ) # Verify tool setup @@ -470,9 +478,12 @@ def test_sql_tool_with_csv(delete_agents_and_team_agents, AgentFactory): finally: # Cleanup - os.remove("test.csv") - os.remove("test.db") - agent.delete() + if agent: + agent.delete() + if os.path.exists("test.csv"): + os.remove("test.csv") + if os.path.exists("test.db"): + os.remove("test.db") @pytest.mark.parametrize("AgentFactory", [AgentFactory, v2.Agent]) @@ -499,13 +510,6 @@ def test_instructions(delete_agents_and_team_agents, AgentFactory): assert response["data"]["session_id"] is not None assert response["data"]["output"] is not None assert "aixplain" in response["data"]["output"].lower() - assert "eve" in response["data"]["output"].lower() - - import os - - # Cleanup - os.remove("test.csv") - os.remove("test.db") agent.delete() @@ -595,4 +599,3 @@ def test_agent_with_pipeline_tool(delete_agents_and_team_agents, AgentFactory): assert "hello" in answer["data"]["output"].lower() assert "hello pipeline" in answer["data"]["intermediate_steps"][0]["tool_steps"][0]["tool"].lower() - diff --git a/tests/functional/model/run_model_test.py b/tests/functional/model/run_model_test.py index dd8cb04e..7fbb74d1 100644 --- a/tests/functional/model/run_model_test.py +++ b/tests/functional/model/run_model_test.py @@ -46,6 +46,25 @@ def test_llm_run(llm_model): assert response["status"] == "SUCCESS" +def test_llm_run_stream(): + """Testing LLMs with streaming""" + from aixplain.modules.model.response import ModelResponse, ResponseStatus + from aixplain.modules.model.model_response_streamer import ModelResponseStreamer + + llm_model = ModelFactory.get("669a63646eb56306647e1091") + + assert isinstance(llm_model, LLM) + response = llm_model.run( + data="This is a test prompt where I expect you to respond with the following phrase: 'This is a test response.'", + stream=True, + ) + assert isinstance(response, ModelResponseStreamer) + for chunk in response: + assert isinstance(chunk, ModelResponse) + assert chunk.data in "This is a test response." + assert response.status == ResponseStatus.SUCCESS + + def test_run_async(): """Testing Model Async""" model = ModelFactory.get("60ddef828d38c51c5885d491") @@ -100,7 +119,6 @@ def run_index_model(index_model): pytest.param(EmbeddingModel.MULTILINGUAL_E5_LARGE, AirParams, id="AIR - Multilingual E5 Large"), pytest.param(EmbeddingModel.BGE_M3, AirParams, id="AIR - BGE M3"), pytest.param(EmbeddingModel.AIXPLAIN_LEGAL_EMBEDDINGS, AirParams, id="AIR - aiXplain Legal Embeddings"), - ], ) def test_index_model(embedding_model, supplier_params): @@ -126,7 +144,6 @@ def test_index_model(embedding_model, supplier_params): pytest.param(EmbeddingModel.MULTILINGUAL_E5_LARGE, AirParams, id="Multilingual E5 Large"), pytest.param(EmbeddingModel.BGE_M3, AirParams, id="BGE M3"), pytest.param(EmbeddingModel.AIXPLAIN_LEGAL_EMBEDDINGS, AirParams, id="aiXplain Legal Embeddings"), - ], ) def test_index_model_with_filter(embedding_model, supplier_params): diff --git a/tests/functional/model/run_utility_model_test.py b/tests/functional/model/run_utility_model_test.py index a2c67ca9..3b6616ae 100644 --- a/tests/functional/model/run_utility_model_test.py +++ b/tests/functional/model/run_utility_model_test.py @@ -272,7 +272,7 @@ def sum_numbers(num1: int, num2: int): # Verify updated state updated_model = ModelFactory.get(utility_model.id) - assert updated_model.status == AssetStatus.DRAFT + assert updated_model.status == AssetStatus.ONBOARDED assert updated_model.name == "sum_numbers_test" assert updated_model.description == "Updated to sum numbers utility" assert len(updated_model.inputs) == 2 @@ -298,17 +298,19 @@ def multiply_numbers(num1: int, num2: int): return num1 * num2 updated_model.code = multiply_numbers - assert updated_model.status == AssetStatus.DRAFT - updated_model.deploy() assert updated_model.status == AssetStatus.ONBOARDED + # the next line should raise an exception + with pytest.raises(Exception, match=".*UtilityModel is already deployed*"): + updated_model.deploy() updated_model.save() + assert updated_model.status == AssetStatus.ONBOARDED # Verify partial update final_model = ModelFactory.get(utility_model.id) assert final_model.name == "sum_numbers_test" assert final_model.description == "Updated to sum numbers utility" - assert final_model.status == AssetStatus.DRAFT + assert final_model.status == AssetStatus.ONBOARDED # Test final behavior with new function but same input field names response = final_model.run({"num1": 5, "num2": 7}) assert response.status == "SUCCESS" diff --git a/tests/functional/team_agent/team_agent_functional_test.py b/tests/functional/team_agent/team_agent_functional_test.py index 759f3920..baa0a34b 100644 --- a/tests/functional/team_agent/team_agent_functional_test.py +++ b/tests/functional/team_agent/team_agent_functional_test.py @@ -625,9 +625,9 @@ def test_team_agent_with_instructions(delete_agents_and_team_agents): assert response.status == "SUCCESS" assert "gato" in response.data["output"] - mentalist_steps = eval(response.data["intermediate_steps"][0]["output"]) + mentalist_steps = [json.loads(step) for step in eval(response.data["intermediate_steps"][0]["output"])] - called_agents = set([step["worker"] for step in mentalist_steps]) + called_agents = set([step["agent"] for step in mentalist_steps]) assert len(called_agents) == 1 assert "Agent 2" in called_agents diff --git a/tests/unit/agent/agent_factory_utils_test.py b/tests/unit/agent/agent_factory_utils_test.py index c0104f5c..f1b4da70 100644 --- a/tests/unit/agent/agent_factory_utils_test.py +++ b/tests/unit/agent/agent_factory_utils_test.py @@ -12,6 +12,7 @@ from aixplain.modules.agent import Agent from aixplain.modules.agent.agent_task import AgentTask from aixplain.factories import ModelFactory, PipelineFactory +import os @pytest.fixture @@ -143,7 +144,7 @@ def test_build_tool_error_cases(tool_dict, expected_error): "name": "Test SQL", "description": "Test SQL", "parameters": [ - {"name": "database", "value": "test_db"}, + {"name": "database", "value": "test_db.db"}, {"name": "schema", "value": "public"}, {"name": "tables", "value": "table1,table2"}, {"name": "enable_commit", "value": True}, @@ -153,7 +154,7 @@ def test_build_tool_error_cases(tool_dict, expected_error): { "name": "Test SQL", "description": "Test SQL", - "database": "test_db", + "database": "test_db.db", "schema": "public", "tables": ["table1", "table2"], "enable_commit": True, @@ -166,7 +167,7 @@ def test_build_tool_error_cases(tool_dict, expected_error): "name": "Test SQL", "description": "Test SQL with string enable_commit", "parameters": [ - {"name": "database", "value": "test_db"}, + {"name": "database", "value": "test_db.db"}, {"name": "schema", "value": "public"}, {"name": "tables", "value": "table1"}, {"name": "enable_commit", "value": True}, @@ -176,7 +177,7 @@ def test_build_tool_error_cases(tool_dict, expected_error): { "name": "Test SQL", "description": "Test SQL with string enable_commit", - "database": "test_db", + "database": "test_db.db", "schema": "public", "tables": ["table1"], "enable_commit": True, @@ -192,6 +193,8 @@ def test_build_tool_success_cases(tool_dict, expected_type, expected_attrs, mock "aixplain.modules.model.utils.parse_code_decorated", return_value=("print('Hello World')", [], "Test description", "test_name"), ) + mocker.patch("os.path.exists", lambda path: True if path == "test_db.db" else os.path.exists(path)) + mocker.patch("aixplain.factories.file_factory.FileFactory.upload", return_value="s3://mocked-file-path/test_db.db") if tool_dict["type"] == "pipeline": mocker.patch.object( PipelineFactory, @@ -205,6 +208,8 @@ def test_build_tool_success_cases(tool_dict, expected_type, expected_attrs, mock for attr, value in expected_attrs.items(): if attr == "model": assert tool.model == mock_model + elif attr == "database" and value == "test_db.db": + assert getattr(tool, attr) == "s3://mocked-file-path/test_db.db" else: assert getattr(tool, attr) == value diff --git a/tests/unit/agent/sql_tool_test.py b/tests/unit/agent/sql_tool_test.py index 1a1dbb84..9f03a279 100644 --- a/tests/unit/agent/sql_tool_test.py +++ b/tests/unit/agent/sql_tool_test.py @@ -59,7 +59,9 @@ def test_create_sql_tool(mocker, tmp_path): ) assert isinstance(tool, SQLTool) assert tool.description == "Test" - assert tool.database == db_path + assert os.path.basename(db_path) in os.path.basename(tool.database) + assert tool.database.startswith("s3://") + assert tool.database.endswith(".db") assert tool.schema == "test" assert tool.tables == ["test", "test2"] @@ -78,7 +80,7 @@ def test_create_sql_tool(mocker, tmp_path): tool_dict = tool.to_dict() assert tool_dict["description"] == "Test" assert tool_dict["parameters"] == [ - {"name": "database", "value": db_path}, + {"name": "database", "value": tool.database}, {"name": "schema", "value": "test"}, {"name": "tables", "value": "test,test2"}, {"name": "enable_commit", "value": False}, @@ -89,7 +91,8 @@ def test_create_sql_tool(mocker, tmp_path): mocker.patch("os.path.exists", return_value=True) mocker.patch("aixplain.modules.agent.tool.sql_tool.get_table_schema", return_value="CREATE TABLE test (id INTEGER)") tool.validate() - assert tool.database == "s3://test.db" + assert tool.database.startswith("s3://") + assert tool.database.endswith(".db") def test_create_database_from_csv(tmp_path): @@ -225,7 +228,8 @@ def test_create_sql_tool_with_schema_inference(tmp_path, mocker): tool.validate() assert tool.schema == schema assert tool.tables == ["test"] - assert tool.database == "s3://test.db" + assert tool.database.startswith("s3://") + assert tool.database.endswith(".db") def test_create_sql_tool_from_csv_with_warnings(tmp_path, mocker): @@ -283,7 +287,7 @@ def test_create_sql_tool_from_csv(tmp_path): assert isinstance(tool, SQLTool) assert tool.description == "Test" assert tool.database.endswith(".db") - assert os.path.exists(tool.database) + assert tool.database.startswith("s3://") # Test schema and table inference during validation try: diff --git a/tests/unit/model_test.py b/tests/unit/model_test.py index 943d501d..57ef17d0 100644 --- a/tests/unit/model_test.py +++ b/tests/unit/model_test.py @@ -25,8 +25,8 @@ from aixplain.factories import ModelFactory from aixplain.enums import Function from urllib.parse import urljoin -from aixplain.enums import ResponseStatus -from aixplain.modules.model.response import ModelResponse +from aixplain.modules.model.response import ModelResponse, ResponseStatus +from aixplain.modules.model.model_response_streamer import ModelResponseStreamer import pytest from unittest.mock import patch from aixplain.enums.asset_status import AssetStatus @@ -638,3 +638,19 @@ def test_empty_model_parameters_string(): """Test string representation of empty ModelParameters.""" params = ModelParameters({}) assert str(params) == "No parameters defined" + + +def test_model_response_streamer(): + """Test ModelResponseStreamer class.""" + streamer = ModelResponseStreamer(iter([])) + assert isinstance(streamer, ModelResponseStreamer) + assert streamer.status == ResponseStatus.IN_PROGRESS + + +def test_model_not_supports_streaming(mocker): + """Test ModelResponseStreamer class.""" + mocker.patch("aixplain.modules.model.utils.build_payload", return_value={"data": "test"}) + model = Model(id="test-id", name="Test Model", supports_streaming=False) + with pytest.raises(Exception) as excinfo: + model.run(data="test", stream=True) + assert f"Model '{model.name} ({model.id})' does not support streaming" in str(excinfo.value)