From 04ac666478bb9072f1e8787833b22ea45ca02a42 Mon Sep 17 00:00:00 2001 From: Jean-Philippe Braun Date: Tue, 22 Feb 2022 15:38:34 +0100 Subject: [PATCH] tools/flow: add Value() to retrieve controller value Currently the value cannot be retrieved when the flow is running. It will panic if the flow is running. Once the flow is terminated it is safe to retrieve the value. Closes #1394 Change-Id: Id1c2851f67aa54c84804f47e7a20ef3e9826a202 Signed-off-by: Jean-Philippe Braun Reviewed-on: https://review.gerrithub.io/c/cue-lang/cue/+/533576 Unity-Result: CUEcueckoo TryBot-Result: CUEcueckoo Reviewed-by: Marcel van Lohuizen --- tools/flow/flow.go | 29 ++++++++++++++++++++++++++++ tools/flow/flow_test.go | 42 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 71 insertions(+) diff --git a/tools/flow/flow.go b/tools/flow/flow.go index 057dfc44d2..a0923fd71b 100644 --- a/tools/flow/flow.go +++ b/tools/flow/flow.go @@ -68,6 +68,7 @@ package flow import ( "context" + "sync" "cuelang.org/go/cue" "cuelang.org/go/cue/errors" @@ -163,6 +164,9 @@ type Controller struct { context context.Context cancelFunc context.CancelFunc + mut *sync.Mutex + done bool + // keys maps task keys to their index. This allows a recreation of the // Instance while retaining the original task indices. // @@ -210,6 +214,7 @@ func New(cfg *Config, inst cue.InstanceOrValue, f TaskFunc) *Controller { taskCh: make(chan *Task), keys: map[string]*Task{}, + mut: &sync.Mutex{}, } if cfg != nil { @@ -227,9 +232,33 @@ func (c *Controller) Run(ctx context.Context) error { defer c.cancelFunc() c.runLoop() + + // NOTE: track state here as runLoop might add more tasks to the flow + // during the execution so checking current tasks state may not be + // accurate enough to determine that the flow is terminated. + // This is used to determine if the controller value can be retrieved. + // When the controller value is safe to be read concurrently this tracking + // can be removed. + c.mut.Lock() + defer c.mut.Unlock() + c.done = true + return c.errs } +// Value returns the value managed by the controller. +// +// It is safe to use the value only after Run() has returned. +// It panics if the flow is running. +func (c *Controller) Value() cue.Value { + c.mut.Lock() + defer c.mut.Unlock() + if !c.done { + panic("can't retrieve value before flow has terminated") + } + return c.inst +} + // A State indicates the state of a Task. // // The following state diagram indicates the possible state transitions: diff --git a/tools/flow/flow_test.go b/tools/flow/flow_test.go index 04371a4de6..48abad9fb7 100644 --- a/tools/flow/flow_test.go +++ b/tools/flow/flow_test.go @@ -22,6 +22,7 @@ import ( "strings" "sync" "testing" + "time" "cuelang.org/go/cue" "cuelang.org/go/cue/cuecontext" @@ -93,6 +94,40 @@ func TestFlow(t *testing.T) { }) } +func TestFlowValuePanic(t *testing.T) { + f := ` + root: { + a: { + $id: "slow" + out: string + } + } + ` + ctx := cuecontext.New() + v := ctx.CompileString(f) + + start := make(chan bool, 1) + + cfg := &flow.Config{Root: cue.ParsePath("root")} + c := flow.New(cfg, v, taskFunc) + + defer func() { recover() }() + + go func() { + start <- true + c.Run(context.TODO()) + }() + + <-start + + // Wait for the flow to be running + // The task sleeps for 10 Milliseconds + time.Sleep(5 * time.Millisecond) + // Should trigger a panic as the flow is not terminated + c.Value() + t.Errorf("Value() did not panic") +} + func taskFunc(v cue.Value) (flow.Runner, error) { switch name, err := v.Lookup("$id").String(); name { default: @@ -130,6 +165,13 @@ func taskFunc(v cue.Value) (flow.Runner, error) { return nil }), nil + case "slow": + return flow.RunnerFunc(func(t *flow.Task) error { + time.Sleep(10 * time.Millisecond) + t.Fill(map[string]string{"out": "finished"}) + return nil + }), nil + case "sequenced": // This task is used to serialize different runners in case // non-deterministic scheduling is possible.