Skip to content

Commit

Permalink
Merge pull request #5732 from grondo/shell-output-limit-relax
Browse files Browse the repository at this point in the history
shell: make kvs output limit configurable and default single-user jobs to unlimited
  • Loading branch information
mergify[bot] committed Feb 13, 2024
2 parents cc54bb9 + 74b4faa commit e71f17b
Show file tree
Hide file tree
Showing 10 changed files with 124 additions and 18 deletions.
5 changes: 5 additions & 0 deletions doc/man1/common/job-shell-options.rst
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,8 @@

* - :option:`hwloc.xmlfile`
- Write hwloc XML gathered by job to a file and set ``HWLOC_XMLFILE``

* - :option:`output.limit`
- Set KVS output limit to SIZE bytes, where SIZE may be a floating point
value including optional SI units: k, K, M, G. This value is ignored
if output is directed to a file with :option:`--output`.
8 changes: 8 additions & 0 deletions doc/man1/flux-shell.rst
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,14 @@ plugins include:
this option directly, as it will be set automatically by options
of higher level commands such as :man1:`flux-submit`.

.. option:: output.limit=SIZE

Truncate KVS output after SIZE bytes have been written. SIZE may
be a floating point value with optional SI units k, K, M, G. A value of
0 is considered unlimited. The default KVS output limit is 10M for jobs
in a multi-user instance or unlimited for single-user instance jobs.
This value is ignored if output is directed to a file.

.. option:: output.{stdout,stderr}.path=PATH

Set job stderr/out file output to PATH.
Expand Down
1 change: 1 addition & 0 deletions doc/man3/flux_shell_get_info.rst
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ with the following layout:
::

