diff --git a/google/cloud/aiplatform/base.py b/google/cloud/aiplatform/base.py index cca3d69064..f1fb28a365 100644 --- a/google/cloud/aiplatform/base.py +++ b/google/cloud/aiplatform/base.py @@ -1,22 +1,4 @@ -# -*- coding: utf-8 -*- - -# Copyright 2023 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - import abc -from concurrent import futures import datetime import functools import inspect @@ -25,41 +7,29 @@ import sys import threading import time -from typing import ( - Any, - Callable, - Dict, - List, - Iterable, - Optional, - Sequence, - Tuple, - Type, - TypeVar, - Union, -) - -from google.api_core import operation -from google.api_core import retry -from google.auth import credentials as auth_credentials -from google.cloud.aiplatform import initializer -from google.cloud.aiplatform import utils -from google.cloud.aiplatform.compat.types import ( - encryption_spec as gca_encryption_spec, -) -from google.cloud.aiplatform.constants import base as base_constants -import proto +from concurrent import futures +from typing import (Any, Callable, Dict, Iterable, List, Optional, Sequence, + Tuple, Type, TypeVar, Union) +import proto +from codeflash.verification.codeflash_capture import codeflash_capture +from google.api_core import operation, retry +from google.auth import credentials as auth_credentials from google.protobuf import field_mask_pb2 as field_mask from google.protobuf import json_format -# This is the default retry callback to be used with get methods. -_DEFAULT_RETRY = retry.Retry() +from google.cloud.aiplatform import initializer, utils +from google.cloud.aiplatform.compat.types import \ + encryption_spec as gca_encryption_spec +from google.cloud.aiplatform.constants import base as base_constants +from functools import lru_cache +_DEFAULT_RETRY = retry.Retry() class VertexLogger(logging.getLoggerClass()): """Logging wrapper class with high level helper methods.""" + @codeflash_capture(function_name='VertexLogger.__init__', tmp_dir_path='/tmp/codeflash_rnon9p_c/test_return_values', tests_root='/home/ubuntu/work/repo/tests', is_fto=False) def __init__(self, name: str): """Initializes logger with optional name. @@ -69,11 +39,7 @@ def __init__(self, name: str): super().__init__(name) self.setLevel(logging.INFO) - def log_create_with_lro( - self, - cls: Type["VertexAiResourceNoun"], - lro: Optional[operation.Operation] = None, - ): + def log_create_with_lro(self, cls: Type['VertexAiResourceNoun'], lro: Optional[operation.Operation]=None): """Logs create event with LRO. Args: @@ -82,19 +48,11 @@ def log_create_with_lro( lro (operation.Operation): Optional. Backing LRO for creation. """ - self.info(f"Creating {cls.__name__}") - + self.info(f'Creating {cls.__name__}') if lro: - self.info(f"Create {cls.__name__} backing LRO: {lro.operation.name}") - - def log_create_complete( - self, - cls: Type["VertexAiResourceNoun"], - resource: proto.Message, - variable_name: str, - *, - module_name: str = "aiplatform", - ): + self.info(f'Create {cls.__name__} backing LRO: {lro.operation.name}') + + def log_create_complete(self, cls: Type['VertexAiResourceNoun'], resource: proto.Message, variable_name: str, *, module_name: str='aiplatform'): """Logs create event is complete. Will also include code snippet to instantiate resource in SDK. @@ -110,18 +68,11 @@ def log_create_complete( The module namespace under which the Vertex AI Resource Noun is available. Defaults to `aiplatform`. """ - self.info(f"{cls.__name__} created. Resource name: {resource.name}") - self.info(f"To use this {cls.__name__} in another session:") + self.info(f'{cls.__name__} created. Resource name: {resource.name}') + self.info(f'To use this {cls.__name__} in another session:') self.info(f"{variable_name} = {module_name}.{cls.__name__}('{resource.name}')") - def log_create_complete_with_getter( - self, - cls: Type["VertexAiResourceNoun"], - resource: proto.Message, - variable_name: str, - *, - module_name: str = "aiplatform", - ): + def log_create_complete_with_getter(self, cls: Type['VertexAiResourceNoun'], resource: proto.Message, variable_name: str, *, module_name: str='aiplatform'): """Logs create event is complete. Will also include code snippet to instantiate resource in SDK. @@ -137,47 +88,31 @@ def log_create_complete_with_getter( The module namespace under which the Vertex AI Resource Noun is available. Defaults to `aiplatform`. """ - self.info(f"{cls.__name__} created. Resource name: {resource.name}") - self.info(f"To use this {cls.__name__} in another session:") + self.info(f'{cls.__name__} created. Resource name: {resource.name}') + self.info(f'To use this {cls.__name__} in another session:') usage_message = f"{module_name}.{cls.__name__}.get('{resource.name}')" - self.info(f"{variable_name} = {usage_message}") + self.info(f'{variable_name} = {usage_message}') - def log_delete_with_lro( - self, - resource: Type["VertexAiResourceNoun"], - lro: Optional[operation.Operation] = None, - ): + def log_delete_with_lro(self, resource: Type['VertexAiResourceNoun'], lro: Optional[operation.Operation]=None): """Logs delete event with LRO. Args: resource: Vertex AI resource that will be deleted. lro: Backing LRO for creation. """ - self.info( - f"Deleting {resource.__class__.__name__} resource: {resource.resource_name}" - ) - + self.info(f'Deleting {resource.__class__.__name__} resource: {resource.resource_name}') if lro: - self.info( - f"Delete {resource.__class__.__name__} backing LRO: {lro.operation.name}" - ) + self.info(f'Delete {resource.__class__.__name__} backing LRO: {lro.operation.name}') - def log_delete_complete( - self, - resource: Type["VertexAiResourceNoun"], - ): + def log_delete_complete(self, resource: Type['VertexAiResourceNoun']): """Logs delete event is complete. Args: resource: Vertex AI resource that was deleted. """ - self.info( - f"{resource.__class__.__name__} resource {resource.resource_name} deleted." - ) + self.info(f'{resource.__class__.__name__} resource {resource.resource_name} deleted.') - def log_action_start_against_resource( - self, action: str, noun: str, resource_noun_obj: "VertexAiResourceNoun" - ): + def log_action_start_against_resource(self, action: str, noun: str, resource_noun_obj: 'VertexAiResourceNoun'): """Logs intention to start an action against a resource. Args: @@ -186,17 +121,9 @@ def log_action_start_against_resource( resource_noun_obj (VertexAiResourceNoun): Resource noun object the action is acting against. """ - self.info( - f"{action} {resource_noun_obj.__class__.__name__} {noun}: {resource_noun_obj.resource_name}" - ) + self.info(f'{action} {resource_noun_obj.__class__.__name__} {noun}: {resource_noun_obj.resource_name}') - def log_action_started_against_resource_with_lro( - self, - action: str, - noun: str, - cls: Type["VertexAiResourceNoun"], - lro: operation.Operation, - ): + def log_action_started_against_resource_with_lro(self, action: str, noun: str, cls: Type['VertexAiResourceNoun'], lro: operation.Operation): """Logs an action started against a resource with lro. Args: @@ -206,11 +133,9 @@ def log_action_started_against_resource_with_lro( Resource noun object the action is acting against. lro (operation.Operation): Backing LRO for action. """ - self.info(f"{action} {cls.__name__} {noun} backing LRO: {lro.operation.name}") + self.info(f'{action} {cls.__name__} {noun} backing LRO: {lro.operation.name}') - def log_action_completed_against_resource( - self, noun: str, action: str, resource_noun_obj: "VertexAiResourceNoun" - ): + def log_action_completed_against_resource(self, noun: str, action: str, resource_noun_obj: 'VertexAiResourceNoun'): """Logs action completed against resource. Args: @@ -219,50 +144,34 @@ def log_action_completed_against_resource( resource_noun_obj (VertexAiResourceNoun): Resource noun object the action is acting against """ - self.info( - f"{resource_noun_obj.__class__.__name__} {noun} {action}. Resource name: {resource_noun_obj.resource_name}" - ) - + self.info(f'{resource_noun_obj.__class__.__name__} {noun} {action}. Resource name: {resource_noun_obj.resource_name}') -def Logger(name: str) -> VertexLogger: # pylint: disable=invalid-name +def Logger(name: str) -> VertexLogger: old_class = logging.getLoggerClass() try: logging.setLoggerClass(VertexLogger) logger = logging.getLogger(name) - - # To avoid writing duplicate logs, skip adding the new handler if - # StreamHandler already exists in logger hierarchy. parent_logger = logger while parent_logger: for handler in parent_logger.handlers: if isinstance(handler, logging.StreamHandler): return logger parent_logger = parent_logger.parent - handler = logging.StreamHandler(sys.stdout) handler.setLevel(logging.INFO) logger.addHandler(handler) - return logger finally: logging.setLoggerClass(old_class) - - _LOGGER = Logger(__name__) - class FutureManager(metaclass=abc.ABCMeta): """Tracks concurrent futures against this object.""" + @codeflash_capture(function_name='FutureManager.__init__', tmp_dir_path='/tmp/codeflash_0kmeha_d/test_return_values', tests_root='/home/ubuntu/work/repo/tests', is_fto=False) def __init__(self): self.__latest_future_lock = threading.Lock() - - # Always points to the latest future. All submitted futures will always - # form a dependency on the latest future. self.__latest_future = None - - # Caches Exception of any executed future. Once one exception occurs - # all additional futures should fail and any additional invocations will block. self._exception = None def _raise_future_exception(self): @@ -278,13 +187,11 @@ def _complete_future(self, future: futures.Future): Args: future (futures.Future): Required. A future to complete. """ - with self.__latest_future_lock: try: - future.result() # raises + future.result() except Exception as e: self._exception = e - if self.__latest_future is future: self.__latest_future = None @@ -302,7 +209,6 @@ def wait(self): future = self.__latest_future if future: futures.wait([future], return_when=futures.FIRST_EXCEPTION) - self._raise_future_exception() @property @@ -320,15 +226,7 @@ def _latest_future(self, future: Optional[futures.Future]): if future: future.add_done_callback(self._complete_future) - def _submit( - self, - method: Callable[..., Any], - args: Sequence[Any], - kwargs: Dict[str, Any], - additional_dependencies: Optional[Sequence[futures.Future]] = None, - callbacks: Optional[Sequence[Callable[[futures.Future], Any]]] = None, - internal_callbacks: Iterable[Callable[[Any], Any]] = None, - ) -> futures.Future: + def _submit(self, method: Callable[..., Any], args: Sequence[Any], kwargs: Dict[str, Any], additional_dependencies: Optional[Sequence[futures.Future]]=None, callbacks: Optional[Sequence[Callable[[futures.Future], Any]]]=None, internal_callbacks: Iterable[Callable[[Any], Any]]=None) -> futures.Future: """Submit a method as a future against this object. Args: @@ -346,13 +244,7 @@ def _submit( future (Future): Future of the submitted method call. """ - def wait_for_dependencies_and_invoke( - deps: Sequence[futures.Future], - method: Callable[..., Any], - args: Sequence[Any], - kwargs: Dict[str, Any], - internal_callbacks: Iterable[Callable[[Any], Any]], - ) -> Any: + def wait_for_dependencies_and_invoke(deps: Sequence[futures.Future], method: Callable[..., Any], args: Sequence[Any], kwargs: Dict[str, Any], internal_callbacks: Iterable[Callable[[Any], Any]]) -> Any: """Wrapper method to wait on any dependencies before submitting method. @@ -367,93 +259,50 @@ def wait_for_dependencies_and_invoke( internal_callbacks: (Callable[[Any], Any]): Callbacks that take the result of method. """ - for future in set(deps): future.result() - result = method(*args, **kwargs) - - # call callbacks from within future if internal_callbacks: for callback in internal_callbacks: callback(result) - return result - - # Retrieves any dependencies from arguments. - deps = [ - arg._latest_future - for arg in list(args) + list(kwargs.values()) - if isinstance(arg, FutureManager) - ] - - # Retrieves exceptions and raises - # if any upstream dependency has an exception - exceptions = [ - arg._exception - for arg in list(args) + list(kwargs.values()) - if isinstance(arg, FutureManager) and arg._exception - ] - + deps = [arg._latest_future for arg in list(args) + list(kwargs.values()) if isinstance(arg, FutureManager)] + exceptions = [arg._exception for arg in list(args) + list(kwargs.values()) if isinstance(arg, FutureManager) and arg._exception] if exceptions: raise exceptions[0] - - # filter out objects that do not have pending tasks deps = [dep for dep in deps if dep] - if additional_dependencies: deps.extend(additional_dependencies) - with self.__latest_future_lock: - - # form a dependency on the latest future of this object if self.__latest_future: deps.append(self.__latest_future) - - self.__latest_future = initializer.global_pool.submit( - wait_for_dependencies_and_invoke, - deps=deps, - method=method, - args=args, - kwargs=kwargs, - internal_callbacks=internal_callbacks, - ) - + self.__latest_future = initializer.global_pool.submit(wait_for_dependencies_and_invoke, deps=deps, method=method, args=args, kwargs=kwargs, internal_callbacks=internal_callbacks) future = self.__latest_future - - # Clean up callback captures exception as well as removes future. - # May execute immediately and take lock. - future.add_done_callback(self._complete_future) - if callbacks: for c in callbacks: future.add_done_callback(c) - return future @classmethod @abc.abstractmethod - def _empty_constructor(cls) -> "FutureManager": + def _empty_constructor(cls) -> 'FutureManager': """Should construct object with all non FutureManager attributes as None.""" pass @abc.abstractmethod - def _sync_object_with_future_result(self, result: "FutureManager"): + def _sync_object_with_future_result(self, result: 'FutureManager'): """Should sync the object from _empty_constructor with result of future.""" def __repr__(self) -> str: if self._exception: - return f"{object.__repr__(self)} failed with {str(self._exception)}" - + return f'{object.__repr__(self)} failed with {str(self._exception)}' if self.__latest_future: - return f"{object.__repr__(self)} is waiting for upstream dependencies to complete." - + return f'{object.__repr__(self)} is waiting for upstream dependencies to complete.' return object.__repr__(self) - class VertexAiResourceNoun(metaclass=abc.ABCMeta): """Base class the Vertex AI resource nouns. @@ -508,15 +357,10 @@ def _parse_resource_name_method(cls) -> str: def _format_resource_name_method(self) -> str: """Method name on GAPIC client to format a resource name.""" pass - - # Override this value with staticmethod - # to use custom resource id validators per resource _resource_id_validator: Optional[Callable[[str], None]] = None @staticmethod - def _revisioned_resource_id_validator( - resource_id: str, - ) -> None: + def _revisioned_resource_id_validator(resource_id: str) -> None: """Some revisioned resource names can have '@' in them to separate the resource ID from the revision ID. Thus, they need their own resource id validator. @@ -528,16 +372,11 @@ def _revisioned_resource_id_validator( Raises: ValueError: If a `resource_id` doesn't conform to appropriate revision syntax. """ - if not re.compile(r"^[\w-]+@?[\w-]+$").match(resource_id): - raise ValueError(f"Resource {resource_id} is not a valid resource ID.") - - def __init__( - self, - project: Optional[str] = None, - location: Optional[str] = None, - credentials: Optional[auth_credentials.Credentials] = None, - resource_name: Optional[str] = None, - ): + if not re.compile('^[\\w-]+@?[\\w-]+$').match(resource_id): + raise ValueError(f'Resource {resource_id} is not a valid resource ID.') + + @codeflash_capture(function_name='VertexAiResourceNoun.__init__', tmp_dir_path='/tmp/codeflash_isahfmnp/test_return_values', tests_root='/home/ubuntu/work/repo/tests', is_fto=False) + def __init__(self, project: Optional[str]=None, location: Optional[str]=None, credentials: Optional[auth_credentials.Credentials]=None, resource_name: Optional[str]=None): """Initializes class with project, location, and api_client. Args: @@ -547,37 +386,19 @@ def __init__( credentials to use when accessing interacting with resource noun. resource_name(str): A fully-qualified resource name or ID. """ - if resource_name: - project, location = self._get_and_validate_project_location( - resource_name=resource_name, project=project, location=location - ) - + (project, location) = self._get_and_validate_project_location(resource_name=resource_name, project=project, location=location) self.project = project or initializer.global_config.project self.location = location or initializer.global_config.location self.credentials = credentials or initializer.global_config.credentials - appended_user_agent = None if base_constants.USER_AGENT_SDK_COMMAND: - appended_user_agent = [ - f"sdk_command/{base_constants.USER_AGENT_SDK_COMMAND}" - ] - # Reset the value for the USER_AGENT_SDK_COMMAND to avoid counting future unrelated api calls. - base_constants.USER_AGENT_SDK_COMMAND = "" - - self.api_client = self._instantiate_client( - location=self.location, - credentials=self.credentials, - appended_user_agent=appended_user_agent, - ) + appended_user_agent = [f'sdk_command/{base_constants.USER_AGENT_SDK_COMMAND}'] + base_constants.USER_AGENT_SDK_COMMAND = '' + self.api_client = self._instantiate_client(location=self.location, credentials=self.credentials, appended_user_agent=appended_user_agent) @classmethod - def _instantiate_client( - cls, - location: Optional[str] = None, - credentials: Optional[auth_credentials.Credentials] = None, - appended_user_agent: Optional[List[str]] = None, - ) -> utils.VertexAiServiceClientWithOverride: + def _instantiate_client(cls, location: Optional[str]=None, credentials: Optional[auth_credentials.Credentials]=None, appended_user_agent: Optional[List[str]]=None) -> utils.VertexAiServiceClientWithOverride: """Helper method to instantiate service client for resource noun. Args: @@ -592,12 +413,7 @@ def _instantiate_client( client (utils.VertexAiServiceClientWithOverride): Initialized service client for this service noun with optional overrides. """ - return initializer.global_config.create_client( - client_class=cls.client_class, - credentials=credentials, - location_override=location, - appended_user_agent=appended_user_agent, - ) + return initializer.global_config.create_client(client_class=cls.client_class, credentials=credentials, location_override=location, appended_user_agent=appended_user_agent) @classmethod def _parse_resource_name(cls, resource_name: str) -> Dict[str, str]: @@ -609,10 +425,9 @@ def _parse_resource_name(cls, resource_name: str) -> Dict[str, str]: Returns: Dictionary of component segments. """ - # gets the underlying wrapped gapic client class - return getattr( - cls.client_class.get_gapic_client_class(), cls._parse_resource_name_method - )(resource_name) + # Use cached method lookup for speedup when called repeatedly + method = _get_parse_resource_name_method_for_cls(cls) + return method(resource_name) @classmethod def _format_resource_name(cls, **kwargs: str) -> str: @@ -628,17 +443,9 @@ def _format_resource_name(cls, **kwargs: str) -> str: Returns: Resource name. """ - # gets the underlying wrapped gapic client class - return getattr( - cls.client_class.get_gapic_client_class(), cls._format_resource_name_method - )(**kwargs) - - def _get_and_validate_project_location( - self, - resource_name: str, - project: Optional[str] = None, - location: Optional[str] = None, - ) -> Tuple[str, str]: + return getattr(cls.client_class.get_gapic_client_class(), cls._format_resource_name_method)(**kwargs) + + def _get_and_validate_project_location(self, resource_name: str, project: Optional[str]=None, location: Optional[str]=None) -> Tuple[str, str]: """Validate the project and location for the resource. Args: @@ -649,25 +456,14 @@ def _get_and_validate_project_location( Raises: RuntimeError: If location is different from resource location """ - fields = self._parse_resource_name(resource_name) - if not fields: - return project, location + return (project, location) + if location and fields['location'] != location: + raise RuntimeError(f"location {location} is provided, but different from the resource location {fields['location']}") + return (fields['project'], fields['location']) - if location and fields["location"] != location: - raise RuntimeError( - f"location {location} is provided, but different from " - f"the resource location {fields['location']}" - ) - - return fields["project"], fields["location"] - - def _get_gca_resource( - self, - resource_name: str, - parent_resource_name_fields: Optional[Dict[str, str]] = None, - ) -> proto.Message: + def _get_gca_resource(self, resource_name: str, parent_resource_name_fields: Optional[Dict[str, str]]=None) -> proto.Message: """Returns GAPIC service representation of client class resource. Args: @@ -677,31 +473,18 @@ def _get_gca_resource( will be used to compose the resource name if only resource ID is given. Should not include project and location. """ - resource_name = utils.full_resource_name( - resource_name=resource_name, - resource_noun=self._resource_noun, - parse_resource_name_method=self._parse_resource_name, - format_resource_name_method=self._format_resource_name, - project=self.project, - location=self.location, - parent_resource_name_fields=parent_resource_name_fields, - resource_id_validator=self._resource_id_validator, - ) - - return getattr(self.api_client, self._getter_method)( - name=resource_name, retry=_DEFAULT_RETRY - ) + resource_name = utils.full_resource_name(resource_name=resource_name, resource_noun=self._resource_noun, parse_resource_name_method=self._parse_resource_name, format_resource_name_method=self._format_resource_name, project=self.project, location=self.location, parent_resource_name_fields=parent_resource_name_fields, resource_id_validator=self._resource_id_validator) + return getattr(self.api_client, self._getter_method)(name=resource_name, retry=_DEFAULT_RETRY) def _sync_gca_resource(self): """Sync GAPIC service representation of client class resource.""" - self._gca_resource = self._get_gca_resource(resource_name=self.resource_name) @property def name(self) -> str: """Name of this resource.""" self._assert_gca_resource_is_available() - return self._gca_resource.name.split("/")[-1] + return self._gca_resource.name.split('/')[-1] @property def _project_tuple(self) -> Tuple[Optional[str], Optional[str]]: @@ -710,13 +493,11 @@ def _project_tuple(self) -> Tuple[Optional[str], Optional[str]]: Another option is to use resource_manager_utils but requires the caller have resource manager get role. """ - # we may not have the project if project inferred from the resource name maybe_project_id = self.project if self._gca_resource is not None and self._gca_resource.name: - project_no = self._parse_resource_name(self._gca_resource.name)["project"] + project_no = self._parse_resource_name(self._gca_resource.name)['project'] else: project_no = None - if maybe_project_id == project_no: return (None, project_no) else: @@ -754,7 +535,7 @@ def encryption_spec(self) -> Optional[gca_encryption_spec.EncryptionSpec]: be encrypted with the provided encryption key. """ self._assert_gca_resource_is_available() - return getattr(self._gca_resource, "encryption_spec") + return getattr(self._gca_resource, 'encryption_spec') @property def labels(self) -> Dict[str, str]: @@ -787,30 +568,23 @@ def _assert_gca_resource_is_available(self) -> None: RuntimeError: If _gca_resource is has not been created. """ if self._gca_resource is None: - raise RuntimeError( - f"{self.__class__.__name__} resource has not been created" - ) + raise RuntimeError(f'{self.__class__.__name__} resource has not been created') def __repr__(self) -> str: - return f"{object.__repr__(self)} \nresource name: {self.resource_name}" + return f'{object.__repr__(self)} \nresource name: {self.resource_name}' def to_dict(self) -> Dict[str, Any]: """Returns the resource proto as a dictionary.""" return json_format.MessageToDict(self._gca_resource._pb) @classmethod - def _generate_display_name(cls, prefix: Optional[str] = None) -> str: + def _generate_display_name(cls, prefix: Optional[str]=None) -> str: """Returns a display name containing class name and time string.""" if not prefix: prefix = cls.__name__ - return prefix + " " + datetime.datetime.now().isoformat(sep=" ") + return prefix + ' ' + datetime.datetime.now().isoformat(sep=' ') - -def optional_sync( - construct_object_on_arg: Optional[str] = None, - return_input_arg: Optional[str] = None, - bind_future_to_self: bool = True, -): +def optional_sync(construct_object_on_arg: Optional[str]=None, return_input_arg: Optional[str]=None, bind_future_to_self: bool=True): """Decorator for VertexAiResourceNounWithFutureManager with optional sync support. @@ -845,119 +619,51 @@ def optional_run_in_thread(method: Callable[..., Any]): @functools.wraps(method) def wrapper(*args, **kwargs): """Wraps method.""" - sync = kwargs.pop("sync", True) + sync = kwargs.pop('sync', True) bound_args = inspect.signature(method).bind(*args, **kwargs) - self = bound_args.arguments.get("self") + self = bound_args.arguments.get('self') calling_object_latest_future = None - - # check to see if this object has any exceptions if self: calling_object_latest_future = self._latest_future self._raise_future_exception() - - # if sync then wait for any Futures to complete and execute if sync: if self: VertexAiResourceNounWithFutureManager.wait(self) return method(*args, **kwargs) - - # callbacks to call within the Future (in same Thread) internal_callbacks = [] - # callbacks to add to the Future (may or may not be in same Thread) callbacks = [] - # additional Future dependencies to capture dependencies = [] - - # all methods should have type signatures - return_type = get_annotation_class( - inspect.getfullargspec(method).annotations["return"] - ) - - # object produced by the method + return_type = get_annotation_class(inspect.getfullargspec(method).annotations['return']) returned_object = bound_args.arguments.get(return_input_arg) - - # is a classmethod that creates the object and returns it if args and inspect.isclass(args[0]): - - # assumes class in classmethod is the resource noun - returned_object = ( - args[0]._empty_constructor() - if not returned_object - else returned_object - ) + returned_object = args[0]._empty_constructor() if not returned_object else returned_object self = returned_object - - else: # instance method - # if we're returning an input object + else: if returned_object and returned_object is not self: - - # make sure the input object doesn't have any exceptions - # from previous futures returned_object._raise_future_exception() - - # if the future will be associated with both the returned object - # and calling object then we need to add additional callback - # to remove the future from the returned object - - # if we need to construct a new empty returned object - should_construct = not returned_object and bound_args.arguments.get( - construct_object_on_arg, not construct_object_on_arg - ) - + should_construct = not returned_object and bound_args.arguments.get(construct_object_on_arg, not construct_object_on_arg) if should_construct: if return_type is not None: returned_object = return_type._empty_constructor() - - # if the future will be associated with both the returned object - # and calling object then we need to add additional callback - # to remove the future from the returned object if returned_object and bind_future_to_self: callbacks.append(returned_object._complete_future) - if returned_object: - # sync objects after future completes - internal_callbacks.append( - returned_object._sync_object_with_future_result - ) - - # If the future is not associated with the calling object - # then the return object future needs to form a dependency on the - # the latest future in the calling object. + internal_callbacks.append(returned_object._sync_object_with_future_result) if not bind_future_to_self: if calling_object_latest_future: dependencies.append(calling_object_latest_future) self = returned_object - - future = self._submit( - method=method, - callbacks=callbacks, - internal_callbacks=internal_callbacks, - additional_dependencies=dependencies, - args=[], - kwargs=bound_args.arguments, - ) - - # if the calling object is the one that submitted then add it's future - # to the returned object + future = self._submit(method=method, callbacks=callbacks, internal_callbacks=internal_callbacks, additional_dependencies=dependencies, args=[], kwargs=bound_args.arguments) if returned_object and returned_object is not self: returned_object._latest_future = future - return returned_object - return wrapper - return optional_run_in_thread - class _VertexAiResourceNounPlus(VertexAiResourceNoun): + @classmethod - def _empty_constructor( - cls, - project: Optional[str] = None, - location: Optional[str] = None, - credentials: Optional[auth_credentials.Credentials] = None, - resource_name: Optional[str] = None, - ) -> "_VertexAiResourceNounPlus": + def _empty_constructor(cls, project: Optional[str]=None, location: Optional[str]=None, credentials: Optional[auth_credentials.Credentials]=None, resource_name: Optional[str]=None) -> '_VertexAiResourceNounPlus': """Initializes with all attributes set to None. Args: @@ -971,24 +677,12 @@ def _empty_constructor( An instance of this class with attributes set to None. """ self = cls.__new__(cls) - VertexAiResourceNoun.__init__( - self, - project=project, - location=location, - credentials=credentials, - resource_name=resource_name, - ) + VertexAiResourceNoun.__init__(self, project=project, location=location, credentials=credentials, resource_name=resource_name) self._gca_resource = None return self @classmethod - def _construct_sdk_resource_from_gapic( - cls, - gapic_resource: proto.Message, - project: Optional[str] = None, - location: Optional[str] = None, - credentials: Optional[auth_credentials.Credentials] = None, - ) -> VertexAiResourceNoun: + def _construct_sdk_resource_from_gapic(cls, gapic_resource: proto.Message, project: Optional[str]=None, location: Optional[str]=None, credentials: Optional[auth_credentials.Credentials]=None) -> VertexAiResourceNoun: """Given a GAPIC resource object, return the SDK representation. Args: @@ -1009,31 +703,13 @@ def _construct_sdk_resource_from_gapic( VertexAiResourceNoun: An initialized SDK object that represents GAPIC type. """ - resource_name_parts = utils.extract_project_and_location_from_parent( - gapic_resource.name - ) - sdk_resource = cls._empty_constructor( - project=resource_name_parts.get("project") or project, - location=resource_name_parts.get("location") or location, - credentials=credentials, - ) + resource_name_parts = utils.extract_project_and_location_from_parent(gapic_resource.name) + sdk_resource = cls._empty_constructor(project=resource_name_parts.get('project') or project, location=resource_name_parts.get('location') or location, credentials=credentials) sdk_resource._gca_resource = gapic_resource return sdk_resource - # TODO(b/144545165): Improve documentation for list filtering once available - # TODO(b/184910159): Expose `page_size` field in list method @classmethod - def _list( - cls, - cls_filter: Callable[[proto.Message], bool] = lambda _: True, - filter: Optional[str] = None, - order_by: Optional[str] = None, - read_mask: Optional[field_mask.FieldMask] = None, - project: Optional[str] = None, - location: Optional[str] = None, - credentials: Optional[auth_credentials.Credentials] = None, - parent: Optional[str] = None, - ) -> List[VertexAiResourceNoun]: + def _list(cls, cls_filter: Callable[[proto.Message], bool]=lambda _: True, filter: Optional[str]=None, order_by: Optional[str]=None, read_mask: Optional[field_mask.FieldMask]=None, project: Optional[str]=None, location: Optional[str]=None, credentials: Optional[auth_credentials.Credentials]=None, parent: Optional[str]=None) -> List[VertexAiResourceNoun]: """Private method to list all instances of this Vertex AI Resource, takes a `cls_filter` arg to filter to a particular SDK resource subclass. @@ -1077,59 +753,22 @@ def _list( if parent: parent_resources = utils.extract_project_and_location_from_parent(parent) if parent_resources: - project, location = ( - parent_resources["project"], - parent_resources["location"], - ) - - resource = cls._empty_constructor( - project=project, location=location, credentials=credentials - ) - - # Fetch credentials once and re-use for all `_empty_constructor()` calls + (project, location) = (parent_resources['project'], parent_resources['location']) + resource = cls._empty_constructor(project=project, location=location, credentials=credentials) creds = resource.credentials - resource_list_method = getattr(resource.api_client, resource._list_method) - - list_request = { - "parent": parent - or initializer.global_config.common_location_path( - project=project, location=location - ), - } - - # `read_mask` is only passed from PipelineJob.list() for now + list_request = {'parent': parent or initializer.global_config.common_location_path(project=project, location=location)} if read_mask is not None: - list_request["read_mask"] = read_mask - + list_request['read_mask'] = read_mask if filter: - list_request["filter"] = filter - + list_request['filter'] = filter if order_by: - list_request["order_by"] = order_by - + list_request['order_by'] = order_by resource_list = resource_list_method(request=list_request) or [] - - return [ - cls._construct_sdk_resource_from_gapic( - gapic_resource, project=project, location=location, credentials=creds - ) - for gapic_resource in resource_list - if cls_filter(gapic_resource) - ] + return [cls._construct_sdk_resource_from_gapic(gapic_resource, project=project, location=location, credentials=creds) for gapic_resource in resource_list if cls_filter(gapic_resource)] @classmethod - def _list_with_local_order( - cls, - cls_filter: Callable[[proto.Message], bool] = lambda _: True, - filter: Optional[str] = None, - order_by: Optional[str] = None, - read_mask: Optional[field_mask.FieldMask] = None, - project: Optional[str] = None, - location: Optional[str] = None, - credentials: Optional[auth_credentials.Credentials] = None, - parent: Optional[str] = None, - ) -> List[VertexAiResourceNoun]: + def _list_with_local_order(cls, cls_filter: Callable[[proto.Message], bool]=lambda _: True, filter: Optional[str]=None, order_by: Optional[str]=None, read_mask: Optional[field_mask.FieldMask]=None, project: Optional[str]=None, location: Optional[str]=None, credentials: Optional[auth_credentials.Credentials]=None, parent: Optional[str]=None) -> List[VertexAiResourceNoun]: """Private method to list all instances of this Vertex AI Resource, takes a `cls_filter` arg to filter to a particular SDK resource subclass. Provides client-side sorting when a list API doesn't support @@ -1171,55 +810,30 @@ def _list_with_local_order( Returns: List[VertexAiResourceNoun] - A list of SDK resource objects """ - - li = cls._list( - cls_filter=cls_filter, - filter=filter, - order_by=None, # This method will handle the ordering locally - read_mask=read_mask, - project=project, - location=location, - credentials=credentials, - parent=parent, - ) - + li = cls._list(cls_filter=cls_filter, filter=filter, order_by=None, read_mask=read_mask, project=project, location=location, credentials=credentials, parent=parent) if order_by: - desc = "desc" in order_by - order_by = order_by.replace("desc", "") - order_by = order_by.split(",") - - li.sort( - key=lambda x: tuple(getattr(x, field.strip()) for field in order_by), - reverse=desc, - ) - + desc = 'desc' in order_by + order_by = order_by.replace('desc', '') + order_by = order_by.split(',') + li.sort(key=lambda x: tuple((getattr(x, field.strip()) for field in order_by)), reverse=desc) return li def _delete(self) -> None: """Deletes this Vertex AI resource. WARNING: This deletion is permanent.""" - _LOGGER.log_action_start_against_resource("Deleting", "", self) - possible_lro = getattr(self.api_client, self._delete_method)( - name=self.resource_name - ) - + _LOGGER.log_action_start_against_resource('Deleting', '', self) + possible_lro = getattr(self.api_client, self._delete_method)(name=self.resource_name) if possible_lro: - _LOGGER.log_action_completed_against_resource("deleted.", "", self) + _LOGGER.log_action_completed_against_resource('deleted.', '', self) _LOGGER.log_delete_with_lro(self, possible_lro) possible_lro.result() _LOGGER.log_delete_complete(self) - class VertexAiResourceNounWithFutureManager(_VertexAiResourceNounPlus, FutureManager): """Allows optional asynchronous calls to this Vertex AI Resource Nouns.""" - def __init__( - self, - project: Optional[str] = None, - location: Optional[str] = None, - credentials: Optional[auth_credentials.Credentials] = None, - resource_name: Optional[str] = None, - ): + @codeflash_capture(function_name='VertexAiResourceNounWithFutureManager.__init__', tmp_dir_path='/tmp/codeflash_isahfmnp/test_return_values', tests_root='/home/ubuntu/work/repo/tests', is_fto=False) + def __init__(self, project: Optional[str]=None, location: Optional[str]=None, credentials: Optional[auth_credentials.Credentials]=None, resource_name: Optional[str]=None): """Initializes class with project, location, and api_client. Args: @@ -1230,23 +844,11 @@ def __init__( resource noun. resource_name(str): A fully-qualified resource name or ID. """ - _VertexAiResourceNounPlus.__init__( - self, - project=project, - location=location, - credentials=credentials, - resource_name=resource_name, - ) + _VertexAiResourceNounPlus.__init__(self, project=project, location=location, credentials=credentials, resource_name=resource_name) FutureManager.__init__(self) @classmethod - def _empty_constructor( - cls, - project: Optional[str] = None, - location: Optional[str] = None, - credentials: Optional[auth_credentials.Credentials] = None, - resource_name: Optional[str] = None, - ) -> "VertexAiResourceNounWithFutureManager": + def _empty_constructor(cls, project: Optional[str]=None, location: Optional[str]=None, credentials: Optional[auth_credentials.Credentials]=None, resource_name: Optional[str]=None) -> 'VertexAiResourceNounWithFutureManager': """Initializes with all attributes set to None. The attributes should be populated after a future is complete. This allows @@ -1263,56 +865,29 @@ def _empty_constructor( An instance of this class with attributes set to None. """ self = cls.__new__(cls) - VertexAiResourceNoun.__init__( - self, - project=project, - location=location, - credentials=credentials, - resource_name=resource_name, - ) + VertexAiResourceNoun.__init__(self, project=project, location=location, credentials=credentials, resource_name=resource_name) FutureManager.__init__(self) self._gca_resource = None return self - def _sync_object_with_future_result( - self, result: "VertexAiResourceNounWithFutureManager" - ): + def _sync_object_with_future_result(self, result: 'VertexAiResourceNounWithFutureManager'): """Populates attributes from a Future result to this object. Args: result: VertexAiResourceNounWithFutureManager Required. Result of future with same type as this object. """ - sync_attributes = [ - "project", - "location", - "api_client", - "_gca_resource", - "credentials", - ] - optional_sync_attributes = [ - "_authorized_session", - "_raw_predict_request_url", - ] - + sync_attributes = ['project', 'location', 'api_client', '_gca_resource', 'credentials'] + optional_sync_attributes = ['_authorized_session', '_raw_predict_request_url'] for attribute in sync_attributes: setattr(self, attribute, getattr(result, attribute)) - for attribute in optional_sync_attributes: value = getattr(result, attribute, None) if value: setattr(self, attribute, value) @classmethod - def list( - cls, - filter: Optional[str] = None, - order_by: Optional[str] = None, - project: Optional[str] = None, - location: Optional[str] = None, - credentials: Optional[auth_credentials.Credentials] = None, - parent: Optional[str] = None, - ) -> List[VertexAiResourceNoun]: + def list(cls, filter: Optional[str]=None, order_by: Optional[str]=None, project: Optional[str]=None, location: Optional[str]=None, credentials: Optional[auth_credentials.Credentials]=None, parent: Optional[str]=None) -> List[VertexAiResourceNoun]: """List all instances of this Vertex AI Resource. Example Usage: @@ -1346,18 +921,10 @@ def list( Returns: List[VertexAiResourceNoun] - A list of SDK resource objects """ - - return cls._list( - filter=filter, - order_by=order_by, - project=project, - location=location, - credentials=credentials, - parent=parent, - ) + return cls._list(filter=filter, order_by=order_by, project=project, location=location, credentials=credentials, parent=parent) @optional_sync() - def delete(self, sync: bool = True) -> None: + def delete(self, sync: bool=True) -> None: """Deletes this Vertex AI resource. WARNING: This deletion is permanent. @@ -1372,7 +939,6 @@ def delete(self, sync: bool = True) -> None: def __repr__(self) -> str: if self._gca_resource and self._resource_is_available: return VertexAiResourceNoun.__repr__(self) - return FutureManager.__repr__(self) def _wait_for_resource_creation(self) -> None: @@ -1389,21 +955,12 @@ def _wait_for_resource_creation(self) -> None: Raises: RuntimeError: If the resource has not been scheduled to be created. """ - - # If the user calls this but didn't actually invoke an API to create - if self._are_futures_done() and not getattr(self._gca_resource, "name", None): + if self._are_futures_done() and (not getattr(self._gca_resource, 'name', None)): self._raise_future_exception() - raise RuntimeError( - f"{self.__class__.__name__} resource is not scheduled to be created." - ) - - while not getattr(self._gca_resource, "name", None): - # breaks out of loop if creation has failed async - if self._are_futures_done() and not getattr( - self._gca_resource, "name", None - ): + raise RuntimeError(f'{self.__class__.__name__} resource is not scheduled to be created.') + while not getattr(self._gca_resource, 'name', None): + if self._are_futures_done() and (not getattr(self._gca_resource, 'name', None)): self._raise_future_exception() - time.sleep(1) def _assert_gca_resource_is_available(self) -> None: @@ -1415,16 +972,8 @@ def _assert_gca_resource_is_available(self) -> None: Raises: RuntimeError: When resource has not been created. """ - if not getattr(self._gca_resource, "name", None): - raise RuntimeError( - f"{self.__class__.__name__} resource has not been created." - + ( - f" Resource failed with: {self._exception}" - if self._exception - else "" - ) - ) - + if not getattr(self._gca_resource, 'name', None): + raise RuntimeError(f'{self.__class__.__name__} resource has not been created.' + (f' Resource failed with: {self._exception}' if self._exception else '')) def get_annotation_class(annotation: type) -> type: """Helper method to retrieve type annotation. @@ -1432,12 +981,13 @@ def get_annotation_class(annotation: type) -> type: Args: annotation (type): Type hint """ - # typing.Optional - if getattr(annotation, "__origin__", None) is Union: + if getattr(annotation, '__origin__', None) is Union: return annotation.__args__[0] - return annotation +@lru_cache(maxsize=32) +def _get_parse_resource_name_method_for_cls(cls): + return getattr(cls.client_class.get_gapic_client_class(), cls._parse_resource_name_method) class DoneMixin(abc.ABC): """An abstract class for implementing a done method, indicating @@ -1450,7 +1000,6 @@ def done(self) -> bool: """Method indicating whether a job has completed.""" pass - class StatefulResource(DoneMixin): """Extends DoneMixin to check whether a job returning a stateful resource has compted.""" @@ -1475,10 +1024,8 @@ def done(self) -> bool: """ if self.state in self._valid_done_states: return True - return False - class VertexAiStatefulResource(VertexAiResourceNounWithFutureManager, StatefulResource): """Extends StatefulResource to include a check for self._gca_resource.""" @@ -1490,31 +1037,20 @@ def done(self) -> bool: """ if self._gca_resource and self._gca_resource.name: return super().done() - return False - - -# PreviewClass type variable -PreviewClass = TypeVar("PreviewClass", bound=VertexAiResourceNoun) - +PreviewClass = TypeVar('PreviewClass', bound=VertexAiResourceNoun) class PreviewMixin(abc.ABC): """An abstract class for adding preview functionality to certain classes. A child class that inherits from both this Mixin and another parent class allows the child class to introduce preview features. """ - _preview_class: Type[PreviewClass] - """Class that is currently in preview or has a preview feature. - Class must have `resource_name` and `credentials` attributes. - """ + 'Class that is currently in preview or has a preview feature.\n Class must have `resource_name` and `credentials` attributes.\n ' @property def preview(self) -> PreviewClass: """Exposes features available in preview for this class.""" - if not hasattr(self, "_preview_instance"): - self._preview_instance = self._preview_class( - self.resource_name, credentials=self.credentials - ) - + if not hasattr(self, '_preview_instance'): + self._preview_instance = self._preview_class(self.resource_name, credentials=self.credentials) return self._preview_instance diff --git a/google/cloud/aiplatform/utils/console_utils.py b/google/cloud/aiplatform/utils/console_utils.py index c108b0605e..39f3981136 100644 --- a/google/cloud/aiplatform/utils/console_utils.py +++ b/google/cloud/aiplatform/utils/console_utils.py @@ -31,6 +31,9 @@ def custom_job_tensorboard_console_uri( """Helper method to create console uri to tensorboard from custom job resource.""" # projects+40556267596+locations+us-central1+tensorboards+740208820004847616+experiments+2214368039829241856 fields = tensorboard.Tensorboard._parse_resource_name(tensorboard_resource_name) - experiment_resource_name = f"{tensorboard_resource_name}/experiments/{custom_job_resource_name.split('/')[-1]}" + custom_job_id = custom_job_resource_name.split("/")[-1] + experiment_resource_name = ( + f"{tensorboard_resource_name}/experiments/{custom_job_id}" + ) uri_experiment_resource_name = experiment_resource_name.replace("/", "+") return f"https://{fields['location']}.tensorboard.googleusercontent.com/experiment/{uri_experiment_resource_name}"