From 4564c9c66ae2de79405012ef237c538a9aca88c5 Mon Sep 17 00:00:00 2001 From: A Vertex SDK engineer Date: Mon, 7 Jul 2025 11:07:16 -0700 Subject: [PATCH] feat: GenAI SDK client - add async version of prompt optimizer PiperOrigin-RevId: 780181794 --- tests/unit/vertexai/genai/replays/conftest.py | 2 + ...est_prompt_optimizer_optimize_job_state.py | 51 +++++++ vertexai/_genai/_prompt_optimizer_utils.py | 42 ++++++ vertexai/_genai/client.py | 13 +- vertexai/_genai/prompt_optimizer.py | 131 +++++++++++++----- vertexai/_genai/types.py | 24 ++-- 6 files changed, 223 insertions(+), 40 deletions(-) create mode 100644 tests/unit/vertexai/genai/replays/test_prompt_optimizer_optimize_job_state.py create mode 100644 vertexai/_genai/_prompt_optimizer_utils.py diff --git a/tests/unit/vertexai/genai/replays/conftest.py b/tests/unit/vertexai/genai/replays/conftest.py index 15cbd9de66..e1e4564215 100644 --- a/tests/unit/vertexai/genai/replays/conftest.py +++ b/tests/unit/vertexai/genai/replays/conftest.py @@ -112,6 +112,8 @@ def client(use_vertex, replays_prefix, http_options, request): ) os.environ["GOOGLE_CLOUD_PROJECT"] = "project-id" os.environ["GOOGLE_CLOUD_LOCATION"] = "location" + os.environ["VAPO_CONFIG_PATH"] = "gs://dummy-test/dummy-config.json" + os.environ["VAPO_SERVICE_ACCOUNT_PROJECT_NUMBER"] = "1234567890" # Set the replay directory to the root directory of the replays. # This is needed to ensure that the replay files are found. diff --git a/tests/unit/vertexai/genai/replays/test_prompt_optimizer_optimize_job_state.py b/tests/unit/vertexai/genai/replays/test_prompt_optimizer_optimize_job_state.py new file mode 100644 index 0000000000..a38cbecb09 --- /dev/null +++ b/tests/unit/vertexai/genai/replays/test_prompt_optimizer_optimize_job_state.py @@ -0,0 +1,51 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# pylint: disable=protected-access,bad-continuation,missing-function-docstring + +import os + +from tests.unit.vertexai.genai.replays import pytest_helper +from vertexai._genai import types + + +def test_optimize(client): + """Tests the optimize request parameters method.""" + + if not os.environ.get("VAPO_CONFIG_PATH"): + raise ValueError("VAPO_CONFIG_PATH environment variable is not set.") + if not os.environ.get("VAPO_SERVICE_ACCOUNT_PROJECT_NUMBER"): + raise ValueError( + "VAPO_SERVICE_ACCOUNT_PROJECT_NUMBER " "environment variable is not set." + ) + + config = types.PromptOptimizerVAPOConfig( + config_path=os.environ.get("VAPO_CONFIG_PATH"), + wait_for_completion=True, + service_account_project_number=os.environ.get( + "VAPO_SERVICE_ACCOUNT_PROJECT_NUMBER" + ), + ) + job = client.prompt_optimizer.optimize( + method="vapo", + config=config, + ) + assert job.state == types.JobState.JOB_STATE_SUCCEEDED + + +pytestmark = pytest_helper.setup( + file=__file__, + globals_for_file=globals(), + test_method="prompt_optimizer.optimize", +) diff --git a/vertexai/_genai/_prompt_optimizer_utils.py b/vertexai/_genai/_prompt_optimizer_utils.py new file mode 100644 index 0000000000..2e70ad519b --- /dev/null +++ b/vertexai/_genai/_prompt_optimizer_utils.py @@ -0,0 +1,42 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""Utility functions for prompt optimizer.""" + +from . import types + + +def _get_service_account( + config: types.PromptOptimizerVAPOConfigOrDict, +) -> str: + """Get the service account from the config for the custom job.""" + if hasattr(config, "service_account") and config.service_account: + if ( + hasattr(config, "service_account_project_number") + and config.service_account_project_number + ): + raise ValueError( + "Only one of service_account or service_account_project_number " + "can be provided." + ) + return config.service_account + elif ( + hasattr(config, "service_account_project_number") + and config.service_account_project_number + ): + return f"{config.service_account_project_number}-compute@developer.gserviceaccount.com" + else: + raise ValueError( + "Either service_account or service_account_project_number is required." + ) diff --git a/vertexai/_genai/client.py b/vertexai/_genai/client.py index eb70b677b1..b44ea40f00 100644 --- a/vertexai/_genai/client.py +++ b/vertexai/_genai/client.py @@ -32,6 +32,7 @@ def __init__(self, api_client: genai_client.Client): self._api_client = api_client self._evals = None self._agent_engines = None + self._prompt_optimizer = None @property @_common.experimental_warning( @@ -52,7 +53,17 @@ def evals(self): ) from e return self._evals.AsyncEvals(self._api_client) - # TODO(b/424176979): add async prompt optimizer here. + @property + @_common.experimental_warning( + "The Vertex SDK GenAI prompt optimizer module is experimental, " + "and may change in future versions." + ) + def prompt_optimizer(self): + if self._prompt_optimizer is None: + self._prompt_optimizer = importlib.import_module( + ".prompt_optimizer", __package__ + ) + return self._prompt_optimizer.AsyncPromptOptimizer(self._api_client) @property @_common.experimental_warning( diff --git a/vertexai/_genai/prompt_optimizer.py b/vertexai/_genai/prompt_optimizer.py index c1ec2e83f3..d8a105a145 100644 --- a/vertexai/_genai/prompt_optimizer.py +++ b/vertexai/_genai/prompt_optimizer.py @@ -27,6 +27,7 @@ from google.genai._common import get_value_by_path as getv from google.genai._common import set_value_by_path as setv +from . import _prompt_optimizer_utils from . import types @@ -574,6 +575,7 @@ def _wait_for_completion(self, job_name: str) -> None: raise RuntimeError(f"Job failed with state: {job.state}") else: logger.info(f"Job completed with state: {job.state}") + return job def optimize( self, @@ -584,21 +586,15 @@ def optimize( Args: method: The method for optimizing multiple prompts. - config: The config to use. Config consists of the following fields: - - config_path: The gcs path to the config file, e.g. - gs://bucket/config.json. - service_account: Optional. The service - account to use for the custom job. Cannot be provided at the same - time as 'service_account_project_number'. - - service_account_project_number: Optional. The project number used to - construct the default service account: - f"{service_account_project_number}-compute@developer.gserviceaccount.com" - Cannot be provided at the same time as 'service_account'. - - wait_for_completion: Optional. Whether to wait for the job to - complete. Default is True. + config: PromptOptimizerVAPOConfig instance containing the + configuration for prompt optimization. + + Returns: + The custom job that was created. """ if method != "vapo": - raise ValueError("Only vapo methods is currently supported.") + raise ValueError("Only vapo method is currently supported.") if isinstance(config, dict): config = types.PromptOptimizerVAPOConfig(**config) @@ -631,23 +627,7 @@ def optimize( } ] - if config.service_account: - if config.service_account_project_number: - raise ValueError( - "Only one of service_account or" - " service_account_project_number can be provided." - ) - service_account = config.service_account - elif config.project_number: - service_account = ( - f"{config.service_account_project_number}" - "-compute@developer.gserviceaccount.com" - ) - else: - raise ValueError( - "Either service_account or service_account_project_number is" - " required." - ) + service_account = _prompt_optimizer_utils._get_service_account(config) job_spec = types.CustomJobSpec( worker_pool_specs=worker_pool_specs, @@ -672,11 +652,11 @@ def optimize( logger.info("Job created: %s", job.name) # Construct the dashboard URL - dashboard_url = f"https://pantheon.corp.google.com/vertex-ai/locations/{region}/training/{job_id}/cpu?e=13802955&project={project}" + dashboard_url = f"https://console.cloud.google.com/vertex-ai/locations/{region}/training/{job_id}/cpu?project={project}" logger.info("View the job status at: %s", dashboard_url) if wait_for_completion: - self._wait_for_completion(job_id) + job = self._wait_for_completion(job_id) return job @@ -843,3 +823,92 @@ async def _get_custom_job( self._api_client._verify_response(return_value) return return_value + + async def optimize( + self, + method: str, + config: types.PromptOptimizerVAPOConfigOrDict, + ) -> types.CustomJob: + """Call async Vertex AI Prompt Optimizer (VAPO). + + # Todo: b/428953357 - Add example in the README. + Example usage: + client = vertexai.Client(project=PROJECT_NAME, location='us-central1') + vapo_config = vertexai.types.PromptOptimizerVAPOConfig( + config_path="gs://you-bucket-name/your-config.json", + service_account=service_account, + wait_for_completion=True + ) + job = await client.aio.prompt_optimizer.optimize( + method="vapo", config=vapo_config) + + Args: + method: The method for optimizing multiple prompts (currently only + vapo is supported). + config: PromptOptimizerVAPOConfig instance containing the + configuration for prompt optimization. + + Returns: + The custom job that was created. + """ + if method != "vapo": + raise ValueError("Only vapo methods is currently supported.") + + if isinstance(config, dict): + config = types.PromptOptimizerVAPOConfig(**config) + + timestamp = datetime.datetime.now().strftime("%Y%m%d-%H%M%S") + display_name = f"vapo-optimizer-{timestamp}" + + if not config.config_path: + raise ValueError("Config path is required.") + bucket = "/".join(config.config_path.split("/")[:-1]) + + container_uri = "us-docker.pkg.dev/vertex-ai/cair/vaipo:preview_v1_0" + + region = self._api_client.location + project = self._api_client.project + container_args = { + "config": config.config_path, + } + args = ["--%s=%s" % (k, v) for k, v in container_args.items()] + worker_pool_specs = [ + { + "replica_count": 1, + "container_spec": { + "image_uri": container_uri, + "args": args, + }, + "machine_spec": { + "machine_type": "n1-standard-4", + }, + } + ] + + service_account = _prompt_optimizer_utils._get_service_account(config) + + job_spec = types.CustomJobSpec( + worker_pool_specs=worker_pool_specs, + base_output_directory=types.GcsDestination(output_uri_prefix=bucket), + service_account=service_account, + ) + + custom_job = types.CustomJob( + display_name=display_name, + job_spec=job_spec, + ) + + job = await self._create_custom_job_resource( + custom_job=custom_job, + ) + + # Get the job id for the dashboard url and display to the user. + job_resource_name = job.name + job_id = job_resource_name.split("/")[-1] + logger.info("Job created: %s", job.name) + + # Construct the dashboard URL to show to the user. + dashboard_url = f"https://console.cloud.google.com/vertex-ai/locations/{region}/training/{job_id}/cpu?project={project}" + logger.info("View the job status at: %s", dashboard_url) + + return job diff --git a/vertexai/_genai/types.py b/vertexai/_genai/types.py index ba8d68a2fd..79d47f1a26 100644 --- a/vertexai/_genai/types.py +++ b/vertexai/_genai/types.py @@ -5207,29 +5207,37 @@ class PromptOptimizerVAPOConfig(_common.BaseModel): """VAPO Prompt Optimizer Config.""" config_path: Optional[str] = Field( - default=None, description="""The gcs path to the config file.""" + default=None, + description="""The gcs path to the config file, e.g. gs://bucket/config.json.""", + ) + service_account: Optional[str] = Field( + default=None, + description="""The service account to use for the custom job. Cannot be provided at the same time as service_account_project_number.""", ) - service_account: Optional[str] = Field(default=None, description="""""") service_account_project_number: Optional[Union[int, str]] = Field( - default=None, description="""""" + default=None, + description="""The project number used to construct the default service account:{service_account_project_number}-compute@developer.gserviceaccount.comCannot be provided at the same time as "service_account".""", + ) + wait_for_completion: Optional[bool] = Field( + default=True, + description="""Whether to wait for the job tocomplete. Ignored for async jobs.""", ) - wait_for_completion: Optional[bool] = Field(default=True, description="""""") class PromptOptimizerVAPOConfigDict(TypedDict, total=False): """VAPO Prompt Optimizer Config.""" config_path: Optional[str] - """The gcs path to the config file.""" + """The gcs path to the config file, e.g. gs://bucket/config.json.""" service_account: Optional[str] - """""" + """The service account to use for the custom job. Cannot be provided at the same time as service_account_project_number.""" service_account_project_number: Optional[Union[int, str]] - """""" + """The project number used to construct the default service account:{service_account_project_number}-compute@developer.gserviceaccount.comCannot be provided at the same time as "service_account".""" wait_for_completion: Optional[bool] - """""" + """Whether to wait for the job tocomplete. Ignored for async jobs.""" PromptOptimizerVAPOConfigOrDict = Union[