From 15fb2d88cf885764e92c9b13e8cd0c7c557e92a8 Mon Sep 17 00:00:00 2001 From: Nader Ziada Date: Fri, 21 Nov 2025 10:02:20 -0500 Subject: [PATCH] feat(kubernetes): add cluster state monitoring with debounced reload Implement automatic detection of cluster state changes (API groups, OpenShift status) with configurable polling and debounce windows. The cluster state watcher runs in the background, invalidates the discovery cache periodically, and triggers a reload callback when changes are detected after a debounce period. - Add clusterStateWatcher to monitor API groups and OpenShift status - Implement debounced reload to avoid excessive reloads during changes - Add WatchClusterState method to Manager with configurable intervals - Integrate cluster state watching in kubeconfig and single cluster providers Signed-off-by: Nader Ziada --- pkg/kubernetes/manager.go | 148 ++++++++++++++++++++++++++ pkg/kubernetes/manager_test.go | 43 ++++++++ pkg/kubernetes/provider_kubeconfig.go | 2 +- pkg/kubernetes/provider_single.go | 1 + pkg/mcp/mcp.go | 34 ++++-- 5 files changed, 216 insertions(+), 12 deletions(-) diff --git a/pkg/kubernetes/manager.go b/pkg/kubernetes/manager.go index 2ba8970e..6851ae35 100644 --- a/pkg/kubernetes/manager.go +++ b/pkg/kubernetes/manager.go @@ -5,8 +5,11 @@ import ( "errors" "fmt" "os" + "sort" "strconv" "strings" + "sync" + "time" "github.com/containers/kubernetes-mcp-server/pkg/config" "github.com/fsnotify/fsnotify" @@ -23,10 +26,38 @@ type Manager struct { staticConfig *config.StaticConfig CloseWatchKubeConfig CloseWatchKubeConfig + + clusterWatcher *clusterStateWatcher +} + +// clusterState represents the cached state of the cluster +type clusterState struct { + apiGroups []string + isOpenShift bool +} + +// clusterStateWatcher monitors cluster state changes and triggers debounced reloads +type clusterStateWatcher struct { + manager *Manager + pollInterval time.Duration + debounceWindow time.Duration + lastKnownState clusterState + reloadCallback func() error + debounceTimer *time.Timer + mu sync.Mutex + stopCh chan struct{} + stoppedCh chan struct{} } var _ Openshift = (*Manager)(nil) +const ( + // DefaultClusterStatePollInterval is the default interval for polling cluster state changes + DefaultClusterStatePollInterval = 30 * time.Second + // DefaultClusterStateDebounceWindow is the default debounce window for cluster state changes + DefaultClusterStateDebounceWindow = 5 * time.Second +) + var ( ErrorKubeconfigInClusterNotAllowed = errors.New("kubeconfig manager cannot be used in in-cluster deployments") ErrorInClusterNotInCluster = errors.New("in-cluster manager cannot be used outside of a cluster") @@ -154,6 +185,9 @@ func (m *Manager) Close() { if m.CloseWatchKubeConfig != nil { _ = m.CloseWatchKubeConfig() } + if m.clusterWatcher != nil { + m.clusterWatcher.stop() + } } func (m *Manager) VerifyToken(ctx context.Context, token, audience string) (*authenticationv1api.UserInfo, []string, error) { @@ -249,3 +283,117 @@ func applyRateLimitFromEnv(cfg *rest.Config) { } } } + +// WatchClusterState starts a background watcher that periodically polls for cluster state changes +// and triggers a debounced reload when changes are detected. +func (m *Manager) WatchClusterState(pollInterval, debounceWindow time.Duration, onClusterStateChange func() error) { + if m.clusterWatcher != nil { + m.clusterWatcher.stop() + } + + watcher := &clusterStateWatcher{ + manager: m, + pollInterval: pollInterval, + debounceWindow: debounceWindow, + reloadCallback: onClusterStateChange, + stopCh: make(chan struct{}), + stoppedCh: make(chan struct{}), + } + + captureState := func() clusterState { + state := clusterState{apiGroups: []string{}} + if groups, err := m.accessControlClientset.DiscoveryClient().ServerGroups(); err == nil { + for _, group := range groups.Groups { + state.apiGroups = append(state.apiGroups, group.Name) + } + sort.Strings(state.apiGroups) + } + state.isOpenShift = m.IsOpenShift(context.Background()) + return state + } + watcher.lastKnownState = captureState() + + m.clusterWatcher = watcher + + // Start background monitoring + go func() { + defer close(watcher.stoppedCh) + ticker := time.NewTicker(pollInterval) + defer ticker.Stop() + + klog.V(2).Infof("Started cluster state watcher (poll interval: %v, debounce: %v)", pollInterval, debounceWindow) + + for { + select { + case <-watcher.stopCh: + klog.V(2).Info("Stopping cluster state watcher") + return + case <-ticker.C: + // Invalidate discovery cache to get fresh API groups + m.accessControlClientset.DiscoveryClient().Invalidate() + + watcher.mu.Lock() + current := captureState() + klog.V(3).Infof("Polled cluster state: %d API groups, OpenShift=%v", len(current.apiGroups), current.isOpenShift) + + changed := current.isOpenShift != watcher.lastKnownState.isOpenShift || + len(current.apiGroups) != len(watcher.lastKnownState.apiGroups) + + if !changed { + for i := range current.apiGroups { + if current.apiGroups[i] != watcher.lastKnownState.apiGroups[i] { + changed = true + break + } + } + } + + if changed { + klog.V(2).Info("Cluster state changed, scheduling debounced reload") + if watcher.debounceTimer != nil { + watcher.debounceTimer.Stop() + } + watcher.debounceTimer = time.AfterFunc(debounceWindow, func() { + klog.V(2).Info("Debounce window expired, triggering reload") + if err := onClusterStateChange(); err != nil { + klog.Errorf("Failed to reload: %v", err) + } else { + watcher.mu.Lock() + watcher.lastKnownState = captureState() + watcher.mu.Unlock() + klog.V(2).Info("Reload completed") + } + }) + } + watcher.mu.Unlock() + } + } + }() +} + +// stop stops the cluster state watcher +func (w *clusterStateWatcher) stop() { + if w == nil { + return + } + + w.mu.Lock() + defer w.mu.Unlock() + + if w.debounceTimer != nil { + w.debounceTimer.Stop() + } + + if w.stopCh == nil || w.stoppedCh == nil { + return + } + + select { + case <-w.stopCh: + // Already closed or stopped + return + default: + close(w.stopCh) + <-w.stoppedCh + } +} diff --git a/pkg/kubernetes/manager_test.go b/pkg/kubernetes/manager_test.go index aeed934e..4f54b299 100644 --- a/pkg/kubernetes/manager_test.go +++ b/pkg/kubernetes/manager_test.go @@ -228,6 +228,49 @@ func (s *ManagerTestSuite) TestNewManager() { }) } +func (s *ManagerTestSuite) TestClusterStateWatcherStop() { + s.Run("stop() on nil watcher", func() { + var watcher *clusterStateWatcher + // Should not panic + watcher.stop() + }) + + s.Run("stop() on uninitialized watcher (nil channels)", func() { + watcher := &clusterStateWatcher{} + // Should not panic even with nil channels + watcher.stop() + }) + + s.Run("stop() on initialized watcher", func() { + watcher := &clusterStateWatcher{ + stopCh: make(chan struct{}), + stoppedCh: make(chan struct{}), + } + // Close the stoppedCh to simulate a running goroutine + go func() { + <-watcher.stopCh + close(watcher.stoppedCh) + }() + // Should not panic and should stop cleanly + watcher.stop() + }) + + s.Run("stop() called multiple times", func() { + watcher := &clusterStateWatcher{ + stopCh: make(chan struct{}), + stoppedCh: make(chan struct{}), + } + go func() { + <-watcher.stopCh + close(watcher.stoppedCh) + }() + // First stop + watcher.stop() + // Second stop should not panic + watcher.stop() + }) +} + func TestManager(t *testing.T) { suite.Run(t, new(ManagerTestSuite)) } diff --git a/pkg/kubernetes/provider_kubeconfig.go b/pkg/kubernetes/provider_kubeconfig.go index e70f0a6c..b46740e1 100644 --- a/pkg/kubernetes/provider_kubeconfig.go +++ b/pkg/kubernetes/provider_kubeconfig.go @@ -120,8 +120,8 @@ func (p *kubeConfigClusterProvider) GetDefaultTarget() string { func (p *kubeConfigClusterProvider) WatchTargets(onKubeConfigChanged func() error) { m := p.managers[p.defaultContext] - m.WatchKubeConfig(onKubeConfigChanged) + m.WatchClusterState(DefaultClusterStatePollInterval, DefaultClusterStateDebounceWindow, onKubeConfigChanged) } func (p *kubeConfigClusterProvider) Close() { diff --git a/pkg/kubernetes/provider_single.go b/pkg/kubernetes/provider_single.go index 3693d639..1e663f67 100644 --- a/pkg/kubernetes/provider_single.go +++ b/pkg/kubernetes/provider_single.go @@ -87,6 +87,7 @@ func (p *singleClusterProvider) GetTargetParameterName() string { func (p *singleClusterProvider) WatchTargets(watch func() error) { p.manager.WatchKubeConfig(watch) + p.manager.WatchClusterState(DefaultClusterStatePollInterval, DefaultClusterStateDebounceWindow, watch) } func (p *singleClusterProvider) Close() { diff --git a/pkg/mcp/mcp.go b/pkg/mcp/mcp.go index 6a4a6d2f..8fee520f 100644 --- a/pkg/mcp/mcp.go +++ b/pkg/mcp/mcp.go @@ -98,31 +98,44 @@ func NewServer(configuration Configuration) (*Server, error) { func (s *Server) reloadKubernetesClusterProvider() error { ctx := context.Background() - p, err := internalk8s.NewProvider(s.configuration.StaticConfig) + + newProvider, err := internalk8s.NewProvider(s.configuration.StaticConfig) + if err != nil { + return err + } + + targets, err := newProvider.GetTargets(ctx) if err != nil { + newProvider.Close() return err } - // close the old provider if s.p != nil { s.p.Close() } - s.p = p + s.p = newProvider - targets, err := p.GetTargets(ctx) - if err != nil { + if err := s.rebuildTools(targets); err != nil { return err } + s.p.WatchTargets(s.reloadKubernetesClusterProvider) + + return nil +} + +// rebuildTools rebuilds the MCP tool registry based on the current provider and targets. +// This is called after the provider has been successfully validated and set. +func (s *Server) rebuildTools(targets []string) error { filter := CompositeFilter( s.configuration.isToolApplicable, - ShouldIncludeTargetListTool(p.GetTargetParameterName(), targets), + ShouldIncludeTargetListTool(s.p.GetTargetParameterName(), targets), ) mutator := WithTargetParameter( - p.GetDefaultTarget(), - p.GetTargetParameterName(), + s.p.GetDefaultTarget(), + s.p.GetTargetParameterName(), targets, ) @@ -136,7 +149,7 @@ func (s *Server) reloadKubernetesClusterProvider() error { applicableTools := make([]api.ServerTool, 0) s.enabledTools = make([]string, 0) for _, toolset := range s.configuration.Toolsets() { - for _, tool := range toolset.GetTools(p) { + for _, tool := range toolset.GetTools(s.p) { tool := mutator(tool) if !filter(tool) { continue @@ -157,6 +170,7 @@ func (s *Server) reloadKubernetesClusterProvider() error { } s.server.RemoveTools(toolsToRemove...) + // Add new tools for _, tool := range applicableTools { goSdkTool, goSdkToolHandler, err := ServerToolToGoSdkTool(s, tool) if err != nil { @@ -165,8 +179,6 @@ func (s *Server) reloadKubernetesClusterProvider() error { s.server.AddTool(goSdkTool, goSdkToolHandler) } - // start new watch - s.p.WatchTargets(s.reloadKubernetesClusterProvider) return nil }