diff --git a/tests/unit/vertexai/genai/replays/test_get_evaluation_run.py b/tests/unit/vertexai/genai/replays/test_get_evaluation_run.py new file mode 100644 index 0000000000..f346531c9a --- /dev/null +++ b/tests/unit/vertexai/genai/replays/test_get_evaluation_run.py @@ -0,0 +1,129 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# pylint: disable=protected-access,bad-continuation,missing-function-docstring + +from tests.unit.vertexai.genai.replays import pytest_helper +from vertexai import types +import datetime +import pytest + + +def test_get_eval_run(client): + """Tests that get_evaluation_run() returns a correctly structured EvaluationRun.""" + evaluation_run_name = ( + "projects/503583131166/locations/us-central1/evaluationRuns/1957799200510967808" + ) + evaluation_run = client.evals.get_evaluation_run(name=evaluation_run_name) + assert isinstance(evaluation_run, types.EvaluationRun) + assert evaluation_run.name == evaluation_run_name + assert evaluation_run.display_name == "test2" + assert evaluation_run.metadata == {"pipeline_id": "4460531348888616960"} + assert evaluation_run.create_time == datetime.datetime( + 2025, 9, 8, 20, 55, 41, 833176, tzinfo=datetime.timezone.utc + ) + assert evaluation_run.completion_time == datetime.datetime( + 2025, 9, 8, 20, 56, 13, 492971, tzinfo=datetime.timezone.utc + ) + assert evaluation_run.state == types.EvaluationRunState.SUCCEEDED + assert evaluation_run.evaluation_set_snapshot == ( + "projects/503583131166/locations/us-central1/evaluationSets/8069535738573619200" + ) + assert evaluation_run.data_source.bigquery_request_set == types.BigQueryRequestSet( + uri="bq://lakeyk-test-limited.inference_batch_prediction_input.1317387725199900672_1b", + prompt_column="request", + candidate_response_columns={ + "baseline_model_response": "baseline_model_response", + "checkpoint_1": "checkpoint_1", + "checkpoint_2": "checkpoint_2", + }, + ) + assert evaluation_run.error is None + + +def test_get_eval_run_bq_source(client): + """Tests that get_evaluation_run() returns a correctly structured EvaluationRun.""" + evaluation_run_name = ( + "projects/503583131166/locations/us-central1/evaluationRuns/1968424880881795072" + ) + evaluation_run = client.evals.get_evaluation_run(name=evaluation_run_name) + assert isinstance(evaluation_run, types.EvaluationRun) + assert evaluation_run.name == evaluation_run_name + assert evaluation_run.display_name == "test1" + assert evaluation_run.data_source.bigquery_request_set == types.BigQueryRequestSet( + uri="bq://lakeyk-test-limited.inference_batch_prediction_input.1317387725199900672_1b", + prompt_column="request", + rubrics_column="rubric", + candidate_response_columns={ + "baseline_model_response": "baseline_model_response", + "checkpoint_1": "checkpoint_1", + "checkpoint_2": "checkpoint_2", + }, + sampling_config=types.SamplingConfig( + sampling_count=100, + sampling_method=types.SamplingMethod.RANDOM, + sampling_duration="60s", + ), + ) + + +def test_get_eval_run_eval_set_source(client): + """Tests that get_evaluation_run() returns a correctly structured EvaluationRun.""" + evaluation_run_name = ( + "projects/503583131166/locations/us-central1/evaluationRuns/6903525647549726720" + ) + evaluation_run = client.evals.get_evaluation_run(name=evaluation_run_name) + assert isinstance(evaluation_run, types.EvaluationRun) + assert evaluation_run.name == evaluation_run_name + assert evaluation_run.display_name == "test3" + assert evaluation_run.data_source.evaluation_set == ( + "projects/503583131166/locations/us-central1/evaluationSets/6619939608513740800" + ) + assert evaluation_run.state == types.EvaluationRunState.FAILED + assert evaluation_run.error.message == ( + "code=INVALID_ARGUMENT, message=EvaluationRun 6903525647549726720 has no " + "items, cause=null" + ) + + +pytest_plugins = ("pytest_asyncio",) + + +@pytest.mark.asyncio +async def test_get_eval_run_async(client): + """Tests that get_evaluation_run() returns a correctly structured EvaluationRun.""" + eval_run_id = "1957799200510967808" + eval_run_name = ( + f"projects/503583131166/locations/us-central1/evaluationRuns/{eval_run_id}" + ) + evaluation_run = await client.aio.evals.get_evaluation_run(name=eval_run_id) + assert isinstance(evaluation_run, types.EvaluationRun) + assert evaluation_run.name == eval_run_name + assert evaluation_run.display_name == "test2" + assert evaluation_run.data_source.bigquery_request_set == types.BigQueryRequestSet( + uri="bq://lakeyk-test-limited.inference_batch_prediction_input.1317387725199900672_1b", + prompt_column="request", + candidate_response_columns={ + "baseline_model_response": "baseline_model_response", + "checkpoint_1": "checkpoint_1", + "checkpoint_2": "checkpoint_2", + }, + ) + + +pytestmark = pytest_helper.setup( + file=__file__, + globals_for_file=globals(), + test_method="evals.get_evaluation_run", +) diff --git a/vertexai/_genai/evals.py b/vertexai/_genai/evals.py index 6120bcbf95..4f5b17f500 100644 --- a/vertexai/_genai/evals.py +++ b/vertexai/_genai/evals.py @@ -36,6 +36,39 @@ logger = logging.getLogger("vertexai_genai.evals") +def _BigQueryRequestSet_from_vertex( + from_object: Union[dict[str, Any], object], + parent_object: Optional[dict[str, Any]] = None, +) -> dict[str, Any]: + to_object: dict[str, Any] = {} + if getv(from_object, ["uri"]) is not None: + setv(to_object, ["uri"], getv(from_object, ["uri"])) + + if getv(from_object, ["promptColumn"]) is not None: + setv(to_object, ["prompt_column"], getv(from_object, ["promptColumn"])) + + if getv(from_object, ["rubricsColumn"]) is not None: + setv(to_object, ["rubrics_column"], getv(from_object, ["rubricsColumn"])) + + if getv(from_object, ["candidateResponseColumns"]) is not None: + setv( + to_object, + ["candidate_response_columns"], + getv(from_object, ["candidateResponseColumns"]), + ) + + if getv(from_object, ["samplingConfig"]) is not None: + setv( + to_object, + ["sampling_config"], + _SamplingConfig_from_vertex( + getv(from_object, ["samplingConfig"]), to_object + ), + ) + + return to_object + + def _BleuInput_to_vertex( from_object: Union[dict[str, Any], object], parent_object: Optional[dict[str, Any]] = None, @@ -303,6 +336,71 @@ def _EvaluationInstance_to_vertex( return to_object +def _EvaluationRunDataSource_from_vertex( + from_object: Union[dict[str, Any], object], + parent_object: Optional[dict[str, Any]] = None, +) -> dict[str, Any]: + to_object: dict[str, Any] = {} + if getv(from_object, ["evaluationSet"]) is not None: + setv(to_object, ["evaluation_set"], getv(from_object, ["evaluationSet"])) + + if getv(from_object, ["bigqueryRequestSet"]) is not None: + setv( + to_object, + ["bigquery_request_set"], + _BigQueryRequestSet_from_vertex( + getv(from_object, ["bigqueryRequestSet"]), to_object + ), + ) + + return to_object + + +def _EvaluationRun_from_vertex( + from_object: Union[dict[str, Any], object], + parent_object: Optional[dict[str, Any]] = None, +) -> dict[str, Any]: + to_object: dict[str, Any] = {} + if getv(from_object, ["name"]) is not None: + setv(to_object, ["name"], getv(from_object, ["name"])) + + if getv(from_object, ["displayName"]) is not None: + setv(to_object, ["display_name"], getv(from_object, ["displayName"])) + + if getv(from_object, ["metadata"]) is not None: + setv(to_object, ["metadata"], getv(from_object, ["metadata"])) + + if getv(from_object, ["createTime"]) is not None: + setv(to_object, ["create_time"], getv(from_object, ["createTime"])) + + if getv(from_object, ["completionTime"]) is not None: + setv(to_object, ["completion_time"], getv(from_object, ["completionTime"])) + + if getv(from_object, ["state"]) is not None: + setv(to_object, ["state"], getv(from_object, ["state"])) + + if getv(from_object, ["evaluationSetSnapshot"]) is not None: + setv( + to_object, + ["evaluation_set_snapshot"], + getv(from_object, ["evaluationSetSnapshot"]), + ) + + if getv(from_object, ["error"]) is not None: + setv(to_object, ["error"], getv(from_object, ["error"])) + + if getv(from_object, ["dataSource"]) is not None: + setv( + to_object, + ["data_source"], + _EvaluationRunDataSource_from_vertex( + getv(from_object, ["dataSource"]), to_object + ), + ) + + return to_object + + def _ExactMatchInput_to_vertex( from_object: Union[dict[str, Any], object], parent_object: Optional[dict[str, Any]] = None, @@ -401,6 +499,20 @@ def _GenerateInstanceRubricsResponse_from_vertex( return to_object +def _GetEvaluationRunParameters_to_vertex( + from_object: Union[dict[str, Any], object], + parent_object: Optional[dict[str, Any]] = None, +) -> dict[str, Any]: + to_object: dict[str, Any] = {} + if getv(from_object, ["name"]) is not None: + setv(to_object, ["_url", "name"], getv(from_object, ["name"])) + + if getv(from_object, ["config"]) is not None: + setv(to_object, ["config"], getv(from_object, ["config"])) + + return to_object + + def _InstanceDataContents_to_vertex( from_object: Union[dict[str, Any], object], parent_object: Optional[dict[str, Any]] = None, @@ -662,6 +774,23 @@ def _Rubric_from_vertex( return to_object +def _SamplingConfig_from_vertex( + from_object: Union[dict[str, Any], object], + parent_object: Optional[dict[str, Any]] = None, +) -> dict[str, Any]: + to_object: dict[str, Any] = {} + if getv(from_object, ["samplingCount"]) is not None: + setv(to_object, ["sampling_count"], getv(from_object, ["samplingCount"])) + + if getv(from_object, ["samplingMethod"]) is not None: + setv(to_object, ["sampling_method"], getv(from_object, ["samplingMethod"])) + + if getv(from_object, ["samplingDuration"]) is not None: + setv(to_object, ["sampling_duration"], getv(from_object, ["samplingDuration"])) + + return to_object + + def _ToolCallValidInput_to_vertex( from_object: Union[dict[str, Any], object], parent_object: Optional[dict[str, Any]] = None, @@ -1014,6 +1143,59 @@ def _generate_rubrics( self._api_client._verify_response(return_value) return return_value + def _get_evaluation_run( + self, *, name: str, config: Optional[types.GetEvaluationRunConfigOrDict] = None + ) -> types.EvaluationRun: + """ + Retrieves an EvaluationRun from the resource name. + """ + + parameter_model = types._GetEvaluationRunParameters( + name=name, + config=config, + ) + + request_url_dict: Optional[dict[str, str]] + if not self._api_client.vertexai: + raise ValueError("This method is only supported in the Vertex AI client.") + else: + request_dict = _GetEvaluationRunParameters_to_vertex(parameter_model) + request_url_dict = request_dict.get("_url") + if request_url_dict: + path = "evaluationRuns/{name}".format_map(request_url_dict) + else: + path = "evaluationRuns/{name}" + + query_params = request_dict.get("_query") + if query_params: + path = f"{path}?{urlencode(query_params)}" + # TODO: remove the hack that pops config. + request_dict.pop("config", None) + + http_options: Optional[types.HttpOptions] = None + if ( + parameter_model.config is not None + and parameter_model.config.http_options is not None + ): + http_options = parameter_model.config.http_options + + request_dict = _common.convert_to_dict(request_dict) + request_dict = _common.encode_unserializable_types(request_dict) + + response = self._api_client.request("get", path, request_dict, http_options) + + response_dict = "" if not response.body else json.loads(response.body) + + if self._api_client.vertexai: + response_dict = _EvaluationRun_from_vertex(response_dict) + + return_value = types.EvaluationRun._from_response( + response=response_dict, kwargs=parameter_model.model_dump() + ) + + self._api_client._verify_response(return_value) + return return_value + def run(self) -> types.EvaluateInstancesResponse: """Evaluates an instance of a model. @@ -1398,6 +1580,23 @@ def generate_rubrics( ) return types.EvaluationDataset(eval_dataset_df=prompts_with_rubrics) + @_common.experimental_warning( + "The Vertex SDK GenAI evals.get_evaluation_run module is experimental, " + "and may change in future versions." + ) + def get_evaluation_run( + self, + *, + name: str, + config: Optional[types.GetEvaluationRunConfigOrDict] = None, + ) -> types.EvaluationRun: + """Retrieves an EvaluationRun from the resource name.""" + if not name: + raise ValueError("name cannot be empty.") + if name.startswith("projects/"): + name = name.split("/")[-1] + return self._get_evaluation_run(name=name, config=config) + class AsyncEvals(_api_module.BaseModule): @@ -1553,6 +1752,61 @@ async def _generate_rubrics( self._api_client._verify_response(return_value) return return_value + async def _get_evaluation_run( + self, *, name: str, config: Optional[types.GetEvaluationRunConfigOrDict] = None + ) -> types.EvaluationRun: + """ + Retrieves an EvaluationRun from the resource name. + """ + + parameter_model = types._GetEvaluationRunParameters( + name=name, + config=config, + ) + + request_url_dict: Optional[dict[str, str]] + if not self._api_client.vertexai: + raise ValueError("This method is only supported in the Vertex AI client.") + else: + request_dict = _GetEvaluationRunParameters_to_vertex(parameter_model) + request_url_dict = request_dict.get("_url") + if request_url_dict: + path = "evaluationRuns/{name}".format_map(request_url_dict) + else: + path = "evaluationRuns/{name}" + + query_params = request_dict.get("_query") + if query_params: + path = f"{path}?{urlencode(query_params)}" + # TODO: remove the hack that pops config. + request_dict.pop("config", None) + + http_options: Optional[types.HttpOptions] = None + if ( + parameter_model.config is not None + and parameter_model.config.http_options is not None + ): + http_options = parameter_model.config.http_options + + request_dict = _common.convert_to_dict(request_dict) + request_dict = _common.encode_unserializable_types(request_dict) + + response = await self._api_client.async_request( + "get", path, request_dict, http_options + ) + + response_dict = "" if not response.body else json.loads(response.body) + + if self._api_client.vertexai: + response_dict = _EvaluationRun_from_vertex(response_dict) + + return_value = types.EvaluationRun._from_response( + response=response_dict, kwargs=parameter_model.model_dump() + ) + + self._api_client._verify_response(return_value) + return return_value + async def batch_evaluate( self, *, @@ -1641,3 +1895,24 @@ async def evaluate_instances( ) return result + + @_common.experimental_warning( + "The Vertex SDK GenAI evals.get_evaluation_run module is experimental, " + "and may change in future versions." + ) + async def get_evaluation_run( + self, + *, + name: str, + config: Optional[types.GetEvaluationRunConfigOrDict] = None, + ) -> types.EvaluationRun: + """ + Retrieves an EvaluationRun from the resource name. + """ + if not name: + raise ValueError("name cannot be empty.") + if name.startswith("projects/"): + name = name.split("/")[-1] + result = await self._get_evaluation_run(name=name, config=config) + + return result diff --git a/vertexai/_genai/types.py b/vertexai/_genai/types.py index cf093020ca..23f4158f2c 100644 --- a/vertexai/_genai/types.py +++ b/vertexai/_genai/types.py @@ -290,6 +290,36 @@ class Importance(_common.CaseInSensitiveEnum): """Low importance.""" +class EvaluationRunState(_common.CaseInSensitiveEnum): + """Represents the state of an evaluation run.""" + + UNSPECIFIED = "UNSPECIFIED" + """Evaluation run state is unspecified.""" + PENDING = "PENDING" + """Evaluation run is pending.""" + RUNNING = "RUNNING" + """Evaluation run is in progress.""" + SUCCEEDED = "SUCCEEDED" + """Evaluation run has succeeded.""" + FAILED = "FAILED" + """Evaluation run failed.""" + CANCELLED = "CANCELLED" + """Evaluation run was cancelled.""" + INFERENCE = "INFERENCE" + """Evaluation run is performing inference.""" + GENERATING_RUBRICS = "GENERATING_RUBRICS" + """Evaluation run is performing rubric generation.""" + + +class SamplingMethod(_common.CaseInSensitiveEnum): + """Represents the sampling method for a BigQuery request set.""" + + UNSPECIFIED = "UNSPECIFIED" + """Sampling method is unspecified.""" + RANDOM = "RANDOM" + """Sampling method is random.""" + + class GenerateMemoriesResponseGeneratedMemoryAction(_common.CaseInSensitiveEnum): """The action to take.""" @@ -2345,6 +2375,192 @@ class GenerateInstanceRubricsResponseDict(TypedDict, total=False): ] +class GetEvaluationRunConfig(_common.BaseModel): + """Config for get evaluation run.""" + + http_options: Optional[genai_types.HttpOptions] = Field( + default=None, description="""Used to override HTTP request options.""" + ) + + +class GetEvaluationRunConfigDict(TypedDict, total=False): + """Config for get evaluation run.""" + + http_options: Optional[genai_types.HttpOptionsDict] + """Used to override HTTP request options.""" + + +GetEvaluationRunConfigOrDict = Union[GetEvaluationRunConfig, GetEvaluationRunConfigDict] + + +class _GetEvaluationRunParameters(_common.BaseModel): + """Represents a job that runs evaluation.""" + + name: Optional[str] = Field(default=None, description="""""") + config: Optional[GetEvaluationRunConfig] = Field(default=None, description="""""") + + +class _GetEvaluationRunParametersDict(TypedDict, total=False): + """Represents a job that runs evaluation.""" + + name: Optional[str] + """""" + + config: Optional[GetEvaluationRunConfigDict] + """""" + + +_GetEvaluationRunParametersOrDict = Union[ + _GetEvaluationRunParameters, _GetEvaluationRunParametersDict +] + + +class SamplingConfig(_common.BaseModel): + """Sampling config for a BigQuery request set.""" + + sampling_count: Optional[int] = Field(default=None, description="""""") + sampling_method: Optional[SamplingMethod] = Field(default=None, description="""""") + sampling_duration: Optional[str] = Field(default=None, description="""""") + + +class SamplingConfigDict(TypedDict, total=False): + """Sampling config for a BigQuery request set.""" + + sampling_count: Optional[int] + """""" + + sampling_method: Optional[SamplingMethod] + """""" + + sampling_duration: Optional[str] + """""" + + +SamplingConfigOrDict = Union[SamplingConfig, SamplingConfigDict] + + +class BigQueryRequestSet(_common.BaseModel): + """Represents a BigQuery request set.""" + + uri: Optional[str] = Field(default=None, description="""""") + prompt_column: Optional[str] = Field( + default=None, + description="""The column name of the prompt in the BigQuery table. Used for EvaluationRun only.""", + ) + rubrics_column: Optional[str] = Field( + default=None, + description="""The column name of the rubrics in the BigQuery table. Used for EvaluationRun only.""", + ) + candidate_response_columns: Optional[dict[str, str]] = Field( + default=None, + description="""The column name of the response candidates in the BigQuery table. Used for EvaluationRun only.""", + ) + sampling_config: Optional[SamplingConfig] = Field( + default=None, + description="""The sampling config for the BigQuery request set. Used for EvaluationRun only.""", + ) + + +class BigQueryRequestSetDict(TypedDict, total=False): + """Represents a BigQuery request set.""" + + uri: Optional[str] + """""" + + prompt_column: Optional[str] + """The column name of the prompt in the BigQuery table. Used for EvaluationRun only.""" + + rubrics_column: Optional[str] + """The column name of the rubrics in the BigQuery table. Used for EvaluationRun only.""" + + candidate_response_columns: Optional[dict[str, str]] + """The column name of the response candidates in the BigQuery table. Used for EvaluationRun only.""" + + sampling_config: Optional[SamplingConfigDict] + """The sampling config for the BigQuery request set. Used for EvaluationRun only.""" + + +BigQueryRequestSetOrDict = Union[BigQueryRequestSet, BigQueryRequestSetDict] + + +class EvaluationRunDataSource(_common.BaseModel): + """Represents an evaluation run data source.""" + + evaluation_set: Optional[str] = Field(default=None, description="""""") + bigquery_request_set: Optional[BigQueryRequestSet] = Field( + default=None, description="""""" + ) + + +class EvaluationRunDataSourceDict(TypedDict, total=False): + """Represents an evaluation run data source.""" + + evaluation_set: Optional[str] + """""" + + bigquery_request_set: Optional[BigQueryRequestSetDict] + """""" + + +EvaluationRunDataSourceOrDict = Union[ + EvaluationRunDataSource, EvaluationRunDataSourceDict +] + + +class EvaluationRun(_common.BaseModel): + """Represents an evaluation run.""" + + name: Optional[str] = Field(default=None, description="""""") + display_name: Optional[str] = Field(default=None, description="""""") + metadata: Optional[dict[str, Any]] = Field(default=None, description="""""") + create_time: Optional[datetime.datetime] = Field(default=None, description="""""") + completion_time: Optional[datetime.datetime] = Field( + default=None, description="""""" + ) + state: Optional[EvaluationRunState] = Field(default=None, description="""""") + evaluation_set_snapshot: Optional[str] = Field(default=None, description="""""") + error: Optional[genai_types.GoogleRpcStatus] = Field( + default=None, description="""""" + ) + data_source: Optional[EvaluationRunDataSource] = Field( + default=None, description="""""" + ) + + +class EvaluationRunDict(TypedDict, total=False): + """Represents an evaluation run.""" + + name: Optional[str] + """""" + + display_name: Optional[str] + """""" + + metadata: Optional[dict[str, Any]] + """""" + + create_time: Optional[datetime.datetime] + """""" + + completion_time: Optional[datetime.datetime] + """""" + + state: Optional[EvaluationRunState] + """""" + + evaluation_set_snapshot: Optional[str] + """""" + + error: Optional[genai_types.GoogleRpcStatusDict] + """""" + + data_source: Optional[EvaluationRunDataSourceDict] + """""" + + +EvaluationRunOrDict = Union[EvaluationRun, EvaluationRunDict] + + class OptimizeConfig(_common.BaseModel): """Config for Prompt Optimizer."""