Skip to content

Commit

Permalink
Merge pull request #5153 from garlick/issue#5145
Browse files Browse the repository at this point in the history
broker: redefine broker.quorum as a size
  • Loading branch information
garlick committed May 5, 2023
2 parents 73b5a75 + 4386b2f commit ebd4459
Show file tree
Hide file tree
Showing 11 changed files with 81 additions and 73 deletions.
2 changes: 1 addition & 1 deletion doc/man1/flux-start.rst
Expand Up @@ -77,7 +77,7 @@ OPTIONS
**--test-start-mode**\ =\ *MODE*
Set the start mode. If set to ``all``, all brokers are started immediately.
If set to ``leader``, only the leader is started. Hint: in ``leader`` mode,
use ``--setattr=broker.quorum=0`` to let the initial program start before
use ``--setattr=broker.quorum=1`` to let the initial program start before
the other brokers are online. Default: ``all``.

**--test-rundir**\ =\ *PATH*
Expand Down
6 changes: 3 additions & 3 deletions doc/man1/flux-uptime.rst
Expand Up @@ -59,9 +59,9 @@ init
The local broker is waiting for the ``rc1`` script to complete locally.

quorum
All brokers are waiting for a configured set of brokers to reach **quorum**
state. The default quorum set is all brokers. A Flux system instance
typically defines the quorum set to only the rank 0 broker.
All brokers are waiting for a configured number of brokers to reach
**quorum** state. The default quorum is the instance size. A Flux
system instance typically defines the quorum size to 1.

run
Flux is fully up and running.
Expand Down
6 changes: 3 additions & 3 deletions doc/man7/flux-broker-attributes.rst
Expand Up @@ -107,9 +107,9 @@ broker.pid
The process id of the local broker.

broker.quorum [Updates: C]
An RFC 22 idset representing broker ranks that are required to be online
before the rank 0 broker enters the RUN state and starts the initial
program, if any. Default: all ranks.
The number of brokers that are required to be online before the rank 0
broker enters the RUN state and starts the initial program, if any.
Default: instance size.

broker.quorum-timeout [Updates: C]
The amount of time (in RFC 23 Flux Standard Duration format) that the
Expand Down
2 changes: 1 addition & 1 deletion etc/flux.service.in
Expand Up @@ -19,7 +19,7 @@ ExecStart=/bin/bash -c '\
-Slog-stderr-level=6 \
-Slog-stderr-mode=local \
-Sbroker.rc2_none \
-Sbroker.quorum=0 \
-Sbroker.quorum=1 \
-Sbroker.quorum-timeout=none \
-Sbroker.exit-norestart=42 \
-Sbroker.sd-notify=1 \
Expand Down
103 changes: 52 additions & 51 deletions src/broker/state_machine.c
Expand Up @@ -27,6 +27,7 @@
#include "src/common/libutil/errprintf.h"
#include "src/common/libsubprocess/server.h"
#include "ccan/array_size/array_size.h"
#include "ccan/str/str.h"

#include "state_machine.h"

Expand All @@ -38,8 +39,9 @@
#include "shutdown.h"

struct quorum {
struct idset *want;
struct idset *have; // cumulative on rank 0, batch buffer on rank > 0
uint32_t size;
struct idset *all;
struct idset *online; // cumulative on rank 0, batch buffer on rank > 0
flux_future_t *f;
double timeout;
bool warned;
Expand Down Expand Up @@ -182,21 +184,6 @@ static broker_state_t state_next (broker_state_t current, const char *event)
return current;
}

/* return true if a is a subset of b */
static bool is_subset_of (const struct idset *a, const struct idset *b)
{
struct idset *ids;
int count;

if (!(ids = idset_difference (a, b)))
return false;
count = idset_count (ids);
idset_destroy (ids);
if (count > 0)
return false;
return true;
}

