Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions pkg/addon-operator/admission_http_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"time"

"github.com/deckhouse/deckhouse/pkg/log"

"github.com/flant/addon-operator/pkg"
)

type AdmissionServer struct {
Expand All @@ -30,7 +32,7 @@ func NewAdmissionServer(listenPort, certsDir string) *AdmissionServer {
func (as *AdmissionServer) RegisterHandler(route string, handler http.Handler) {
if _, ok := as.routes[route]; ok {
log.Fatal("Route is already registered",
slog.String("route", route))
slog.String(pkg.LogKeyRoute, route))
}

as.routes[route] = handler
Expand All @@ -45,7 +47,7 @@ func (as *AdmissionServer) start(ctx context.Context) {
}

log.Debug("Registered admission routes",
slog.String("routes", fmt.Sprintf("%v", as.routes)))
slog.String(pkg.LogKeyRoutes, fmt.Sprintf("%v", as.routes)))

srv := &http.Server{
Addr: fmt.Sprintf(":%s", as.listenPort),
Expand Down
5 changes: 3 additions & 2 deletions pkg/addon-operator/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/deckhouse/deckhouse/pkg/log"

"github.com/flant/addon-operator/pkg"
"github.com/flant/addon-operator/pkg/app"
"github.com/flant/addon-operator/pkg/kube_config_manager"
"github.com/flant/addon-operator/pkg/kube_config_manager/backend"
Expand All @@ -25,11 +26,11 @@ func (op *AddonOperator) bootstrap() error {

// Log the path where modules will be searched
log.Info("Search modules",
slog.String("path", app.ModulesDir))
slog.String(pkg.LogKeyPath, app.ModulesDir))

// Log the namespace in which the operator will work
log.Info("Addon-operator namespace",
slog.String("namespace", op.DefaultNamespace))
slog.String(pkg.LogKeyNamespace, op.DefaultNamespace))

// Initialize the debug server for troubleshooting and monitoring
// TODO: rewrite shapp global variables to the addon-operator one
Expand Down
34 changes: 17 additions & 17 deletions pkg/addon-operator/handler_manager_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@ func (op *AddonOperator) RegisterManagerEventsHandlers() {
// Register handler for schedule events
op.engine.ManagerEventsHandler.WithScheduleEventHandler(func(ctx context.Context, crontab string) []sh_task.Task {
logLabels := map[string]string{
"event.id": uuid.Must(uuid.NewV4()).String(),
pkg.LogKeyEventID: uuid.Must(uuid.NewV4()).String(),
pkg.LogKeyBinding: string(htypes.Schedule),
}
logEntry := utils.EnrichLoggerWithLabels(op.Logger, logLabels)
logEntry.Debug("Create tasks for 'schedule' event",
slog.String("event", crontab))
slog.String(pkg.LogKeyEvent, crontab))

// Handle global hook schedule events
return op.ModuleManager.HandleScheduleEvent(
Expand All @@ -40,12 +40,12 @@ func (op *AddonOperator) RegisterManagerEventsHandlers() {
// Register handler for kubernetes events
op.engine.ManagerEventsHandler.WithKubeEventHandler(func(ctx context.Context, kubeEvent types.KubeEvent) []sh_task.Task {
logLabels := map[string]string{
"event.id": uuid.Must(uuid.NewV4()).String(),
pkg.LogKeyEventID: uuid.Must(uuid.NewV4()).String(),
pkg.LogKeyBinding: string(htypes.OnKubernetesEvent),
}
logEntry := utils.EnrichLoggerWithLabels(op.Logger, logLabels)
logEntry.Debug("Create tasks for 'kubernetes' event",
slog.String("event", kubeEvent.String()))
slog.String(pkg.LogKeyEvent, kubeEvent.String()))

// Handle kubernetes events for global and module hooks
tailTasks := op.ModuleManager.HandleKubeEvent(
Expand Down Expand Up @@ -73,19 +73,19 @@ func (op *AddonOperator) createGlobalHookTaskFactory(
}

hookLabels := utils.MergeLabels(logLabels, map[string]string{
pkg.LogKeyHook: globalHook.GetName(),
"hook.type": "global",
"queue": info.QueueName,
pkg.LogKeyHook: globalHook.GetName(),
pkg.LogKeyHookType: "global",
pkg.LogKeyQueue: info.QueueName,
})

if len(info.BindingContext) > 0 {
hookLabels["binding.name"] = info.BindingContext[0].Binding
hookLabels[pkg.LogKeyBindingName] = info.BindingContext[0].Binding
if bindingType == htypes.OnKubernetesEvent {
hookLabels["watchEvent"] = string(info.BindingContext[0].WatchEvent)
hookLabels[pkg.LogKeyWatchEvent] = string(info.BindingContext[0].WatchEvent)
}
}

delete(hookLabels, "task.id")
delete(hookLabels, pkg.LogKeyTaskID)

newTask := sh_task.NewTask(task.GlobalHookRun).
WithLogLabels(hookLabels).
Expand Down Expand Up @@ -118,20 +118,20 @@ func (op *AddonOperator) createModuleHookTaskFactory(
}

hookLabels := utils.MergeLabels(logLabels, map[string]string{
"module": module.GetName(),
pkg.LogKeyHook: moduleHook.GetName(),
"hook.type": "module",
"queue": info.QueueName,
pkg.LogKeyModule: module.GetName(),
pkg.LogKeyHook: moduleHook.GetName(),
pkg.LogKeyHookType: "module",
pkg.LogKeyQueue: info.QueueName,
})

if len(info.BindingContext) > 0 {
hookLabels["binding.name"] = info.BindingContext[0].Binding
hookLabels[pkg.LogKeyBindingName] = info.BindingContext[0].Binding
if bindingType == htypes.OnKubernetesEvent {
hookLabels["watchEvent"] = string(info.BindingContext[0].WatchEvent)
hookLabels[pkg.LogKeyWatchEvent] = string(info.BindingContext[0].WatchEvent)
}
}

delete(hookLabels, "task.id")
delete(hookLabels, pkg.LogKeyTaskID)

newTask := sh_task.NewTask(task.ModuleHookRun).
WithLogLabels(hookLabels).
Expand Down
38 changes: 19 additions & 19 deletions pkg/addon-operator/handler_module_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,17 @@ import (

func (op *AddonOperator) StartModuleManagerEventHandler() {
go func() {
logEntry := op.Logger.With("operator.component", "handleManagerEvents")
logEntry := op.Logger.With(pkg.LogKeyOperatorComponent, "handleManagerEvents")
for {
select {
case schedulerEvent := <-op.ModuleManager.SchedulerEventCh():
switch event := schedulerEvent.EncapsulatedEvent.(type) {
// dynamically_enabled_extender
case dynamic_extender.DynamicExtenderEvent:
logLabels := map[string]string{
"event.id": uuid.Must(uuid.NewV4()).String(),
"type": "ModuleScheduler event",
"event_source": "DymicallyEnabledExtenderChanged",
pkg.LogKeyEventID: uuid.Must(uuid.NewV4()).String(),
pkg.LogKeyType: "ModuleScheduler event",
pkg.LogKeyEventSource: "DymicallyEnabledExtenderChanged",
}
eventLogEntry := utils.EnrichLoggerWithLabels(logEntry, logLabels)
// if global hooks haven't been run yet, script enabled extender fails due to missing global values
Expand All @@ -50,7 +50,7 @@ func (op *AddonOperator) StartModuleManagerEventHandler() {
// if converge has already begun - restart it immediately
if op.engine.TaskQueues.GetMain().Length() > 0 && RemoveCurrentConvergeTasks(op.getConvergeQueues(), logLabels, op.Logger) && op.ConvergeState.GetPhase() != converge.StandBy {
logEntry.Info("ConvergeModules: global hook dynamic modification detected, restart current converge process",
slog.String("phase", string(op.ConvergeState.GetPhase())))
slog.String(pkg.LogKeyPhase, string(op.ConvergeState.GetPhase())))
op.engine.TaskQueues.GetMain().AddFirst(convergeTask)
op.logTaskAdd(eventLogEntry, "DynamicExtender is updated, put first", convergeTask)
} else {
Expand All @@ -65,9 +65,9 @@ func (op *AddonOperator) StartModuleManagerEventHandler() {
// kube_config_extender
case config.KubeConfigEvent:
logLabels := map[string]string{
"event.id": uuid.Must(uuid.NewV4()).String(),
"type": "ModuleScheduler event",
"event_source": "KubeConfigExtenderChanged",
pkg.LogKeyEventID: uuid.Must(uuid.NewV4()).String(),
pkg.LogKeyType: "ModuleScheduler event",
pkg.LogKeyEventSource: "KubeConfigExtenderChanged",
}
eventLogEntry := utils.EnrichLoggerWithLabels(logEntry, logLabels)
switch event.Type {
Expand All @@ -77,10 +77,10 @@ func (op *AddonOperator) StartModuleManagerEventHandler() {

case config.KubeConfigChanged:
eventLogEntry.Debug("ModuleManagerEventHandler-KubeConfigChanged",
slog.Bool("globalSectionChanged", event.GlobalSectionChanged),
slog.Any("moduleValuesChanged", event.ModuleValuesChanged),
slog.Any("moduleEnabledStateChanged", event.ModuleEnabledStateChanged),
slog.Any("ModuleMaintenanceChanged", event.ModuleMaintenanceChanged))
slog.Bool(pkg.LogKeyGlobalSectionChanged, event.GlobalSectionChanged),
slog.Any(pkg.LogKeyModuleValuesChanged, event.ModuleValuesChanged),
slog.Any(pkg.LogKeyModuleEnabledStateChanged, event.ModuleEnabledStateChanged),
slog.Any(pkg.LogKeyModuleMaintenanceChanged, event.ModuleMaintenanceChanged))
if !op.ModuleManager.GetKubeConfigValid() {
eventLogEntry.Info("KubeConfig become valid")
}
Expand Down Expand Up @@ -129,7 +129,7 @@ func (op *AddonOperator) StartModuleManagerEventHandler() {
// if main queue isn't empty and there was another convergeModules task:
if op.engine.TaskQueues.GetMain().Length() > 0 && RemoveCurrentConvergeTasks(op.getConvergeQueues(), logLabels, op.Logger) {
logEntry.Info("ConvergeModules: kube config modification detected, restart current converge process",
slog.String("phase", string(op.ConvergeState.GetPhase())))
slog.String(pkg.LogKeyPhase, string(op.ConvergeState.GetPhase())))
// put ApplyKubeConfig->NewConvergeModulesTask sequence in the beginning of the main queue
if kubeConfigTask != nil {
op.engine.TaskQueues.GetMain().AddFirst(kubeConfigTask)
Expand Down Expand Up @@ -174,8 +174,8 @@ func (op *AddonOperator) StartModuleManagerEventHandler() {
op.engine.TaskQueues.GetMain().AddAfter(kubeConfigTask.GetId(), reloadTasks[i])
}
logEntry.Info("ConvergeModules: kube config modification detected, append tasks to rerun modules",
slog.Int("count", len(reloadTasks)),
slog.Any("modules", modulesToRerun))
slog.Int(pkg.LogKeyCount, len(reloadTasks)),
slog.Any(pkg.LogKeyModules, modulesToRerun))
op.logTaskAdd(logEntry, "tail", reloadTasks...)
}
}
Expand All @@ -187,8 +187,8 @@ func (op *AddonOperator) StartModuleManagerEventHandler() {

case HelmReleaseStatusEvent := <-op.HelmResourcesManager.Ch():
logLabels := map[string]string{
"event.id": uuid.Must(uuid.NewV4()).String(),
"module": HelmReleaseStatusEvent.ModuleName,
pkg.LogKeyEventID: uuid.Must(uuid.NewV4()).String(),
pkg.LogKeyModule: HelmReleaseStatusEvent.ModuleName,
}
eventLogEntry := utils.EnrichLoggerWithLabels(logEntry, logLabels)

Expand Down Expand Up @@ -219,8 +219,8 @@ func (op *AddonOperator) StartModuleManagerEventHandler() {
op.engine.TaskQueues.GetMain().AddLast(newTask.WithQueuedAt(time.Now()))
op.logTaskAdd(logEntry, fmt.Sprintf("detected %s, append", additionalDescription), newTask)
} else {
eventLogEntry.With("task.flow", "noop").Info("Detected event, ModuleRun task already queued",
slog.String("description", additionalDescription))
eventLogEntry.With(pkg.LogKeyTaskFlow, "noop").Info("Detected event, ModuleRun task already queued",
slog.String(pkg.LogKeyDescription, additionalDescription))
}
}
}
Expand Down
52 changes: 26 additions & 26 deletions pkg/addon-operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ func (op *AddonOperator) Setup() error {
return fmt.Errorf("global hooks directory: %s", err)
}
log.Info("global hooks directory",
slog.String("dir", globalHooksDir))
slog.String(pkg.LogKeyDir, globalHooksDir))

tempDir, err := ensureTempDirectory(shapp.TempDir)
if err != nil {
Expand Down Expand Up @@ -443,7 +443,7 @@ func (op *AddonOperator) BootstrapMainQueue(tqs *queue.TaskQueueSet) {
tqs.NewNamedQueue("main", op.TaskService.Handle,
queue.WithCompactionCallback(queueutils.CompactionCallback(op.ModuleManager, op.Logger)),
queue.WithCompactableTypes(queueutils.MergeTasks...),
queue.WithLogger(op.Logger.With("operator.component", "mainQueue")),
queue.WithLogger(op.Logger.With(pkg.LogKeyOperatorComponent, "mainQueue")),
)

tasks := op.CreateBootstrapTasks(logLabels)
Expand All @@ -457,7 +457,7 @@ func (op *AddonOperator) BootstrapMainQueue(tqs *queue.TaskQueueSet) {
// Add "DiscoverHelmReleases" task to detect unknown releases and purge them.
// this task will run only after the first converge, to keep all modules
discoverLabels := utils.MergeLabels(logLabels, map[string]string{
"queue": "main",
pkg.LogKeyQueue: "main",
pkg.LogKeyBinding: string(task.DiscoverHelmReleases),
})
discoverTask := sh_task.NewTask(task.DiscoverHelmReleases).
Expand All @@ -482,10 +482,10 @@ func (op *AddonOperator) CreateBootstrapTasks(logLabels map[string]string) []sh_

for _, hookName := range onStartupHooks {
hookLogLabels := utils.MergeLabels(logLabels, map[string]string{
pkg.LogKeyHook: hookName,
"hook.type": "global",
"queue": "main",
pkg.LogKeyBinding: string(htypes.OnStartup),
pkg.LogKeyHook: hookName,
pkg.LogKeyHookType: "global",
pkg.LogKeyQueue: "main",
pkg.LogKeyBinding: string(htypes.OnStartup),
})

onStartupBindingContext := bc.BindingContext{Binding: string(htypes.OnStartup)}
Expand All @@ -509,10 +509,10 @@ func (op *AddonOperator) CreateBootstrapTasks(logLabels map[string]string) []sh_
schedHooks := op.ModuleManager.GetGlobalHooksInOrder(htypes.Schedule)
for _, hookName := range schedHooks {
hookLogLabels := utils.MergeLabels(logLabels, map[string]string{
pkg.LogKeyHook: hookName,
"hook.type": "global",
"queue": "main",
pkg.LogKeyBinding: string(task.GlobalHookEnableScheduleBindings),
pkg.LogKeyHook: hookName,
pkg.LogKeyHookType: "global",
pkg.LogKeyQueue: "main",
pkg.LogKeyBinding: string(task.GlobalHookEnableScheduleBindings),
})

newTask := sh_task.NewTask(task.GlobalHookEnableScheduleBindings).
Expand All @@ -529,10 +529,10 @@ func (op *AddonOperator) CreateBootstrapTasks(logLabels map[string]string) []sh_
kubeHooks := op.ModuleManager.GetGlobalHooksInOrder(htypes.OnKubernetesEvent)
for _, hookName := range kubeHooks {
hookLogLabels := utils.MergeLabels(logLabels, map[string]string{
pkg.LogKeyHook: hookName,
"hook.type": "global",
"queue": "main",
pkg.LogKeyBinding: string(task.GlobalHookEnableKubernetesBindings),
pkg.LogKeyHook: hookName,
pkg.LogKeyHookType: "global",
pkg.LogKeyQueue: "main",
pkg.LogKeyBinding: string(task.GlobalHookEnableKubernetesBindings),
})

newTask := sh_task.NewTask(task.GlobalHookEnableKubernetesBindings).
Expand All @@ -547,7 +547,7 @@ func (op *AddonOperator) CreateBootstrapTasks(logLabels map[string]string) []sh_

// Task to wait for kubernetes.Synchronization.
waitLogLabels := utils.MergeLabels(logLabels, map[string]string{
"queue": "main",
pkg.LogKeyQueue: "main",
pkg.LogKeyBinding: string(task.GlobalHookWaitKubernetesSynchronization),
})
waitTask := sh_task.NewTask(task.GlobalHookWaitKubernetesSynchronization).
Expand All @@ -560,7 +560,7 @@ func (op *AddonOperator) CreateBootstrapTasks(logLabels map[string]string) []sh_

// Add "ConvergeModules" task to run modules converge sequence for the first time.
convergeLabels := utils.MergeLabels(logLabels, map[string]string{
"queue": "main",
pkg.LogKeyQueue: "main",
pkg.LogKeyBinding: string(task.ConvergeModules),
})
convergeTask := converge.NewConvergeModulesTask(eventDescription, converge.OperatorStartup, convergeLabels)
Expand All @@ -574,13 +574,13 @@ func (op *AddonOperator) CreateAndStartParallelQueues() {
for i := range app.NumberOfParallelQueues {
queueName := fmt.Sprintf(app.ParallelQueueNamePattern, i)
if op.IsQueueExists(queueName) {
log.Warn("Parallel queue already exists", slog.String("queue", queueName))
log.Warn("Parallel queue already exists", slog.String(pkg.LogKeyQueue, queueName))
continue
}

op.startQueue(queueName, op.TaskService.ParallelHandle)
log.Debug("Parallel queue started",
slog.String("queue", queueName))
slog.String(pkg.LogKeyQueue, queueName))
}
}

Expand All @@ -594,7 +594,7 @@ func (op *AddonOperator) startQueue(queueName string, handler func(ctx context.C
op.engine.TaskQueues.NewNamedQueue(queueName, handler,
queue.WithCompactionCallback(queueutils.CompactionCallback(op.ModuleManager, op.Logger)),
queue.WithCompactableTypes(queueutils.MergeTasks...),
queue.WithLogger(op.Logger.With("operator.component", "queue", "queue", queueName)),
queue.WithLogger(op.Logger.With(pkg.LogKeyOperatorComponent, "queue", "queue", queueName)),
)
op.engine.TaskQueues.GetByName(queueName).Start(op.ctx)
}
Expand All @@ -615,7 +615,7 @@ func (op *AddonOperator) CreateAndStartQueuesForGlobalHooks() {
op.CreateAndStartQueue(hookBinding.Queue)

log.Debug("Queue started for global 'schedule' hook",
slog.String("queue", hookBinding.Queue),
slog.String(pkg.LogKeyQueue, hookBinding.Queue),
slog.String(pkg.LogKeyHook, hookName))
}
}
Expand All @@ -624,7 +624,7 @@ func (op *AddonOperator) CreateAndStartQueuesForGlobalHooks() {
op.CreateAndStartQueue(hookBinding.Queue)

log.Debug("Queue started for global 'kubernetes' hook",
slog.String("queue", hookBinding.Queue),
slog.String(pkg.LogKeyQueue, hookBinding.Queue),
slog.String(pkg.LogKeyHook, hookName))
}
}
Expand All @@ -647,7 +647,7 @@ func (op *AddonOperator) CreateAndStartQueuesForModuleHooks(moduleName string) {
op.CreateAndStartQueue(hookBinding.Queue)

log.Debug("Queue started for module 'schedule'",
slog.String("queue", hookBinding.Queue),
slog.String(pkg.LogKeyQueue, hookBinding.Queue),
slog.String(pkg.LogKeyHook, hook.GetName()))
}
}
Expand All @@ -660,7 +660,7 @@ func (op *AddonOperator) CreateAndStartQueuesForModuleHooks(moduleName string) {
op.CreateAndStartQueue(hookBinding.Queue)

log.Debug("Queue started for module 'kubernetes'",
slog.String("queue", hookBinding.Queue),
slog.String(pkg.LogKeyQueue, hookBinding.Queue),
slog.String(pkg.LogKeyHook, hook.GetName()))
}
}
Expand Down Expand Up @@ -697,8 +697,8 @@ func (op *AddonOperator) CreateReloadModulesTasks(moduleNames []string, logLabel
}

newLogLabels := utils.MergeLabels(logLabels)
newLogLabels["module"] = moduleName
delete(newLogLabels, "task.id")
newLogLabels[pkg.LogKeyModule] = moduleName
delete(newLogLabels, pkg.LogKeyTaskID)

newTask := sh_task.NewTask(task.ModuleRun).
WithLogLabels(newLogLabels).
Expand Down
Loading
Loading