From a88193056ad09dd372f11c731ad3437ca0795efd Mon Sep 17 00:00:00 2001 From: David Gageot Date: Mon, 4 May 2020 18:16:19 +0200 Subject: [PATCH] Only listen to pods for the current RunID Fixes #1753 Signed-off-by: David Gageot --- .../kubernetes/debugging/container_manager.go | 27 ++++++++++++------- pkg/skaffold/kubernetes/log.go | 26 +++++++++--------- .../portforward/forwarder_manager.go | 6 ++--- .../kubernetes/portforward/pod_forwarder.go | 16 ++++++----- .../portforward/pod_forwarder_test.go | 6 ++--- pkg/skaffold/kubernetes/watcher.go | 3 ++- pkg/skaffold/kubernetes/watcher_test.go | 6 ++--- pkg/skaffold/runner/debugging.go | 1 + pkg/skaffold/runner/logger.go | 2 +- pkg/skaffold/runner/portforwarder.go | 2 +- 10 files changed, 55 insertions(+), 40 deletions(-) diff --git a/pkg/skaffold/kubernetes/debugging/container_manager.go b/pkg/skaffold/kubernetes/debugging/container_manager.go index eff8af14a51..5eca561295e 100644 --- a/pkg/skaffold/kubernetes/debugging/container_manager.go +++ b/pkg/skaffold/kubernetes/debugging/container_manager.go @@ -41,18 +41,27 @@ var ( ) type ContainerManager struct { - output io.Writer - cli *kubectl.CLI - podSelector kubernetes.PodSelector - namespaces []string - active map[string]string // set of containers that have been notified - aggregate chan watch.Event + output io.Writer + cli *kubectl.CLI + podSelector kubernetes.PodSelector + labelSelector string + namespaces []string + active map[string]string // set of containers that have been notified + aggregate chan watch.Event } -func NewContainerManager(out io.Writer, cli *kubectl.CLI, podSelector kubernetes.PodSelector, namespaces []string) *ContainerManager { +func NewContainerManager(out io.Writer, cli *kubectl.CLI, podSelector kubernetes.PodSelector, labelSelector string, namespaces []string) *ContainerManager { // Create the channel here as Stop() may be called before Start() when a build fails, thus // avoiding the possibility of closing a nil channel. Channels are cheap. - return &ContainerManager{output: out, cli: cli, podSelector: podSelector, namespaces: namespaces, active: map[string]string{}, aggregate: make(chan watch.Event)} + return &ContainerManager{ + output: out, + cli: cli, + podSelector: podSelector, + labelSelector: labelSelector, + namespaces: namespaces, + active: map[string]string{}, + aggregate: make(chan watch.Event), + } } func (d *ContainerManager) Start(ctx context.Context) error { @@ -60,7 +69,7 @@ func (d *ContainerManager) Start(ctx context.Context) error { // debug mode probably not enabled return nil } - stopWatchers, err := aggregatePodWatcher(d.namespaces, d.aggregate) + stopWatchers, err := aggregatePodWatcher(d.labelSelector, d.namespaces, d.aggregate) if err != nil { stopWatchers() return fmt.Errorf("initializing debugging container watcher: %w", err) diff --git a/pkg/skaffold/kubernetes/log.go b/pkg/skaffold/kubernetes/log.go index 43f53706a6f..45c6948a1ee 100644 --- a/pkg/skaffold/kubernetes/log.go +++ b/pkg/skaffold/kubernetes/log.go @@ -35,11 +35,12 @@ import ( // LogAggregator aggregates the logs for all the deployed pods. type LogAggregator struct { - output io.Writer - kubectlcli *kubectl.CLI - podSelector PodSelector - namespaces []string - colorPicker ColorPicker + output io.Writer + kubectlcli *kubectl.CLI + podSelector PodSelector + labelSelector string + namespaces []string + colorPicker ColorPicker muted int32 sinceTime time.Time @@ -49,13 +50,14 @@ type LogAggregator struct { } // NewLogAggregator creates a new LogAggregator for a given output. -func NewLogAggregator(out io.Writer, cli *kubectl.CLI, baseImageNames []string, podSelector PodSelector, namespaces []string) *LogAggregator { +func NewLogAggregator(out io.Writer, cli *kubectl.CLI, baseImageNames []string, podSelector PodSelector, labelSelector string, namespaces []string) *LogAggregator { return &LogAggregator{ - output: out, - kubectlcli: cli, - podSelector: podSelector, - namespaces: namespaces, - colorPicker: NewColorPicker(baseImageNames), + output: out, + kubectlcli: cli, + podSelector: podSelector, + labelSelector: labelSelector, + namespaces: namespaces, + colorPicker: NewColorPicker(baseImageNames), trackedContainers: trackedContainers{ ids: map[string]bool{}, }, @@ -73,7 +75,7 @@ func (a *LogAggregator) Start(ctx context.Context) error { a.cancel = cancel aggregate := make(chan watch.Event) - stopWatchers, err := AggregatePodWatcher(a.namespaces, aggregate) + stopWatchers, err := AggregatePodWatcher(a.labelSelector, a.namespaces, aggregate) if err != nil { stopWatchers() return fmt.Errorf("initializing aggregate pod watcher: %w", err) diff --git a/pkg/skaffold/kubernetes/portforward/forwarder_manager.go b/pkg/skaffold/kubernetes/portforward/forwarder_manager.go index d876f56d963..3e5e318c217 100644 --- a/pkg/skaffold/kubernetes/portforward/forwarder_manager.go +++ b/pkg/skaffold/kubernetes/portforward/forwarder_manager.go @@ -45,7 +45,7 @@ var ( ) // NewForwarderManager returns a new port manager which handles starting and stopping port forwarding -func NewForwarderManager(out io.Writer, cli *kubectl.CLI, podSelector kubernetes.PodSelector, namespaces []string, label string, opts config.PortForwardOptions, userDefined []*latest.PortForwardResource) *ForwarderManager { +func NewForwarderManager(out io.Writer, cli *kubectl.CLI, podSelector kubernetes.PodSelector, labelSelector string, namespaces []string, opts config.PortForwardOptions, userDefined []*latest.PortForwardResource) *ForwarderManager { if !opts.Enabled { return emptyForwarderManager } @@ -54,11 +54,11 @@ func NewForwarderManager(out io.Writer, cli *kubectl.CLI, podSelector kubernetes ForwarderManager := &ForwarderManager{ output: out, - Forwarders: []Forwarder{NewResourceForwarder(em, namespaces, label, userDefined)}, + Forwarders: []Forwarder{NewResourceForwarder(em, namespaces, labelSelector, userDefined)}, } if opts.ForwardPods { - f := NewWatchingPodForwarder(em, podSelector, namespaces) + f := NewWatchingPodForwarder(em, podSelector, labelSelector, namespaces) ForwarderManager.Forwarders = append(ForwarderManager.Forwarders, f) } diff --git a/pkg/skaffold/kubernetes/portforward/pod_forwarder.go b/pkg/skaffold/kubernetes/portforward/pod_forwarder.go index b020f7dfeea..fdabaafd724 100644 --- a/pkg/skaffold/kubernetes/portforward/pod_forwarder.go +++ b/pkg/skaffold/kubernetes/portforward/pod_forwarder.go @@ -41,22 +41,24 @@ var ( // container ports within those pods. It also tracks and manages the port-forward connections. type WatchingPodForwarder struct { EntryManager - namespaces []string - podSelector kubernetes.PodSelector + namespaces []string + podSelector kubernetes.PodSelector + labelSelector string } // NewWatchingPodForwarder returns a struct that tracks and port-forwards pods as they are created and modified -func NewWatchingPodForwarder(em EntryManager, podSelector kubernetes.PodSelector, namespaces []string) *WatchingPodForwarder { +func NewWatchingPodForwarder(em EntryManager, podSelector kubernetes.PodSelector, labelSelector string, namespaces []string) *WatchingPodForwarder { return &WatchingPodForwarder{ - EntryManager: em, - podSelector: podSelector, - namespaces: namespaces, + EntryManager: em, + podSelector: podSelector, + labelSelector: labelSelector, + namespaces: namespaces, } } func (p *WatchingPodForwarder) Start(ctx context.Context) error { aggregate := make(chan watch.Event) - stopWatchers, err := aggregatePodWatcher(p.namespaces, aggregate) + stopWatchers, err := aggregatePodWatcher(p.labelSelector, p.namespaces, aggregate) if err != nil { stopWatchers() return fmt.Errorf("initializing pod watcher: %w", err) diff --git a/pkg/skaffold/kubernetes/portforward/pod_forwarder_test.go b/pkg/skaffold/kubernetes/portforward/pod_forwarder_test.go index ead4d3091b3..ccc18e888e3 100644 --- a/pkg/skaffold/kubernetes/portforward/pod_forwarder_test.go +++ b/pkg/skaffold/kubernetes/portforward/pod_forwarder_test.go @@ -413,7 +413,7 @@ func TestAutomaticPortForwardPod(t *testing.T) { forwardedPorts: newForwardedPorts(), forwardedResources: newForwardedResources(), } - p := NewWatchingPodForwarder(entryManager, kubernetes.NewImageList(), nil) + p := NewWatchingPodForwarder(entryManager, kubernetes.NewImageList(), "", nil) if test.forwarder == nil { test.forwarder = newTestForwarder() } @@ -473,7 +473,7 @@ func TestStartPodForwarder(t *testing.T) { client.PrependWatchReactor("*", testutil.SetupFakeWatcher(fakeWatcher)) waitForWatcher := make(chan bool) - t.Override(&aggregatePodWatcher, func(_ []string, aggregate chan<- watch.Event) (func(), error) { + t.Override(&aggregatePodWatcher, func(_ string, _ []string, aggregate chan<- watch.Event) (func(), error) { go func() { waitForWatcher <- true for msg := range fakeWatcher.ResultChan() { @@ -487,7 +487,7 @@ func TestStartPodForwarder(t *testing.T) { imageList := kubernetes.NewImageList() imageList.Add("image") - p := NewWatchingPodForwarder(NewEntryManager(ioutil.Discard, nil), imageList, nil) + p := NewWatchingPodForwarder(NewEntryManager(ioutil.Discard, nil), imageList, "", nil) fakeForwarder := newTestForwarder() p.EntryForwarder = fakeForwarder p.Start(context.Background()) diff --git a/pkg/skaffold/kubernetes/watcher.go b/pkg/skaffold/kubernetes/watcher.go index 523199673ed..6bf101efd02 100644 --- a/pkg/skaffold/kubernetes/watcher.go +++ b/pkg/skaffold/kubernetes/watcher.go @@ -24,7 +24,7 @@ import ( ) // AggregatePodWatcher returns a watcher for multiple namespaces. -func AggregatePodWatcher(namespaces []string, aggregate chan<- watch.Event) (func(), error) { +func AggregatePodWatcher(labelSelector string, namespaces []string, aggregate chan<- watch.Event) (func(), error) { watchers := make([]watch.Interface, 0, len(namespaces)) stopWatchers := func() { for _, w := range watchers { @@ -42,6 +42,7 @@ func AggregatePodWatcher(namespaces []string, aggregate chan<- watch.Event) (fun for _, ns := range namespaces { watcher, err := kubeclient.CoreV1().Pods(ns).Watch(metav1.ListOptions{ TimeoutSeconds: &forever, + LabelSelector: labelSelector, }) if err != nil { stopWatchers() diff --git a/pkg/skaffold/kubernetes/watcher_test.go b/pkg/skaffold/kubernetes/watcher_test.go index b02b9d77e72..d24c2e9cf62 100644 --- a/pkg/skaffold/kubernetes/watcher_test.go +++ b/pkg/skaffold/kubernetes/watcher_test.go @@ -38,7 +38,7 @@ func TestAggregatePodWatcher(t *testing.T) { testutil.Run(t, "fail to get client", func(t *testutil.T) { t.Override(&Client, func() (kubernetes.Interface, error) { return nil, errors.New("unable to get client") }) - cleanup, err := AggregatePodWatcher([]string{"ns"}, nil) + cleanup, err := AggregatePodWatcher("", []string{"ns"}, nil) defer cleanup() t.CheckErrorContains("unable to get client", err) @@ -52,7 +52,7 @@ func TestAggregatePodWatcher(t *testing.T) { return true, nil, errors.New("unable to watch") }) - cleanup, err := AggregatePodWatcher([]string{"ns"}, nil) + cleanup, err := AggregatePodWatcher("", []string{"ns"}, nil) defer cleanup() t.CheckErrorContains("unable to watch", err) @@ -63,7 +63,7 @@ func TestAggregatePodWatcher(t *testing.T) { t.Override(&Client, func() (kubernetes.Interface, error) { return clientset, nil }) events := make(chan watch.Event) - cleanup, err := AggregatePodWatcher([]string{"ns1", "ns2"}, events) + cleanup, err := AggregatePodWatcher("", []string{"ns1", "ns2"}, events) defer cleanup() t.CheckNoError(err) diff --git a/pkg/skaffold/runner/debugging.go b/pkg/skaffold/runner/debugging.go index c993be3589a..8d6af607e4c 100644 --- a/pkg/skaffold/runner/debugging.go +++ b/pkg/skaffold/runner/debugging.go @@ -33,5 +33,6 @@ func (r *SkaffoldRunner) createContainerManager(out io.Writer) { out, kubectlCLI, r.podSelector, + r.defaultLabeller.RunIDSelector(), r.runCtx.Namespaces) } diff --git a/pkg/skaffold/runner/logger.go b/pkg/skaffold/runner/logger.go index 7c3b59ab8c3..81027dd439e 100644 --- a/pkg/skaffold/runner/logger.go +++ b/pkg/skaffold/runner/logger.go @@ -34,5 +34,5 @@ func (r *SkaffoldRunner) createLogger(out io.Writer, artifacts []*latest.Artifac func (r *SkaffoldRunner) createLoggerForImages(out io.Writer, images []string) { kubectlCLI := kubectl.NewFromRunContext(r.runCtx) - r.logger = kubernetes.NewLogAggregator(out, kubectlCLI, images, r.podSelector, r.runCtx.Namespaces) + r.logger = kubernetes.NewLogAggregator(out, kubectlCLI, images, r.podSelector, r.defaultLabeller.RunIDSelector(), r.runCtx.Namespaces) } diff --git a/pkg/skaffold/runner/portforwarder.go b/pkg/skaffold/runner/portforwarder.go index 4fdc49ff4ef..aae65ce08d7 100644 --- a/pkg/skaffold/runner/portforwarder.go +++ b/pkg/skaffold/runner/portforwarder.go @@ -28,8 +28,8 @@ func (r *SkaffoldRunner) createForwarder(out io.Writer) { r.forwarderManager = portforward.NewForwarderManager(out, kubectlCLI, r.podSelector, - r.runCtx.Namespaces, r.defaultLabeller.RunIDSelector(), + r.runCtx.Namespaces, r.runCtx.Opts.PortForward, r.portForwardResources) }