Skip to content

Commit

Permalink
feat: Add option to not include time_series_metrics in get_experiment…
Browse files Browse the repository at this point in the history
…_df call. This will improve execution time for Experiments with large number of runs.

PiperOrigin-RevId: 630185385
  • Loading branch information
vertex-sdk-bot authored and copybara-github committed May 2, 2024
1 parent 57a5f78 commit 78a95c5
Show file tree
Hide file tree
Showing 5 changed files with 119 additions and 13 deletions.
29 changes: 27 additions & 2 deletions google/cloud/aiplatform/metadata/experiment_resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
from google.cloud.aiplatform.tensorboard import tensorboard_resource

_LOGGER = base.Logger(__name__)
_HIGH_RUN_COUNT_THRESHOLD = 100 # Used in get_data_frame to make suggestion to user


@dataclass
Expand Down Expand Up @@ -403,13 +404,23 @@ def delete(self, *, delete_backing_tensorboard_runs: bool = False):
f"Experiment {self.name} metadata node not found. Skipping deletion."
)

def get_data_frame(self) -> "pd.DataFrame": # noqa: F821
def get_data_frame(
self, *, include_time_series: bool = True
) -> "pd.DataFrame": # noqa: F821
"""Get parameters, metrics, and time series metrics of all runs in this experiment as Dataframe.
```py
my_experiment = aiplatform.Experiment('my-experiment')
df = my_experiment.get_data_frame()
```
Args:
include_time_series (bool):
Optional. Whether or not to include time series metrics in df.
Default is True. Setting to False will largely improve execution
time and reduce quota contributing calls. Recommended when time
series metrics are not needed or number of runs in Experiment is
large. For time series metrics consider querying a specific run
using get_time_series_data_frame.
Returns:
pd.DataFrame: Pandas Dataframe of Experiment Runs.
Expand Down Expand Up @@ -448,17 +459,30 @@ def get_data_frame(self) -> "pd.DataFrame": # noqa: F821

executions = execution.Execution.list(filter_str, **service_request_args)

run_count = max([len(contexts), len(executions)])
if include_time_series and run_count > _HIGH_RUN_COUNT_THRESHOLD:
_LOGGER.warning(
f"Number of runs {run_count} is high. Consider setting "
f"include_time_series to False to improve execution performance"
)
if not include_time_series:
_LOGGER.warning(
"include_time_series is set to False. Time series metrics will"
" not be included in this call even if they exist."
)

rows = []
if contexts or executions:
with concurrent.futures.ThreadPoolExecutor(
max_workers=max([len(contexts), len(executions)])
max_workers=run_count
) as executor:
futures = [
executor.submit(
_SUPPORTED_LOGGABLE_RESOURCES[context.Context][
metadata_context.schema_title
]._query_experiment_row,
metadata_context,
include_time_series,
)
for metadata_context in contexts
]
Expand All @@ -470,6 +494,7 @@ def get_data_frame(self) -> "pd.DataFrame": # noqa: F821
metadata_execution.schema_title
]._query_experiment_row,
metadata_execution,
include_time_series,
)
for metadata_execution in executions
)
Expand Down
25 changes: 17 additions & 8 deletions google/cloud/aiplatform/metadata/experiment_run_resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -441,17 +441,18 @@ def _initialize_experiment_run(
self,
node: Union[context.Context, execution.Execution],
experiment: Optional[experiment_resources.Experiment] = None,
lookup_tensorboard_run: bool = True,
):
self._experiment = experiment
self._run_name = node.display_name
self._metadata_node = node
self._largest_step = None
self._backing_tensorboard_run = None
self._metadata_metric_artifact = None

if self._is_legacy_experiment_run():
self._metadata_metric_artifact = self._v1_get_metric_artifact()
self._backing_tensorboard_run = None
else:
self._metadata_metric_artifact = None
if not self._is_legacy_experiment_run() and lookup_tensorboard_run:
self._backing_tensorboard_run = self._lookup_tensorboard_run_artifact()

@classmethod
Expand Down Expand Up @@ -550,18 +551,25 @@ def _create_v1_experiment_run(

@classmethod
def _query_experiment_row(
cls, node: Union[context.Context, execution.Execution]
cls,
node: Union[context.Context, execution.Execution],
include_time_series: Optional[bool] = True,
) -> experiment_resources._ExperimentRow:
"""Retrieves the runs metric and parameters into an experiment run row.
Args:
node (Union[context._Context, execution.Execution]):
Required. Metadata node instance that represents this run.
include_time_series (bool):
Optional. Whether or not to include time series metrics in df.
Default is True.
Returns:
Experiment run row that represents this run.
"""
this_experiment_run = cls.__new__(cls)
this_experiment_run._initialize_experiment_run(node)
this_experiment_run._initialize_experiment_run(
node, lookup_tensorboard_run=include_time_series
)

row = experiment_resources._ExperimentRow(
experiment_run_type=node.schema_title,
Expand All @@ -571,9 +579,10 @@ def _query_experiment_row(
row.params = this_experiment_run.get_params()
row.metrics = this_experiment_run.get_metrics()
row.state = this_experiment_run.get_state()
row.time_series_metrics = (
this_experiment_run._get_latest_time_series_metric_columns()
)
if include_time_series:
row.time_series_metrics = (
this_experiment_run._get_latest_time_series_metric_columns()
)

return row

Expand Down
14 changes: 12 additions & 2 deletions google/cloud/aiplatform/metadata/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -764,7 +764,10 @@ def _validate_experiment_and_run(self, method_name: str):
)

def get_experiment_df(
self, experiment: Optional[str] = None
self,
experiment: Optional[str] = None,
*,
include_time_series: bool = True,
) -> "pd.DataFrame": # noqa: F821
"""Returns a Pandas DataFrame of the parameters and metrics associated with one experiment.
Expand Down Expand Up @@ -795,6 +798,13 @@ def get_experiment_df(
experiment (str):
Name of the Experiment to filter results. If not set, return results
of current active experiment.
include_time_series (bool):
Optional. Whether or not to include time series metrics in df.
Default is True. Setting to False will largely improve execution
time and reduce quota contributing calls. Recommended when time
series metrics are not needed or number of runs in Experiment is
large. For time series metrics consider querying a specific run
using get_time_series_data_frame.
Returns:
Pandas Dataframe of Experiment with metrics and parameters.
Expand All @@ -809,7 +819,7 @@ def get_experiment_df(
else:
experiment = experiment_resources.Experiment(experiment)

return experiment.get_data_frame()
return experiment.get_data_frame(include_time_series=include_time_series)

def log(
self,
Expand Down
5 changes: 4 additions & 1 deletion google/cloud/aiplatform/pipeline_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -887,7 +887,7 @@ def _get_context(self) -> context.Context:

@classmethod
def _query_experiment_row(
cls, node: context.Context
cls, node: context.Context, include_time_series: Optional[bool] = True
) -> experiment_resources._ExperimentRow:
"""Queries the PipelineJob metadata as an experiment run parameter and metric row.
Expand All @@ -898,6 +898,9 @@ def _query_experiment_row(
Args:
node (context._Context):
Required. System.PipelineRun context that represents a PipelineJob Run.
include_time_series (bool):
Optional. Whether or not to include time series metrics in df.
Default is True.
Returns:
Experiment run row representing this PipelineJob.
"""
Expand Down
59 changes: 59 additions & 0 deletions tests/system/aiplatform/test_experiments.py
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,65 @@ def test_get_experiments_df(self):
key=lambda d: d["run_name"],
) == sorted(df.fillna(0.0).to_dict("records"), key=lambda d: d["run_name"])

def test_get_experiments_df_include_time_series_false(self):
aiplatform.init(
project=e2e_base._PROJECT,
location=e2e_base._LOCATION,
experiment=self._experiment_name,
)

df = aiplatform.get_experiment_df(include_time_series=False)

pipelines_param_and_metrics = {
"param.dropout_rate": 0.2,
"param.learning_rate": 0.1,
"metric.accuracy": 0.8,
"metric.mse": 1.2,
}

true_df_dict_1 = {f"metric.{key}": value for key, value in _METRICS.items()}
for key, value in _PARAMS.items():
true_df_dict_1[f"param.{key}"] = value

true_df_dict_1["experiment_name"] = self._experiment_name
true_df_dict_1["run_name"] = _RUN
true_df_dict_1["state"] = aiplatform.gapic.Execution.State.COMPLETE.name
true_df_dict_1["run_type"] = aiplatform.metadata.constants.SYSTEM_EXPERIMENT_RUN

true_df_dict_2 = {f"metric.{key}": value for key, value in _METRICS_2.items()}
for key, value in _PARAMS_2.items():
true_df_dict_2[f"param.{key}"] = value

true_df_dict_2["experiment_name"] = self._experiment_name
true_df_dict_2["run_name"] = _RUN_2
true_df_dict_2["state"] = aiplatform.gapic.Execution.State.COMPLETE.name
true_df_dict_2["run_type"] = aiplatform.metadata.constants.SYSTEM_EXPERIMENT_RUN
true_df_dict_2.update(pipelines_param_and_metrics)

true_df_dict_3 = {
"experiment_name": self._experiment_name,
"run_name": self._pipeline_job_id,
"run_type": aiplatform.metadata.constants.SYSTEM_PIPELINE_RUN,
"state": aiplatform.gapic.Execution.State.COMPLETE.name,
}

true_df_dict_3.update(pipelines_param_and_metrics)

for key in pipelines_param_and_metrics.keys():
true_df_dict_1[key] = 0.0
true_df_dict_2[key] = 0.0

for key in _PARAMS.keys():
true_df_dict_3[f"param.{key}"] = 0.0

for key in _METRICS.keys():
true_df_dict_3[f"metric.{key}"] = 0.0

assert sorted(
[true_df_dict_1, true_df_dict_2, true_df_dict_3],
key=lambda d: d["run_name"],
) == sorted(df.fillna(0.0).to_dict("records"), key=lambda d: d["run_name"])

def test_delete_run_does_not_exist_raises_exception(self):
run = aiplatform.ExperimentRun(
run_name=_RUN,
Expand Down

0 comments on commit 78a95c5

Please sign in to comment.