Skip to content

Commit

Permalink
feat: Create Vertex Experiment when uploading Tensorboard logs
Browse files Browse the repository at this point in the history
PiperOrigin-RevId: 626105964
  • Loading branch information
vertex-sdk-bot authored and Copybara-Service committed Apr 26, 2024
1 parent 9809a3a commit e3e7c9e
Show file tree
Hide file tree
Showing 6 changed files with 389 additions and 169 deletions.
13 changes: 12 additions & 1 deletion google/cloud/aiplatform/metadata/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,8 @@ def set_experiment(
backing_tensorboard: Optional[
Union[str, tensorboard_resource.Tensorboard, bool]
] = None,
project: Optional[str] = None,
location: Optional[str] = None,
):
"""Set the experiment. Will retrieve the Experiment if it exists or create one with the provided name.
Expand All @@ -309,11 +311,20 @@ def set_experiment(
To disable using a backing tensorboard, set `backing_tensorboard` to `False`.
To maintain this behavior, set `experiment_tensorboard` to `False` in subsequent calls to aiplatform.init().
project (str):
Optional. Project where this experiment will be retrieved from or created. Overrides project set in
aiplatform.init.
location (str):
Optional. Location where this experiment will be retrieved from or created. Overrides location set in
aiplatform.init.
"""
self.reset()

experiment = experiment_resources.Experiment.get_or_create(
experiment_name=experiment, description=description
experiment_name=experiment,
description=description,
project=project,
location=location,
)

if backing_tensorboard and not isinstance(backing_tensorboard, bool):
Expand Down
3 changes: 3 additions & 0 deletions google/cloud/aiplatform/tensorboard/logdir_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,14 @@ def synchronize_runs(self):
In addition, any existing `DirectoryLoader` whose run directory
no longer exists will be deleted.
Modify run name to work with Experiments restrictions.
"""
logger.info("Starting logdir traversal of %s", self._logdir)
runs_seen = set()
for subdir in io_wrapper.GetLogdirSubdirectories(self._logdir):
run = os.path.relpath(subdir, self._logdir)
run = run.replace("/", "-").replace("_", "-")
runs_seen.add(run)
if run not in self._directory_loaders:
logger.info("- Adding run for relative directory %s", run)
Expand Down
67 changes: 31 additions & 36 deletions google/cloud/aiplatform/tensorboard/uploader.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,21 @@
from collections import defaultdict
import functools
import logging
import os
import re
import time
from typing import ContextManager, Dict, FrozenSet, Generator, Iterable, Optional, Tuple
import uuid

from google.api_core import exceptions
from google.cloud import storage
from google.cloud import aiplatform
from google.cloud.aiplatform import base
from google.cloud.aiplatform.compat.services import (
tensorboard_service_client,
)
from google.cloud.aiplatform.compat.types import tensorboard_data
from google.cloud.aiplatform.compat.types import tensorboard_experiment
from google.cloud.aiplatform.compat.types import tensorboard_service
from google.cloud.aiplatform.compat.types import tensorboard_time_series
from google.cloud.aiplatform.metadata import experiment_resources
from google.cloud.aiplatform.tensorboard import logdir_loader
from google.cloud.aiplatform.tensorboard import upload_tracker
from google.cloud.aiplatform.tensorboard import uploader_constants
Expand Down Expand Up @@ -215,47 +214,43 @@ def active_filter(secs):

self._create_additional_senders()

def _create_or_get_experiment(self) -> tensorboard_experiment.TensorboardExperiment:
"""Create an experiment or get an experiment.
Attempts to create an experiment. If the experiment already exists and
creation fails then the experiment will be retrieved.
def create_experiment(self):
"""Creates an Experiment for this upload session.
Returns:
The created or retrieved experiment.
Sets the tensorboard resource and experiment, which will get or create a
Vertex Experiment and associate it with a Tensorboard Experiment.
"""
logger.info("Creating experiment")

