Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Wf new plugin iface #19

Closed
wants to merge 70 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
70 commits
Select commit Hold shift + click to select a range
1f410a6
Plugin Refactor to use arbitrary state machines
Sep 6, 2019
b326041
Unit tests working
Sep 7, 2019
11aebfe
Gopkg updated
Sep 7, 2019
91e3720
tests
Sep 9, 2019
c2fde9a
more unit tests
Sep 9, 2019
8fb28a7
tests for handler
Sep 10, 2019
f6415b2
more unit tests
Sep 10, 2019
cd0eaac
More unit tests for dynamic and task
Sep 11, 2019
165c398
updated with mocks
Sep 11, 2019
cbc6e3d
linter fix
Sep 11, 2019
9497444
start and end node tests running
Sep 11, 2019
ffe8af2
Node unit tests working
Sep 12, 2019
f1a8f23
Merge branch 'master' into plugin-refactor
Sep 12, 2019
4b30072
merge issue
Sep 12, 2019
37bf8dd
Final top workflow tests in progress
Sep 12, 2019
413086e
Tests working and compiling
Sep 13, 2019
e45a775
Linter Fix
Sep 13, 2019
a5b1836
loaded container plugin
Sep 13, 2019
58648b9
More unit tests done and eventing tested
Sep 13, 2019
a92d1ca
remove backwards compatibility
Sep 13, 2019
b2e2c7c
Working simple task
Sep 14, 2019
13cc97d
Dynamic nodes working
Sep 16, 2019
a40b620
k8s array being called successfully
Sep 16, 2019
1d7d521
Improved Plugin manager
Sep 17, 2019
e0cb7b9
Updated workflow
Sep 17, 2019
c50e2c4
branch node
Sep 17, 2019
cccd8b7
Merge branch 'plugin-refactor' into branch-wf-node
Sep 17, 2019
8f74d13
All unit tests working and linter issues fixed
Sep 17, 2019
31ad97c
work in progress
Sep 18, 2019
b2c6fe3
updated lock file
Sep 18, 2019
179a19f
Merge branch 'master' into plugin-refactor
Sep 18, 2019
474adaf
fixing tests
Sep 19, 2019
20cb713
some renaming
Sep 19, 2019
2f4c0ac
consistent naming
Sep 19, 2019
4b70288
cr feedback
Sep 20, 2019
b117448
cr feedback
Sep 20, 2019
99bf9eb
Merge branch 'plugin-refactor' into branch-wf-node
Sep 20, 2019
a6dd147
Merge pull request #10 from lyft/branch-wf-node
surindersinghp Sep 20, 2019
4c8b06d
Merge branch 'master' into plugin-refactor
Sep 20, 2019
8fdae34
tests working and fixed compilation errors
Sep 20, 2019
007f5f3
linter fixed
Sep 20, 2019
ecf8e75
Merge branch 'plugin-refactor' of github.com:lyft/flytepropeller into…
Sep 20, 2019
81a0c75
removed unused methods
Sep 21, 2019
8ddac46
Move common code to pluginmachinery
Sep 23, 2019
d11b77c
Not compiling - Cycle
Sep 23, 2019
ee6ca34
Catalog
Sep 24, 2019
a512597
fixing unit tests
Sep 24, 2019
c09438e
Updated handler
Sep 25, 2019
efeb27d
fixed unit tests
Sep 25, 2019
9b7d9bb
Enable k8s && aws batch
EngHabu Sep 27, 2019
1289c93
linter fixes
Sep 27, 2019
4b3374c
linter fix
Sep 27, 2019
6f5ac61
Merge branch 'plugin-refactor' of github.com:lyft/flytepropeller into…
Sep 27, 2019
99597ed
Dep ensure
EngHabu Sep 28, 2019
2ee6a13
Aborting all running nodes on failure or external abort
Sep 29, 2019
beef93e
Merge origin
EngHabu Sep 30, 2019
62d643c
Only enable container and k8s array plugins locally
EngHabu Sep 30, 2019
a69b88c
Fix enabled plugins check
EngHabu Sep 30, 2019
1de8966
Fix enabled-plugins check
EngHabu Sep 30, 2019
a95192a
updated setupcontext
Sep 30, 2019
5a6ad00
Fixes from master
EngHabu Sep 30, 2019
d0ae8f1
Merge branch 'plugin-refactor' of github.com:lyft/flytepropeller into…
EngHabu Sep 30, 2019
1c89ac7
Better plugin wrangling logic
Sep 30, 2019
772a332
update config
Sep 30, 2019
94d2898
Merge branch 'plugin-refactor' of github.com:lyft/flytepropeller into…
EngHabu Sep 30, 2019
0c33208
Merge branch 'plugin-refactor' of github.com:lyft/flytepropeller into…
Sep 30, 2019
c3d7dd2
plugin manager
Sep 30, 2019
f747771
Working cases
Sep 30, 2019
44f852f
rename discovery -> catalog
Sep 30, 2019
56832dc
changing the workflow handler to comply with new interface
Oct 2, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
185 changes: 90 additions & 95 deletions Gopkg.lock

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,9 @@ required = [

[[constraint]]
name = "github.com/lyft/flyteplugins"
revision = "798101a85483ce46f727f6f1129890eefd358e72"
source = "https://github.com/lyft/flyteplugins"
version = "^0.1.3"
#version = "^0.1.3"

[[override]]
name = "github.com/lyft/flytestdlib"
Expand Down
12 changes: 0 additions & 12 deletions cmd/controller/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,6 @@ import (

config2 "github.com/lyft/flytepropeller/pkg/controller/config"

"github.com/lyft/flyteplugins/go/tasks"

"github.com/lyft/flyteplugins/go/tasks/v1/flytek8s"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/lyft/flytestdlib/config/viper"
Expand Down Expand Up @@ -193,15 +190,6 @@ func executeRootCmd(cfg *config2.Config) {
limitNamespace = cfg.LimitNamespace
}

err = flytek8s.Initialize(ctx, limitNamespace, cfg.DownstreamEval.Duration)
if err != nil {
logger.Panicf(ctx, "Failed to initialize k8s plugins. Error: %v", err)
}

if err := tasks.Load(ctx); err != nil {
logger.Fatalf(ctx, "Failed to load task plugins. [%v]", err)
}

mgr, err := manager.New(kubecfg, manager.Options{
Namespace: limitNamespace,
SyncPeriod: &cfg.DownstreamEval.Duration,
Expand Down
5 changes: 5 additions & 0 deletions cmd/controller/main.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
package main

import (
// _ "github.com/lyft/flyteplugins/go/tasks/plugins/array/awsbatch"
_ "github.com/lyft/flyteplugins/go/tasks/plugins/array/k8s"
_ "github.com/lyft/flyteplugins/go/tasks/plugins/hive"
_ "github.com/lyft/flyteplugins/go/tasks/plugins/k8s/container"

"github.com/lyft/flytepropeller/cmd/controller/cmd"
)

Expand Down
2 changes: 1 addition & 1 deletion cmd/kubectl-flyte/cmd/printers/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (p NodeStatusPrinter) NodeInfo(wName string, node v1alpha1.BaseNode, nodeSt

func (p NodePrinter) BranchNodeInfo(node v1alpha1.ExecutableNode, nodeStatus v1alpha1.ExecutableNodeStatus) []string {
info := p.BaseNodeInfo(node, nodeStatus)
branchStatus := nodeStatus.GetOrCreateBranchStatus()
branchStatus := nodeStatus.GetBranchStatus()
info = append(info, branchStatus.GetPhase().String())
if branchStatus.GetFinalizedNode() != nil {
info = append(info, *branchStatus.GetFinalizedNode())
Expand Down
33 changes: 7 additions & 26 deletions config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,14 @@ propeller:
capacity: 1000
kube-config: "$HOME/.kube/config"
publish-k8s-events: true
tasks:
task-plugins:
enabled-plugins:
- container
- k8s-array
- qubole-hive-executor
# Sample plugins config
plugins:
# Set of enabled plugins at root level
enabled-plugins:
- container
- waitable
- K8S-ARRAY
# All k8s plugins default configuration
k8s:
inject-finalizer: true
Expand All @@ -46,24 +47,6 @@ plugins:
- FLYTE_AWS_ENDPOINT: "http://minio.flyte:9000"
- FLYTE_AWS_ACCESS_KEY_ID: minio
- FLYTE_AWS_SECRET_ACCESS_KEY: miniostorage
# Spark Plugin configuration
spark:
spark-config-default:
- spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version: "2"
- spark.kubernetes.allocation.batch.size: "50"
- spark.hadoop.fs.s3a.acl.default: "BucketOwnerFullControl"
- spark.hadoop.fs.s3n.impl: "org.apache.hadoop.fs.s3a.S3AFileSystem"
- spark.hadoop.fs.AbstractFileSystem.s3n.impl: "org.apache.hadoop.fs.s3a.S3A"
- spark.hadoop.fs.s3.impl: "org.apache.hadoop.fs.s3a.S3AFileSystem"
- spark.hadoop.fs.AbstractFileSystem.s3.impl: "org.apache.hadoop.fs.s3a.S3A"
- spark.hadoop.fs.s3a.impl: "org.apache.hadoop.fs.s3a.S3AFileSystem"
- spark.hadoop.fs.AbstractFileSystem.s3a.impl: "org.apache.hadoop.fs.s3a.S3A"
- spark.hadoop.fs.s3a.multipart.threshold: "536870912"
- spark.blacklist.enabled: "true"
- spark.blacklist.timeout: "5m"
# Waitable plugin configuration
waitable:
console-uri: http://localhost:30081/console
# Logging configuration
logs:
kubernetes-enabled: true
Expand All @@ -89,11 +72,9 @@ admin:
endpoint: localhost:8089
insecure: true
catalog-cache:
type: catalog
type: noop
endpoint: datacatalog:8089
insecure: true
errors:
show-source: true
logger:
level: 4
show-source: true
33 changes: 19 additions & 14 deletions pkg/apis/flyteworkflow/v1alpha1/iface.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"k8s.io/apimachinery/pkg/types"

"github.com/lyft/flyteidl/gen/pb-go/flyteidl/core"
types2 "github.com/lyft/flyteplugins/go/tasks/v1/types"
"github.com/lyft/flytestdlib/storage"
)

Expand Down Expand Up @@ -173,7 +172,7 @@ type MutableDynamicNodeStatus interface {
}

// Interface for Branch node. All the methods are purely read only except for the GetExecutionStatus.
// Phase returns ExecutableBranchNodeStatus, which permits some mutations
// p returns ExecutableBranchNodeStatus, which permits some mutations
type ExecutableBranchNode interface {
GetIf() ExecutableIfBlock
GetElse() *NodeID
Expand Down Expand Up @@ -205,18 +204,19 @@ type MutableNodeStatus interface {
SetCached()
ResetDirty()

GetBranchStatus() MutableBranchNodeStatus
GetOrCreateBranchStatus() MutableBranchNodeStatus
GetOrCreateWorkflowStatus() MutableWorkflowNodeStatus
GetWorkflowStatus() MutableWorkflowNodeStatus
ClearWorkflowStatus()
GetOrCreateTaskStatus() MutableTaskNodeStatus
GetTaskStatus() MutableTaskNodeStatus
ClearTaskStatus()
GetOrCreateSubWorkflowStatus() MutableSubWorkflowNodeStatus
ClearSubWorkflowStatus()
GetOrCreateDynamicNodeStatus() MutableDynamicNodeStatus
GetDynamicNodeStatus() MutableDynamicNodeStatus
ClearDynamicNodeStatus()
}

// Interface for a Node Phase. This provides a mutable API.
// Interface for a Node p. This provides a mutable API.
type ExecutableNodeStatus interface {
NodeStatusGetter
MutableNodeStatus
Expand All @@ -233,7 +233,6 @@ type ExecutableNodeStatus interface {
GetAttempts() uint32
GetWorkflowNodeStatus() ExecutableWorkflowNodeStatus
GetTaskNodeStatus() ExecutableTaskNodeStatus
GetSubWorkflowNodeStatus() ExecutableSubWorkflowNodeStatus

IsCached() bool
IsDirty() bool
Expand All @@ -249,16 +248,18 @@ type MutableSubWorkflowNodeStatus interface {
}

type ExecutableTaskNodeStatus interface {
GetPhase() types2.TaskPhase
GetPhase() int
GetPhaseVersion() uint32
GetCustomState() types2.CustomState
GetPluginState() []byte
GetPluginStateVersion() uint32
}

type MutableTaskNodeStatus interface {
ExecutableTaskNodeStatus
SetPhase(phase types2.TaskPhase)
SetPhase(phase int)
SetPhaseVersion(version uint32)
SetCustomState(state types2.CustomState)
SetPluginState([]byte)
SetPluginStateVersion(uint32)
}

// Interface for a Child Workflow Node
Expand Down Expand Up @@ -287,7 +288,7 @@ type ExecutableNode interface {
GetRetryStrategy() *RetryStrategy
}

// Interface for the Workflow Phase. This is the mutable portion for a Workflow
// Interface for the Workflow p. This is the mutable portion for a Workflow
type ExecutableWorkflowStatus interface {
NodeStatusGetter
UpdatePhase(p WorkflowPhase, msg string)
Expand Down Expand Up @@ -336,7 +337,7 @@ type ExecutableSubWorkflow interface {
type WorkflowMeta interface {
GetExecutionID() ExecutionID
GetK8sWorkflowID() types.NamespacedName
NewControllerRef() metav1.OwnerReference
GetOwnerReference() metav1.OwnerReference
GetNamespace() string
GetCreationTimestamp() metav1.Time
GetAnnotations() map[string]string
Expand All @@ -345,9 +346,13 @@ type WorkflowMeta interface {
GetServiceAccountName() string
}

type TaskDetailsGetter interface {
GetTask(id TaskID) (ExecutableTask, error)
}

type WorkflowMetaExtended interface {
WorkflowMeta
GetTask(id TaskID) (ExecutableTask, error)
TaskDetailsGetter
FindSubWorkflow(subID WorkflowID) ExecutableSubWorkflow
GetExecutionStatus() ExecutableWorkflowStatus
}
Expand Down
89 changes: 58 additions & 31 deletions pkg/apis/flyteworkflow/v1alpha1/mocks/ExecutableNodeStatus.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading