Skip to content
Permalink
Browse files

clean up unlang_interpret_resumable() and parallel

instead of having magic for parallel in unlang_interpret_resumable(),
parallel just pushes a UNLANG_TYPE_FUNCTION into the childs stack.

The function then runs as the last thing before the child exits.
The function can then notify the parent that the child has exited.
  • Loading branch information
alandekok committed Sep 27, 2019
1 parent 2efe2dc commit 6f0af91fe7644de73d538fef264b4ca64b4535f4
Showing with 68 additions and 86 deletions.
  1. +13 −82 src/lib/unlang/interpret.c
  2. +1 −0 src/lib/unlang/io.c
  3. +53 −4 src/lib/unlang/parallel.c
  4. +1 −0 src/lib/unlang/parallel_priv.h
@@ -1195,91 +1195,22 @@ rlm_rcode_t unlang_interpret_stack_result(REQUEST *request)
*/
void unlang_interpret_resumable(REQUEST *request)
{
REQUEST *parent = request->parent;
unlang_stack_t *stack;
unlang_stack_frame_t *frame;

while (parent) {
int i;
unlang_resume_t *mr;
unlang_parallel_t *state;
#ifndef NDEBUG
bool found = false;
#endif

/*
* Child requests CANNOT be runnable. Only the
* parent request can be runnable. When it runs
* (eventually), the interpreter will walk back
* down the stack, resuming anything that needs resuming.
*/
rad_assert(request->backlog == NULL);
rad_assert(request->runnable_id < 0);

#ifndef NDEBUG
/*
* Look at the current stack.
*/
stack = request->stack;
frame = &stack->frame[stack->depth];

/*
* The current request MUST have been yielded in
* order for someone to mark it resumable.
*/
rad_assert(frame->instruction->type == UNLANG_TYPE_RESUME);
#endif

/*
* Now look at the parents stack. It also must
* have been yielded in order for someone to mark
* the child as resumable.
*/
stack = parent->stack;
frame = &stack->frame[stack->depth];
rad_assert(frame->instruction->type == UNLANG_TYPE_RESUME);

mr = unlang_generic_to_resume(frame->instruction);
(void) talloc_get_type_abort(mr, unlang_resume_t);

if (mr->parent->type != UNLANG_TYPE_PARALLEL) goto next;

state = mr->rctx;

/*
* Find the child and mark it resumable
*/
for (i = 0; i < state->num_children; i++) {
if (state->children[i].state != CHILD_YIELDED) continue;
if (state->children[i].child != request) continue;

state->children[i].state = CHILD_RUNNABLE;
#ifndef NDEBUG
found = true;
#endif
break;
}

/*
* We MUST have found the child here.
*/
rad_assert(found == true);

next:
request = parent;
parent = parent->parent;
}

unlang_stack_t *stack = request->stack;
unlang_stack_frame_t *frame = &stack->frame[stack->depth];

#ifndef NDEBUG
/*
* The current request MUST have been yielded in
* order for someone to mark it resumable.
* The IO code, or children have no idea where they're
* being called from. They just mark the parent
* resumable when they're done. So we have to check here
* for a RESUME frame. If the parent called the child
* directly, then there's no RESUME frame. When the
* child is done, the parent will automatically continue
* running. We threfore don't need to insert the parent
* into the backlog.
*/
stack = request->stack;
frame = &stack->frame[stack->depth];
rad_assert(frame->instruction->type == UNLANG_TYPE_RESUME);
#endif
if (frame->instruction->type != UNLANG_TYPE_RESUME) {
return;
}

rad_assert(request->backlog != NULL);

@@ -86,6 +86,7 @@ REQUEST *unlang_io_subrequest_alloc(REQUEST *parent, fr_dict_t const *namespace,
child->number = parent->number;
child->el = parent->el;
child->server_cs = parent->server_cs;
child->backlog = parent->backlog;

/*
* Initialize all of the async fields.
@@ -29,6 +29,35 @@ RCSID("$Id$")
#include "subrequest_priv.h"
#include "module_priv.h"

/** When the chld is done, tell the parent that we've exited.
*
*/
static unlang_action_t unlang_parallel_child_done(REQUEST *request, UNUSED rlm_rcode_t *presult, UNUSED int *priority, void *uctx)
{
unlang_parallel_child_t *child = uctx;

/*
* If we have a parent, then we're running synchronously
* with it. Tell the parent that we've exited, and it
* can continue.
*
* Otherwise we're a detached child, and we don't tell
* the parent anything. Because we have that kind of
* relationship.
*/
if (request->parent) {
child->state = CHILD_EXITED;
unlang_interpret_resumable(request->parent);
}

/*
* Don't change frame->result, it's the result of the child.
*/

return UNLANG_ACTION_CALCULATE_RESULT;
}


/** Run one or more sub-sections from the parallel section.
*
*/
@@ -39,8 +68,6 @@ static rlm_rcode_t unlang_parallel_run(REQUEST *request, unlang_parallel_t *stat
unlang_parallel_child_state_t done = CHILD_DONE; /* hope that we're done */
REQUEST *child;

// @todo - rdebug running the request.

/*
* The children are created all detached. We just return
* "noop".
@@ -91,11 +118,18 @@ static rlm_rcode_t unlang_parallel_run(REQUEST *request, unlang_parallel_t *stat
}

/*
* Push first instruction for child to execute
* Push a top frame, followed by a frame
* which signals us that the child is
* done, followed by the instruction to
* run in the child.
*/
unlang_interpret_push(child, NULL, RLM_MODULE_NOOP,
UNLANG_NEXT_STOP, UNLANG_TOP_FRAME);
unlang_interpret_push_function(child, NULL, unlang_parallel_child_done,
&state->children[i]);
unlang_interpret_push(child,
state->children[i].instruction, RLM_MODULE_FAIL,
UNLANG_NEXT_STOP, UNLANG_TOP_FRAME);
UNLANG_NEXT_STOP, UNLANG_SUB_FRAME);

/*
* It is often useful to create detached
@@ -138,6 +172,7 @@ static rlm_rcode_t unlang_parallel_run(REQUEST *request, unlang_parallel_t *stat
* Run this entry.
*/
case CHILD_RUNNABLE:
runnable:
RDEBUG2("parallel - running entry %d/%d", i + 1, state->num_children);
result = unlang_interpret_run(state->children[i].child);
if (result == RLM_MODULE_YIELD) {
@@ -218,12 +253,25 @@ static rlm_rcode_t unlang_parallel_run(REQUEST *request, unlang_parallel_t *stat
* Not ready to run.
*/
case CHILD_YIELDED:
if (state->children[i].child->runnable_id >= 0) {
(void) fr_heap_extract(state->children[i].child->backlog,
state->children[i].child);
goto runnable;
}

RDEBUG3("parallel child %d is already YIELDED", i + 1);
rad_assert(state->children[i].child != NULL);
rad_assert(state->children[i].instruction != NULL);
done = CHILD_YIELDED;
continue;

case CHILD_EXITED:
RDEBUG3("parallel child %d has already EXITED", i + 1);
state->children[i].state = CHILD_DONE;
state->children[i].child = NULL; // someone else freed this somewhere
state->children[i].instruction = NULL;
/* FALL-THROUGH */

/*
* Don't need to call this any more.
*/
@@ -304,6 +352,7 @@ static void unlang_parallel_signal(UNUSED REQUEST *request, void *rctx, fr_state
for (i = 0; i < state->num_children; i++) {
switch (state->children[i].state) {
case CHILD_INIT:
case CHILD_EXITED:
case CHILD_DONE:
break;

@@ -39,6 +39,7 @@ typedef enum {
CHILD_INIT = 0, //!< Initial state.
CHILD_RUNNABLE, //!< Child can continue running.
CHILD_YIELDED, //!< Child is yielded waiting on an event.
CHILD_EXITED, //!< Child has exited
CHILD_DONE //!< The child has completed.
} unlang_parallel_child_state_t;

0 comments on commit 6f0af91

Please sign in to comment.
You can’t perform that action at this time.