This repository has been archived by the owner on Oct 9, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 60
/
plugin_config.go
62 lines (54 loc) · 2.07 KB
/
plugin_config.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
package task
import (
"context"
"strings"
"github.com/lyft/flytepropeller/pkg/controller/nodes/task/backoff"
"github.com/lyft/flyteplugins/go/tasks/pluginmachinery/core"
"github.com/lyft/flytestdlib/logger"
"k8s.io/apimachinery/pkg/util/sets"
"github.com/lyft/flytepropeller/pkg/controller/nodes/task/config"
"github.com/lyft/flytepropeller/pkg/controller/nodes/task/k8s"
)
func WranglePluginsAndGenerateFinalList(ctx context.Context, cfg *config.TaskPluginConfig, pr PluginRegistryIface) ([]core.PluginEntry, error) {
allPluginsEnabled := false
enabledPlugins := sets.NewString()
if cfg != nil {
enabledPlugins = cfg.GetEnabledPluginsSet()
}
if enabledPlugins.Len() == 0 {
allPluginsEnabled = true
}
var finalizedPlugins []core.PluginEntry
logger.Infof(ctx, "Enabled plugins: %v", enabledPlugins.List())
logger.Infof(ctx, "Loading core Plugins, plugin configuration [all plugins enabled: %v]", allPluginsEnabled)
for _, cpe := range pr.GetCorePlugins() {
id := strings.ToLower(cpe.ID)
if !allPluginsEnabled && !enabledPlugins.Has(id) {
logger.Infof(ctx, "Plugin [%s] is DISABLED (not found in enabled plugins list).", id)
} else {
logger.Infof(ctx, "Plugin [%s] ENABLED", id)
finalizedPlugins = append(finalizedPlugins, cpe)
}
}
// Create a single backOffManager for all the plugins
backOffController := backoff.NewController(ctx)
k8sPlugins := pr.GetK8sPlugins()
for i := range k8sPlugins {
kpe := k8sPlugins[i]
id := strings.ToLower(kpe.ID)
if !allPluginsEnabled && !enabledPlugins.Has(id) {
logger.Infof(ctx, "K8s Plugin [%s] is DISABLED (not found in enabled plugins list).", id)
} else {
logger.Infof(ctx, "K8s Plugin [%s] is ENABLED.", id)
finalizedPlugins = append(finalizedPlugins, core.PluginEntry{
ID: id,
RegisteredTaskTypes: kpe.RegisteredTaskTypes,
LoadPlugin: func(ctx context.Context, iCtx core.SetupContext) (plugin core.Plugin, e error) {
return k8s.NewPluginManagerWithBackOff(ctx, iCtx, kpe, backOffController)
},
IsDefault: kpe.IsDefault,
})
}
}
return finalizedPlugins, nil
}