Skip to content

Commit

Permalink
Add driver status message when job fails
Browse files Browse the repository at this point in the history
This includes a log from the actual submit command and the output of the
job files; ie. append logging information from the LSF-out and LSF-err
files into the logging.
  • Loading branch information
xjules committed May 24, 2024
1 parent 54cc5d6 commit 0aeb4f1
Show file tree
Hide file tree
Showing 8 changed files with 127 additions and 2 deletions.
8 changes: 8 additions & 0 deletions src/ert/scheduler/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ class Driver(ABC):

def __init__(self, **kwargs: Dict[str, str]) -> None:
self._event_queue: Optional[asyncio.Queue[Event]] = None
# we will keep the error messages coming from the driver
self._job_error_message_by_iens: Dict[int, str] = {}

@property
def event_queue(self) -> asyncio.Queue[Event]:
Expand Down Expand Up @@ -61,6 +63,12 @@ async def poll(self) -> None:
async def finish(self) -> None:
"""make sure that all the jobs / realizations are complete."""

def read_stdout_and_stderr_files(
self, runpath: str, job_name: str, num_characters_to_read_from_end: int = 300
) -> str:
"""Each driver should provide some output in case of failure."""
return ""

@staticmethod
async def _execute_with_retry(
cmd_with_args: List[str],
Expand Down
7 changes: 7 additions & 0 deletions src/ert/scheduler/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,13 @@ async def _handle_failure(self) -> None:
f"\n\t{self._callback_status_msg}"
)

if msg := self.driver._job_error_message_by_iens.get(self.iens, ""):
error_msg += f"\nDriver reported: {msg}"

error_msg += self.driver.read_stdout_and_stderr_files(
self.real.run_arg.runpath, self.real.run_arg.job_name
)

self.real.run_arg.ensemble_storage.set_failure(
self.real.run_arg.iens, RealizationStorageState.LOAD_FAILURE, error_msg
)
Expand Down
4 changes: 3 additions & 1 deletion src/ert/scheduler/local_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,9 @@ async def _run(self, iens: int, executable: str, /, *args: str) -> None:
except FileNotFoundError as err:
# /bin/sh uses returncode 127 for FileNotFound, so copy that
# behaviour.
logger.error(f"Realization {iens} failed with {err}")
msg = f"Realization {iens} failed with {err}"
logger.error(msg)
self._job_error_message_by_iens[iens] = msg
await self._dispatch_finished_event(iens, 127)
return

Expand Down
26 changes: 25 additions & 1 deletion src/ert/scheduler/lsf_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,7 @@ async def submit(
retry_interval=self._sleep_time_between_cmd_retries,
)
if not process_success:
self._job_error_message_by_iens[iens] = process_message
raise RuntimeError(process_message)

match = re.search("Job <([0-9]+)> is submitted to .*queue", process_message)
Expand Down Expand Up @@ -300,7 +301,7 @@ async def kill(self, iens: int) -> None:
job_id,
]

