Skip to content

Commit

Permalink
fix: Print error for schema classes
Browse files Browse the repository at this point in the history
PiperOrigin-RevId: 487277666
  • Loading branch information
jaycee-li authored and Copybara-Service committed Nov 9, 2022
1 parent fafb0e2 commit 13e2165
Show file tree
Hide file tree
Showing 4 changed files with 370 additions and 6 deletions.
57 changes: 54 additions & 3 deletions google/cloud/aiplatform/metadata/schema/base_artifact.py
Expand Up @@ -17,10 +17,9 @@

import abc

from typing import Optional, Dict
from typing import Any, Optional, Dict

from google.auth import credentials as auth_credentials

from google.cloud.aiplatform.compat.types import artifact as gca_artifact
from google.cloud.aiplatform.metadata import artifact
from google.cloud.aiplatform.constants import base as base_constants
Expand Down Expand Up @@ -58,7 +57,7 @@ def __init__(
various structure and field requirements for the metadata field.
Args:
resource_id (str):
artifact_id (str):
Optional. The <resource_id> portion of the Artifact name with
the following format, this is globally unique in a metadataStore:
projects/123/locations/us-central1/metadataStores/<metadata_store_id>/artifacts/<resource_id>.
Expand All @@ -82,6 +81,8 @@ def __init__(
Pipelines), and the system does not prescribe or
check the validity of state transitions.
"""
# initialize the exception to resolve the FutureManager exception.
self._exception = None
# resource_id is not stored in the proto. Create method uses the
# resource_id along with project_id and location to construct an
# resource_name which is stored in the proto message.
Expand Down Expand Up @@ -178,3 +179,53 @@ def create(
# Reinstantiate this class using the newly created resource.
self._init_with_resource_name(artifact_name=new_artifact_instance.resource_name)
return self

def sync_resource(self):
"""Syncs local resource with the resource in metadata store.
Raises:
RuntimeError: if the artifact resource hasn't been created.
"""
if self._gca_resource.name:
super().sync_resource()
else:
raise RuntimeError(
f"{self.__class__.__name__} resource has not been created."
)

def update(
self,
metadata: Optional[Dict[str, Any]] = None,
description: Optional[str] = None,
credentials: Optional[auth_credentials.Credentials] = None,
):
"""Updates an existing Artifact resource with new metadata.
Args:
metadata (Dict):
Optional. metadata contains the updated metadata information.
description (str):
Optional. Description describes the resource to be updated.
credentials (auth_credentials.Credentials):
Custom credentials to use to update this resource. Overrides
credentials set in aiplatform.init.
Raises:
RuntimeError: if the artifact resource hasn't been created.
"""
if self._gca_resource.name:
super().update(
metadata=metadata,
description=description,
credentials=credentials,
)
else:
raise RuntimeError(
f"{self.__class__.__name__} resource has not been created."
)

def __repr__(self) -> str:
if self._gca_resource.name:
return super().__repr__()
else:
return f"{object.__repr__(self)}\nschema_title: {self.schema_title}"
74 changes: 73 additions & 1 deletion google/cloud/aiplatform/metadata/schema/base_context.py
Expand Up @@ -17,11 +17,14 @@

import abc

from typing import Optional, Dict
from typing import Dict, List, Optional, Sequence

from google.auth import credentials as auth_credentials

from google.cloud.aiplatform.compat.types import context as gca_context
from google.cloud.aiplatform.compat.types import (
lineage_subgraph as gca_lineage_subgraph,
)
from google.cloud.aiplatform.constants import base as base_constants
from google.cloud.aiplatform.metadata import constants
from google.cloud.aiplatform.metadata import context
Expand Down Expand Up @@ -64,6 +67,8 @@ def __init__(
description (str):
Optional. Describes the purpose of the Context to be created.
"""
# initialize the exception to resolve the FutureManager exception.
self._exception = None
# resource_id is not stored in the proto. Create method uses the
# resource_id along with project_id and location to construct an
# resource_name which is stored in the proto message.
Expand Down Expand Up @@ -154,3 +159,70 @@ def create(
# Reinstantiate this class using the newly created resource.
self._init_with_resource_name(context_name=new_context.resource_name)
return self

def add_artifacts_and_executions(
self,
artifact_resource_names: Optional[Sequence[str]] = None,
execution_resource_names: Optional[Sequence[str]] = None,
):
"""Associate Executions and attribute Artifacts to a given Context.
Args:
artifact_resource_names (Sequence[str]):
Optional. The full resource name of Artifacts to attribute to
the Context.
execution_resource_names (Sequence[str]):
Optional. The full resource name of Executions to associate with
the Context.
Raises:
RuntimeError: if Context resource hasn't been created.
"""
if self._gca_resource.name:
super().add_artifacts_and_executions(
artifact_resource_names=artifact_resource_names,
execution_resource_names=execution_resource_names,
)
else:
raise RuntimeError(
f"{self.__class__.__name__} resource has not been created."
)

def add_context_children(self, contexts: List[context.Context]):
"""Adds the provided contexts as children of this context.
Args:
contexts (List[_Context]): Contexts to add as children.
Raises:
RuntimeError: if Context resource hasn't been created.
"""
if self._gca_resource.name:
super().add_context_children(contexts)
else:
raise RuntimeError(
f"{self.__class__.__name__} resource has not been created."
)

def query_lineage_subgraph(self) -> gca_lineage_subgraph.LineageSubgraph:
"""Queries lineage subgraph of this context.
Returns:
lineage subgraph(gca_lineage_subgraph.LineageSubgraph):
Lineage subgraph of this Context.
Raises:
RuntimeError: if Context resource hasn't been created.
"""
if self._gca_resource.name:
return super().query_lineage_subgraph()
else:
raise RuntimeError(
f"{self.__class__.__name__} resource has not been created."
)

def __repr__(self) -> str:
if self._gca_resource.name:
return super().__repr__()
else:
return f"{object.__repr__(self)}\nschema_title: {self.schema_title}"
114 changes: 112 additions & 2 deletions google/cloud/aiplatform/metadata/schema/base_execution.py
Expand Up @@ -17,12 +17,14 @@

import abc

from typing import Optional, Dict
from typing import Any, Dict, List, Optional, Union

from google.auth import credentials as auth_credentials

from google.cloud.aiplatform import models
from google.cloud.aiplatform.compat.types import execution as gca_execution
from google.cloud.aiplatform.constants import base as base_constants
from google.cloud.aiplatform.metadata import artifact
from google.cloud.aiplatform.metadata import constants
from google.cloud.aiplatform.metadata import execution
from google.cloud.aiplatform.metadata import metadata
Expand Down Expand Up @@ -70,7 +72,8 @@ def __init__(
description (str):
Optional. Describes the purpose of the Execution to be created.
"""

# initialize the exception to resolve the FutureManager exception.
self._exception = None
# resource_id is not stored in the proto. Create method uses the
# resource_id along with project_id and location to construct an
# resource_name which is stored in the proto message.
Expand Down Expand Up @@ -249,3 +252,110 @@ def start_execution(
execution_name=new_execution_instance.resource_name
)
return self

def assign_input_artifacts(
self, artifacts: List[Union[artifact.Artifact, models.Model]]
):
"""Assigns Artifacts as inputs to this Executions.
Args:
artifacts (List[Union[artifact.Artifact, models.Model]]):
Required. Artifacts to assign as input.
Raises:
RuntimeError: if Execution resource hasn't been created.
"""
if self._gca_resource.name:
super().assign_input_artifacts(artifacts)
else:
raise RuntimeError(
f"{self.__class__.__name__} resource has not been created."
)

def assign_output_artifacts(
self, artifacts: List[Union[artifact.Artifact, models.Model]]
):
"""Assigns Artifacts as outputs to this Executions.
Args:
artifacts (List[Union[artifact.Artifact, models.Model]]):
Required. Artifacts to assign as input.
Raises:
RuntimeError: if Execution resource hasn't been created.
"""
if self._gca_resource.name:
super().assign_output_artifacts(artifacts)
else:
raise RuntimeError(
f"{self.__class__.__name__} resource has not been created."
)

def get_input_artifacts(self) -> List[artifact.Artifact]:
"""Get the input Artifacts of this Execution.
Returns:
List of input Artifacts.
Raises:
RuntimeError: if Execution resource hasn't been created.
"""
if self._gca_resource.name:
return super().get_input_artifacts()
else:
raise RuntimeError(
f"{self.__class__.__name__} resource has not been created."
)

def get_output_artifacts(self) -> List[artifact.Artifact]:
"""Get the output Artifacts of this Execution.
Returns:
List of output Artifacts.
Raises:
RuntimeError: if Execution resource hasn't been created.
"""
if self._gca_resource.name:
return super().get_output_artifacts()
else:
raise RuntimeError(
f"{self.__class__.__name__} resource has not been created."
)

def update(
self,
state: Optional[gca_execution.Execution.State] = None,
description: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None,
):
"""Update this Execution.
Args:
state (gca_execution.Execution.State):
Optional. State of this Execution.
description (str):
Optional. Describes the purpose of the Execution to be created.
metadata (Dict[str, Any):
Optional. Contains the metadata information that will be stored
in the Execution.
Raises:
RuntimeError: if Execution resource hasn't been created.
"""
if self._gca_resource.name:
super().update(
state=state,
description=description,
metadata=metadata,
)
else:
raise RuntimeError(
f"{self.__class__.__name__} resource has not been created."
)

def __repr__(self) -> str:
if self._gca_resource.name:
return super().__repr__()
else:
return f"{object.__repr__(self)}\nschema_title: {self.schema_title}"

0 comments on commit 13e2165

Please sign in to comment.