"jobid":I,
"instance_owner":i,
"rank":i,
"size":i,
"ntasks";i,
Expand Down
1 change: 1 addition & 0 deletions src/shell/internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
struct flux_shell {
flux_jobid_t jobid;
int broker_rank;
uid_t broker_owner;
char hostname [MAXHOSTNAMELEN + 1];
int protocol_fd[2];

Expand Down
66 changes: 60 additions & 6 deletions src/shell/output.c
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
#include "src/common/libeventlog/eventlog.h"
#include "src/common/libeventlog/eventlogger.h"
#include "src/common/libioencode/ioencode.h"
#include "src/common/libutil/parse_size.h"
#include "ccan/str/str.h"

#include "task.h"
Expand All @@ -61,8 +62,7 @@
#include "builtins.h"
#include "log.h"

#define OUTPUT_LIMIT_BYTES 1024*1024*10
#define OUTPUT_LIMIT_STRING "10MB"
#define MULTIUSER_OUTPUT_LIMIT "10M"

enum {
FLUX_OUTPUT_TYPE_TERM = 1,
Expand All @@ -82,6 +82,8 @@ struct shell_output_type_file {

struct shell_output {
flux_shell_t *shell;
const char *kvs_limit_string;
size_t kvs_limit_bytes;
struct eventlogger *ev;
double batch_timeout;
int refcount;
Expand Down Expand Up @@ -341,6 +343,9 @@ static bool check_kvs_output_limit (struct shell_output *out,
size_t *bytesp;
size_t prev;

if (out->kvs_limit_bytes == 0)
return false;

if (is_stdout) {
stream = "stdout";
bytesp = &out->stdout_bytes;
Expand All @@ -353,13 +358,13 @@ static bool check_kvs_output_limit (struct shell_output *out,
prev = *bytesp;
*bytesp += len;

if (*bytesp > OUTPUT_LIMIT_BYTES) {
if (*bytesp > out->kvs_limit_bytes) {
/* Only log an error when the threshold is reached.
*/
if (prev <= OUTPUT_LIMIT_BYTES)
if (prev <= out->kvs_limit_bytes)
shell_warn ("%s will be truncated, %s limit exceeded",
stream,
OUTPUT_LIMIT_STRING);
out->kvs_limit_string);
return true;
}
return false;
Expand All @@ -385,7 +390,7 @@ static int shell_output_kvs (struct shell_output *out)
out->stdout_bytes : out->stderr_bytes;
shell_warn ("%s: %zu of %zu bytes truncated",
is_stdout ? "stdout" : "stderr",
total - OUTPUT_LIMIT_BYTES,
total - out->kvs_limit_bytes,
total);
}
}
Expand Down Expand Up @@ -1135,6 +1140,53 @@ static int shell_lost (flux_plugin_t *p,
return 0;
}

static int get_output_limit (struct shell_output *out)
{
json_t *val = NULL;
uint64_t size;

/* Set default to unlimited (0) for single-user instances,
* O/w use the default multiuser output limit:
*/
if (out->shell->broker_owner == getuid())
out->kvs_limit_string = "0";
else
out->kvs_limit_string = MULTIUSER_OUTPUT_LIMIT;

if (flux_shell_getopt_unpack (out->shell,
"output",
"{s?o}",
"limit", &val) < 0) {
shell_log_error ("Unable to unpack shell output.limit");
return -1;
}
if (val != NULL) {
if (json_is_integer (val)) {
out->kvs_limit_bytes = (size_t) json_integer_value (val);
if (out->kvs_limit_bytes > 0) {
/* Need a string representation of limit for errors
*/
char *s = strdup (encode_size (out->kvs_limit_bytes));
if (s && flux_shell_aux_set (out->shell, NULL, s, free) < 0)
free (s);
else
out->kvs_limit_string = s;
}
return 0;
}
if (!(out->kvs_limit_string = json_string_value (val))) {
shell_log_error ("Unable to convert output.limit to string");
return -1;
}
}
if (parse_size (out->kvs_limit_string, &size) < 0) {
shell_log_errno ("Invalid KVS output.limit=%s", out->kvs_limit_string);
return -1;
}
out->kvs_limit_bytes = (size_t) size;
return 0;
}

struct shell_output *shell_output_create (flux_shell_t *shell)
{
struct shell_output *out;
Expand All @@ -1147,6 +1199,8 @@ struct shell_output *shell_output_create (flux_shell_t *shell)
out->stdout_buffer_type = "line";
out->stderr_buffer_type = "none";

if (get_output_limit (out) < 0)
goto error;
if (shell_output_check_alternate_output (out) < 0)
goto error;
if (shell_output_check_alternate_buffer_type (out) < 0)
Expand Down
24 changes: 23 additions & 1 deletion src/shell/shell.c
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,25 @@ static int reconnect (flux_t *h, void *arg)
return 0;
}

static uid_t get_instance_owner (flux_t *h)
{
const char *s;
char *endptr;
int id;

if (!(s = flux_attr_get (h, "security.owner"))) {
shell_log_errno ("error fetching security.owner attribute");
return (uid_t) 0;
}
errno = 0;
id = strtoul (s, &endptr, 10);
if (errno != 0 || *endptr != '\0') {
shell_log_error ("error parsing security.owner=%s", s);
return (uid_t) 0;
}
return (uid_t) id;
}

static void shell_connect_flux (flux_shell_t *shell)
{
uint32_t rank;
Expand All @@ -274,6 +293,8 @@ static void shell_connect_flux (flux_shell_t *shell)
shell_log_errno ("error fetching broker rank");
shell->broker_rank = rank;

shell->broker_owner = get_instance_owner (shell->h);

if (plugstack_call (shell->plugstack, "shell.connect", NULL) < 0)
shell_log_errno ("shell.connect");
}
Expand Down Expand Up @@ -427,9 +448,10 @@ static json_t *flux_shell_get_info_object (flux_shell_t *shell)
return o;

if (!(o = json_pack_ex (&err, 0,
"{ s:I s:i s:i s:i s:s s:O s:O s:{ s:i }}",
"{ s:I s:i s:i s:i s:i s:s s:O s:O s:{ s:i }}",
"jobid", shell->info->jobid,
"rank", shell->info->shell_rank,
"instance_owner", (int) shell->broker_owner,
"size", shell->info->shell_size,
"ntasks", shell->info->total_ntasks,
"service", shell_svc_name (shell->svc),
Expand Down
1 change: 1 addition & 0 deletions src/shell/shell.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ const struct taskmap *flux_shell_get_taskmap (flux_shell_t *shell);
/* Return shell info as a JSON string.
* {
* "jobid":I,
* "instance_owner":i,
* "rank":i,
* "size":i,
* "ntasks";i,
Expand Down
7 changes: 7 additions & 0 deletions t/system/0001-basic.t
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,10 @@ test_expect_success 'flux jobs lists job with correct userid' '
test_expect_success 'flux proxy can submit jobs to system instance' '
flux proxy $(flux getattr local-uri) flux submit true
'
test_expect_success 'flux-shell limits kvs output to 10M for guest jobs' '
dd if=/dev/urandom bs=10240 count=800 | base64 --wrap 79 >large.in &&
flux run -vvv cat large.in >large.out 2>trunc.err &&
ls -lh large* &&
test_debug "cat trunc.err" &&
grep "stdout.*truncated" trunc.err
'
27 changes: 17 additions & 10 deletions t/t2606-job-shell-output-redirection.t
Original file line number Diff line number Diff line change
Expand Up @@ -348,15 +348,22 @@ test_expect_success 'job-shell: shell errors are captured in error file' '
test_expect_code 127 flux run --error=test.err nosuchcommand &&
grep "nosuchcommand: No such file or directory" test.err
'
test_expect_success LONGTEST 'job-shell: output to kvs is truncated at 10MB' '
dd if=/dev/urandom bs=10240 count=800 | base64 --wrap 79 >expected &&
flux run cat expected >output 2>truncate.error &&
test_debug "cat truncate.error" &&
grep "stdout.*truncated" truncate.error
'
test_expect_success LONGTEST 'job-shell: stderr to kvs is truncated at 10MB' '
dd if=/dev/urandom bs=10240 count=800 | base64 --wrap 79 >expected &&
flux run sh -c "cat expected >&2" >truncate2.error 2>&1 &&
grep "stderr.*truncated" truncate2.error
test_expect_success 'job-shell: kvs output truncatation works' '
flux run -o output.limit=5 echo 0123456789 2>trunc.err &&
test_debug "cat trunc.err" &&
grep "stdout.*truncated" trunc.err
'
test_expect_success 'job-shell: stderr truncation works' '
flux run -o output.limit=5 \
sh -c "echo 0123456789 >&2" >trunc2.error 2>&1 &&
grep "stderr.*truncated" trunc2.error
'
test_expect_success LONGTEST 'job-shell: no truncation at 10MB for single-user job' '
dd if=/dev/urandom bs=10240 count=800 | base64 --wrap 79 >10M+ &&
flux run cat 10M+ >10M+.output &&
test_cmp 10M+ 10M+.output
'
test_expect_success 'job-shell: invalid output.limit string is rejected' '
test_must_fail flux run -o output.limit=foo hostname
'
test_done
2 changes: 1 addition & 1 deletion t/t9000-system.t
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ alias test_expect_success='expect_success_wrap'
#
for testscript in ${FLUX_SOURCE_DIR}/t/system/${T9000_SYSTEM_GLOB}; do
TEST_LABEL="$(basename $testscript)"
source $testscript
. $testscript
done

test_done

0 comments on commit e71f17b

Please sign in to comment.