-
Notifications
You must be signed in to change notification settings - Fork 27
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
fix: more responsive ConfigMap changes
Huge refactoring to speedup reactions to ConfigMap changes. Fixes in enabled state calculations. KubeConfigManager is now responsible to signal if config is changed or invalid. - fix: Update metric if ConfigMap is invalid (malformed module name or module values YAML) - use runtime config to enable debug - add locking ModuleManager is now the only component that runs enabled scripts. It is responsible for calculating ModulesReload/ReloadAll event. It lists Helm releases once at start to detect initial enabled modules and modules to purge. - fix: Fresh config values for enabled scripts (#184, #16) AddonOperator uses task ConvergeModules to handle changes in ConfigMap or global values and reload all modules or only changed. - fix: ModulePurge tasks moved between GlobalSynchronization and first ConvergeModules (#233) - fix: Clear queues from tasks for disabled module or errored hooks on ConfigMap changes (#43) - ref: ConvergeModules is a new multi-phase task. Other: - mocks for HelmResourcesManager and KubeConfigManager - Helm struct instead of global variables helm.NewClient and helm.HealthzHandler - by dependabot: update gomega
- Loading branch information
Showing
99 changed files
with
4,476 additions
and
2,966 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,130 @@ | ||
package addon_operator | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
|
||
sh_app "github.com/flant/shell-operator/pkg/app" | ||
"github.com/flant/shell-operator/pkg/config" | ||
"github.com/flant/shell-operator/pkg/debug" | ||
shell_operator "github.com/flant/shell-operator/pkg/shell-operator" | ||
log "github.com/sirupsen/logrus" | ||
|
||
"github.com/flant/addon-operator/pkg/app" | ||
"github.com/flant/addon-operator/pkg/helm" | ||
"github.com/flant/addon-operator/pkg/kube_config_manager" | ||
"github.com/flant/addon-operator/pkg/module_manager" | ||
) | ||
|
||
func Init() (*AddonOperator, error) { | ||
runtimeConfig := config.NewConfig() | ||
// Init logging subsystem. | ||
sh_app.SetupLogging(runtimeConfig) | ||
log.Infof(sh_app.AppStartMessage) | ||
|
||
modulesDir, err := shell_operator.RequireExistingDirectory(app.ModulesDir) | ||
if err != nil { | ||
log.Errorf("Fatal: modules directory: %s", err) | ||
return nil, err | ||
} | ||
log.Infof("Modules directory: %s", modulesDir) | ||
|
||
globalHooksDir, err := shell_operator.RequireExistingDirectory(app.GlobalHooksDir) | ||
if err != nil { | ||
log.Errorf("Fatal: global hooks directory: %s", err) | ||
return nil, err | ||
} | ||
log.Infof("Global hooks directory: %s", globalHooksDir) | ||
|
||
tempDir, err := shell_operator.EnsureTempDirectory(sh_app.TempDir) | ||
if err != nil { | ||
log.Errorf("Fatal: temp directory: %s", err) | ||
return nil, err | ||
} | ||
|
||
log.Infof("Addon-operator namespace: %s", app.Namespace) | ||
|
||
op := NewAddonOperator() | ||
op.WithContext(context.Background()) | ||
|
||
// Debug server. | ||
debugServer, err := shell_operator.InitDefaultDebugServer() | ||
if err != nil { | ||
log.Errorf("Fatal: start Debug server: %s", err) | ||
return nil, err | ||
} | ||
|
||
err = shell_operator.AssembleCommonOperator(op.ShellOperator) | ||
if err != nil { | ||
log.Errorf("Fatal: %s", err) | ||
return nil, err | ||
} | ||
|
||
err = AssembleAddonOperator(op, modulesDir, globalHooksDir, tempDir, debugServer, runtimeConfig) | ||
if err != nil { | ||
log.Errorf("Fatal: %s", err) | ||
return nil, err | ||
} | ||
|
||
return op, nil | ||
} | ||
|
||
func AssembleAddonOperator(op *AddonOperator, modulesDir string, globalHooksDir string, tempDir string, debugServer *debug.Server, runtimeConfig *config.Config) (err error) { | ||
RegisterDefaultRoutes(op) | ||
RegisterAddonOperatorMetrics(op.MetricStorage) | ||
StartLiveTicksUpdater(op.MetricStorage) | ||
StartTasksQueueLengthUpdater(op.MetricStorage, op.TaskQueues) | ||
|
||
// Register routes in debug server. | ||
shell_operator.RegisterDebugQueueRoutes(debugServer, op.ShellOperator) | ||
shell_operator.RegisterDebugConfigRoutes(debugServer, runtimeConfig) | ||
RegisterDebugGlobalRoutes(debugServer, op) | ||
RegisterDebugModuleRoutes(debugServer, op) | ||
|
||
// Helm client factory. | ||
op.Helm = helm.New() | ||
op.Helm.WithKubeClient(op.KubeClient) | ||
err = op.Helm.Init() | ||
if err != nil { | ||
return fmt.Errorf("initialize Helm: %s", err) | ||
} | ||
|
||
// Helm resources monitor. | ||
// It uses a separate client-go instance. (Metrics are registered when 'main' client is initialized). | ||
op.HelmResourcesManager, err = InitDefaultHelmResourcesManager(op.ctx, op.MetricStorage) | ||
if err != nil { | ||
return fmt.Errorf("initialize Helm resources manager: %s", err) | ||
} | ||
|
||
SetupModuleManager(op, modulesDir, globalHooksDir, tempDir, runtimeConfig) | ||
|
||
err = op.InitModuleManager() | ||
if err != nil { | ||
return err | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func SetupModuleManager(op *AddonOperator, modulesDir string, globalHooksDir string, tempDir string, runtimeConfig *config.Config) { | ||
// Create manager to check values in ConfigMap. | ||
op.KubeConfigManager = kube_config_manager.NewKubeConfigManager() | ||
op.KubeConfigManager.WithKubeClient(op.KubeClient) | ||
op.KubeConfigManager.WithContext(op.ctx) | ||
op.KubeConfigManager.WithNamespace(app.Namespace) | ||
op.KubeConfigManager.WithConfigMapName(app.ConfigMapName) | ||
op.KubeConfigManager.WithRuntimeConfig(runtimeConfig) | ||
|
||
// Create manager that runs modules and hooks. | ||
op.ModuleManager = module_manager.NewModuleManager() | ||
op.ModuleManager.WithContext(op.ctx) | ||
op.ModuleManager.WithDirectories(modulesDir, globalHooksDir, tempDir) | ||
op.ModuleManager.WithKubeConfigManager(op.KubeConfigManager) | ||
op.ModuleManager.WithHelm(op.Helm) | ||
op.ModuleManager.WithScheduleManager(op.ScheduleManager) | ||
op.ModuleManager.WithKubeEventManager(op.KubeEventsManager) | ||
op.ModuleManager.WithKubeObjectPatcher(op.ObjectPatcher) | ||
op.ModuleManager.WithMetricStorage(op.MetricStorage) | ||
op.ModuleManager.WithHookMetricStorage(op.HookMetricStorage) | ||
op.ModuleManager.WithHelmResourcesManager(op.HelmResourcesManager) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,91 @@ | ||
package addon_operator | ||
|
||
import ( | ||
"time" | ||
|
||
sh_task "github.com/flant/shell-operator/pkg/task" | ||
|
||
. "github.com/flant/addon-operator/pkg/hook/types" | ||
"github.com/flant/addon-operator/pkg/task" | ||
) | ||
|
||
type ConvergeState struct { | ||
Phase ConvergePhase | ||
FirstStarted bool | ||
FirstDone bool | ||
StartedAt int64 | ||
Activation string | ||
} | ||
|
||
type ConvergePhase string | ||
|
||
const ( | ||
StandBy ConvergePhase = "StandBy" | ||
RunBeforeAll ConvergePhase = "RunBeforeAll" | ||
WaitBeforeAll ConvergePhase = "WaitBeforeAll" | ||
WaitDeleteAndRunModules ConvergePhase = "WaitDeleteAndRunModules" | ||
WaitAfterAll ConvergePhase = "WaitAfterAll" | ||
) | ||
|
||
func NewConvergeState() *ConvergeState { | ||
return &ConvergeState{ | ||
Phase: StandBy, | ||
} | ||
} | ||
|
||
const ConvergeEventProp = "converge.event" | ||
|
||
type ConvergeEvent string | ||
|
||
const ( | ||
// OperatorStartup is a first converge during startup. | ||
OperatorStartup ConvergeEvent = "OperatorStartup" | ||
// GlobalValuesChanged is a converge initiated by changing values in the global hook. | ||
GlobalValuesChanged ConvergeEvent = "GlobalValuesChanged" | ||
// KubeConfigChanged is a converge started after changing ConfigMap. | ||
KubeConfigChanged ConvergeEvent = "KubeConfigChanged" | ||
// ReloadAllModules is a converge queued to the | ||
ReloadAllModules ConvergeEvent = "ReloadAllModules" | ||
) | ||
|
||
func IsConvergeTask(t sh_task.Task) bool { | ||
taskType := t.GetType() | ||
switch taskType { | ||
case task.ModuleDelete, task.ModuleRun, task.ConvergeModules: | ||
return true | ||
} | ||
hm := task.HookMetadataAccessor(t) | ||
switch taskType { | ||
case task.GlobalHookRun: | ||
switch hm.BindingType { | ||
case BeforeAll, AfterAll: | ||
return true | ||
} | ||
case task.ModuleHookRun: | ||
if hm.IsSynchronization() { | ||
return true | ||
} | ||
} | ||
return false | ||
} | ||
|
||
func IsFirstConvergeTask(t sh_task.Task) bool { | ||
taskType := t.GetType() | ||
switch taskType { | ||
case task.ModulePurge, task.DiscoverHelmReleases, task.GlobalHookEnableKubernetesBindings, task.GlobalHookEnableScheduleBindings: | ||
return true | ||
} | ||
return false | ||
} | ||
|
||
func NewConvergeModulesTask(description string, convergeEvent ConvergeEvent, logLabels map[string]string) sh_task.Task { | ||
convergeTask := sh_task.NewTask(task.ConvergeModules). | ||
WithLogLabels(logLabels). | ||
WithQueueName("main"). | ||
WithMetadata(task.HookMetadata{ | ||
EventDescription: description, | ||
}). | ||
WithQueuedAt(time.Now()) | ||
convergeTask.SetProp(ConvergeEventProp, convergeEvent) | ||
return convergeTask | ||
} |
Oops, something went wrong.