static void action_init (struct state_machine *s)
{
s->ctx->online = true;
Expand Down Expand Up @@ -245,7 +232,7 @@ static void quorum_timer_cb (flux_reactor_t *r,
if (s->state != STATE_QUORUM)
return;

if (!(ids = idset_difference (s->quorum.want, s->quorum.have))
if (!(ids = idset_difference (s->quorum.all, s->quorum.online))
|| !(rankstr = idset_encode (ids, IDSET_FLAG_RANGE))
|| !(hl = hostlist_create ())) {
flux_log_error (h, "error computing slow brokers");
Expand Down Expand Up @@ -749,43 +736,54 @@ static void quorum_check_parent (struct state_machine *s)
}
}

/* Configure the set of broker ranks needed for quorum (default=all).
/* For backwards compatibility, translate "0" and "0-<size-1>" to 1 and <size>,
* respectively, but print a warning on stderr.
*/
static bool quorum_configure_deprecated (struct state_machine *s,
const char *val)
{
char all[64];
snprintf (all, sizeof (all), "0-%lu", (unsigned long)s->ctx->size - 1);
if (streq (val, all))
s->quorum.size = s->ctx->size;
else if (streq (val, "0"))
s->quorum.size = 1;
else
return false;
if (s->ctx->rank == 0) {
log_msg ("warning: broker.quorum is now a size - assuming %lu",
(unsigned long)s->quorum.size);
}
return true;
}

/* Configure the count of broker ranks needed for quorum (default=<size>).
*/
static int quorum_configure (struct state_machine *s)
{
const char *val;
char *tmp;
unsigned long id;

if (attr_get (s->ctx->attrs, "broker.quorum", &val, NULL) == 0) {
if (!(s->quorum.want = idset_decode (val))) {
log_msg ("Error parsing broker.quorum attribute");
return -1;
}
id = idset_last (s->quorum.want);
if (id != IDSET_INVALID_ID && id >= s->ctx->size) {
log_msg ("Error parsing broker.quorum attribute: exceeds size");
return -1;
if (!quorum_configure_deprecated (s, val)) {
errno = 0;
s->quorum.size = strtoul (val, NULL, 10);
if (errno != 0
|| s->quorum.size < 1
|| s->quorum.size > s->ctx->size) {
log_msg ("Error parsing broker.quorum attribute");
errno = EINVAL;
return -1;
}
}
if (attr_delete (s->ctx->attrs, "broker.quorum", true) < 0)
if (attr_set_flags (s->ctx->attrs, "broker.quorum", ATTR_IMMUTABLE) < 0)
return -1;
}
else {
if (!(s->quorum.want = idset_create (s->ctx->size, 0)))
s->quorum.size = s->ctx->size;
char buf[16];
snprintf (buf, sizeof (buf), "%lu", (unsigned long)s->quorum.size);
if (attr_add (s->ctx->attrs, "broker.quorum", buf, ATTR_IMMUTABLE) < 0)
return -1;
if (idset_range_set (s->quorum.want, 0, s->ctx->size - 1) < 0)
return -1;
}
if (!(tmp = idset_encode (s->quorum.want, IDSET_FLAG_RANGE)))
return -1;
if (attr_add (s->ctx->attrs,
"broker.quorum",
tmp,
ATTR_IMMUTABLE) < 0) {
ERRNO_SAFE_WRAP (free, tmp);
return -1;
}
free (tmp);
return 0;
}

Expand Down Expand Up @@ -837,11 +835,10 @@ static void broker_online_cb (flux_future_t *f, void *arg)
return;
}

idset_destroy (s->quorum.have);
s->quorum.have = ids;
if (is_subset_of (s->quorum.want, s->quorum.have)) {
idset_destroy (s->quorum.online);
s->quorum.online = ids;
if (idset_count (s->quorum.online) >= s->quorum.size)
quorum_reached = true;
}

if (strlen (members) > 0
&& (quorum_reached || now - last_update > 5)) {
Expand Down Expand Up @@ -1150,8 +1147,8 @@ void state_machine_destroy (struct state_machine *s)
flux_msglist_destroy (s->wait_requests);
flux_future_destroy (s->monitor.f);
flux_msglist_destroy (s->monitor.requests);
idset_destroy (s->quorum.want);
idset_destroy (s->quorum.have);
idset_destroy (s->quorum.all);
idset_destroy (s->quorum.online);
flux_watcher_destroy (s->quorum.timer);
flux_future_destroy (s->quorum.f);
free (s);
Expand Down Expand Up @@ -1190,8 +1187,12 @@ struct state_machine *state_machine_create (struct broker *ctx)
if (!(s->monitor.f = monitor_parent (ctx->h, s)))
goto error;
}
if (!(s->quorum.have = idset_create (ctx->size, 0)))
if (!(s->quorum.online = idset_create (ctx->size, 0)))
goto error;
if (!(s->quorum.all = idset_create (s->ctx->size, 0))
|| idset_range_set (s->quorum.all, 0, s->ctx->size - 1) < 0)
goto error;

if (quorum_configure (s) < 0
|| quorum_timeout_configure (s) < 0) {
log_err ("error configuring quorum attributes");
Expand Down
2 changes: 1 addition & 1 deletion src/cmd/flux-start.c
Expand Up @@ -540,7 +540,7 @@ int exec_broker (const char *cmd_argz, size_t cmd_argz_len,
struct stat sb;

add_argzf (&argz, &argz_len, "-Sbroker.recovery-mode=1");
add_argzf (&argz, &argz_len, "-Sbroker.quorum=0");
add_argzf (&argz, &argz_len, "-Sbroker.quorum=1");
add_argzf (&argz, &argz_len, "-Slog-stderr-level=5");

// if --recovery has no optional argument, assume this is the system
Expand Down
4 changes: 2 additions & 2 deletions t/sharness.d/flux-sharness.sh
Expand Up @@ -118,7 +118,7 @@ make_bootstrap_config() {
echo "--test-hosts=$fakehosts -o,-c$workdir/conf.d"
echo "--test-exit-mode=${TEST_UNDER_FLUX_EXIT_MODE:-leader}"
echo "--test-exit-timeout=${TEST_UNDER_FLUX_EXIT_TIMEOUT:-0}"
echo "-o,-Sbroker.quorum=${TEST_UNDER_FLUX_QUORUM:-$full}"
echo "-o,-Sbroker.quorum=${TEST_UNDER_FLUX_QUORUM:-$size}"
echo "--test-start-mode=${TEST_UNDER_FLUX_START_MODE:-all}"
echo "-o,-Stbon.topo=${TEST_UNDER_FLUX_TOPO:-custom}"
echo "-o,-Stbon.zmqdebug=1"
Expand Down Expand Up @@ -173,7 +173,7 @@ remove_trashdir_wrapper() {
# - TEST_UNDER_FLUX_EXIT_TIMEOUT
# Set the flux-start exit timeout (default: 0)
# - TEST_UNDER_FLUX_QUORUM
# Set the broker.quorum attribute (default: 0-<highest_rank>)
# Set the broker.quorum attribute (default: <size>)
# - TEST_UNDER_FLUX_START_MODE
# Set the flux-start start mode (default: all)
# - TEST_UNDER_FLUX_TOPO
Expand Down
19 changes: 13 additions & 6 deletions t/t0025-broker-state-machine.t
Expand Up @@ -31,23 +31,30 @@ test_expect_success 'quorum reached on instance with 3 TBON levels' '
'

test_expect_success 'broker.quorum can be set on the command line' '
flux start -s3 ${ARGS} -o,-Sbroker.quorum="0-2" \
flux start -s3 ${ARGS} -o,-Sbroker.quorum=3 \
${GROUPSCMD} get broker.online >full1_explicit.out &&
test_cmp full1.exp full1_explicit.out
'

test_expect_success 'broker fails with malformed broker.quorum' '
test_must_fail flux start ${ARGS} \
-o,-Sbroker.quorum="badids" /bin/true 2>qmalformed.err &&
-o,-Sbroker.quorum=9-10 /bin/true 2>qmalformed.err &&
grep "Error parsing broker.quorum attribute" qmalformed.err
'

test_expect_success 'broker fails with broker.quorum that exceeds size' '
test_must_fail flux start ${ARGS} \
-o,-Sbroker.quorum="0-1" /bin/true 2>qtoobig.err &&
grep "Error parsing broker.quorum attribute: exceeds size" qtoobig.err
-o,-Sbroker.quorum=99 /bin/true 2>qtoobig.err &&
grep "Error parsing broker.quorum attribute" qtoobig.err
'
test_expect_success 'broker.quorum can be 0 for compatibility' '
flux start ${ARGS} -o,-Sbroker.quorum=0 /bin/true 2>compat1.err &&
grep assuming compat1.err
'
test_expect_success 'broker.quorum can be 0-1 (size=2) for compatibility' '
flux start -s2 ${ARGS} -o,-Sbroker.quorum=0-1 /bin/true 2>compat2.err &&
grep assuming compat2.err
'

test_expect_success 'create rc1 that blocks on FIFO for rank != 0' '
cat <<-EOT >rc1_block &&
#!/bin/bash
Expand Down Expand Up @@ -77,7 +84,7 @@ test_expect_success 'instance functions with late-joiner' '
-o,-Slog-stderr-level=6 \
-o,-Sbroker.rc1_path="$(pwd)/rc1_block" \
-o,-Sbroker.rc3_path= \
-o,-Sbroker.quorum="0" \
-o,-Sbroker.quorum=1 \
$(pwd)/rc2_unblock >late.out &&
test_cmp late.exp late.out
'
Expand Down
2 changes: 1 addition & 1 deletion t/t3203-instance-recovery.t
Expand Up @@ -16,7 +16,7 @@ test_expect_success 'start a persistent instance of size 4' '
test_expect_success 'expected broker attributes are set in recovery mode' '
cat >recov_attrs.exp <<-EOT &&
1
0
1
5
EOT
flux start --recovery=$(pwd)/test1 \
Expand Down
6 changes: 3 additions & 3 deletions t/t3301-system-latestart.t
Expand Up @@ -10,15 +10,15 @@ and ensure that it wires up.

. `dirname $0`/sharness.sh

export TEST_UNDER_FLUX_QUORUM=0
export TEST_UNDER_FLUX_QUORUM=1
export TEST_UNDER_FLUX_START_MODE=leader

test_under_flux 2 system

startctl="flux python ${SHARNESS_TEST_SRCDIR}/scripts/startctl.py"

test_expect_success 'broker.quorum was set to 0 by system test personality' '
echo 0 >quorum.exp &&
test_expect_success 'broker.quorum was set to 1 by system test personality' '
echo 1 >quorum.exp &&
flux getattr broker.quorum >quorum.out &&
test_cmp quorum.exp quorum.out
'
Expand Down
2 changes: 1 addition & 1 deletion t/t3302-system-offline.t
Expand Up @@ -12,7 +12,7 @@ confirm that the user gets a reasonable error.
. `dirname $0`/sharness.sh

export TEST_UNDER_FLUX_TOPO=kary:1
export TEST_UNDER_FLUX_QUORUM=0
export TEST_UNDER_FLUX_QUORUM=1
export TEST_UNDER_FLUX_START_MODE=leader

test_under_flux 3 system
Expand Down

0 comments on commit ebd4459

Please sign in to comment.