Skip to content

Commit 04ac666

Browse files
eonpataponmpvl
authored andcommitted
tools/flow: add Value() to retrieve controller value
Currently the value cannot be retrieved when the flow is running. It will panic if the flow is running. Once the flow is terminated it is safe to retrieve the value. Closes #1394 Change-Id: Id1c2851f67aa54c84804f47e7a20ef3e9826a202 Signed-off-by: Jean-Philippe Braun <eon@patapon.info> Reviewed-on: https://review.gerrithub.io/c/cue-lang/cue/+/533576 Unity-Result: CUEcueckoo <cueckoo@cuelang.org> TryBot-Result: CUEcueckoo <cueckoo@cuelang.org> Reviewed-by: Marcel van Lohuizen <mpvl@gmail.com>
1 parent 699ce65 commit 04ac666

File tree

2 files changed

+71
-0
lines changed

2 files changed

+71
-0
lines changed

tools/flow/flow.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ package flow
6868

6969
import (
7070
"context"
71+
"sync"
7172

7273
"cuelang.org/go/cue"
7374
"cuelang.org/go/cue/errors"
@@ -163,6 +164,9 @@ type Controller struct {
163164
context context.Context
164165
cancelFunc context.CancelFunc
165166

167+
mut *sync.Mutex
168+
done bool
169+
166170
// keys maps task keys to their index. This allows a recreation of the
167171
// Instance while retaining the original task indices.
168172
//
@@ -210,6 +214,7 @@ func New(cfg *Config, inst cue.InstanceOrValue, f TaskFunc) *Controller {
210214

211215
taskCh: make(chan *Task),
212216
keys: map[string]*Task{},
217+
mut: &sync.Mutex{},
213218
}
214219

215220
if cfg != nil {
@@ -227,9 +232,33 @@ func (c *Controller) Run(ctx context.Context) error {
227232
defer c.cancelFunc()
228233

229234
c.runLoop()
235+
236+
// NOTE: track state here as runLoop might add more tasks to the flow
237+
// during the execution so checking current tasks state may not be
238+
// accurate enough to determine that the flow is terminated.
239+
// This is used to determine if the controller value can be retrieved.
240+
// When the controller value is safe to be read concurrently this tracking
241+
// can be removed.
242+
c.mut.Lock()
243+
defer c.mut.Unlock()
244+
c.done = true
245+
230246
return c.errs
231247
}
232248

249+
// Value returns the value managed by the controller.
250+
//
251+
// It is safe to use the value only after Run() has returned.
252+
// It panics if the flow is running.
253+
func (c *Controller) Value() cue.Value {
254+
c.mut.Lock()
255+
defer c.mut.Unlock()
256+
if !c.done {
257+
panic("can't retrieve value before flow has terminated")
258+
}
259+
return c.inst
260+
}
261+
233262
// A State indicates the state of a Task.
234263
//
235264
// The following state diagram indicates the possible state transitions:

tools/flow/flow_test.go

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"strings"
2323
"sync"
2424
"testing"
25+
"time"
2526

2627
"cuelang.org/go/cue"
2728
"cuelang.org/go/cue/cuecontext"
@@ -93,6 +94,40 @@ func TestFlow(t *testing.T) {
9394
})
9495
}
9596

97+
func TestFlowValuePanic(t *testing.T) {
98+
f := `
99+
root: {
100+
a: {
101+
$id: "slow"
102+
out: string
103+
}
104+
}
105+
`
106+
ctx := cuecontext.New()
107+
v := ctx.CompileString(f)
108+
109+
start := make(chan bool, 1)
110+
111+
cfg := &flow.Config{Root: cue.ParsePath("root")}
112+
c := flow.New(cfg, v, taskFunc)
113+
114+
defer func() { recover() }()
115+
116+
go func() {
117+
start <- true
118+
c.Run(context.TODO())
119+
}()
120+
121+
<-start
122+
123+
// Wait for the flow to be running
124+
// The task sleeps for 10 Milliseconds
125+
time.Sleep(5 * time.Millisecond)
126+
// Should trigger a panic as the flow is not terminated
127+
c.Value()
128+
t.Errorf("Value() did not panic")
129+
}
130+
96131
func taskFunc(v cue.Value) (flow.Runner, error) {
97132
switch name, err := v.Lookup("$id").String(); name {
98133
default:
@@ -130,6 +165,13 @@ func taskFunc(v cue.Value) (flow.Runner, error) {
130165
return nil
131166
}), nil
132167

168+
case "slow":
169+
return flow.RunnerFunc(func(t *flow.Task) error {
170+
time.Sleep(10 * time.Millisecond)
171+
t.Fill(map[string]string{"out": "finished"})
172+
return nil
173+
}), nil
174+
133175
case "sequenced":
134176
// This task is used to serialize different runners in case
135177
// non-deterministic scheduling is possible.

0 commit comments

Comments
 (0)