Skip to content

Commit

Permalink
Summit update fixes (#86)
Browse files Browse the repository at this point in the history
* Turn jsrun into managed, change lsf env and orc

* Fix code block for Summit doc

* Move step_id creation for LSF

* Remove outdated comments

* Add LocalStep to LSFLauncher

* Update installation instructions for new modules
  • Loading branch information
al-rigazzi committed Oct 11, 2021
1 parent 572ff27 commit 266efb8
Show file tree
Hide file tree
Showing 10 changed files with 187 additions and 82 deletions.
17 changes: 7 additions & 10 deletions doc/installation.rst
Original file line number Diff line number Diff line change
Expand Up @@ -429,15 +429,11 @@ Note that SmartSim and SmartRedis will be downloaded to the working directory
from which these instructions are executed.

.. code-block:: bash
# setup Python and build environment
module load open-ce
conda create -p /ccs/home/$USER/.conda/envs/smartsim --clone open-ce-1.2.0-py38-0
conda activate smartsim
# fix broken cmake module if present
pip uninstall cmake
pip install cmake
conda install git-lfs make -y
git lfs install
module load gcc/9.3.0
module load cuda/11.4.0
module unload xalt
Expand All @@ -458,19 +454,20 @@ from which these instructions are executed.
pushd smartsim
pip install .
pip uninstall cmake
conda install cmake
conda install git-lfs
conda install make
# install PyTorch and TensorFlow backend for the Orchestrator database.
# pip-installed cmake won't use the correct CMAKE_PREFIX_PATH
pip uninstall cmake -y
conda install cmake -y
export Torch_DIR=/ccs/home/$USER/.conda/envs/smartsim/lib/python3.8/site-packages/torch/share/cmake/Torch/
export CFLAGS="$CFLAGS -I/ccs/home/$USER/.conda/envs/smartsim/lib/python3.8/site-packages/tensorflow/include"
export CFLAGS="$CFLAGS -I/ccs/home/$USER/.conda/envs/smarter/lib/python3.8/site-packages/tensorflow/include"
smart --device=gpu --torch_dir $Torch_DIR -v
When executing SmartSim, if you want to use the PyTorch backend in the orchestrator,
you will need to add the PyTorch library path to the environment with:

.. code-block:: bash
export LD_LIBRARY_PATH=/ccs/home/$USER/.conda/envs/smartsim/lib/python3.8/site-packages/torch/lib/:$LD_LIBRARY_PATH
Expand Down
2 changes: 1 addition & 1 deletion smartsim/database/lsfOrchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ def _build_run_settings(self, exe, exe_args, **kwargs):
old_host = host

erf_sets = {
"rank_count": "1",
"rank": str(shard_id),
"host": str(1 + host),
"cpu": "{" + f"{assigned_smts}:{self.cpus_per_shard}" + "}",
}
Expand Down
32 changes: 30 additions & 2 deletions smartsim/launcher/lsf/lsfCommands.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,37 @@ def bkill(args):
:param args: list of command arguments
:type args: list of str
:return: output and error
:rtype: str
:return: returncode, output and error
:rtype: (int, str, str)
"""
cmd = ["bkill"] + args
returncode, out, error = execute_cmd(cmd)
return returncode, out, error


def jskill(args):
"""Calls LSF jskill with args.
returncode is also supplied in this function.
:param args: list of command arguments
:type args: list of str
:return: returncode, output and error
:rtype: (int, str, str)
"""

cmd = ["jskill"] + args
returncode, out, error = execute_cmd(cmd)
return returncode, out, error

def jslist(args):
"""Calls LSF jslist with args
:param args: List of command arguments
:type args: List of str
:returns: Output and error of jslist
:rtype: (str, str)
"""
cmd = ["jslist"] + args
_, out, err = execute_cmd(cmd)
return out, err
87 changes: 46 additions & 41 deletions smartsim/launcher/lsf/lsfLauncher.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,18 @@
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

from smartsim.settings.settings import RunSettings
import time

import psutil

from ...constants import STATUS_CANCELLED, STATUS_COMPLETED
from ...error import LauncherError, SSConfigError
from ...settings import BsubBatchSettings, JsrunSettings, MpirunSettings
from ...utils import get_logger
from ..launcher import WLMLauncher
from ..step import BsubBatchStep, JsrunStep, MpirunStep
from ..stepInfo import LSFStepInfo
from .lsfCommands import bjobs, bkill
from .lsfParser import parse_bjobs_jobid, parse_bsub, parse_step_id_from_bjobs
from ..step import BsubBatchStep, JsrunStep, MpirunStep, LocalStep
from ..stepInfo import LSFBatchStepInfo, LSFJsrunStepInfo
from .lsfCommands import bjobs, bkill, jslist, jskill
from .lsfParser import parse_jslist_stepid, parse_bjobs_jobid, parse_bsub, parse_max_step_id_from_jslist

logger = get_logger(__name__)

Expand Down Expand Up @@ -78,6 +77,9 @@ def create_step(self, name, cwd, step_settings):
if isinstance(step_settings, MpirunSettings):
step = MpirunStep(name, cwd, step_settings)
return step
if isinstance(step_settings, RunSettings):
step = LocalStep(name, cwd, step_settings)
return step
raise TypeError(
f"RunSettings type {type(step_settings)} not supported by LSF"
)
Expand Down Expand Up @@ -107,24 +109,22 @@ def run(self, step):
if out:
step_id = parse_bsub(out)
logger.debug(f"Gleaned batch job id: {step_id} for {step.name}")
elif isinstance(step, MpirunStep):
elif isinstance(step, JsrunStep):
self.task_manager.start_task(cmd_list, step.cwd)
time.sleep(1)
step_id = self._get_lsf_step_id(step)
logger.debug(f"Gleaned jsrun step id: {step_id} for {step.name}")
else: # isinstance(step, MpirunStep) or isinstance(step, LocalStep)
out, err = step.get_output_files()
# mpirun doesn't direct output for us
# mpirun and local launch don't direct output for us
output = open(out, "w+")
error = open(err, "w+")
task_id = self.task_manager.start_task(
cmd_list, step.cwd, out=output, err=error
)
else:
task_id = self.task_manager.start_task(cmd_list, step.cwd)

# if batch submission did not successfully retrieve job ID
if not step_id and step.managed: # pragma: no cover
step_id = self._get_lsf_step_id(step)

self.step_mapping.add(step.name, step_id, task_id, step.managed)

time.sleep(5)

return step_id

def stop(self, step_name):
Expand All @@ -137,8 +137,11 @@ def stop(self, step_name):
"""
stepmap = self.step_mapping[step_name]
if stepmap.managed:
qdel_rc, _, err = bkill([str(stepmap.step_id)])
if qdel_rc != 0:
if "." in stepmap.step_id:
rc, _, err = jskill([stepmap.step_id.rpartition(".")[-1]])
else:
rc, _, err = bkill([str(stepmap.step_id)])
if rc != 0:
logger.warning(f"Unable to cancel job step {step_name}\n {err}")
if stepmap.task_id:
self.task_manager.remove_task(stepmap.task_id)
Expand All @@ -149,31 +152,24 @@ def stop(self, step_name):
step_info.status = STATUS_CANCELLED # set status to cancelled instead of failed
return step_info

# TODO: use jslist here if it is a JsrunStep
# otherwise, this is only reached in a very rare case where a batch
# job is submitted but no message is receieved
# We exclude this from coverage
def _get_lsf_step_id(self, step, interval=2, trials=5): # pragma: no cover
"""Get the step_id of a step from bjobs (rarely used)
def _get_lsf_step_id(self, step, interval=2, trials=5):
"""Get the step_id of last launched step from jslist
Parses bjobs output by looking for the step name
"""
time.sleep(interval)
step_id = "unassigned"
username = psutil.Process.username()
while trials > 0:
output, _ = bjobs(["-w", "-u", username])
step_id = parse_step_id_from_bjobs(output, step.name)
output, _ = jslist([])
step_id = parse_max_step_id_from_jslist(output)
if step_id:
break
else:
time.sleep(interval)
trials -= 1
if not step_id:
raise LauncherError("Could not find id of launched job step")
return step_id
return f"{step.alloc}.{step_id}"

# TODO: use jslist here if it is a JsrunStep
def _get_managed_step_update(self, step_ids):
"""Get step updates for WLM managed jobs
Expand All @@ -183,17 +179,26 @@ def _get_managed_step_update(self, step_ids):
:rtype: list[StepInfo]
"""
updates = []
# Include recently finished jobs
bjobs_args = ["-a"] + step_ids
bjobs_out, _ = bjobs(bjobs_args)
stats = [parse_bjobs_jobid(bjobs_out, str(step_id)) for step_id in step_ids]
# create LSFStepInfo objects to return

for stat, _ in zip(stats, step_ids):
info = LSFStepInfo(stat, None)
# account for case where job history is not logged by LSF
if info.status == STATUS_COMPLETED:
info.returncode = 0

for step_id in step_ids:

# Batch jobs have integer step id,
# Jsrun processes have {alloc}.{task_id}
# Include recently finished jobs
if "." in str(step_id):
jsrun_step_id = step_id.rpartition(".")[-1]
jslist_out, _ = jslist([])
stat, return_code = parse_jslist_stepid(jslist_out, jsrun_step_id)
info = LSFJsrunStepInfo(stat, return_code)
else:
bjobs_args = ["-a"] + step_ids
bjobs_out, _ = bjobs(bjobs_args)
stat = parse_bjobs_jobid(bjobs_out, str(step_id))
# create LSFBatchStepInfo objects to return
info = LSFBatchStepInfo(stat, None)
# account for case where job history is not logged by LSF
if info.status == STATUS_COMPLETED:
info.returncode = 0

updates.append(info)
return updates
Expand Down
46 changes: 39 additions & 7 deletions smartsim/launcher/lsf/lsfParser.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,29 @@ def parse_bsub_error(output):
return base_err


def parse_jslist_stepid(output, step_id):
"""Parse and return output of the jslist command run with
options to obrain step status
:param output: output of the bjobs command
:type output: str
:param job_id: allocation id or job step id
:type job_id: str
:return: status and return code
:rtype: (str, str)
"""
result = ("NOTFOUND", None)

for line in output.split("\n"):
if line.strip().startswith(step_id):
line = line.split()
stat = line[6]
return_code = line[5]
result = (stat, return_code)
break

return result

def parse_bjobs_jobid(output, job_id):
"""Parse and return output of the bjobs command run with options
to obtain job status.
Expand Down Expand Up @@ -113,8 +136,11 @@ def parse_bjobs_nodes(output):
return list(dict.fromkeys(nodes))


def parse_step_id_from_bjobs(output, step_name):
"""Parse and return the step id from a bjobs command without args
def parse_max_step_id_from_jslist(output):
"""Parse and return the maximum step id from a jslist command.
This function must be called immedietaly after a call to jsrun,
and before the next one, to ensure the id of the last spawned task is
properly returned
:param output: output bjobs
:type output: str
Expand All @@ -123,13 +149,19 @@ def parse_step_id_from_bjobs(output, step_name):
:return: the step_id
:rtype: str
"""
step_id = None
max_step_id = None

for line in output.split("\n"):
if line.startswith("="):
continue
fields = line.split()
if len(fields) >= 7:
if fields[7] == step_name:
step_id = fields[0]
return step_id
if fields[0].isdigit():
if (max_step_id is None) or (int(fields[0]) > max_step_id):
max_step_id = int(fields[0])

if max_step_id:
return str(max_step_id)
else:
return None

return ""
10 changes: 5 additions & 5 deletions smartsim/launcher/step/lsfStep.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ def __init__(self, name, cwd, run_settings):
super().__init__(name, cwd)
self.run_settings = run_settings
self.alloc = None
self.managed = False
self.managed = True
if not self.run_settings.in_batch:
self._set_alloc()

Expand Down Expand Up @@ -168,8 +168,8 @@ def get_launch_cmd(self):
]

if self.run_settings.env_vars:
env_var_str = self.run_settings.format_env_vars()
jsrun_cmd += [env_var_str]
env_var_str_list = self.run_settings.format_env_vars()
jsrun_cmd += env_var_str_list

jsrun_cmd += self.run_settings.format_run_args()
jsrun_cmd += self._build_exe()
Expand Down Expand Up @@ -233,9 +233,9 @@ def _make_mpmd(self):
distr_line = "launch_distribution : packed"

with open(erf_file, "w+") as f:
f.write(distr_line)
f.write(distr_line + "\n")
for line in preamble_lines:
f.write(line)
f.write(line + "\n")
f.write("\n")

# First we list the apps
Expand Down
Loading

0 comments on commit 266efb8

Please sign in to comment.