Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Only listen to pods for the current RunID #4097

Merged
merged 1 commit into from
May 4, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 18 additions & 9 deletions pkg/skaffold/kubernetes/debugging/container_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,26 +41,35 @@ var (
)

type ContainerManager struct {
output io.Writer
cli *kubectl.CLI
podSelector kubernetes.PodSelector
namespaces []string
active map[string]string // set of containers that have been notified
aggregate chan watch.Event
output io.Writer
cli *kubectl.CLI
podSelector kubernetes.PodSelector
labelSelector string
namespaces []string
active map[string]string // set of containers that have been notified
aggregate chan watch.Event
}

func NewContainerManager(out io.Writer, cli *kubectl.CLI, podSelector kubernetes.PodSelector, namespaces []string) *ContainerManager {
func NewContainerManager(out io.Writer, cli *kubectl.CLI, podSelector kubernetes.PodSelector, labelSelector string, namespaces []string) *ContainerManager {
// Create the channel here as Stop() may be called before Start() when a build fails, thus
// avoiding the possibility of closing a nil channel. Channels are cheap.
return &ContainerManager{output: out, cli: cli, podSelector: podSelector, namespaces: namespaces, active: map[string]string{}, aggregate: make(chan watch.Event)}
return &ContainerManager{
output: out,
cli: cli,
podSelector: podSelector,
labelSelector: labelSelector,
namespaces: namespaces,
active: map[string]string{},
aggregate: make(chan watch.Event),
}
}

func (d *ContainerManager) Start(ctx context.Context) error {
if d == nil {
// debug mode probably not enabled
return nil
}
stopWatchers, err := aggregatePodWatcher(d.namespaces, d.aggregate)
stopWatchers, err := aggregatePodWatcher(d.labelSelector, d.namespaces, d.aggregate)
if err != nil {
stopWatchers()
return fmt.Errorf("initializing debugging container watcher: %w", err)
Expand Down
26 changes: 14 additions & 12 deletions pkg/skaffold/kubernetes/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,12 @@ import (

// LogAggregator aggregates the logs for all the deployed pods.
type LogAggregator struct {
output io.Writer
kubectlcli *kubectl.CLI
podSelector PodSelector
namespaces []string
colorPicker ColorPicker
output io.Writer
kubectlcli *kubectl.CLI
podSelector PodSelector
labelSelector string
namespaces []string
colorPicker ColorPicker

muted int32
sinceTime time.Time
Expand All @@ -49,13 +50,14 @@ type LogAggregator struct {
}

// NewLogAggregator creates a new LogAggregator for a given output.
func NewLogAggregator(out io.Writer, cli *kubectl.CLI, baseImageNames []string, podSelector PodSelector, namespaces []string) *LogAggregator {
func NewLogAggregator(out io.Writer, cli *kubectl.CLI, baseImageNames []string, podSelector PodSelector, labelSelector string, namespaces []string) *LogAggregator {
return &LogAggregator{
output: out,
kubectlcli: cli,
podSelector: podSelector,
namespaces: namespaces,
colorPicker: NewColorPicker(baseImageNames),
output: out,
kubectlcli: cli,
podSelector: podSelector,
labelSelector: labelSelector,
namespaces: namespaces,
colorPicker: NewColorPicker(baseImageNames),
trackedContainers: trackedContainers{
ids: map[string]bool{},
},
Expand All @@ -73,7 +75,7 @@ func (a *LogAggregator) Start(ctx context.Context) error {
a.cancel = cancel

aggregate := make(chan watch.Event)
stopWatchers, err := AggregatePodWatcher(a.namespaces, aggregate)
stopWatchers, err := AggregatePodWatcher(a.labelSelector, a.namespaces, aggregate)
if err != nil {
stopWatchers()
return fmt.Errorf("initializing aggregate pod watcher: %w", err)
Expand Down
6 changes: 3 additions & 3 deletions pkg/skaffold/kubernetes/portforward/forwarder_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ var (
)

// NewForwarderManager returns a new port manager which handles starting and stopping port forwarding
func NewForwarderManager(out io.Writer, cli *kubectl.CLI, podSelector kubernetes.PodSelector, namespaces []string, label string, opts config.PortForwardOptions, userDefined []*latest.PortForwardResource) *ForwarderManager {
func NewForwarderManager(out io.Writer, cli *kubectl.CLI, podSelector kubernetes.PodSelector, labelSelector string, namespaces []string, opts config.PortForwardOptions, userDefined []*latest.PortForwardResource) *ForwarderManager {
if !opts.Enabled {
return emptyForwarderManager
}
Expand All @@ -54,11 +54,11 @@ func NewForwarderManager(out io.Writer, cli *kubectl.CLI, podSelector kubernetes

ForwarderManager := &ForwarderManager{
output: out,
Forwarders: []Forwarder{NewResourceForwarder(em, namespaces, label, userDefined)},
Forwarders: []Forwarder{NewResourceForwarder(em, namespaces, labelSelector, userDefined)},
}

if opts.ForwardPods {
f := NewWatchingPodForwarder(em, podSelector, namespaces)
f := NewWatchingPodForwarder(em, podSelector, labelSelector, namespaces)
ForwarderManager.Forwarders = append(ForwarderManager.Forwarders, f)
}

Expand Down
16 changes: 9 additions & 7 deletions pkg/skaffold/kubernetes/portforward/pod_forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,22 +41,24 @@ var (
// container ports within those pods. It also tracks and manages the port-forward connections.
type WatchingPodForwarder struct {
EntryManager
namespaces []string
podSelector kubernetes.PodSelector
namespaces []string
podSelector kubernetes.PodSelector
labelSelector string
}

// NewWatchingPodForwarder returns a struct that tracks and port-forwards pods as they are created and modified
func NewWatchingPodForwarder(em EntryManager, podSelector kubernetes.PodSelector, namespaces []string) *WatchingPodForwarder {
func NewWatchingPodForwarder(em EntryManager, podSelector kubernetes.PodSelector, labelSelector string, namespaces []string) *WatchingPodForwarder {
return &WatchingPodForwarder{
EntryManager: em,
podSelector: podSelector,
namespaces: namespaces,
EntryManager: em,
podSelector: podSelector,
labelSelector: labelSelector,
namespaces: namespaces,
}
}

func (p *WatchingPodForwarder) Start(ctx context.Context) error {
aggregate := make(chan watch.Event)
stopWatchers, err := aggregatePodWatcher(p.namespaces, aggregate)
stopWatchers, err := aggregatePodWatcher(p.labelSelector, p.namespaces, aggregate)
if err != nil {
stopWatchers()
return fmt.Errorf("initializing pod watcher: %w", err)
Expand Down
6 changes: 3 additions & 3 deletions pkg/skaffold/kubernetes/portforward/pod_forwarder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ func TestAutomaticPortForwardPod(t *testing.T) {
forwardedPorts: newForwardedPorts(),
forwardedResources: newForwardedResources(),
}
p := NewWatchingPodForwarder(entryManager, kubernetes.NewImageList(), nil)
p := NewWatchingPodForwarder(entryManager, kubernetes.NewImageList(), "", nil)
if test.forwarder == nil {
test.forwarder = newTestForwarder()
}
Expand Down Expand Up @@ -473,7 +473,7 @@ func TestStartPodForwarder(t *testing.T) {
client.PrependWatchReactor("*", testutil.SetupFakeWatcher(fakeWatcher))

waitForWatcher := make(chan bool)
t.Override(&aggregatePodWatcher, func(_ []string, aggregate chan<- watch.Event) (func(), error) {
t.Override(&aggregatePodWatcher, func(_ string, _ []string, aggregate chan<- watch.Event) (func(), error) {
go func() {
waitForWatcher <- true
for msg := range fakeWatcher.ResultChan() {
Expand All @@ -487,7 +487,7 @@ func TestStartPodForwarder(t *testing.T) {
imageList := kubernetes.NewImageList()
imageList.Add("image")

p := NewWatchingPodForwarder(NewEntryManager(ioutil.Discard, nil), imageList, nil)
p := NewWatchingPodForwarder(NewEntryManager(ioutil.Discard, nil), imageList, "", nil)
fakeForwarder := newTestForwarder()
p.EntryForwarder = fakeForwarder
p.Start(context.Background())
Expand Down
3 changes: 2 additions & 1 deletion pkg/skaffold/kubernetes/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
)

// AggregatePodWatcher returns a watcher for multiple namespaces.
func AggregatePodWatcher(namespaces []string, aggregate chan<- watch.Event) (func(), error) {
func AggregatePodWatcher(labelSelector string, namespaces []string, aggregate chan<- watch.Event) (func(), error) {
watchers := make([]watch.Interface, 0, len(namespaces))
stopWatchers := func() {
for _, w := range watchers {
Expand All @@ -42,6 +42,7 @@ func AggregatePodWatcher(namespaces []string, aggregate chan<- watch.Event) (fun
for _, ns := range namespaces {
watcher, err := kubeclient.CoreV1().Pods(ns).Watch(metav1.ListOptions{
TimeoutSeconds: &forever,
LabelSelector: labelSelector,
})
if err != nil {
stopWatchers()
Expand Down
6 changes: 3 additions & 3 deletions pkg/skaffold/kubernetes/watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func TestAggregatePodWatcher(t *testing.T) {
testutil.Run(t, "fail to get client", func(t *testutil.T) {
t.Override(&Client, func() (kubernetes.Interface, error) { return nil, errors.New("unable to get client") })

cleanup, err := AggregatePodWatcher([]string{"ns"}, nil)
cleanup, err := AggregatePodWatcher("", []string{"ns"}, nil)
defer cleanup()

t.CheckErrorContains("unable to get client", err)
Expand All @@ -52,7 +52,7 @@ func TestAggregatePodWatcher(t *testing.T) {
return true, nil, errors.New("unable to watch")
})

cleanup, err := AggregatePodWatcher([]string{"ns"}, nil)
cleanup, err := AggregatePodWatcher("", []string{"ns"}, nil)
defer cleanup()

t.CheckErrorContains("unable to watch", err)
Expand All @@ -63,7 +63,7 @@ func TestAggregatePodWatcher(t *testing.T) {
t.Override(&Client, func() (kubernetes.Interface, error) { return clientset, nil })

events := make(chan watch.Event)
cleanup, err := AggregatePodWatcher([]string{"ns1", "ns2"}, events)
cleanup, err := AggregatePodWatcher("", []string{"ns1", "ns2"}, events)
defer cleanup()
t.CheckNoError(err)

Expand Down
1 change: 1 addition & 0 deletions pkg/skaffold/runner/debugging.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,5 +33,6 @@ func (r *SkaffoldRunner) createContainerManager(out io.Writer) {
out,
kubectlCLI,
r.podSelector,
r.defaultLabeller.RunIDSelector(),
r.runCtx.Namespaces)
}
2 changes: 1 addition & 1 deletion pkg/skaffold/runner/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,5 @@ func (r *SkaffoldRunner) createLogger(out io.Writer, artifacts []*latest.Artifac

func (r *SkaffoldRunner) createLoggerForImages(out io.Writer, images []string) {
kubectlCLI := kubectl.NewFromRunContext(r.runCtx)
r.logger = kubernetes.NewLogAggregator(out, kubectlCLI, images, r.podSelector, r.runCtx.Namespaces)
r.logger = kubernetes.NewLogAggregator(out, kubectlCLI, images, r.podSelector, r.defaultLabeller.RunIDSelector(), r.runCtx.Namespaces)
}
2 changes: 1 addition & 1 deletion pkg/skaffold/runner/portforwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ func (r *SkaffoldRunner) createForwarder(out io.Writer) {
r.forwarderManager = portforward.NewForwarderManager(out,
kubectlCLI,
r.podSelector,
r.runCtx.Namespaces,
r.defaultLabeller.RunIDSelector(),
r.runCtx.Namespaces,
r.runCtx.Opts.PortForward,
r.portForwardResources)
}