Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cancelling in flight flow and task #190

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 5 additions & 2 deletions action.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,8 @@ func (fa *FlowAction) Run(ctx context.Context, inputs map[string]interface{}, ha
inst.CurrentStep(true)
}
}

// set cancelcontext
inst.SetCancelContext(ctx)
go func() {

defer handler.Done()
Expand Down Expand Up @@ -381,7 +382,7 @@ func (fa *FlowAction) Run(ctx context.Context, inputs map[string]interface{}, ha
fa.applyAssertionInterceptor(inst)

handler.HandleResult(returnData, err)
} else if inst.Status() == model.FlowStatusFailed {
} else if inst.Status() == model.FlowStatusFailed || inst.Status() == model.FlowStatusCancelled {
if inst.TracingContext() != nil {
_ = trace.GetTracer().FinishTrace(inst.TracingContext(), inst.GetError())
}
Expand All @@ -394,6 +395,8 @@ func (fa *FlowAction) Run(ctx context.Context, inputs map[string]interface{}, ha
logger.Infof("Flow Instance [%s] for event id [%s] completed in %s", inst.ID(), trigger.GetHandlerEventIdFromContext(ctx), inst.ExecutionTime().String())
} else if inst.Status() == model.FlowStatusFailed {
logger.Infof("Flow Instance [%s] for event id [%s] failed in %s", inst.ID(), trigger.GetHandlerEventIdFromContext(ctx), inst.ExecutionTime().String())
} else if inst.Status() == model.FlowStatusCancelled {
logger.Infof("Flow Instance [%s] for event id [%s] cancelled in %s", inst.ID(), trigger.GetHandlerEventIdFromContext(ctx), inst.ExecutionTime().String())
}

if stateRecorder != nil {
Expand Down
28 changes: 22 additions & 6 deletions instance/ind_instance.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package instance

import (
"context"
"errors"
"fmt"
"strconv"
Expand Down Expand Up @@ -36,6 +37,8 @@ type IndependentInstance struct {
startTime time.Time
//Instance recorder
instRecorder *stateInstanceRecorder
// context with cancel
canctx context.Context
}

const (
Expand Down Expand Up @@ -122,6 +125,13 @@ func (inst *IndependentInstance) ExecutionTime() time.Duration {
return time.Since(inst.startTime)
}

func (inst *IndependentInstance) GetCancelContext() context.Context {
return inst.canctx
}
func (inst *IndependentInstance) SetCancelContext(ctx context.Context) {
inst.canctx = ctx
}

func (inst *IndependentInstance) GetFlowState(inputs map[string]interface{}) *state.FlowState {
retData, _ := inst.GetReturnData()

Expand Down Expand Up @@ -298,7 +308,8 @@ func (inst *IndependentInstance) DoStep() bool {

// track the fact that the work item was removed from the queue
inst.changeTracker.WorkItemRemoved(workItem)

// injecting cancelcontext further
workItem.taskInst.cancelContext = inst.GetCancelContext()
inst.execTask(behavior, workItem.taskInst)

hasNext = true
Expand Down Expand Up @@ -523,13 +534,15 @@ func (inst *IndependentInstance) propagateSkip(taskEntries []*model.TaskEntry, a

// handleTaskError handles the completion of a task in the Flow Instance
func (inst *IndependentInstance) handleTaskError(taskBehavior model.TaskBehavior, taskInst *TaskInst, err error) {

if taskInst.traceContext != nil {
_ = trace.GetTracer().FinishTrace(taskInst.traceContext, err)
}
// Set task status to failed for subflow activity
taskInst.SetStatus(model.TaskStatusFailed)

if errors.Is(err, context.Canceled) {
taskInst.SetStatus(model.TaskStatusCancelled)
} else {
taskInst.SetStatus(model.TaskStatusFailed)
}
handled, taskEntries := taskBehavior.Error(taskInst, err)

containerInst := taskInst.flowInst
Expand Down Expand Up @@ -615,8 +628,11 @@ func (inst *IndependentInstance) HandleGlobalError(containerInst *Instance, err
} else {
// Print error message if no error handler
inst.logger.Error(err)
containerInst.SetStatus(model.FlowStatusFailed)

if errors.Is(err, context.Canceled) {
containerInst.SetStatus(model.FlowStatusCancelled)
} else {
containerInst.SetStatus(model.FlowStatusFailed)
}
if containerInst != inst.Instance {

// Complete SubflowCreated trace
Expand Down
2 changes: 2 additions & 0 deletions instance/taskevents.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ func convertTaskStatus(code model.TaskStatus) event.Status {
return event.STARTED
case model.TaskStatusFailed:
return event.FAILED
case model.TaskStatusCancelled:
return event.CANCELLED
case model.TaskStatusDone:
return event.COMPLETED
case model.TaskStatusWaiting:
Expand Down
37 changes: 34 additions & 3 deletions instance/taskinst.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package instance

import (
"context"
"fmt"
"github.com/project-flogo/flow/support"
"runtime/debug"
"time"

Expand All @@ -14,6 +14,7 @@ import (
"github.com/project-flogo/core/support/trace"
"github.com/project-flogo/flow/definition"
"github.com/project-flogo/flow/model"
"github.com/project-flogo/flow/support"
)

func NewTaskInst(flowInst *Instance, task *definition.Task) *TaskInst {
Expand Down Expand Up @@ -73,6 +74,8 @@ type TaskInst struct {
returnError error
traceContext trace.TracingContext

cancelContext context.Context

//needed for serialization
taskID string
}
Expand Down Expand Up @@ -149,6 +152,10 @@ func (ti *TaskInst) GetTracingContext() trace.TracingContext {
return ti.traceContext
}

func (ti *TaskInst) GetCancelContext() context.Context {
return ti.cancelContext
}

func (ti *TaskInst) GetSharedTempData() map[string]interface{} {
//todo implement
return nil
Expand Down Expand Up @@ -361,7 +368,27 @@ func (ti *TaskInst) EvalActivity() (done bool, evalErr error) {
// If output interceptor is there then the activity should be mocked and activity evaluation should be skipped.
// In the applyOutputInterceptor step the mock data will be applied to the activity
if !hasOutputInterceptor(ti) {
done, evalErr = actCfg.Activity.Eval(ctx)
type evalReturnObject struct {
done bool
evalerror error
}

evalchan := make(chan evalReturnObject)

go func(evchan chan evalReturnObject) {
done, evalErr = actCfg.Activity.Eval(ctx)
evalchan <- evalReturnObject{done: done, evalerror: evalErr}
}(evalchan)

select {
case evalresult := <-evalchan:
done = evalresult.done
evalErr = evalresult.evalerror

case <-ctx.GetCancelContext().Done():
done = false
evalErr = ctx.GetCancelContext().Err()
}

if evalErr != nil {
e, ok := evalErr.(*activity.Error)
Expand Down Expand Up @@ -606,11 +633,15 @@ func NewErrorObj(taskId string, msg string) map[string]interface{} {
return map[string]interface{}{"activity": taskId, "message": msg, "type": "unknown", "code": "", "data": nil}
}

//DEPRECATED
// DEPRECATED
type LegacyCtx struct {
task *TaskInst
}

func (l *LegacyCtx) GetCancelContext() context.Context {
return l.task.cancelContext
}

func (l *LegacyCtx) GetOutput(name string) interface{} {
val, ok := l.task.outputs[name]
if ok {
Expand Down
3 changes: 3 additions & 0 deletions model/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ const (
// TaskStatusSkipped indicates that the Task was skipped
TaskStatusSkipped TaskStatus = 50

// TaskStatusFailed indicates that the Task failed
TaskStatusCancelled TaskStatus = 70

// TaskStatusFailed indicates that the Task failed
TaskStatusFailed TaskStatus = 100

Expand Down