Skip to content

Commit

Permalink
Rely on context.Context to stop port-forwarding
Browse files Browse the repository at this point in the history
Signed-off-by: David Gageot <david@gageot.net>
  • Loading branch information
dgageot committed Jan 14, 2019
1 parent 4b872e6 commit cb3fb76
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 102 deletions.
95 changes: 40 additions & 55 deletions pkg/skaffold/kubernetes/port_forward.go
Expand Up @@ -23,15 +23,10 @@ import (
"io"
"os/exec"
"strconv"
"sync"
"syscall"

"github.com/GoogleContainerTools/skaffold/pkg/skaffold/color"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/util"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/watch"
)
Expand All @@ -45,10 +40,10 @@ type PortForwarder struct {
podSelector PodSelector

// forwardedPods is a map of portForwardEntry.key() (string) -> portForwardEntry
forwardedPods *sync.Map
forwardedPods map[string]*portForwardEntry

// forwardedPorts is a map of port (int32) -> container name (string)
forwardedPorts *sync.Map
forwardedPorts map[int32]string
}

type portForwardEntry struct {
Expand All @@ -58,45 +53,51 @@ type portForwardEntry struct {
containerName string
port int32

cmd *exec.Cmd
cancel context.CancelFunc
}

// Forwarder is an interface that can modify and manage port-forward processes
type Forwarder interface {
Forward(*portForwardEntry) error
Terminate(*portForwardEntry) error
Forward(context.Context, *portForwardEntry) error
Terminate(*portForwardEntry)
}

type kubectlForwarder struct{}

// Forward port-forwards a pod using kubectl port-forward
// It returns an error only if the process fails or was terminated by a signal other than SIGTERM
func (*kubectlForwarder) Forward(pfe *portForwardEntry) error {
func (*kubectlForwarder) Forward(parentCtx context.Context, pfe *portForwardEntry) error {
logrus.Debugf("Port forwarding %s", pfe)

ctx, cancel := context.WithCancel(parentCtx)
pfe.cancel = cancel

portNumber := fmt.Sprintf("%d", pfe.port)
cmd := exec.Command("kubectl", "port-forward", pfe.podName, portNumber, portNumber, "--namespace", pfe.namespace)
pfe.cmd = cmd

cmd := exec.CommandContext(ctx, "kubectl", "port-forward", pfe.podName, portNumber, portNumber, "--namespace", pfe.namespace)
buf := &bytes.Buffer{}
cmd.Stdout = buf
cmd.Stderr = buf

if err := util.RunCmd(cmd); err != nil && !util.IsTerminatedError(err) {
if err := cmd.Start(); err != nil {
if errors.Cause(err) == context.Canceled {
return nil
}
return errors.Wrapf(err, "port forwarding pod: %s/%s, port: %s, err: %s", pfe.namespace, pfe.podName, portNumber, buf.String())
}

go cmd.Wait()

return nil
}

// Terminate terminates an existing kubectl port-forward command using SIGTERM
func (*kubectlForwarder) Terminate(p *portForwardEntry) error {
func (*kubectlForwarder) Terminate(p *portForwardEntry) {
logrus.Debugf("Terminating port-forward %s", p)
if p.cmd == nil {
return fmt.Errorf("no port-forward command found for %s", p)
}
if err := p.cmd.Process.Signal(syscall.SIGTERM); err != nil {
return errors.Wrap(err, "terminating port-forward process")

if p.cancel != nil {
p.cancel()
}
return nil
}

// NewPortForwarder returns a struct that tracks and port-forwards pods as they are created and modified
Expand All @@ -105,20 +106,16 @@ func NewPortForwarder(out io.Writer, podSelector PodSelector) *PortForwarder {
Forwarder: &kubectlForwarder{},
output: out,
podSelector: podSelector,
forwardedPods: &sync.Map{},
forwardedPorts: &sync.Map{},
forwardedPods: make(map[string]*portForwardEntry),
forwardedPorts: make(map[int32]string),
}
}

// Stop terminates all kubectl port-forward commands.
func (p *PortForwarder) Stop() {
p.forwardedPods.Range(func(k, v interface{}) bool {
entry := v.(*portForwardEntry)
if err := p.Terminate(entry); err != nil {
logrus.Warnf("cleaning up port forwards: %s", err)
}
return true
})
for _, entry := range p.forwardedPods {
p.Terminate(entry)
}
}

// Start begins a pod watcher that port forwards any pods involving containers with exposed ports.
Expand Down Expand Up @@ -156,14 +153,12 @@ func (p *PortForwarder) Start(ctx context.Context) error {
continue
}

// At this point, we know the event's type if "ADDED" or "MODIFIED".
// At this point, we know the event's type is "ADDED" or "MODIFIED".
// We must take both types into account as it is possible for the pod to have become ready for port-forwarding before we established the watch.
if p.podSelector.Select(pod) && pod.Status.Phase == v1.PodRunning && pod.DeletionTimestamp == nil {
go func() {
if err := p.portForwardPod(pod); err != nil {
logrus.Warnf("port forwarding pod failed: %s", err)
}
}()
if err := p.portForwardPod(ctx, pod); err != nil {
logrus.Warnf("port forwarding pod failed: %s", err)
}
}
}
}
Expand All @@ -172,19 +167,17 @@ func (p *PortForwarder) Start(ctx context.Context) error {
return nil
}

func (p *PortForwarder) portForwardPod(pod *v1.Pod) error {
func (p *PortForwarder) portForwardPod(ctx context.Context, pod *v1.Pod) error {
resourceVersion, err := strconv.Atoi(pod.ResourceVersion)
if err != nil {
return errors.Wrap(err, "converting resource version to integer")
}

var g errgroup.Group

for _, c := range pod.Spec.Containers {
for _, port := range c.Ports {
// If the port is already port-forwarded by another container,
// continue without port-forwarding
currentApp, ok := p.forwardedPorts.Load(port.ContainerPort)
currentApp, ok := p.forwardedPorts[port.ContainerPort]
if ok && currentApp != c.Name {
color.LightYellow.Fprintf(p.output, "Port %d for %s is already in use by container %s\n", port.ContainerPort, c.Name, currentApp)
continue
Expand All @@ -197,32 +190,24 @@ func (p *PortForwarder) portForwardPod(pod *v1.Pod) error {
containerName: c.Name,
port: port.ContainerPort,
}
v, ok := p.forwardedPods.Load(entry.key())

if ok {
prevEntry := v.(*portForwardEntry)
if prevEntry, ok := p.forwardedPods[entry.key()]; ok {
// Check if this is a new generation of pod
if entry.resourceVersion > prevEntry.resourceVersion {
if err := p.Terminate(prevEntry); err != nil {
return errors.Wrap(err, "terminating port-forward process")
}
p.Terminate(prevEntry)
}
}

color.Default.Fprintln(p.output, fmt.Sprintf("Port Forwarding %s %d -> %d", entry.podName, entry.port, entry.port))
p.forwardedPods.Store(entry.key(), entry)
p.forwardedPorts.Store(entry.port, entry.containerName)
p.forwardedPods[entry.key()] = entry
p.forwardedPorts[entry.port] = entry.containerName

g.Go(func() error {
return p.Forward(entry)
})
if err := p.Forward(ctx, entry); err != nil {
return errors.Wrap(err, "port forwarding failed")
}
}
}

if err := g.Wait(); err != nil {
return errors.Wrap(err, "port forwarding")
}

return nil
}

Expand Down
16 changes: 7 additions & 9 deletions pkg/skaffold/kubernetes/port_forward_test.go
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package kubernetes

import (
"context"
"fmt"
"io/ioutil"
"reflect"
Expand All @@ -32,27 +33,24 @@ type testForwarder struct {
forwardedPorts map[int32]bool

forwardErr error
stopErr error
}

func (f *testForwarder) Forward(pfe *portForwardEntry) error {
func (f *testForwarder) Forward(ctx context.Context, pfe *portForwardEntry) error {
f.forwardedEntries[pfe.key()] = pfe
f.forwardedPorts[pfe.port] = true
return f.forwardErr
}

func (f *testForwarder) Terminate(pfe *portForwardEntry) error {
func (f *testForwarder) Terminate(pfe *portForwardEntry) {
delete(f.forwardedEntries, pfe.key())
delete(f.forwardedPorts, pfe.port)
return f.stopErr
}

func newTestForwarder(forwardErr, stopErr error) *testForwarder {
func newTestForwarder(forwardErr error) *testForwarder {
return &testForwarder{
forwardedEntries: map[string]*portForwardEntry{},
forwardedPorts: map[int32]bool{},
forwardErr: forwardErr,
stopErr: stopErr,
}
}

Expand Down Expand Up @@ -130,7 +128,7 @@ func TestPortForwardPod(t *testing.T) {
expectedPorts: map[int32]bool{
8080: true,
},
forwarder: newTestForwarder(fmt.Errorf(""), nil),
forwarder: newTestForwarder(fmt.Errorf("")),
shouldErr: true,
expectedEntries: map[string]*portForwardEntry{
"containername-8080": {
Expand Down Expand Up @@ -330,12 +328,12 @@ func TestPortForwardPod(t *testing.T) {
t.Run(test.description, func(t *testing.T) {
p := NewPortForwarder(ioutil.Discard, NewImageList())
if test.forwarder == nil {
test.forwarder = newTestForwarder(nil, nil)
test.forwarder = newTestForwarder(nil)
}
p.Forwarder = test.forwarder

for _, pod := range test.pods {
err := p.portForwardPod(pod)
err := p.portForwardPod(context.Background(), pod)
testutil.CheckError(t, test.shouldErr, err)
}

Expand Down
38 changes: 0 additions & 38 deletions pkg/skaffold/util/process.go

This file was deleted.

0 comments on commit cb3fb76

Please sign in to comment.