process_success, process_message = await self._execute_with_retry(
_, process_message = await self._execute_with_retry(
bkill_with_args,
retry_codes=(FLAKY_SSH_RETURNCODE,),
retries=3,
Expand Down Expand Up @@ -524,3 +525,26 @@ def _build_resource_requirement_arg(self) -> List[str]:

async def finish(self) -> None:
pass

def read_stdout_and_stderr_files(
self, runpath: str, job_name: str, num_characters_to_read_from_end: int = 300
) -> str:
error_msg = ""
stderr_file = Path(runpath) / (job_name + ".LSF-stderr")
if msg := tail_textfile(stderr_file, num_characters_to_read_from_end):
error_msg += f"\n LSF-stderr:\n{msg}"
stdout_file = Path(runpath) / (job_name + ".LSF-stdout")
if msg := tail_textfile(stdout_file, num_characters_to_read_from_end):
error_msg += f"\n LSF-stdout:\n{msg}"
return error_msg


def tail_textfile(file_path: Path, num_chars: int) -> str:
if not file_path.exists():
return f"No output file {file_path}"
with open(file_path, encoding="utf-8") as file:
file.seek(0, 2)
file_end_position = file.tell()
seek_position = max(0, file_end_position - num_chars)
file.seek(seek_position)
return file.read()[-num_chars:]
1 change: 1 addition & 0 deletions src/ert/scheduler/openpbs_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ async def submit(
driverlogger=logger,
)
if not process_success:
self._job_error_message_by_iens[iens] = process_message
raise RuntimeError(process_message)

job_id_ = process_message
Expand Down
55 changes: 55 additions & 0 deletions tests/integration_tests/scheduler/test_lsf_driver.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import json
import logging
import os
import random
import stat
import string
from pathlib import Path

import pytest
Expand Down Expand Up @@ -68,6 +70,59 @@ async def test_lsf_dumps_stderr_to_file(tmp_path, job_name):
)


def generate_random_text(size):
letters = string.ascii_letters
return "".join(random.choice(letters) for i in range(size))


@pytest.mark.parametrize("tail_chars_to_read", [(5), (50), (500), (700)])
async def test_lsf_can_retrieve_stdout_and_stderr(
tmp_path, job_name, tail_chars_to_read
):
os.chdir(tmp_path)
driver = LsfDriver()
num_written_characters = 600
_out = generate_random_text(num_written_characters)
_err = generate_random_text(num_written_characters)
await driver.submit(0, "sh", "-c", f"echo {_out} && echo {_err} >&2", name=job_name)
await poll(driver, {0})
message = driver.read_stdout_and_stderr_files(
runpath=".",
job_name=job_name,
num_characters_to_read_from_end=tail_chars_to_read,
)

if tail_chars_to_read > num_written_characters:
assert f"LSF-stderr:\n{_err}" in message
# we get some extra echos after LSF-out
assert f"{_out}" in message
else:
assert f"LSF-stderr:\n{_err}" not in message
assert f"LSF-stderr:\n{_err[-tail_chars_to_read+1:]}" in message
assert f"LSF-stdout:\n{_out}" not in message
assert f"LSF-stdout:\n{_out[-tail_chars_to_read+1:]}" in message


async def test_lsf_cannot_retrieve_stdout_and_stderr(tmp_path, job_name):
os.chdir(tmp_path)
driver = LsfDriver()
num_written_characters = 600
_out = generate_random_text(num_written_characters)
_err = generate_random_text(num_written_characters)
await driver.submit(0, "sh", "-c", f"echo {_out} && echo {_err} >&2", name=job_name)
await poll(driver, {0})
# let's remove the output files
os.remove(job_name + ".LSF-stderr")
os.remove(job_name + ".LSF-stdout")
message = driver.read_stdout_and_stderr_files(
runpath=".",
job_name=job_name,
num_characters_to_read_from_end=1,
)
assert "LSF-err: No output file" in message
assert "LSF-out: No output file" in message


@pytest.mark.parametrize("explicit_runpath", [(True), (False)])
async def test_lsf_info_file_in_runpath(explicit_runpath, tmp_path, job_name):
os.chdir(tmp_path)
Expand Down
6 changes: 6 additions & 0 deletions tests/unit_tests/scheduler/test_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,12 @@ async def test_job_run_sends_expected_events(
lambda _: LoadResult(forward_model_ok_result, ""),
)
scheduler = create_scheduler()
monkeypatch.setattr(
scheduler.driver,
"read_stdout_and_stderr_files",
lambda *args: "",
)

scheduler.job.forward_model_ok = MagicMock()
scheduler.job.forward_model_ok.return_value = LoadResult(
forward_model_ok_result, ""
Expand Down
22 changes: 22 additions & 0 deletions tests/unit_tests/scheduler/test_lsf_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,28 @@ async def test_faulty_bsub(monkeypatch, tmp_path, bsub_script, expectation):
await driver.submit(0, "sleep")


async def test_faulty_bsub_produces_error_log(monkeypatch, tmp_path):
monkeypatch.chdir(tmp_path)
bin_path = Path("bin")
bin_path.mkdir()
monkeypatch.setenv("PATH", f"{bin_path}:{os.environ['PATH']}")

_out = "THIS_IS_OUTPUT"
_err = "THIS_IS_ERROR"
bsub_script = f"echo {_out} && echo {_err} >&2; exit 1"
bsub_path = bin_path / "bsub"
bsub_path.write_text(f"#!/bin/sh\n{bsub_script}")
bsub_path.chmod(bsub_path.stat().st_mode | stat.S_IEXEC)

driver = LsfDriver()
with pytest.raises(RuntimeError):
await driver.submit(0, "sleep")
assert (
"failed with exit code 1, output: {_out}, and error: {_err}"
in driver._job_error_message_by_iens[0]
)


@pytest.mark.timeout(10)
@pytest.mark.parametrize(
"mocked_iens2jobid, iens_to_kill, bkill_returncode, bkill_stdout, bkill_stderr, expected_logged_error",
Expand Down

0 comments on commit 0aeb4f1

Please sign in to comment.