Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Set MLFlowLogger status to FAILED when training raises an error #12292

Merged
merged 29 commits into from Sep 20, 2022
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
2bf6244
bugfix: update MLFlowLogger's status to be FAILED when trainig raises…
Mar 10, 2022
bee913f
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Mar 10, 2022
dbbfef8
disable finalize method if status == failed
Mar 15, 2022
f57dd5a
Merge branch 'bugfix/12291_mlflow' of github.com:ritsuki1227/pytorch-…
Mar 15, 2022
eaf467c
Merge branch 'master' into bugfix/12291_mlflow
awaelchli Mar 20, 2022
5860efa
Merge branch 'master' into bugfix/12291_mlflow
Borda Jun 21, 2022
bd7768b
Merge branch 'master' into bugfix/12291_mlflow
carmocca Jul 23, 2022
7a73573
close finalizer on tensorboard logger & refactor mlflow logger test
Aug 1, 2022
9f5a31e
Merge branch 'master' into bugfix/12291_mlflow
Aug 1, 2022
10e133a
wip: initialize self._version
Aug 31, 2022
156e12b
Merge branch 'master' into bugfix/12291_mlflow
Aug 31, 2022
846c5bb
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Aug 31, 2022
3c39f08
revert the tensorboard logger fix
Sep 11, 2022
48e64dd
bugfix: logger.finalize is called only when the logger is mlflow logg…
Sep 11, 2022
038b87f
Merge branch 'master' into bugfix/12291_mlflow
Sep 11, 2022
40e82a5
revert unnecessary fix
Sep 11, 2022
7510410
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 11, 2022
82935b7
bugfix: add patches
Sep 11, 2022
b3d1931
Merge branch 'bugfix/12291_mlflow' of github.com:ritsuki1227/pytorch-…
Sep 11, 2022
09097ef
Merge branch 'master' into bugfix/12291_mlflow
awaelchli Sep 17, 2022
d6438ed
handle mlflow finalize on main process
awaelchli Sep 17, 2022
e379ac0
improve the testing
awaelchli Sep 17, 2022
11a0cc6
adjust other loggers
awaelchli Sep 17, 2022
d923981
handle special tensorboard hparams file saving logic
awaelchli Sep 17, 2022
e1064c2
Merge branch 'master' into bugfix/12291_mlflow
awaelchli Sep 18, 2022
a93d7a4
finalize checkpoints in wandb only on success
awaelchli Sep 19, 2022
6c6c333
Merge branch 'master' into bugfix/12291_mlflow
awaelchli Sep 19, 2022
0b59285
remove unused import
awaelchli Sep 19, 2022
a6d5fd7
add changelog
awaelchli Sep 19, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/pytorch_lightning/loggers/comet.py
Expand Up @@ -346,6 +346,10 @@ def finalize(self, status: str) -> None:
This happens automatically in the :meth:`~CometLogger.experiment` property, when
``self._experiment`` is set to ``None``, i.e. ``self.reset_experiment()``.
"""
if self._experiment is None:
# When using multiprocessing, finalize() should be a no-op on the main process, as no experiment has been
# initialized there
return
self.experiment.end()
self.reset_experiment()

Expand Down
4 changes: 4 additions & 0 deletions src/pytorch_lightning/loggers/csv_logs.py
Expand Up @@ -208,6 +208,10 @@ def save(self) -> None:

@rank_zero_only
def finalize(self, status: str) -> None:
if self._experiment is None:
# When using multiprocessing, finalize() should be a no-op on the main process, as no experiment has been
# initialized there
return
self.save()

@property
Expand Down
10 changes: 7 additions & 3 deletions src/pytorch_lightning/loggers/mlflow.py
Expand Up @@ -254,9 +254,13 @@ def log_metrics(self, metrics: Mapping[str, float], step: Optional[int] = None)
self.experiment.log_metric(self.run_id, k, v, timestamp_ms, step)

@rank_zero_only
def finalize(self, status: str = "FINISHED") -> None:
super().finalize(status)
status = "FINISHED" if status == "success" else status
def finalize(self, status: str = "success") -> None:
if not self._initialized:
return
if status == "success":
status = "FINISHED"
elif status == "failed":
status = "FAILED"
if self.experiment.get_run(self.run_id):
self.experiment.set_terminated(self.run_id, status)

Expand Down
4 changes: 4 additions & 0 deletions src/pytorch_lightning/loggers/neptune.py
Expand Up @@ -430,6 +430,10 @@ def log_metrics(self, metrics: Dict[str, Union[Tensor, float]], step: Optional[i

@rank_zero_only
def finalize(self, status: str) -> None:
if not self._run_instance:
# When using multiprocessing, finalize() should be a no-op on the main process, as no experiment has been
# initialized there
return
if status:
self.run[self._construct_path_with_prefix("status")] = status

Expand Down
5 changes: 4 additions & 1 deletion src/pytorch_lightning/loggers/tensorboard.py
Expand Up @@ -271,7 +271,10 @@ def finalize(self, status: str) -> None:
if self._experiment is not None:
self.experiment.flush()
self.experiment.close()
self.save()

if status == "success":
# saving hparams happens independent of experiment manager
self.save()

@property
def name(self) -> str:
Expand Down
4 changes: 4 additions & 0 deletions src/pytorch_lightning/loggers/wandb.py
Expand Up @@ -552,6 +552,10 @@ def use_artifact(self, artifact: str, artifact_type: Optional[str] = None) -> "w

@rank_zero_only
def finalize(self, status: str) -> None:
if self._experiment is None:
rohitgr7 marked this conversation as resolved.
Show resolved Hide resolved
# When using multiprocessing, finalize() should be a no-op on the main process, as no experiment has been
# initialized there
return
# log checkpoints as artifacts
if self._checkpoint_callback:
self._scan_and_log_checkpoints(self._checkpoint_callback)
Expand Down
4 changes: 4 additions & 0 deletions src/pytorch_lightning/trainer/trainer.py
Expand Up @@ -644,12 +644,16 @@ def _call_and_handle_interrupt(self, trainer_fn: Callable, *args: Any, **kwargs:
if not self.interrupted:
self.state.status = TrainerStatus.INTERRUPTED
self._call_callback_hooks("on_exception", exception)
for logger in self.loggers:
logger.finalize("failed")
awaelchli marked this conversation as resolved.
Show resolved Hide resolved
except BaseException as exception:
self.state.status = TrainerStatus.INTERRUPTED
if distributed_available() and self.world_size > 1:
# try syncing remaining processes, kill otherwise
self.strategy.reconciliate_processes(traceback.format_exc())
self._call_callback_hooks("on_exception", exception)
for logger in self.loggers:
logger.finalize("failed")
awaelchli marked this conversation as resolved.
Show resolved Hide resolved
self._teardown()
# teardown might access the stage so we reset it after
self.state.stage = None
Expand Down
18 changes: 18 additions & 0 deletions tests/tests_pytorch/loggers/test_mlflow.py
Expand Up @@ -259,3 +259,21 @@ def test_mlflow_logger_experiment_calls(client, mlflow, time, tmpdir):
logger._mlflow_client.create_experiment.assert_called_once_with(
name="test", artifact_location="my_artifact_location"
)


@mock.patch("pytorch_lightning.loggers.mlflow.mlflow")
@mock.patch("pytorch_lightning.loggers.mlflow.MlflowClient")
def test_mlflow_logger_finalize_when_exception(*_):
logger = MLFlowLogger("test")

# Pretend we are on the main process and failing
assert logger._mlflow_client
assert not logger._initialized
logger.finalize("failed")
logger.experiment.set_terminated.assert_not_called()

# Pretend we are in a worker process and failing
_ = logger.experiment
assert logger._initialized
logger.finalize("failed")
logger.experiment.set_terminated.assert_called_once_with(logger.run_id, "FAILED")
19 changes: 18 additions & 1 deletion tests/tests_pytorch/trainer/test_trainer.py
Expand Up @@ -22,7 +22,7 @@
from pathlib import Path
from re import escape
from unittest import mock
from unittest.mock import ANY, call, patch
from unittest.mock import ANY, call, Mock, patch

import cloudpickle
import pytest
Expand Down Expand Up @@ -2195,3 +2195,20 @@ def test_trainer_save_checkpoint_no_model_attached():
assert trainer.model is None
with pytest.raises(AttributeError, match="Saving a checkpoint is only possible if a model is attached"):
trainer.save_checkpoint("checkpoint.ckpt")


def test_trainer_calls_logger_finalize_on_exception(tmpdir):
class CustomModel(BoringModel):
def on_fit_start(self):
super().on_fit_start()
raise Exception("logger-finalize")

model = CustomModel()
logger = TensorBoardLogger(save_dir=tmpdir)
logger.finalize = Mock()
trainer = Trainer(logger=logger)

with pytest.raises(Exception, match="logger-finalize"):
trainer.fit(model)

logger.finalize.assert_called_once_with("failed")