Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Commit

Permalink
Add k8s events to task phase updates (#600)
Browse files Browse the repository at this point in the history
* wip: Add k8s events to task phase updates

Signed-off-by: Andrew Dye <andrewwdye@gmail.com>

* Refactor clientset, pluginstate

Signed-off-by: Andrew Dye <andrewwdye@gmail.com>

* Refactor to use batched events

Signed-off-by: Andrew Dye <andrewwdye@gmail.com>

* go.mod updates

Signed-off-by: Andrew Dye <andrewwdye@gmail.com>

* Linits

Signed-off-by: Andrew Dye <andrewwdye@gmail.com>

* Update flyteidl and flyteplugins versions

Signed-off-by: Andrew Dye <andrewwdye@gmail.com>

* Update to EventReason

Signed-off-by: Andrew Dye <andrewwdye@gmail.com>

* Comments

Signed-off-by: Andrew Dye <andrewwdye@gmail.com>

* Readd GroupVersionKind changes

Signed-off-by: Andrew Dye <andrewwdye@gmail.com>

---------

Signed-off-by: Andrew Dye <andrewwdye@gmail.com>
  • Loading branch information
andrewwdye authored Sep 29, 2023
1 parent 2aca906 commit 40fef66
Show file tree
Hide file tree
Showing 19 changed files with 603 additions and 229 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ require (
github.com/DiSiqueira/GoTree v1.0.1-0.20180907134536-53a8e837f295
github.com/benlaurie/objecthash v0.0.0-20180202135721-d1e3d6079fc1
github.com/fatih/color v1.13.0
github.com/flyteorg/flyteidl v1.5.13
github.com/flyteorg/flyteplugins v1.1.30
github.com/flyteorg/flyteidl v1.5.21
github.com/flyteorg/flyteplugins v1.1.31
github.com/flyteorg/flytestdlib v1.0.24
github.com/ghodss/yaml v1.0.0
github.com/go-redis/redis v6.15.7+incompatible
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -242,10 +242,10 @@ github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5Kwzbycv
github.com/fatih/color v1.13.0 h1:8LOYc1KYPPmyKMuN8QV2DNRWNbLo6LZ0iLs8+mlH53w=
github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk=
github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/flyteorg/flyteidl v1.5.13 h1:IQ2Cw+u36ew3BPyRDAcHdzc/GyNEOXOxhKy9jbS4hbo=
github.com/flyteorg/flyteidl v1.5.13/go.mod h1:EtE/muM2lHHgBabjYcxqe9TWeJSL0kXwbI0RgVwI4Og=
github.com/flyteorg/flyteplugins v1.1.30 h1:AVqS6Eb9Nr9Z3Mb3CtP04ffAVS9LMx5Q1Z7AyFFk/e0=
github.com/flyteorg/flyteplugins v1.1.30/go.mod h1:FujFQdL/f9r1HvFR81JCiNYusDy9F0lExhyoyMHXXbg=
github.com/flyteorg/flyteidl v1.5.21 h1:zP1byUlNFqstTe7Io1DiiNgNf+mZAVmGZM04oIUA5kU=
github.com/flyteorg/flyteidl v1.5.21/go.mod h1:EtE/muM2lHHgBabjYcxqe9TWeJSL0kXwbI0RgVwI4Og=
github.com/flyteorg/flyteplugins v1.1.31 h1:9LHEWq6I4/hh4BeSk7qKwgeaBSyedD8V5se54v77yYo=
github.com/flyteorg/flyteplugins v1.1.31/go.mod h1:FujFQdL/f9r1HvFR81JCiNYusDy9F0lExhyoyMHXXbg=
github.com/flyteorg/flytestdlib v1.0.24 h1:jDvymcjlsTRCwOtxPapro0WZBe3isTz+T3Tiq+mZUuk=
github.com/flyteorg/flytestdlib v1.0.24/go.mod h1:6nXa5g00qFIsgdvQ7jKQMJmDniqO0hG6Z5X5olfduqQ=
github.com/flyteorg/stow v0.3.7 h1:Cx7j8/Ux6+toD5hp5fy++927V+yAcAttDeQAlUD/864=
Expand Down
10 changes: 5 additions & 5 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ func getAdminClient(ctx context.Context) (client service.AdminServiceClient, sig
}

// New returns a new FlyteWorkflow controller
func New(ctx context.Context, cfg *config.Config, kubeclientset kubernetes.Interface, flytepropellerClientset clientset.Interface,
func New(ctx context.Context, cfg *config.Config, kubeClientset kubernetes.Interface, flytepropellerClientset clientset.Interface,
flyteworkflowInformerFactory informers.SharedInformerFactory, informerFactory k8sInformers.SharedInformerFactory,
kubeClient executors.Client, scope promutils.Scope) (*Controller, error) {

Expand Down Expand Up @@ -354,13 +354,13 @@ func New(ctx context.Context, cfg *config.Config, kubeclientset kubernetes.Inter
if err != nil {
return nil, errors.Wrapf(err, "Failed to create EventSink [%v], error %v", events.GetConfig(ctx).Type, err)
}
gc, err := NewGarbageCollector(cfg, scope, clock.RealClock{}, kubeclientset.CoreV1().Namespaces(), flytepropellerClientset.FlyteworkflowV1alpha1())
gc, err := NewGarbageCollector(cfg, scope, clock.RealClock{}, kubeClientset.CoreV1().Namespaces(), flytepropellerClientset.FlyteworkflowV1alpha1())
if err != nil {
logger.Errorf(ctx, "failed to initialize GC for workflows")
return nil, errors.Wrapf(err, "failed to initialize WF GC")
}

eventRecorder, err := utils.NewK8sEventRecorder(ctx, kubeclientset, controllerAgentName, cfg.PublishK8sEvents)
eventRecorder, err := utils.NewK8sEventRecorder(ctx, kubeClientset, controllerAgentName, cfg.PublishK8sEvents)
if err != nil {
logger.Errorf(ctx, "failed to event recorder %v", err)
return nil, errors.Wrapf(err, "failed to initialize resource lock.")
Expand All @@ -372,7 +372,7 @@ func New(ctx context.Context, cfg *config.Config, kubeclientset kubernetes.Inter
numWorkers: cfg.Workers,
}

lock, err := leader.NewResourceLock(kubeclientset.CoreV1(), kubeclientset.CoordinationV1(), eventRecorder, cfg.LeaderElection)
lock, err := leader.NewResourceLock(kubeClientset.CoreV1(), kubeClientset.CoordinationV1(), eventRecorder, cfg.LeaderElection)
if err != nil {
logger.Errorf(ctx, "failed to initialize resource lock.")
return nil, errors.Wrapf(err, "failed to initialize resource lock.")
Expand Down Expand Up @@ -440,7 +440,7 @@ func New(ctx context.Context, cfg *config.Config, kubeclientset kubernetes.Inter

recoveryClient := recovery.NewClient(adminClient)
nodeHandlerFactory, err := factory.NewHandlerFactory(ctx, launchPlanActor, launchPlanActor,
kubeClient, catalogClient, recoveryClient, &cfg.EventConfig, cfg.ClusterID, signalClient, scope)
kubeClient, kubeClientset, catalogClient, recoveryClient, &cfg.EventConfig, cfg.ClusterID, signalClient, scope)
if err != nil {
return nil, errors.Wrapf(err, "failed to create node handler factory")
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/controller/executors/dag_structure.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"github.com/flyteorg/flytepropeller/pkg/apis/flyteworkflow/v1alpha1"
)

//go:generate mockery -name DAGStructure -name DAGStructureWithStartNode -case=underscore

// An interface that captures the Directed Acyclic Graph structure in which the nodes are connected.
// If NodeLookup and DAGStructure are used together a traversal can be implemented.
type DAGStructure interface {
Expand Down
2 changes: 2 additions & 0 deletions pkg/controller/executors/execution_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"github.com/flyteorg/flytepropeller/pkg/apis/flyteworkflow/v1alpha1"
)

// go:generate mockery -case=underscore

type TaskDetailsGetter interface {
GetTask(id v1alpha1.TaskID) (v1alpha1.ExecutableTask, error)
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/controller/executors/kube.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
)

//go:generate mockery -name Client -case=underscore

// Client is a friendlier controller-runtime client that gets passed to executors
type Client interface {
// GetClient returns a client configured with the Config
Expand Down
2 changes: 2 additions & 0 deletions pkg/controller/executors/node_lookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"github.com/flyteorg/flytepropeller/pkg/apis/flyteworkflow/v1alpha1"
)

//go:generate mockery -name NodeLookup -case=underscore

// NodeLookup provides a structure that enables looking up all nodes within the current execution hierarchy/context.
// NOTE: execution hierarchy may change the nodes available, this is because when a SubWorkflow is being executed, only
// the nodes within the subworkflow are visible
Expand Down
2 changes: 2 additions & 0 deletions pkg/controller/executors/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"github.com/flyteorg/flytepropeller/pkg/apis/flyteworkflow/v1alpha1"
)

//go:generate mockery -name Workflow -case=underscore

type Workflow interface {
Initialize(ctx context.Context) error
HandleFlyteWorkflow(ctx context.Context, w *v1alpha1.FlyteWorkflow) error
Expand Down
8 changes: 6 additions & 2 deletions pkg/controller/nodes/factory/handler_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package factory
import (
"context"

"k8s.io/client-go/kubernetes"

"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/service"

"github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/catalog"
Expand Down Expand Up @@ -33,6 +35,7 @@ type handlerFactory struct {
workflowLauncher launchplan.Executor
launchPlanReader launchplan.Reader
kubeClient executors.Client
kubeClientset kubernetes.Interface
catalogClient catalog.Client
recoveryClient recovery.Client
eventConfig *config.EventConfig
Expand All @@ -50,7 +53,7 @@ func (f *handlerFactory) GetHandler(kind v1alpha1.NodeKind) (interfaces.NodeHand
}

func (f *handlerFactory) Setup(ctx context.Context, executor interfaces.Node, setup interfaces.SetupContext) error {
t, err := task.New(ctx, f.kubeClient, f.catalogClient, f.eventConfig, f.clusterID, f.scope)
t, err := task.New(ctx, f.kubeClient, f.kubeClientset, f.catalogClient, f.eventConfig, f.clusterID, f.scope)
if err != nil {
return err
}
Expand Down Expand Up @@ -79,13 +82,14 @@ func (f *handlerFactory) Setup(ctx context.Context, executor interfaces.Node, se
}

func NewHandlerFactory(ctx context.Context, workflowLauncher launchplan.Executor, launchPlanReader launchplan.Reader,
kubeClient executors.Client, catalogClient catalog.Client, recoveryClient recovery.Client, eventConfig *config.EventConfig,
kubeClient executors.Client, kubeClientset kubernetes.Interface, catalogClient catalog.Client, recoveryClient recovery.Client, eventConfig *config.EventConfig,
clusterID string, signalClient service.SignalServiceClient, scope promutils.Scope) (interfaces.HandlerFactory, error) {

return &handlerFactory{
workflowLauncher: workflowLauncher,
launchPlanReader: launchPlanReader,
kubeClient: kubeClient,
kubeClientset: kubeClientset,
catalogClient: catalogClient,
recoveryClient: recoveryClient,
eventConfig: eventConfig,
Expand Down
9 changes: 7 additions & 2 deletions pkg/controller/nodes/task/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"runtime/debug"
"time"

"k8s.io/client-go/kubernetes"

"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/event"
Expand Down Expand Up @@ -195,6 +197,7 @@ type Handler struct {
metrics *metrics
pluginRegistry PluginRegistryIface
kubeClient pluginCore.KubeClient
kubeClientset kubernetes.Interface
secretManager pluginCore.SecretManager
resourceManager resourcemanager.BaseResourceManager
cfg *config.Config
Expand Down Expand Up @@ -229,7 +232,7 @@ func (t *Handler) Setup(ctx context.Context, sCtx interfaces.SetupContext) error

// Create the resource negotiator here
// and then convert it to proxies later and pass them to plugins
enabledPlugins, defaultForTaskTypes, err := WranglePluginsAndGenerateFinalList(ctx, &t.cfg.TaskPlugins, t.pluginRegistry)
enabledPlugins, defaultForTaskTypes, err := WranglePluginsAndGenerateFinalList(ctx, &t.cfg.TaskPlugins, t.pluginRegistry, t.kubeClientset)
if err != nil {
logger.Errorf(ctx, "Failed to finalize enabled plugins. Error: %s", err)
return err
Expand Down Expand Up @@ -840,7 +843,8 @@ func (t Handler) Finalize(ctx context.Context, nCtx interfaces.NodeExecutionCont
}()
}

func New(ctx context.Context, kubeClient executors.Client, client catalog.Client, eventConfig *controllerConfig.EventConfig, clusterID string, scope promutils.Scope) (*Handler, error) {
func New(ctx context.Context, kubeClient executors.Client, kubeClientset kubernetes.Interface, client catalog.Client,
eventConfig *controllerConfig.EventConfig, clusterID string, scope promutils.Scope) (*Handler, error) {
// TODO New should take a pointer
async, err := catalog.NewAsyncClient(client, *catalog.GetConfig(), scope.NewSubScope("async_catalog"))
if err != nil {
Expand All @@ -866,6 +870,7 @@ func New(ctx context.Context, kubeClient executors.Client, client catalog.Client
},
pluginScope: scope.NewSubScope("plugin"),
kubeClient: kubeClient,
kubeClientset: kubeClientset,
catalog: client,
asyncCatalog: async,
resourceManager: nil,
Expand Down
49 changes: 25 additions & 24 deletions pkg/controller/nodes/task/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,50 +6,48 @@ import (
"fmt"
"testing"

"github.com/flyteorg/flyteidl/clients/go/coreutils"
"github.com/golang/protobuf/proto"

eventsErr "github.com/flyteorg/flytepropeller/events/errors"

pluginK8sMocks "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/k8s/mocks"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
v1 "k8s.io/api/core/v1"
v12 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
k8sfake "k8s.io/client-go/kubernetes/fake"

"github.com/flyteorg/flyteidl/clients/go/coreutils"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/flyteorg/flytepropeller/pkg/apis/flyteworkflow/v1alpha1"

"github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/ioutils"

"github.com/flyteorg/flytepropeller/pkg/controller/nodes/task/resourcemanager"

"github.com/flyteorg/flytestdlib/contextutils"
"github.com/flyteorg/flytestdlib/promutils/labeled"

"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/event"

"github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery"
pluginCatalogMocks "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/catalog/mocks"
pluginCore "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core"
pluginCoreMocks "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core/mocks"
"github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/io"
ioMocks "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/io/mocks"
"github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/ioutils"
pluginK8s "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/k8s"
controllerConfig "github.com/flyteorg/flytepropeller/pkg/controller/config"
"github.com/flyteorg/flytestdlib/promutils"
"github.com/flyteorg/flytestdlib/storage"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
v1 "k8s.io/api/core/v1"
v12 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
pluginK8sMocks "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/k8s/mocks"

eventsErr "github.com/flyteorg/flytepropeller/events/errors"
"github.com/flyteorg/flytepropeller/pkg/apis/flyteworkflow/v1alpha1"
flyteMocks "github.com/flyteorg/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/mocks"
controllerConfig "github.com/flyteorg/flytepropeller/pkg/controller/config"
"github.com/flyteorg/flytepropeller/pkg/controller/executors/mocks"
"github.com/flyteorg/flytepropeller/pkg/controller/nodes/handler"
"github.com/flyteorg/flytepropeller/pkg/controller/nodes/interfaces"
nodeMocks "github.com/flyteorg/flytepropeller/pkg/controller/nodes/interfaces/mocks"
"github.com/flyteorg/flytepropeller/pkg/controller/nodes/task/codex"
"github.com/flyteorg/flytepropeller/pkg/controller/nodes/task/config"
"github.com/flyteorg/flytepropeller/pkg/controller/nodes/task/fakeplugins"
"github.com/flyteorg/flytepropeller/pkg/controller/nodes/task/resourcemanager"
rmConfig "github.com/flyteorg/flytepropeller/pkg/controller/nodes/task/resourcemanager/config"

"github.com/flyteorg/flytestdlib/contextutils"
"github.com/flyteorg/flytestdlib/promutils"
"github.com/flyteorg/flytestdlib/promutils/labeled"
"github.com/flyteorg/flytestdlib/storage"
)

var eventConfig = &controllerConfig.EventConfig{
Expand Down Expand Up @@ -242,12 +240,13 @@ func Test_task_Setup(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
sCtx := &nodeMocks.SetupContext{}
fakeKubeClient := mocks.NewFakeKubeClient()
mockClientset := k8sfake.NewSimpleClientset()
sCtx.On("KubeClient").Return(fakeKubeClient)
sCtx.On("OwnerKind").Return("test")
sCtx.On("EnqueueOwner").Return(pluginCore.EnqueueOwner(func(name types.NamespacedName) error { return nil }))
sCtx.On("MetricsScope").Return(promutils.NewTestScope())

tk, err := New(context.TODO(), mocks.NewFakeKubeClient(), &pluginCatalogMocks.Client{}, eventConfig, testClusterID, promutils.NewTestScope())
tk, err := New(context.TODO(), fakeKubeClient, mockClientset, &pluginCatalogMocks.Client{}, eventConfig, testClusterID, promutils.NewTestScope())
tk.cfg.TaskPlugins.EnabledPlugins = tt.enabledPlugins
tk.cfg.TaskPlugins.DefaultForTaskTypes = tt.defaultForTaskTypes
assert.NoError(t, err)
Expand Down Expand Up @@ -1226,7 +1225,8 @@ func Test_task_Finalize(t *testing.T) {

catalog := &pluginCatalogMocks.Client{}
m := tt.fields.defaultPluginCallback()
tk, err := New(context.TODO(), mocks.NewFakeKubeClient(), catalog, eventConfig, testClusterID, promutils.NewTestScope())
mockClientset := k8sfake.NewSimpleClientset()
tk, err := New(context.TODO(), mocks.NewFakeKubeClient(), mockClientset, catalog, eventConfig, testClusterID, promutils.NewTestScope())
assert.NoError(t, err)
tk.defaultPlugin = m
tk.resourceManager = noopRm
Expand All @@ -1245,7 +1245,8 @@ func Test_task_Finalize(t *testing.T) {
}

func TestNew(t *testing.T) {
got, err := New(context.TODO(), mocks.NewFakeKubeClient(), &pluginCatalogMocks.Client{}, eventConfig, testClusterID, promutils.NewTestScope())
mockClientset := k8sfake.NewSimpleClientset()
got, err := New(context.TODO(), mocks.NewFakeKubeClient(), mockClientset, &pluginCatalogMocks.Client{}, eventConfig, testClusterID, promutils.NewTestScope())
assert.NoError(t, err)
assert.NotNil(t, got)
assert.NotNil(t, got.defaultPlugins)
Expand Down
Loading

0 comments on commit 40fef66

Please sign in to comment.