Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion src/lightning/app/core/work.py
Original file line number Diff line number Diff line change
Expand Up @@ -639,7 +639,10 @@ def _aggregate_status_timeout(self, statuses: List[Dict]) -> WorkStatus:
return WorkStatus(**status, count=len(timeout_statuses))

def on_exit(self):
"""Override this hook to add your logic when the work is exiting."""
"""Override this hook to add your logic when the work is exiting.

Note: This hook is not guaranteed to be called when running in the cloud.
"""
pass

def stop(self):
Expand Down
31 changes: 15 additions & 16 deletions tests/integrations_app/apps/idle_timeout/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from lightning.app import CloudCompute, LightningApp, LightningFlow, LightningWork
from lightning.app.storage.path import _artifacts_path, _filesystem
from lightning.app.utilities.enum import WorkStageStatus, WorkStopReasons
from lightning.app.utilities.enum import WorkStageStatus


class SourceFileWriterWork(LightningWork):
Expand Down Expand Up @@ -35,22 +35,21 @@ def run(self):
if self.work.counter == 0:
self.work.run()

elif (
self.work.status.stage == WorkStageStatus.STOPPED
and self.work.status.reason == WorkStopReasons.SIGTERM_SIGNAL_HANDLER
and self.make_check
):
succeeded_status = self.work.statuses[-3]
stopped_status_pending = self.work.statuses[-2]
stopped_status_sigterm = self.work.statuses[-1]
assert succeeded_status.stage == WorkStageStatus.SUCCEEDED
assert stopped_status_pending.stage == WorkStageStatus.STOPPED
assert stopped_status_pending.reason == WorkStopReasons.PENDING
assert stopped_status_sigterm.stage == WorkStageStatus.STOPPED
assert stopped_status_sigterm.reason == WorkStopReasons.SIGTERM_SIGNAL_HANDLER
elif self.work.status.stage == WorkStageStatus.STOPPED and self.make_check:
succeeded_statuses = [status for status in self.work.statuses if status.stage == WorkStageStatus.SUCCEEDED]
# Ensure the work succeeded at some point
assert len(succeeded_statuses) > 0
succeeded_status = succeeded_statuses[-1]

stopped_statuses = [status for status in self.work.statuses if status.stage == WorkStageStatus.STOPPED]

# We want to check that the work started shutting down withing the required timeframe, so we take the first
# status that has `stage == STOPPED`.
stopped_status = stopped_statuses[0]

# Note: Account for the controlplane, k8s, SIGTERM handler delays.
assert (stopped_status_pending.timestamp - succeeded_status.timestamp) < 20
assert (stopped_status_sigterm.timestamp - stopped_status_pending.timestamp) < 120
assert (stopped_status.timestamp - succeeded_status.timestamp) < 20

fs = _filesystem()
destination_path = _artifacts_path(self.work) / pathlib.Path(*self.work.path.resolve().parts[1:])
assert fs.exists(destination_path)
Expand Down