tb_experiment = tensorboard_experiment.TensorboardExperiment(
description=self._description, display_name=self._experiment_display_name
m = re.match(
"projects/(.*)/locations/(.*)/tensorboards/.*",
self._tensorboard_resource_name,
)
project = m[1]
location = m[2]

try:
experiment = self._api.create_tensorboard_experiment(
parent=self._tensorboard_resource_name,
tensorboard_experiment=tb_experiment,
tensorboard_experiment_id=self._experiment_name,
)
existing_experiment = experiment_resources.Experiment.get(
experiment_name=self._experiment_name,
project=project,
location=location,
)
if not existing_experiment:
self._is_brand_new_experiment = True
except exceptions.AlreadyExists:
logger.info("Creating experiment failed. Retrieving experiment.")
experiment_name = os.path.join(
self._tensorboard_resource_name, "experiments", self._experiment_name
)
experiment = self._api.get_tensorboard_experiment(name=experiment_name)
return experiment

def create_experiment(self):
"""Creates an Experiment for this upload session and returns the ID."""

experiment = self._create_or_get_experiment()
self._experiment = experiment
aiplatform.init(
project=project,
location=location,
experiment=self._experiment_name,
experiment_description=self._description,
experiment_tensorboard=self._tensorboard_resource_name,
)
self._tensorboard_experiment_resource_name = (
f"{self._tensorboard_resource_name}/experiments/{self._experiment_name}"
)
self._one_platform_resource_manager = uploader_utils.OnePlatformResourceManager(
self._experiment.name, self._api
self._tensorboard_experiment_resource_name, self._api
)

self._request_sender = _BatchedRequestSender(
self._experiment.name,
self._tensorboard_experiment_resource_name,
self._api,
allowed_plugins=self._allowed_plugins,
upload_limits=self._upload_limits,
Expand All @@ -271,7 +266,7 @@ def create_experiment(self):
# Update partials with experiment name
for sender in self._additional_senders.keys():
self._additional_senders[sender] = self._additional_senders[sender](
experiment_resource_name=self._experiment.name,
experiment_resource_name=self._tensorboard_experiment_resource_name,
)

self._dispatcher = _Dispatcher(
Expand Down Expand Up @@ -310,7 +305,7 @@ def _create_additional_senders(self) -> Dict[str, uploader_utils.RequestSender]:
)

def get_experiment_resource_name(self):
return self._experiment.name
return self._tensorboard_experiment_resource_name

def start_uploading(self):
"""Blocks forever to continuously upload data from the logdir.
Expand Down
84 changes: 30 additions & 54 deletions google/cloud/aiplatform/tensorboard/uploader_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,19 @@
import re
import time
from typing import Callable, Dict, Generator, List, Optional, Tuple
import uuid

from absl import app
from google.api_core import exceptions
from google.cloud import storage
from google.cloud.aiplatform.compat.services import (
tensorboard_service_client,
)
from google.cloud.aiplatform.compat.types import execution as gca_execution
from google.cloud.aiplatform.compat.types import tensorboard_run
from google.cloud.aiplatform.compat.types import tensorboard_service
from google.cloud.aiplatform.compat.types import tensorboard_time_series
from google.cloud.aiplatform.metadata import experiment_run_resource
from google.cloud.aiplatform.tensorboard import tensorboard_resource
import grpc

from tensorboard.util import tb_logging
Expand Down Expand Up @@ -102,39 +104,20 @@ def __init__(self, experiment_resource_name: str, api: TensorboardServiceClient)

def batch_create_runs(
self, run_names: List[str]
) -> List[tensorboard_run.TensorboardRun]:
) -> List[tensorboard_resource.TensorboardRun]:
"""Batch creates TensorboardRuns.
Args:
run_names: a list of run_names for creating the TensorboardRuns.
Returns:
the created TensorboardRuns
"""
batch_size = OnePlatformResourceManager.CREATE_RUN_BATCH_SIZE
created_runs = []
for i in range(0, len(run_names), batch_size):
one_batch_run_names = run_names[i : i + batch_size]
tb_run_requests = [
tensorboard_service.CreateTensorboardRunRequest(
parent=self._experiment_resource_name,
tensorboard_run=tensorboard_run.TensorboardRun(
display_name=run_name
),
tensorboard_run_id=str(uuid.uuid4()),
)
for run_name in one_batch_run_names
]

tb_runs = self._api.batch_create_tensorboard_runs(
parent=self._experiment_resource_name,
requests=tb_run_requests,
).tensorboard_runs

self._run_name_to_run_resource_name.update(
{run.display_name: run.name for run in tb_runs}
)

created_runs.extend(tb_runs)
for run_name in run_names:
tb_run = self._create_or_get_run_resource(run_name)
created_runs.append(tb_run)
if run_name not in self._run_name_to_run_resource_name:
self._run_name_to_run_resource_name[run_name] = tb_run.resource_name

return created_runs

Expand Down Expand Up @@ -207,13 +190,16 @@ def get_run_resource_name(self, run_name: str) -> str:
"""
if run_name not in self._run_name_to_run_resource_name:
tb_run = self._create_or_get_run_resource(run_name)
self._run_name_to_run_resource_name[run_name] = tb_run.name
self._run_name_to_run_resource_name[run_name] = tb_run.resource_name
return self._run_name_to_run_resource_name[run_name]

def _create_or_get_run_resource(
self, run_name: str
) -> tensorboard_run.TensorboardRun:
"""Creates a new run resource in current tensorboard experiment resource.
"""Creates new experiment run and tensorboard run resources.
The experiment run will be associated with the tensorboard run resource.
This will link all tensorboard run data to the associated experiment.
Args:
run_name (str):
Expand All @@ -224,36 +210,26 @@ def _create_or_get_run_resource(
The TensorboardRun given the run_name.
Raises:
ExistingResourceNotFoundError:
Run name could not be found in resource list.
exceptions.InvalidArgument:
ValueError:
run_name argument is invalid.
"""
tb_run = tensorboard_run.TensorboardRun()
tb_run.display_name = run_name
try:
tb_run = self._api.create_tensorboard_run(
parent=self._experiment_resource_name,
tensorboard_run=tb_run,
tensorboard_run_id=str(uuid.uuid4()),
m = re.match(
"projects/(.*)/locations/(.*)/tensorboards/(.*)/experiments/(.*)",
self._experiment_resource_name,
)
tensorboard = m[3]
experiment = m[4]
experiment_run = experiment_run_resource.ExperimentRun.get(run_name)
if not experiment_run:
experiment_run = experiment_run_resource.ExperimentRun.create(
run_name=run_name,
experiment=experiment,
tensorboard=tensorboard,
state=gca_execution.Execution.State.STATE_UNSPECIFIED,
)
except exceptions.InvalidArgument as e:
# If the run name already exists then retrieve it
if "already exist" in e.message:
runs_pages = self._api.list_tensorboard_runs(
parent=self._experiment_resource_name
)
for tb_run in runs_pages:
if tb_run.display_name == run_name:
break
tb_run_artifact = experiment_run._backing_tensorboard_run
tb_run = tb_run_artifact.resource

if tb_run.display_name != run_name:
raise ExistingResourceNotFoundError(
"Run with name %s already exists but is not resource list."
% run_name
)
else:
raise
return tb_run

def get_time_series_resource_name(
Expand Down
10 changes: 7 additions & 3 deletions tests/unit/aiplatform/test_logdir_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,8 @@ def test_multiple_writes_to_logdir(self):
writer.add_test_summary("tag_b")
with FileWriter(os.path.join(logdir, "b", "x")) as writer:
writer.add_test_summary("tag_b_x")
with FileWriter(os.path.join(logdir, "b_z")) as writer:
writer.add_test_summary("tag_b_z")
writer_c = FileWriter(os.path.join(logdir, "c"))
writer_c.add_test_summary("tag_c")
writer_c.flush()
Expand All @@ -199,14 +201,15 @@ def test_multiple_writes_to_logdir(self):
{
"a": ["tag_a"],
"b": ["tag_b"],
"b/x": ["tag_b_x"],
"b-x": ["tag_b_x"],
"b-z": ["tag_b_z"],
"c": ["tag_c"],
},
)
# A second load should indicate no new data.
self.assertEqual(
self._extract_run_to_tags(loader.get_run_events()),
{"a": [], "b": [], "b/x": [], "c": []},
{"a": [], "b": [], "b-x": [], "b-z": [], "c": []},
)
# Write some new data to both new and pre-existing event files.
with FileWriter(os.path.join(logdir, "a"), filename_suffix=".other") as writer:
Expand All @@ -225,7 +228,8 @@ def test_multiple_writes_to_logdir(self):
{
"a": ["tag_a_2", "tag_a_3", "tag_a_4"],
"b": [],
"b/x": ["tag_b_x_2"],
"b-x": ["tag_b_x_2"],
"b-z": [],
"c": ["tag_c_2"],
},
)
Expand Down

0 comments on commit e3e7c9e

Please sign in to comment.