Skip to content

Commit

Permalink
tools/flow: add Value() to retrieve controller value
Browse files Browse the repository at this point in the history
Currently the value cannot be retrieved when the flow is running. It
will panic if the flow is running.

Once the flow is terminated it is safe to retrieve the value.

Closes #1394

Change-Id: Id1c2851f67aa54c84804f47e7a20ef3e9826a202
Signed-off-by: Jean-Philippe Braun <eon@patapon.info>
Reviewed-on: https://review.gerrithub.io/c/cue-lang/cue/+/533576
Unity-Result: CUEcueckoo <cueckoo@cuelang.org>
TryBot-Result: CUEcueckoo <cueckoo@cuelang.org>
Reviewed-by: Marcel van Lohuizen <mpvl@gmail.com>
  • Loading branch information
eonpatapon authored and mpvl committed Mar 3, 2022
1 parent 699ce65 commit 04ac666
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 0 deletions.
29 changes: 29 additions & 0 deletions tools/flow/flow.go
Expand Up @@ -68,6 +68,7 @@ package flow

import (
"context"
"sync"

"cuelang.org/go/cue"
"cuelang.org/go/cue/errors"
Expand Down Expand Up @@ -163,6 +164,9 @@ type Controller struct {
context context.Context
cancelFunc context.CancelFunc

mut *sync.Mutex
done bool

// keys maps task keys to their index. This allows a recreation of the
// Instance while retaining the original task indices.
//
Expand Down Expand Up @@ -210,6 +214,7 @@ func New(cfg *Config, inst cue.InstanceOrValue, f TaskFunc) *Controller {

taskCh: make(chan *Task),
keys: map[string]*Task{},
mut: &sync.Mutex{},
}

if cfg != nil {
Expand All @@ -227,9 +232,33 @@ func (c *Controller) Run(ctx context.Context) error {
defer c.cancelFunc()

c.runLoop()

// NOTE: track state here as runLoop might add more tasks to the flow
// during the execution so checking current tasks state may not be
// accurate enough to determine that the flow is terminated.
// This is used to determine if the controller value can be retrieved.
// When the controller value is safe to be read concurrently this tracking
// can be removed.
c.mut.Lock()
defer c.mut.Unlock()
c.done = true

return c.errs
}

// Value returns the value managed by the controller.
//
// It is safe to use the value only after Run() has returned.
// It panics if the flow is running.
func (c *Controller) Value() cue.Value {
c.mut.Lock()
defer c.mut.Unlock()
if !c.done {
panic("can't retrieve value before flow has terminated")
}
return c.inst
}

// A State indicates the state of a Task.
//
// The following state diagram indicates the possible state transitions:
Expand Down
42 changes: 42 additions & 0 deletions tools/flow/flow_test.go
Expand Up @@ -22,6 +22,7 @@ import (
"strings"
"sync"
"testing"
"time"

"cuelang.org/go/cue"
"cuelang.org/go/cue/cuecontext"
Expand Down Expand Up @@ -93,6 +94,40 @@ func TestFlow(t *testing.T) {
})
}

func TestFlowValuePanic(t *testing.T) {
f := `
root: {
a: {
$id: "slow"
out: string
}
}
`
ctx := cuecontext.New()
v := ctx.CompileString(f)

start := make(chan bool, 1)

cfg := &flow.Config{Root: cue.ParsePath("root")}
c := flow.New(cfg, v, taskFunc)

defer func() { recover() }()

go func() {
start <- true
c.Run(context.TODO())
}()

<-start

// Wait for the flow to be running
// The task sleeps for 10 Milliseconds
time.Sleep(5 * time.Millisecond)
// Should trigger a panic as the flow is not terminated
c.Value()
t.Errorf("Value() did not panic")
}

func taskFunc(v cue.Value) (flow.Runner, error) {
switch name, err := v.Lookup("$id").String(); name {
default:
Expand Down Expand Up @@ -130,6 +165,13 @@ func taskFunc(v cue.Value) (flow.Runner, error) {
return nil
}), nil

case "slow":
return flow.RunnerFunc(func(t *flow.Task) error {
time.Sleep(10 * time.Millisecond)
t.Fill(map[string]string{"out": "finished"})
return nil
}), nil

case "sequenced":
// This task is used to serialize different runners in case
// non-deterministic scheduling is possible.
Expand Down

0 comments on commit 04ac666

Please sign in to comment.