Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ require (
github.com/knadh/koanf/v2 v2.3.0
github.com/onsi/ginkgo/v2 v2.25.3
github.com/onsi/gomega v1.38.2
github.com/pkg/errors v0.9.1
go.uber.org/zap v1.27.0
google.golang.org/grpc v1.75.1
google.golang.org/protobuf v1.36.9
Expand Down
4 changes: 4 additions & 0 deletions internal/config/log_constants.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
package config

const LogPluginId = "plugin.id"
const LogNodeName = "node.name"
5 changes: 2 additions & 3 deletions internal/config/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"sync"

"github.com/fatih/color"
"github.com/pkg/errors"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
Expand All @@ -19,7 +18,7 @@ const TraceLevel = zapcore.DebugLevel - 1
func GetLogger(logLevel, logFormat string) (*zap.Logger, error) {
level, err := zap.ParseAtomicLevel(logLevel)
if err != nil {
return nil, errors.Wrap(err, "failed to parse log level")
return nil, fmt.Errorf("failed to parse log level: %w", err)
}

disableStackTrace := true
Expand Down Expand Up @@ -60,7 +59,7 @@ func GetLogger(logLevel, logFormat string) (*zap.Logger, error) {
logger, err := loggerConfig.Build()

if err != nil {
return nil, errors.Wrap(err, "failed to build logger")
return nil, fmt.Errorf("failed to build logger: %w", err)
}
setLoggerConfig(&loggerConfig)
return logger, nil
Expand Down
17 changes: 10 additions & 7 deletions internal/controller/node_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,13 +87,13 @@ func NewNodeReconciler(ctx context.Context, client client.Client, schema *runtim
return nil, err
}

rebootManager, err := utils.NewRebootManager(l, client, restConfig, managerNamespace)
rebootManager, err := utils.NewRebootManager(l, client, restConfig, recorder, managerNamespace)
if err != nil {
return nil, fmt.Errorf("failed to create reboot manager: %w", err)
}
_ = rebootManager.CleanupNode(ctx, "")

drainManager, err := utils.NewDrainManager(ctx, client, restConfig)
drainManager, err := utils.NewDrainManager(ctx, client, restConfig, recorder)
if err != nil {
return nil, fmt.Errorf("failed to create drain manager: %w", err)
}
Expand Down Expand Up @@ -282,7 +282,7 @@ func (r *nodeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.
}

l.Info("Rebooting node")
err = r.rebootManager.RebootNode(ctx, node.Name)
err = r.rebootManager.RebootNode(ctx, node)
if err != nil {
return ctrl.Result{}, err
}
Expand Down Expand Up @@ -314,6 +314,7 @@ func (r *nodeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.
if err != nil {
return ctrl.Result{}, err
}
r.recorder.Eventf(node, corev1.EventTypeNormal, "Reboot", "Node Rebooted")
return ctrl.Result{RequeueAfter: time.Second}, nil
} else {
l.Info("Node not ready yet after reboot")
Expand Down Expand Up @@ -472,7 +473,7 @@ func (r *nodeReconciler) drain(ctx context.Context, l *zap.Logger, node *drainv1
if node.Status.CurrentState == drainv1.NodeCurrentStateNext {

// Check if cluster is healthy before starting drain
healthy, err := r.drainManager.IsHealthy(ctx)
healthy, err := r.drainManager.IsHealthy(ctx, node)
if err != nil {
return nil, fmt.Errorf("failed to check if cluster is healthy: %w", err)
}
Expand All @@ -481,7 +482,7 @@ func (r *nodeReconciler) drain(ctx context.Context, l *zap.Logger, node *drainv1
}

// Check if drain of node is ok
drainOk, err := r.drainManager.IsDrainOk(ctx, node.Name)
drainOk, err := r.drainManager.IsDrainOk(ctx, node)
if err != nil {
return nil, fmt.Errorf("failed to check if node(%s) is ok to drain: %w", node.Name, err)
}
Expand Down Expand Up @@ -512,7 +513,7 @@ func (r *nodeReconciler) drain(ctx context.Context, l *zap.Logger, node *drainv1
}

// Run Plugin PreDrain
err := r.drainManager.RunPreDrain(ctx, node.Name)
err := r.drainManager.RunPreDrain(ctx, node)
if err != nil {
return nil, fmt.Errorf("failed to run plugin PreDrain for node(%s): %w", node.Name, err)
}
Expand Down Expand Up @@ -568,6 +569,7 @@ func (r *nodeReconciler) drain(ctx context.Context, l *zap.Logger, node *drainv1
return nil, fmt.Errorf("failed to update drained status on node: %w", err)
}

r.recorder.Eventf(node, corev1.EventTypeNormal, "Drained", "Node drained successfully")
return nil, nil
}

Expand All @@ -589,7 +591,7 @@ func (r *nodeReconciler) undrain(ctx context.Context, l *zap.Logger, node *drain
}
}

if err := r.drainManager.RunPostDrain(ctx, node.Name); err != nil {
if err := r.drainManager.RunPostDrain(ctx, node); err != nil {
l.Warn("failed to run PostDrain for node", zap.Error(err))
return false, ctrl.Result{RequeueAfter: 30 * time.Second}, nil
}
Expand All @@ -606,6 +608,7 @@ func (r *nodeReconciler) undrain(ctx context.Context, l *zap.Logger, node *drain
return false, ctrl.Result{}, err
}

r.recorder.Eventf(node, corev1.EventTypeNormal, "Undrain", "Undrained successfully")
return true, ctrl.Result{}, nil
}

Expand Down
117 changes: 85 additions & 32 deletions internal/utils/drain-manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package utils
import (
"bufio"
"context"
"errors"
"fmt"
"io"
"os"
Expand All @@ -11,18 +12,21 @@ import (
"strings"

"github.com/hashicorp/go-plugin"
"github.com/pkg/errors"
"github.com/slyngdk/node-drain/api/plugins"
pluginv1 "github.com/slyngdk/node-drain/api/plugins/proto/v1"
drainv1 "github.com/slyngdk/node-drain/api/v1"
"github.com/slyngdk/node-drain/internal/config"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/record"
kClient "sigs.k8s.io/controller-runtime/pkg/client"
)

const pluginNotSupported = "Plugin not supported"

type drainClientInfo struct {
*plugins.DrainClient
info *plugins.DrainPluginInfo
Expand All @@ -32,12 +36,13 @@ type DrainManager struct {
l *zap.Logger
client kClient.Client
clientSet *kubernetes.Clientset
recorder record.EventRecorder
pluginClients map[string]*plugin.Client
drainClients map[string]drainClientInfo
pluginFileId map[string]string
}

func NewDrainManager(ctx context.Context, client kClient.Client, restConfig *rest.Config) (*DrainManager, error) {
func NewDrainManager(ctx context.Context, client kClient.Client, restConfig *rest.Config, recorder record.EventRecorder) (*DrainManager, error) {
l, err := config.GetNamedLogger("drain-plugin-client")
if err != nil {
return nil, err
Expand All @@ -52,6 +57,7 @@ func NewDrainManager(ctx context.Context, client kClient.Client, restConfig *res
l: l,
client: client,
clientSet: clientSet,
recorder: recorder,
drainClients: make(map[string]drainClientInfo),
pluginFileId: make(map[string]string),
}
Expand Down Expand Up @@ -192,7 +198,7 @@ func (d *DrainManager) IsClusterNodesHealthy(ctx context.Context) (bool, error)
for _, condition := range node.Status.Conditions {
if condition.Type == corev1.NodeReady {
if condition.Status != corev1.ConditionTrue {
d.l.Warn("node is not ready", zap.String("node.name", node.Name), zap.String("node.ready", string(condition.Status)))
d.l.Warn("node is not ready", zap.String(config.LogNodeName, node.Name), zap.String("node.ready", string(condition.Status)))
allNodesAreReady = false
}
}
Expand All @@ -201,124 +207,167 @@ func (d *DrainManager) IsClusterNodesHealthy(ctx context.Context) (bool, error)
return allNodesAreReady, nil
}

func (d *DrainManager) IsHealthy(ctx context.Context) (bool, error) {
func (d *DrainManager) IsHealthy(ctx context.Context, node *drainv1.Node) (bool, error) {
d.l.Debug("Checking if cluster is healthy, including drain plugins.")
healthy, err := d.IsClusterNodesHealthy(ctx)
if !healthy || err != nil {
d.recorder.Eventf(node, corev1.EventTypeWarning, "Healthy", "Cluster nodes is not healthy: %t err: %v", healthy, err)
return false, err
}

allModulesHealthy := true
errs := make([]error, 0)
pluginStatus := make([]string, 0, len(d.drainClients))

for id, client := range d.drainClients {
l := d.l.With(zap.String("plugin.id", id))
l.Debug("Checking if drain plugin is supported")
l := d.l.With(zap.String(config.LogPluginId, id))
isSupported, err := client.IsSupported(ctx)
if err != nil {
return false, err
allModulesHealthy = false
errs = append(errs, err)
pluginStatus = append(pluginStatus, pluginEventError(id, err))
continue
}
if !isSupported {
l.Debug("Plugin not supported")
l.Debug(pluginNotSupported)
pluginStatus = append(pluginStatus, fmt.Sprintf("%s: %s", id, pluginNotSupported))
continue
}

l.Debug("Checking if drain plugin is healthy")
isHealthy, err := client.IsHealthy(ctx)
if err != nil {
return false, err
allModulesHealthy = false
errs = append(errs, err)
pluginStatus = append(pluginStatus, pluginEventError(id, err))
continue
}
if !isHealthy {
l.Warn("Plugin is not healthy")
allModulesHealthy = false
pluginStatus = append(pluginStatus, fmt.Sprintf("%s: Not healthy", id))
continue
}

pluginStatus = append(pluginStatus, fmt.Sprintf("%s: OK", id))
}
eventType := corev1.EventTypeNormal
if !allModulesHealthy || len(errs) > 0 {
eventType = corev1.EventTypeWarning
}
d.recorder.Eventf(node, eventType, "Healthy", "IsHealthy:\n%s", strings.Join(pluginStatus, "\n"))

if len(errs) != 0 {
return false, errors.Join(errs...)
}
return allModulesHealthy, err
}

func (d *DrainManager) IsDrainOk(ctx context.Context, nodeName string) (bool, error) {
l := d.l.With(zap.String("node.name", nodeName))
func (d *DrainManager) IsDrainOk(ctx context.Context, node *drainv1.Node) (bool, error) {
l := d.l.With(zap.String(config.LogNodeName, node.Name))
l.Debug("Checking if drain is OK")
allModulesReadyToDrain := true
errs := make([]error, 0)
pluginStatus := make([]string, 0, len(d.drainClients))

for id, client := range d.drainClients {
lp := l.With(zap.String("plugin.id", id))
lp.Debug("Checking if drain plugin is supported")
lp := l.With(zap.String(config.LogPluginId, id))
isSupported, err := client.IsSupported(ctx)
if err != nil {
return false, err
allModulesReadyToDrain = false
errs = append(errs, err)
pluginStatus = append(pluginStatus, pluginEventError(id, err))
continue
}
if !isSupported {
lp.Debug("Plugin not supported")
lp.Debug(pluginNotSupported)
pluginStatus = append(pluginStatus, fmt.Sprintf("%s: %s", id, pluginNotSupported))
continue
}

isDrainOk, err := client.IsDrainOk(ctx, nodeName)
isDrainOk, err := client.IsDrainOk(ctx, node.Name)
if err != nil {
return false, err
allModulesReadyToDrain = false
errs = append(errs, err)
pluginStatus = append(pluginStatus, pluginEventError(id, err))
continue
}
if !isDrainOk {
lp.Warn("plugin is not ready to be drained")
allModulesReadyToDrain = false
pluginStatus = append(pluginStatus, fmt.Sprintf("%s: Not ready to be drained", id))
continue
}

pluginStatus = append(pluginStatus, fmt.Sprintf("%s: OK", id))
}

eventType := corev1.EventTypeNormal
if !allModulesReadyToDrain || len(errs) > 0 {
eventType = corev1.EventTypeWarning
}
d.recorder.Eventf(node, eventType, "DrainOK", "IsDrainOK:\n%s", strings.Join(pluginStatus, "\n"))

if len(errs) != 0 {
return false, errors.Join(errs...)
}

return allModulesReadyToDrain, nil
}

func (d *DrainManager) RunPreDrain(ctx context.Context, nodeName string) error {
l := d.l.With(zap.String("node.name", nodeName))
func (d *DrainManager) RunPreDrain(ctx context.Context, node *drainv1.Node) error {
l := d.l.With(zap.String(config.LogNodeName, node.Name))
l.Debug("Run PreDrain")

for id, client := range d.drainClients {
lp := l.With(zap.String("plugin.id", id))
lp.Debug("Checking if drain plugin is supported")
lp := l.With(zap.String(config.LogPluginId, id))
isSupported, err := client.IsSupported(ctx)
if err != nil {
return err
}
if !isSupported {
lp.Debug("Plugin not supported")
lp.Debug(pluginNotSupported)
continue
}

lp.Debug("Running plugin PreDrain")
err = client.PreDrain(ctx, nodeName)
err = client.PreDrain(ctx, node.Name)
if err != nil {
lp.Debug("Plugin PreDrain failed", zap.Error(err))
return errors.Wrapf(err, "failed PreDrain on plugin: %s", id)
d.recorder.Eventf(node, corev1.EventTypeWarning, "PreDrain", "PreDrain failed for %s: %v", id, err)
return fmt.Errorf("failed PreDrain on plugin: %s %w", id, err)
}
lp.Debug("Plugin PreDrain succeeded without errors")
}
d.recorder.Eventf(node, corev1.EventTypeNormal, "PreDrain", "Node PreDrain succeeded")
return nil
}

func (d *DrainManager) RunPostDrain(ctx context.Context, nodeName string) error {
l := d.l.With(zap.String("node.name", nodeName))
func (d *DrainManager) RunPostDrain(ctx context.Context, node *drainv1.Node) error {
l := d.l.With(zap.String(config.LogNodeName, node.Name))
l.Debug("Run PostDrain")

for id, client := range d.drainClients {
lp := l.With(zap.String("plugin.id", id))
lp.Debug("Checking if drain plugin is supported")
lp := l.With(zap.String(config.LogPluginId, id))
isSupported, err := client.IsSupported(ctx)
if err != nil {
return err
}
if !isSupported {
lp.Debug("Plugin not supported")
lp.Debug(pluginNotSupported)
continue
}

lp.Debug("Running plugin PostDrain")
err = client.PostDrain(ctx, nodeName)
err = client.PostDrain(ctx, node.Name)
if err != nil {
lp.Debug("Plugin PostDrain failed", zap.Error(err))
return errors.Wrapf(err, "failed PostDrain on plugin: %s", id)
d.recorder.Eventf(node, corev1.EventTypeWarning, "PostDrain", "PostDrain failed for %s: %v", id, err)
return fmt.Errorf("failed PostDrain on plugin: %s %w", id, err)
}
lp.Debug("Plugin PostDrain succeeded without errors")
}
d.recorder.Eventf(node, corev1.EventTypeNormal, "PostDrain", "Node PostDrain succeeded")
return nil
}

Expand All @@ -338,7 +387,7 @@ func (d *DrainManager) pluginOutputMonitor(streamName string, l *zap.Logger, plu

if !addedPluginId {
if id, ok := d.pluginFileId[pluginFile]; ok {
l = l.With(zap.String("plugin.id", id))
l = l.With(zap.String(config.LogPluginId, id))
addedPluginId = true
}
}
Expand Down Expand Up @@ -367,3 +416,7 @@ func (d *DrainManager) pluginOutputMonitor(streamName string, l *zap.Logger, plu

return writer
}

func pluginEventError(id string, err error) string {
return fmt.Sprintf("%s: Error: %s", id, err.Error())
}
Loading
Loading