Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -297,3 +297,30 @@ $ gridtk list --truncate # --truncate or -t
-------- -------- ------- ------- ---------- ---------------- -------- -------------
1 506994 hc.. COMPL.. gridtk logs/gridtk.50.. gridtk subm..
```

For machine-readable output (useful for scripting and AI agents), use `--json`:
```bash
$ gridtk list --json
[
{
"job_id": 1,
"slurm_id": 506994,
"nodes": "hcne01",
"state": "COMPLETED",
"exit_code": "0",
"name": "gridtk",
"output": "logs/gridtk.506994.out",
"dependencies": [],
"command": "gridtk submit job.sh"
}
]
```

The `--json` flag is also available on `submit` and `report`:
```bash
$ gridtk submit --json job.sh
{"job_id": 1, "slurm_id": 506994, "name": "gridtk"}

$ gridtk report --json -j 1
[{"job_id": 1, "name": "gridtk", "state": "COMPLETED", ...}]
```
161 changes: 154 additions & 7 deletions src/gridtk/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#
# SPDX-License-Identifier: GPL-3.0-or-later

import json
import pydoc
import shutil
import tempfile
Expand Down Expand Up @@ -94,6 +95,21 @@ def states_callback(ctx, param, value):
return parse_states(value)


def no_jobs_message(
action: str,
*,
default_states: bool = False,
) -> str:
"""Build a helpful message when no jobs match the given filters."""
msg = f"No jobs were {action}."
if default_states:
msg += (
" Note: the default state filter is active."
" Use --state all to include all jobs."
)
return msg


def job_filters(f_py=None, default_states=None):
"""Filter jobs based on the provided function and default states."""
assert callable(f_py) or f_py is None
Expand Down Expand Up @@ -315,6 +331,13 @@ def process_result(result, **kwargs):
@click.option("--wait-all-nodes", hidden=True)
@click.option("--wckey", hidden=True)
@click.option("--wrap", hidden=True)
@click.option(
"--json",
"output_json",
is_flag=True,
default=False,
help="Output in JSON format",
)
@click.argument("script", nargs=-1, type=click.UNPROCESSED)
@click.pass_context
def submit(
Expand All @@ -323,6 +346,7 @@ def submit(
array: str,
dependencies: str,
repeat: int,
output_json: bool,
script: str,
**kwargs,
):
Expand Down Expand Up @@ -362,7 +386,18 @@ def submit(
array=array,
dependencies=dependencies,
)
click.echo(job.id)
if output_json:
click.echo(
json.dumps(
{
"job_id": job.id,
"slurm_id": job.grid_id,
"name": job.name,
}
)
)
else:
click.echo(job.id)
deps = (dependencies or "").split(",")
deps[-1] = f"{deps[-1]}:{job.id}" if deps[-1] else str(job.id)
dependencies = ",".join(deps)
Expand All @@ -389,10 +424,10 @@ def resubmit(
)
if not jobs:
click.echo(
"No jobs were resubmitted. Note that the default state "
"filtering may have excluded some jobs. If you want to "
"resubmit all jobs, please use the option: "
"gridtk resubmit --state all"
no_jobs_message(
"resubmitted",
default_states=True,
)
)
for job in jobs:
click.echo(f"Resubmitted job {job.id}")
Expand All @@ -415,6 +450,13 @@ def resubmit(
default=False,
help="Truncate the output to the terminal width",
)
@click.option(
"--json",
"output_json",
is_flag=True,
default=False,
help="Output in JSON format",
)
@click.pass_context
def list_jobs(
ctx: click.Context,
Expand All @@ -424,12 +466,16 @@ def list_jobs(
dependents: bool,
wrap: bool,
truncate: bool,
output_json: bool,
):
"""List jobs in the queue, similar to sacct and squeue."""
from tabulate import tabulate

from .manager import JobManager

if output_json and (wrap or truncate):
raise click.UsageError(
"--json is mutually exclusive with --wrap and --truncate"
)

def truncate_str(content: str, max_width: int) -> str:
if len(content) > max_width:
return content[: max_width - 3] + ".."
Expand All @@ -440,6 +486,34 @@ def truncate_str(content: str, max_width: int) -> str:
jobs = job_manager.list_jobs(
job_ids=job_ids, states=states, names=names, dependents=dependents
)

if output_json:
jobs_list = []
for job in jobs:
output = job.output_files[0].resolve()
try:
output = output.relative_to(Path.cwd().resolve())
except ValueError:
pass
jobs_list.append(
{
"job_id": job.id,
"slurm_id": job.grid_id,
"nodes": job.nodes,
"state": job.state,
"exit_code": job.exit_code,
"name": job.name,
"output": str(output),
"dependencies": [dep_job for dep_job in job.dependencies_ids],
"command": "gridtk submit " + " ".join(job.command),
}
)
click.echo(json.dumps(jobs_list, indent=2))
session.commit()
return

from tabulate import tabulate

table: dict[str, list[str]] = defaultdict(list)
for job in jobs:
table["job-id"].append(job.id)
Expand Down Expand Up @@ -507,6 +581,8 @@ def truncate_str(content: str, max_width: int) -> str:
maxheadercolwidths=maxcolwidths,
)
)
else:
click.echo(no_jobs_message("found"))
session.commit()


Expand All @@ -528,6 +604,8 @@ def stop(
jobs = job_manager.stop_jobs(
job_ids=job_ids, states=states, names=names, dependents=dependents
)
if not jobs:
click.echo(no_jobs_message("stopped"))
for job in jobs:
click.echo(f"Stopped job {job.id} with slurm id {job.grid_id}")
session.commit()
Expand All @@ -551,6 +629,8 @@ def delete(
jobs = job_manager.delete_jobs(
job_ids=job_ids, states=states, names=names, dependents=dependents
)
if not jobs:
click.echo(no_jobs_message("deleted"))
for job in jobs:
click.echo(f"Deleted job {job.id} with slurm id {job.grid_id}")
session.commit()
Expand All @@ -564,6 +644,13 @@ def delete(
"array_idx",
help="Array index to see the logs for only one item of an array job.",
)
@click.option(
"--json",
"output_json",
is_flag=True,
default=False,
help="Output in JSON format",
)
@click.pass_context
def report(
ctx: click.Context,
Expand All @@ -572,6 +659,7 @@ def report(
names: list[str],
dependents: bool,
array_idx: Optional[str],
output_json: bool,
):
"""Report on jobs in the queue."""
from .manager import JobManager
Expand All @@ -581,6 +669,65 @@ def report(
jobs = job_manager.list_jobs(
job_ids=job_ids, states=states, names=names, dependents=dependents
)
if not jobs:
if output_json:
click.echo(json.dumps([]))
else:
click.echo(no_jobs_message("found"))
session.commit()
return

if output_json:
report_list = []
for job in jobs:
with tempfile.NamedTemporaryFile(mode="w+t", suffix=".sh") as tmpfile:
command = job.submitted_command(tmpfile, session=session)
output_files_list = []
files = job.output_files
error_files = job.error_files
if array_idx is not None:
real_array_idx = job.array_task_ids.index(int(array_idx))
files = files[real_array_idx : real_array_idx + 1]
error_files = error_files[real_array_idx : real_array_idx + 1]
for out_file, err_file in zip(files, error_files):
if out_file.exists():
output_files_list.append(
{
"path": str(out_file),
"content": out_file.open().read(),
}
)
else:
output_files_list.append(
{"path": str(out_file), "content": None}
)
if err_file != out_file:
if err_file.exists():
output_files_list.append(
{
"path": str(err_file),
"content": err_file.open().read(),
}
)
else:
output_files_list.append(
{"path": str(err_file), "content": None}
)
report_list.append(
{
"job_id": job.id,
"name": job.name,
"state": job.state,
"exit_code": job.exit_code,
"nodes": job.nodes,
"command": command,
"output_files": output_files_list,
}
)
click.echo(json.dumps(report_list, indent=2))
session.commit()
return

for job in jobs:
report_text = ""
report_text += f"Job ID: {job.id}\n"
Expand Down
45 changes: 42 additions & 3 deletions tests/test_gridtk.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ def test_list_jobs(mock_check_output, runner):
# test when there are no jobs
result = runner.invoke(cli, ["list"])
assert_click_runner_result(result)
assert result.output == ""
assert "No jobs were found." in result.output

# test when there are jobs
submit_job_id = 9876543
Expand Down Expand Up @@ -496,8 +496,9 @@ def test_resubmit_no_jobs(mock_check_output, runner):
)
result = runner.invoke(cli, ["resubmit"])
assert_click_runner_result(result)
assert "No jobs were resubmitted" in result.output
assert "gridtk resubmit --state all" in result.output
assert "No jobs were resubmitted." in result.output
assert "default state filter" in result.output
assert "--state all" in result.output


@patch("subprocess.check_output")
Expand Down Expand Up @@ -693,6 +694,44 @@ def test_submit_with_dependencies(mock_check_output, runner):
)


@patch("subprocess.check_output")
def test_list_json(mock_check_output, runner):
with runner.isolated_filesystem():
submit_job_id = 9876543
_submit_job(
runner=runner, mock_check_output=mock_check_output, job_id=submit_job_id
)

mock_check_output.return_value = _pending_job_sacct_json(submit_job_id)
result = runner.invoke(cli, ["list", "--json"])
assert_click_runner_result(result)
data = json.loads(result.output)
assert isinstance(data, list)
assert len(data) == 1
job = data[0]
assert job["job_id"] == 1
assert job["slurm_id"] == submit_job_id
assert job["name"] == "gridtk"
assert job["state"] == "PENDING"
assert str(job["exit_code"]) == "0"
assert job["nodes"] == "Unassigned"
assert "dependencies" in job
assert "command" in job
assert "output" in job


@patch("subprocess.check_output")
def test_submit_json(mock_check_output, runner):
mock_check_output.return_value = _sbatch_output(123456789)
with runner.isolated_filesystem():
result = runner.invoke(cli, ["submit", "--json", "--wrap=hostname"])
assert_click_runner_result(result)
data = json.loads(result.output)
assert data["job_id"] == 1
assert data["slurm_id"] == 123456789
assert data["name"] == "gridtk"


if __name__ == "__main__":
import sys

Expand Down