Skip to content

Commit

Permalink
Support default PodTemplate for k8s plugin (flyteorg#404)
Browse files Browse the repository at this point in the history
* updated flyteplugins to use local repo

Signed-off-by: Daniel Rammer <daniel@union.ai>

* initializing default pod template informer

Signed-off-by: Daniel Rammer <daniel@union.ai>

* moved k8s plugin configuration

Signed-off-by: Daniel Rammer <daniel@union.ai>

* updated to reflect per-namespace PodTemplate support

Signed-off-by: Daniel Rammer <daniel@union.ai>

* updated flyteplugins version

Signed-off-by: Daniel Rammer <daniel@union.ai>

* fixes lint issue

Signed-off-by: Daniel Rammer <daniel@union.ai>

* updated flyteplugins version

Signed-off-by: Daniel Rammer <daniel@union.ai>
  • Loading branch information
hamersaw committed Mar 16, 2022
1 parent 37b2e32 commit 8a7f2dc
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 7 deletions.
2 changes: 1 addition & 1 deletion flytepropeller/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ require (
github.com/benlaurie/objecthash v0.0.0-20180202135721-d1e3d6079fc1
github.com/fatih/color v1.10.0
github.com/flyteorg/flyteidl v0.23.0
github.com/flyteorg/flyteplugins v0.10.12
github.com/flyteorg/flyteplugins v0.10.13
github.com/flyteorg/flytestdlib v0.4.13
github.com/ghodss/yaml v1.0.0
github.com/go-redis/redis v6.15.7+incompatible
Expand Down
4 changes: 2 additions & 2 deletions flytepropeller/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -238,8 +238,8 @@ github.com/fatih/color v1.10.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGE
github.com/fatih/structtag v1.2.0/go.mod h1:mBJUNpUnHmRKrKlQQlmCrh5PuhftFbNv8Ys4/aAZl94=
github.com/flyteorg/flyteidl v0.23.0 h1:Pjl9Tq1pJfIK0au5PiqPVpl25xTYosN6BruZl+PgWAk=
github.com/flyteorg/flyteidl v0.23.0/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U=
github.com/flyteorg/flyteplugins v0.10.12 h1:O204oPVLn1/jgSh0RoNk1yFsORv8ur9rBDXlMdjvOKE=
github.com/flyteorg/flyteplugins v0.10.12/go.mod h1:1KU+tAFfm+89/YF7VGg8EoEFT6uEvU2RDcYgN2TOnoE=
github.com/flyteorg/flyteplugins v0.10.13 h1:jNwAv3rELt23e3PumrLWrEUNKfsg6iYfdaldt1pNLtc=
github.com/flyteorg/flyteplugins v0.10.13/go.mod h1:1KU+tAFfm+89/YF7VGg8EoEFT6uEvU2RDcYgN2TOnoE=
github.com/flyteorg/flytestdlib v0.3.13/go.mod h1:Tz8JCECAbX6VWGwFT6cmEQ+RJpZ/6L9pswu3fzWs220=
github.com/flyteorg/flytestdlib v0.4.13 h1:TzgqhECRGfOHYH1A7rUwcKEEH2rTtPxGy+oYcif7iBw=
github.com/flyteorg/flytestdlib v0.4.13/go.mod h1:fv1ar34LJLMTaf0tbfetisLykUlARi7rP+NQTUn6QQs=
Expand Down
36 changes: 32 additions & 4 deletions flytepropeller/pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package controller
import (
"context"
"fmt"
"os"
"runtime/pprof"
"time"

Expand Down Expand Up @@ -38,11 +39,15 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"k8s.io/apimachinery/pkg/util/clock"
k8sInformers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/record"

"github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/flytek8s"
flyteK8sConfig "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/flytek8s/config"

"github.com/flyteorg/flytepropeller/pkg/apis/flyteworkflow/v1alpha1"
clientset "github.com/flyteorg/flytepropeller/pkg/client/clientset/versioned"
informers "github.com/flyteorg/flytepropeller/pkg/client/informers/externalversions"
Expand All @@ -55,8 +60,12 @@ import (
"github.com/flyteorg/flytepropeller/pkg/utils"
)

const resourceLevelMonitorCycleDuration = 5 * time.Second
const missing = "missing"
const (
resourceLevelMonitorCycleDuration = 5 * time.Second
missing = "missing"
podDefaultNamespace = "flyte"
podNamespaceEnvVar = "POD_NAMESPACE"
)

type metrics struct {
Scope promutils.Scope
Expand Down Expand Up @@ -303,7 +312,8 @@ func getAdminClient(ctx context.Context) (client service.AdminServiceClient, opt

// New returns a new FlyteWorkflow controller
func New(ctx context.Context, cfg *config.Config, kubeclientset kubernetes.Interface, flytepropellerClientset clientset.Interface,
flyteworkflowInformerFactory informers.SharedInformerFactory, kubeClient executors.Client, scope promutils.Scope) (*Controller, error) {
flyteworkflowInformerFactory informers.SharedInformerFactory, informerFactory k8sInformers.SharedInformerFactory,
kubeClient executors.Client, scope promutils.Scope) (*Controller, error) {

adminClient, authOpts, err := getAdminClient(ctx)
if err != nil {
Expand Down Expand Up @@ -377,6 +387,16 @@ func New(ctx context.Context, cfg *config.Config, kubeclientset kubernetes.Inter
flyteworkflowInformer := flyteworkflowInformerFactory.Flyteworkflow().V1alpha1().FlyteWorkflows()
controller.flyteworkflowSynced = flyteworkflowInformer.Informer().HasSynced

podTemplateInformer := informerFactory.Core().V1().PodTemplates()

// set default namespace for pod template store
podNamespace, found := os.LookupEnv(podNamespaceEnvVar)
if !found {
podNamespace = podDefaultNamespace
}

flytek8s.DefaultPodTemplateStore.SetDefaultNamespace(podNamespace)

sCfg := storage.GetConfig()
if sCfg == nil {
logger.Errorf(ctx, "Storage configuration missing.")
Expand Down Expand Up @@ -424,6 +444,9 @@ func New(ctx context.Context, cfg *config.Config, kubeclientset kubernetes.Inter
logger.Info(ctx, "Setting up event handlers")
// Set up an event handler for when FlyteWorkflow resources change
flyteworkflowInformer.Informer().AddEventHandler(controller.getWorkflowUpdatesHandler())

updateHandler := flytek8s.GetPodTemplateUpdatesHandler(&flytek8s.DefaultPodTemplateStore, flyteK8sConfig.GetK8sPluginConfig().DefaultPodTemplateName)
podTemplateInformer.Informer().AddEventHandler(updateHandler)
return controller, nil
}

Expand Down Expand Up @@ -486,6 +509,8 @@ func StartController(ctx context.Context, cfg *config.Config, defaultNamespace s
opts := SharedInformerOptions(cfg, defaultNamespace)
flyteworkflowInformerFactory := informers.NewSharedInformerFactoryWithOptions(flyteworkflowClient, cfg.WorkflowReEval.Duration, opts...)

informerFactory := k8sInformers.NewSharedInformerFactoryWithOptions(kubeClient, flyteK8sConfig.GetK8sPluginConfig().DefaultPodTemplateResync.Duration)

// Add the propeller subscope because the MetricsPrefix only has "flyte:" to get uniform collection of metrics.
propellerScope := promutils.NewScope(cfg.MetricsPrefix).NewSubScope("propeller").NewSubScope(cfg.LimitNamespace)

Expand Down Expand Up @@ -517,14 +542,17 @@ func StartController(ctx context.Context, cfg *config.Config, defaultNamespace s
}
}(ctx)

c, err := New(ctx, cfg, kubeClient, flyteworkflowClient, flyteworkflowInformerFactory, mgr, propellerScope)
c, err := New(ctx, cfg, kubeClient, flyteworkflowClient, flyteworkflowInformerFactory, informerFactory, mgr, propellerScope)
if err != nil {
return errors.Wrap(err, "failed to start FlytePropeller")
} else if c == nil {
return errors.Errorf("Failed to create a new instance of FlytePropeller")
}

go flyteworkflowInformerFactory.Start(ctx.Done())
if flyteK8sConfig.GetK8sPluginConfig().DefaultPodTemplateName != "" {
go informerFactory.Start(ctx.Done())
}

if err = c.Run(ctx); err != nil {
return errors.Wrapf(err, "Error running FlytePropeller.")
Expand Down

0 comments on commit 8a7f2dc

Please sign in to comment.