Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Commit

Permalink
Merge pull request #18 from lyft/barrier-impl
Browse files Browse the repository at this point in the history
Barrier support for task plugins
  • Loading branch information
Ketan Umare committed Oct 15, 2019
2 parents 980760d + 5963199 commit 6158819
Show file tree
Hide file tree
Showing 16 changed files with 471 additions and 153 deletions.
1 change: 1 addition & 0 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions pkg/apis/flyteworkflow/v1alpha1/iface.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ type ExecutableTaskNodeStatus interface {
GetPhaseVersion() uint32
GetPluginState() []byte
GetPluginStateVersion() uint32
GetBarrierClockTick() uint32
}

type MutableTaskNodeStatus interface {
Expand All @@ -262,6 +263,7 @@ type MutableTaskNodeStatus interface {
SetPhaseVersion(version uint32)
SetPluginState([]byte)
SetPluginStateVersion(uint32)
SetBarrierClockTick(tick uint32)
}

// Interface for a Child Workflow Node
Expand Down
32 changes: 0 additions & 32 deletions pkg/apis/flyteworkflow/v1alpha1/mocks/ExecutableNodeStatus.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 5 additions & 21 deletions pkg/apis/flyteworkflow/v1alpha1/mocks/ExecutableTaskNodeStatus.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

32 changes: 0 additions & 32 deletions pkg/apis/flyteworkflow/v1alpha1/mocks/MutableNodeStatus.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

31 changes: 10 additions & 21 deletions pkg/apis/flyteworkflow/v1alpha1/mocks/MutableTaskNodeStatus.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions pkg/apis/flyteworkflow/v1alpha1/node_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,15 @@ type TaskNodeStatus struct {
PhaseVersion uint32 `json:"phaseVersion,omitempty"`
PluginState []byte `json:"pState,omitempty"`
PluginStateVersion uint32 `json:"psv,omitempty"`
BarrierClockTick uint32 `json:"tick,omitempty"`
}

func (in *TaskNodeStatus) GetBarrierClockTick() uint32 {
return in.BarrierClockTick
}

func (in *TaskNodeStatus) SetBarrierClockTick(tick uint32) {
in.BarrierClockTick = tick
}

func (in *TaskNodeStatus) SetPluginState(s []byte) {
Expand Down
8 changes: 4 additions & 4 deletions pkg/controller/nodes/dynamic/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,13 +255,13 @@ func (d dynamicNodeTaskNodeHandler) buildContextualDynamicWorkflow(ctx context.C
nStatus.SetDataDir(nCtx.NodeStatus().GetDataDir())
nStatus.SetParentTaskID(execID)

//cacheHitStopWatch := d.metrics.CacheHit.Start(ctx)
// cacheHitStopWatch := d.metrics.CacheHit.Start(ctx)
// Check if we have compiled the workflow before:
// If there is a cached compiled Workflow, load and return it.
//if ok, err := f.CacheExists(ctx); err != nil {
// if ok, err := f.CacheExists(ctx); err != nil {
// logger.Warnf(ctx, "Failed to call head on compiled futures file. Error: %v", err)
// return nil, false, errors.Wrapf(errors.CausedByError, nCtx.NodeID(), err, "Failed to do HEAD on compiled futures file.")
//} else if ok {
// } else if ok {
// // It exists, load and return it
// compiledWf, err := f.RetrieveCache(ctx)
// if err != nil {
Expand All @@ -271,7 +271,7 @@ func (d dynamicNodeTaskNodeHandler) buildContextualDynamicWorkflow(ctx context.C
// cacheHitStopWatch.Stop()
// return newContextualWorkflow(nCtx.Workflow(), compiledWf, nStatus, compiledWf.Tasks, compiledWf.SubWorkflows), true, nil
// }
//}
// }

// We know for sure that futures file was generated. Lets read it
djSpec, err := f.Read(ctx)
Expand Down
22 changes: 15 additions & 7 deletions pkg/controller/nodes/dynamic/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,11 +313,6 @@ func Test_dynamicNodeHandler_Handle_SubTask(t *testing.T) {
nCtx.On("NodeID").Return("n1")
nCtx.On("EnqueueOwnerFunc").Return(func() error { return nil })

w := &flyteMocks.ExecutableWorkflow{}
ws := &flyteMocks.ExecutableWorkflowStatus{}
w.On("GetExecutionStatus").Return(ws)
nCtx.On("Workflow").Return(w)

endNodeStatus := &flyteMocks.ExecutableNodeStatus{}
endNodeStatus.On("GetDataDir").Return(storage.DataReference("end-node"))

Expand All @@ -326,13 +321,26 @@ func Test_dynamicNodeHandler_Handle_SubTask(t *testing.T) {
subNs.On("ResetDirty").Return()
subNs.On("GetDataDir").Return(finalOutput)
subNs.On("SetParentTaskID", mock.Anything).Return()
subNs.On("GetNodeExecutionStatus", mock.MatchedBy(func(n v1alpha1.NodeID) bool { return n == v1alpha1.EndNodeID })).Return(endNodeStatus)

dynamicNS := &flyteMocks.ExecutableNodeStatus{}
dynamicNS.On("SetDataDir", mock.Anything).Return()
dynamicNS.On("SetParentTaskID", mock.Anything).Return()
dynamicNS.On("GetNodeExecutionStatus", "n1-Node_1").Return(subNs)
dynamicNS.On("GetNodeExecutionStatus", "n1-Node_2").Return(subNs)
dynamicNS.On("GetNodeExecutionStatus", "n1-Node_3").Return(subNs)
dynamicNS.On("GetNodeExecutionStatus", v1alpha1.EndNodeID).Return(endNodeStatus)

ns := &flyteMocks.ExecutableNodeStatus{}
ns.On("GetDataDir").Return(storage.DataReference("data-dir"))
ns.On("GetNodeExecutionStatus", mock.Anything).Return(subNs)
ns.On("GetNodeExecutionStatus", dynamicNodeID).Return(dynamicNS)
nCtx.On("NodeStatus").Return(ns)

w := &flyteMocks.ExecutableWorkflow{}
ws := &flyteMocks.ExecutableWorkflowStatus{}
ws.On("GetNodeExecutionStatus", "n1").Return(ns)
w.On("GetExecutionStatus").Return(ws)
nCtx.On("Workflow").Return(w)

r := &nodeMocks.NodeStateReader{}
r.On("GetDynamicNodeState").Return(handler.DynamicNodeState{
Phase: v1alpha1.DynamicNodePhaseExecuting,
Expand Down
1 change: 1 addition & 0 deletions pkg/controller/nodes/handler/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ type TaskNodeState struct {
PluginPhaseVersion uint32
PluginState []byte
PluginStateVersion uint32
BarrierClockTick uint32
}

type BranchNodeState struct {
Expand Down
1 change: 1 addition & 0 deletions pkg/controller/nodes/node_state_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ func (n nodeStateManager) GetTaskNodeState() handler.TaskNodeState {
PluginPhaseVersion: tn.GetPhaseVersion(),
PluginStateVersion: tn.GetPluginStateVersion(),
PluginState: tn.GetPluginState(),
BarrierClockTick: tn.GetBarrierClockTick(),
}
}
return handler.TaskNodeState{}
Expand Down
61 changes: 61 additions & 0 deletions pkg/controller/nodes/task/barrier.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package task

import (
"context"
"time"

"github.com/lyft/flytestdlib/logger"
"k8s.io/apimachinery/pkg/util/cache"

"github.com/lyft/flytepropeller/pkg/controller/nodes/task/config"
)

type BarrierKey = string

type PluginCallLog struct {
PluginTransition *pluginRequestedTransition
}

type BarrierTransition struct {
BarrierClockTick uint32
CallLog PluginCallLog
}

var NoBarrierTransition = BarrierTransition{BarrierClockTick: 0}

type barrier struct {
barrierCacheExpiration time.Duration
barrierTransitions *cache.LRUExpireCache
barrierEnabled bool
}

func (b *barrier) RecordBarrierTransition(ctx context.Context, k BarrierKey, bt BarrierTransition) {
if b.barrierEnabled {
b.barrierTransitions.Add(k, bt, b.barrierCacheExpiration)
}
}

func (b *barrier) GetPreviousBarrierTransition(ctx context.Context, k BarrierKey) BarrierTransition {
if b.barrierEnabled {
if v, ok := b.barrierTransitions.Get(k); ok {
f, casted := v.(BarrierTransition)
if !casted {
logger.Errorf(ctx, "Failed to cast recorded value to BarrierTransition")
return NoBarrierTransition
}
return f
}
}
return NoBarrierTransition
}

func NewLRUBarrier(_ context.Context, cfg config.BarrierConfig) *barrier {
b := &barrier{
barrierEnabled: cfg.Enabled,
}
if cfg.Enabled {
b.barrierCacheExpiration = cfg.CacheTTL.Duration
b.barrierTransitions = cache.NewLRUExpireCache(cfg.CacheSize)
}
return b
}
Loading

0 comments on commit 6158819

Please sign in to comment.