Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve port-forwarding #1452

Merged
merged 2 commits into from Jan 14, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
101 changes: 43 additions & 58 deletions pkg/skaffold/kubernetes/port_forward.go
Expand Up @@ -23,15 +23,10 @@ import (
"io"
"os/exec"
"strconv"
"sync"
"syscall"

"github.com/GoogleContainerTools/skaffold/pkg/skaffold/color"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/util"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/watch"
)
Expand All @@ -45,10 +40,10 @@ type PortForwarder struct {
podSelector PodSelector

// forwardedPods is a map of portForwardEntry.key() (string) -> portForwardEntry
forwardedPods *sync.Map
forwardedPods map[string]*portForwardEntry

// forwardedPorts is a map of port (int32) -> container name (string)
forwardedPorts *sync.Map
forwardedPorts map[int32]string
}

type portForwardEntry struct {
Expand All @@ -58,45 +53,51 @@ type portForwardEntry struct {
containerName string
port int32

cmd *exec.Cmd
cancel context.CancelFunc
}

// Forwarder is an interface that can modify and manage port-forward processes
type Forwarder interface {
Forward(*portForwardEntry) error
Stop(*portForwardEntry) error
Forward(context.Context, *portForwardEntry) error
Terminate(*portForwardEntry)
}

type kubectlForwarder struct{}

// Forward port-forwards a pod using kubectl port-forward
// It returns an error only if the process fails or was terminated by a signal other than SIGTERM
func (*kubectlForwarder) Forward(pfe *portForwardEntry) error {
func (*kubectlForwarder) Forward(parentCtx context.Context, pfe *portForwardEntry) error {
logrus.Debugf("Port forwarding %s", pfe)

ctx, cancel := context.WithCancel(parentCtx)
pfe.cancel = cancel

portNumber := fmt.Sprintf("%d", pfe.port)
cmd := exec.Command("kubectl", "port-forward", pfe.podName, portNumber, portNumber, "--namespace", pfe.namespace)
pfe.cmd = cmd

cmd := exec.CommandContext(ctx, "kubectl", "port-forward", pfe.podName, portNumber, portNumber, "--namespace", pfe.namespace)
buf := &bytes.Buffer{}
cmd.Stdout = buf
cmd.Stderr = buf

if err := util.RunCmd(cmd); err != nil && !util.IsTerminatedError(err) {
if err := cmd.Start(); err != nil {
if errors.Cause(err) == context.Canceled {
return nil
}
return errors.Wrapf(err, "port forwarding pod: %s/%s, port: %s, err: %s", pfe.namespace, pfe.podName, portNumber, buf.String())
}

go cmd.Wait()

return nil
}

// Stop terminates an existing kubectl port-forward command using SIGTERM
func (*kubectlForwarder) Stop(p *portForwardEntry) error {
// Terminate terminates an existing kubectl port-forward command using SIGTERM
func (*kubectlForwarder) Terminate(p *portForwardEntry) {
logrus.Debugf("Terminating port-forward %s", p)
if p.cmd == nil {
return fmt.Errorf("no port-forward command found for %s", p)
}
if err := p.cmd.Process.Signal(syscall.SIGTERM); err != nil {
return errors.Wrap(err, "terminating port-forward process")

if p.cancel != nil {
p.cancel()
}
return nil
}

// NewPortForwarder returns a struct that tracks and port-forwards pods as they are created and modified
Expand All @@ -105,19 +106,16 @@ func NewPortForwarder(out io.Writer, podSelector PodSelector) *PortForwarder {
Forwarder: &kubectlForwarder{},
output: out,
podSelector: podSelector,
forwardedPods: &sync.Map{},
forwardedPorts: &sync.Map{},
forwardedPods: make(map[string]*portForwardEntry),
forwardedPorts: make(map[int32]string),
}
}

func (p *PortForwarder) cleanupPorts() {
p.forwardedPods.Range(func(k, v interface{}) bool {
entry := v.(*portForwardEntry)
if err := p.Stop(entry); err != nil {
logrus.Warnf("cleaning up port forwards: %s", err)
}
return false
})
// Stop terminates all kubectl port-forward commands.
func (p *PortForwarder) Stop() {
for _, entry := range p.forwardedPods {
p.Terminate(entry)
}
}

// Start begins a pod watcher that port forwards any pods involving containers with exposed ports.
Expand All @@ -134,7 +132,6 @@ func (p *PortForwarder) Start(ctx context.Context) error {
for {
select {
case <-ctx.Done():
p.cleanupPorts()
return
case evt, ok := <-watcher.ResultChan():
if !ok {
Expand All @@ -156,14 +153,12 @@ func (p *PortForwarder) Start(ctx context.Context) error {
continue
}

// At this point, we know the event's type if "ADDED" or "MODIFIED".
// At this point, we know the event's type is "ADDED" or "MODIFIED".
// We must take both types into account as it is possible for the pod to have become ready for port-forwarding before we established the watch.
if p.podSelector.Select(pod) && pod.Status.Phase == v1.PodRunning && pod.DeletionTimestamp == nil {
go func() {
if err := p.portForwardPod(pod); err != nil {
logrus.Warnf("port forwarding pod failed: %s", err)
}
}()
if err := p.portForwardPod(ctx, pod); err != nil {
logrus.Warnf("port forwarding pod failed: %s", err)
}
}
}
}
Expand All @@ -172,19 +167,17 @@ func (p *PortForwarder) Start(ctx context.Context) error {
return nil
}

func (p *PortForwarder) portForwardPod(pod *v1.Pod) error {
func (p *PortForwarder) portForwardPod(ctx context.Context, pod *v1.Pod) error {
resourceVersion, err := strconv.Atoi(pod.ResourceVersion)
if err != nil {
return errors.Wrap(err, "converting resource version to integer")
}

var g errgroup.Group

for _, c := range pod.Spec.Containers {
for _, port := range c.Ports {
// If the port is already port-forwarded by another container,
// continue without port-forwarding
currentApp, ok := p.forwardedPorts.Load(port.ContainerPort)
currentApp, ok := p.forwardedPorts[port.ContainerPort]
if ok && currentApp != c.Name {
color.LightYellow.Fprintf(p.output, "Port %d for %s is already in use by container %s\n", port.ContainerPort, c.Name, currentApp)
continue
Expand All @@ -197,32 +190,24 @@ func (p *PortForwarder) portForwardPod(pod *v1.Pod) error {
containerName: c.Name,
port: port.ContainerPort,
}
v, ok := p.forwardedPods.Load(entry.key())

if ok {
prevEntry := v.(*portForwardEntry)
if prevEntry, ok := p.forwardedPods[entry.key()]; ok {
// Check if this is a new generation of pod
if entry.resourceVersion > prevEntry.resourceVersion {
if err := p.Stop(prevEntry); err != nil {
return errors.Wrap(err, "terminating port-forward process")
}
p.Terminate(prevEntry)
}
}

color.Default.Fprintln(p.output, fmt.Sprintf("Port Forwarding %s %d -> %d", entry.podName, entry.port, entry.port))
p.forwardedPods.Store(entry.key(), entry)
p.forwardedPorts.Store(entry.port, entry.containerName)
p.forwardedPods[entry.key()] = entry
p.forwardedPorts[entry.port] = entry.containerName

g.Go(func() error {
return p.Forward(entry)
})
if err := p.Forward(ctx, entry); err != nil {
return errors.Wrap(err, "port forwarding failed")
}
}
}

if err := g.Wait(); err != nil {
return errors.Wrap(err, "port forwarding")
}

return nil
}

Expand Down
16 changes: 7 additions & 9 deletions pkg/skaffold/kubernetes/port_forward_test.go
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package kubernetes

import (
"context"
"fmt"
"io/ioutil"
"reflect"
Expand All @@ -32,27 +33,24 @@ type testForwarder struct {
forwardedPorts map[int32]bool

forwardErr error
stopErr error
}

func (f *testForwarder) Forward(pfe *portForwardEntry) error {
func (f *testForwarder) Forward(ctx context.Context, pfe *portForwardEntry) error {
f.forwardedEntries[pfe.key()] = pfe
f.forwardedPorts[pfe.port] = true
return f.forwardErr
}

func (f *testForwarder) Stop(pfe *portForwardEntry) error {
func (f *testForwarder) Terminate(pfe *portForwardEntry) {
delete(f.forwardedEntries, pfe.key())
delete(f.forwardedPorts, pfe.port)
return f.stopErr
}

func newTestForwarder(forwardErr, stopErr error) *testForwarder {
func newTestForwarder(forwardErr error) *testForwarder {
return &testForwarder{
forwardedEntries: map[string]*portForwardEntry{},
forwardedPorts: map[int32]bool{},
forwardErr: forwardErr,
stopErr: stopErr,
}
}

Expand Down Expand Up @@ -130,7 +128,7 @@ func TestPortForwardPod(t *testing.T) {
expectedPorts: map[int32]bool{
8080: true,
},
forwarder: newTestForwarder(fmt.Errorf(""), nil),
forwarder: newTestForwarder(fmt.Errorf("")),
shouldErr: true,
expectedEntries: map[string]*portForwardEntry{
"containername-8080": {
Expand Down Expand Up @@ -330,12 +328,12 @@ func TestPortForwardPod(t *testing.T) {
t.Run(test.description, func(t *testing.T) {
p := NewPortForwarder(ioutil.Discard, NewImageList())
if test.forwarder == nil {
test.forwarder = newTestForwarder(nil, nil)
test.forwarder = newTestForwarder(nil)
}
p.Forwarder = test.forwarder

for _, pod := range test.pods {
err := p.portForwardPod(pod)
err := p.portForwardPod(context.Background(), pod)
testutil.CheckError(t, test.shouldErr, err)
}

Expand Down
7 changes: 4 additions & 3 deletions pkg/skaffold/runner/dev.go
Expand Up @@ -37,6 +37,10 @@ var ErrorConfigurationChanged = errors.New("configuration changed")
// pipeline until interrupted by the user.
func (r *SkaffoldRunner) Dev(ctx context.Context, out io.Writer, artifacts []*latest.Artifact) error {
logger := r.newLogger(out, artifacts)
defer logger.Stop()

portForwarder := kubernetes.NewPortForwarder(out, r.imageList)
defer portForwarder.Stop()

// Create watcher and register artifacts to build current state of files.
changed := changes{}
Expand All @@ -59,7 +63,6 @@ func (r *SkaffoldRunner) Dev(ctx context.Context, out io.Writer, artifacts []*la

switch {
case changed.needsReload:
logger.Stop()
return ErrorConfigurationChanged
case len(changed.needsResync) > 0:
for _, s := range changed.needsResync {
Expand Down Expand Up @@ -139,8 +142,6 @@ func (r *SkaffoldRunner) Dev(ctx context.Context, out io.Writer, artifacts []*la
}

if r.opts.PortForward {
portForwarder := kubernetes.NewPortForwarder(out, r.imageList)

if err := portForwarder.Start(ctx); err != nil {
return errors.Wrap(err, "starting port-forwarder")
}
Expand Down
38 changes: 0 additions & 38 deletions pkg/skaffold/util/process.go

This file was deleted.