Skip to content

Commit

Permalink
Set MLFlowLogger status to FAILED when training raises an error (#1…
Browse files Browse the repository at this point in the history
…2292)

Co-authored-by: Ritsuki Yamada <ritsuki.yamada@uzabase.com>
Co-authored-by: Adrian Wälchli <aedu.waelchli@gmail.com>
Co-authored-by: Jirka <jirka.borovec@seznam.cz>
Co-authored-by: Carlos Mocholí <carlossmocholi@gmail.com>
  • Loading branch information
5 people committed Sep 20, 2022
1 parent c0ff7a1 commit 6855f65
Show file tree
Hide file tree
Showing 10 changed files with 71 additions and 6 deletions.
4 changes: 4 additions & 0 deletions src/pytorch_lightning/CHANGELOG.md
Expand Up @@ -77,6 +77,10 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/).
- Removed fall-back to `LightningEnvironment` when number of SLURM tasks does not correspond to number of processes in Trainer ([#14300](https://github.com/Lightning-AI/lightning/pull/14300))


- The `MLFlowLogger.finalize()` now sets the status to `FAILED` when an exception occurred in `Trainer`, and sets the status to `FINISHED` on successful completion ([#12292](https://github.com/Lightning-AI/lightning/pull/12292))



### Deprecated

- Deprecated `LightningDeepSpeedModule` ([#14000](https://github.com/Lightning-AI/lightning/pull/14000))
Expand Down
4 changes: 4 additions & 0 deletions src/pytorch_lightning/loggers/comet.py
Expand Up @@ -346,6 +346,10 @@ def finalize(self, status: str) -> None:
This happens automatically in the :meth:`~CometLogger.experiment` property, when
``self._experiment`` is set to ``None``, i.e. ``self.reset_experiment()``.
"""
if self._experiment is None:
# When using multiprocessing, finalize() should be a no-op on the main process, as no experiment has been
# initialized there
return
self.experiment.end()
self.reset_experiment()

Expand Down
4 changes: 4 additions & 0 deletions src/pytorch_lightning/loggers/csv_logs.py
Expand Up @@ -208,6 +208,10 @@ def save(self) -> None:

@rank_zero_only
def finalize(self, status: str) -> None:
if self._experiment is None:
# When using multiprocessing, finalize() should be a no-op on the main process, as no experiment has been
# initialized there
return
self.save()

@property
Expand Down
10 changes: 7 additions & 3 deletions src/pytorch_lightning/loggers/mlflow.py
Expand Up @@ -254,9 +254,13 @@ def log_metrics(self, metrics: Mapping[str, float], step: Optional[int] = None)
self.experiment.log_metric(self.run_id, k, v, timestamp_ms, step)

@rank_zero_only
def finalize(self, status: str = "FINISHED") -> None:
super().finalize(status)
status = "FINISHED" if status == "success" else status
def finalize(self, status: str = "success") -> None:
if not self._initialized:
return
if status == "success":
status = "FINISHED"
elif status == "failed":
status = "FAILED"
if self.experiment.get_run(self.run_id):
self.experiment.set_terminated(self.run_id, status)

Expand Down
4 changes: 4 additions & 0 deletions src/pytorch_lightning/loggers/neptune.py
Expand Up @@ -427,6 +427,10 @@ def log_metrics(self, metrics: Dict[str, Union[Tensor, float]], step: Optional[i

@rank_zero_only
def finalize(self, status: str) -> None:
if not self._run_instance:
# When using multiprocessing, finalize() should be a no-op on the main process, as no experiment has been
# initialized there
return
if status:
self.run[self._construct_path_with_prefix("status")] = status

Expand Down
5 changes: 4 additions & 1 deletion src/pytorch_lightning/loggers/tensorboard.py
Expand Up @@ -271,7 +271,10 @@ def finalize(self, status: str) -> None:
if self._experiment is not None:
self.experiment.flush()
self.experiment.close()
self.save()

if status == "success":
# saving hparams happens independent of experiment manager
self.save()

@property
def name(self) -> str:
Expand Down
5 changes: 4 additions & 1 deletion src/pytorch_lightning/loggers/wandb.py
Expand Up @@ -552,8 +552,11 @@ def use_artifact(self, artifact: str, artifact_type: Optional[str] = None) -> "w

@rank_zero_only
def finalize(self, status: str) -> None:
if status != "success":
# Currently, checkpoints only get logged on success
return
# log checkpoints as artifacts
if self._checkpoint_callback:
if self._checkpoint_callback and self._experiment is not None:
self._scan_and_log_checkpoints(self._checkpoint_callback)

def _scan_and_log_checkpoints(self, checkpoint_callback: "ReferenceType[Checkpoint]") -> None:
Expand Down
4 changes: 4 additions & 0 deletions src/pytorch_lightning/trainer/trainer.py
Expand Up @@ -644,12 +644,16 @@ def _call_and_handle_interrupt(self, trainer_fn: Callable, *args: Any, **kwargs:
if not self.interrupted:
self.state.status = TrainerStatus.INTERRUPTED
self._call_callback_hooks("on_exception", exception)
for logger in self.loggers:
logger.finalize("failed")
except BaseException as exception:
self.state.status = TrainerStatus.INTERRUPTED
if distributed_available() and self.world_size > 1:
# try syncing remaining processes, kill otherwise
self.strategy.reconciliate_processes(traceback.format_exc())
self._call_callback_hooks("on_exception", exception)
for logger in self.loggers:
logger.finalize("failed")
self._teardown()
# teardown might access the stage so we reset it after
self.state.stage = None
Expand Down
18 changes: 18 additions & 0 deletions tests/tests_pytorch/loggers/test_mlflow.py
Expand Up @@ -259,3 +259,21 @@ def test_mlflow_logger_experiment_calls(client, mlflow, time, tmpdir):
logger._mlflow_client.create_experiment.assert_called_once_with(
name="test", artifact_location="my_artifact_location"
)


@mock.patch("pytorch_lightning.loggers.mlflow.mlflow")
@mock.patch("pytorch_lightning.loggers.mlflow.MlflowClient")
def test_mlflow_logger_finalize_when_exception(*_):
logger = MLFlowLogger("test")

# Pretend we are on the main process and failing
assert logger._mlflow_client
assert not logger._initialized
logger.finalize("failed")
logger.experiment.set_terminated.assert_not_called()

# Pretend we are in a worker process and failing
_ = logger.experiment
assert logger._initialized
logger.finalize("failed")
logger.experiment.set_terminated.assert_called_once_with(logger.run_id, "FAILED")
19 changes: 18 additions & 1 deletion tests/tests_pytorch/trainer/test_trainer.py
Expand Up @@ -21,7 +21,7 @@
from copy import deepcopy
from pathlib import Path
from re import escape
from unittest.mock import ANY, call, patch
from unittest.mock import ANY, call, Mock, patch

import cloudpickle
import pytest
Expand Down Expand Up @@ -2191,3 +2191,20 @@ def test_trainer_save_checkpoint_no_model_attached():
assert trainer.model is None
with pytest.raises(AttributeError, match="Saving a checkpoint is only possible if a model is attached"):
trainer.save_checkpoint("checkpoint.ckpt")


def test_trainer_calls_logger_finalize_on_exception(tmpdir):
class CustomModel(BoringModel):
def on_fit_start(self):
super().on_fit_start()
raise Exception("logger-finalize")

model = CustomModel()
logger = TensorBoardLogger(save_dir=tmpdir)
logger.finalize = Mock()
trainer = Trainer(logger=logger)

with pytest.raises(Exception, match="logger-finalize"):
trainer.fit(model)

logger.finalize.assert_called_once_with("failed")

0 comments on commit 6855f65

Please sign in to comment.