Skip to content

Commit

Permalink
Merge pull request #5762 from garlick/groups_cleanup
Browse files Browse the repository at this point in the history
broker: catch an improper use of groups and handle it gracefully
  • Loading branch information
mergify[bot] committed Mar 1, 2024
2 parents 40d6f4c + 1f8d2a2 commit bdd6aec
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 4 deletions.
26 changes: 26 additions & 0 deletions src/broker/groups.c
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,18 @@
* Optimization: collect contemporaneous JOIN/LEAVE requests at each
* rank for a short time before applying them and sending them upstream.
* During that time, JOINs/LEAVEs of the same key may be combined.
*
* broker.online use case:
* Groups are used for instance quorum detection. The state machine calls
* groups.join broker.online in the QUORUM state on all ranks. Rank 0 calls
* groups.get broker.online which notifies the broker as membership evolves,
* and when the quorum condition is satisfied, the state transitions
* to RUN. The 'broker.online' group is also monitored by the resource module
* so that it can inform the scheduler as execution targets go up/down.
*
* broker.torpid use case:
* A broker.torpid group is maintained by the broker overlay (see overlay.c).
* The resource module also monitors broker.torpid and drains torpid nodes.
*/

#if HAVE_CONFIG_H
Expand All @@ -50,6 +62,10 @@

static const double batch_timeout = 0.1;

/* N.B. only one client can join a group per broker. That client
* join request is cached in group->join_request so that when the client
* disconnects, we can identify its groups and force it to leave.
*/
struct group {
char *name; // used directly as zhashx key
struct idset *members;
Expand Down Expand Up @@ -441,6 +457,11 @@ static void join_request_cb (flux_t *h,

if (flux_request_unpack (msg, NULL, "{s:s}", "name", &name) < 0)
goto error;
if (!flux_msg_is_local (msg)) {
errno = EPROTO;
errmsg = "groups.join is restricted to the local broker";
goto error;
}
if (!(group = group_lookup (g, name, true)))
goto error;
if (group->join_request) {
Expand Down Expand Up @@ -485,6 +506,11 @@ static void leave_request_cb (flux_t *h,

if (flux_request_unpack (msg, NULL, "{s:s}", "name", &name) < 0)
goto error;
if (!flux_msg_is_local (msg)) {
errno = EPROTO;
errmsg = "groups.leave is restricted to the local broker";
goto error;
}
if (!(group = group_lookup (g, name, false))
|| !group->join_request) {
snprintf (errbuf,
Expand Down
30 changes: 26 additions & 4 deletions t/scripts/groups.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,16 +120,25 @@ def join(args):
"""
h = flux.Flux()

h.rpc("groups.join", {"name": args.name}).get()
h.rpc("groups.join", {"name": args.name}, nodeid=args.rank).get()
if args.dubjoin:
h.rpc("groups.join", {"name": args.name}).get()
h.rpc("groups.join", {"name": args.name}, nodeid=args.rank).get()

if args.leave:
h.rpc("groups.leave", {"name": args.name}).get()
h.rpc("groups.leave", {"name": args.name}, nodeid=args.rank).get()
if args.dubleave:
h.rpc("groups.leave", {"name": args.name}).get()


def leave(args):
"""
Leave group.
"""
h = flux.Flux()

h.rpc("groups.leave", {"name": args.name}, nodeid=args.rank).get()


LOGGER = logging.getLogger("groups")


Expand Down Expand Up @@ -183,15 +192,28 @@ def main():
# join
join_parser = subparsers.add_parser(
"join",
usage="groups join [--dubjoin] [--leave] [--dubleave] name",
usage="groups join [--rank N] [--dubjoin] [--leave] [--dubleave] name",
formatter_class=flux.util.help_formatter(),
)
join_parser.add_argument("--dubjoin", action="store_true")
join_parser.add_argument("--leave", action="store_true")
join_parser.add_argument("--dubleave", action="store_true")
join_parser.add_argument("--rank", type=int, default=flux.constants.FLUX_NODEID_ANY)
join_parser.add_argument("name")
join_parser.set_defaults(func=join)

# leave
leave_parser = subparsers.add_parser(
"leave",
usage="groups leave [--rank N] name",
formatter_class=flux.util.help_formatter(),
)
leave_parser.add_argument(
"--rank", type=int, default=flux.constants.FLUX_NODEID_ANY
)
leave_parser.add_argument("name")
leave_parser.set_defaults(func=leave)

args = parser.parse_args()
args.func(args)

Expand Down
9 changes: 9 additions & 0 deletions t/t0027-broker-groups.t
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,15 @@ test_expect_success 'groups.get on rank > 0 fails with reasonable error' '
grep "only available on rank 0" test0.err
'

test_expect_success 'nonlocal groups.join fails with appropriate error' '
test_must_fail ${GROUPSCMD} join --rank 1 foo 2>rmtjoin.err &&
grep "restricted to the local broker" rmtjoin.err
'
test_expect_success 'nonlocal groups.leave fails with appropriate error' '
test_must_fail ${GROUPSCMD} leave --rank 1 foo 2>rmtleave.err &&
grep "restricted to the local broker" rmtleave.err
'

badjoin() {
flux python -c "import flux; print(flux.Flux().rpc(\"groups.join\").get())"
}
Expand Down

0 comments on commit bdd6aec

Please sign in to comment.