-
Notifications
You must be signed in to change notification settings - Fork 10
/
portforward.go
103 lines (87 loc) · 2.8 KB
/
portforward.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
package k8s
import (
"fmt"
"io/ioutil"
"net/http"
"os"
"os/signal"
"k8s.io/apimachinery/pkg/util/httpstream"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/portforward"
"k8s.io/client-go/transport/spdy"
)
// PortForwarder is a type that implements port forwarding to a pod
type PortForwarder struct {
forwarder *portforward.PortForwarder
stopChan chan struct{}
readyChan <-chan struct{}
sigChan chan os.Signal
done chan struct{}
}
// NewPortForwarder creates a new port forwarder to a pod
func NewPortForwarder(dialer httpstream.Dialer, portSpec string) (*PortForwarder, error) {
stopChan := make(chan struct{})
readyChan := make(chan struct{})
forwarder, err := portforward.New(dialer, []string{portSpec}, stopChan, readyChan, ioutil.Discard, os.Stderr)
if err != nil {
return nil, fmt.Errorf("Error setting up port forwarding: %w", err)
}
return &PortForwarder{
forwarder: forwarder,
stopChan: stopChan,
readyChan: readyChan,
done: make(chan struct{}),
}, nil
}
// Start starts the port forwarding and calls the readyFunc callback function when port forwarding is ready
func (pf *PortForwarder) Start(readyFunc func(pf *PortForwarder) error) error {
// Set up a channel to process OS signals
pf.sigChan = make(chan os.Signal, 1)
signal.Notify(pf.sigChan, os.Interrupt)
// Set up a channel to process forwarding errors
errChan := make(chan error, 1)
// Set up forwarding, and relay errors to errChan so that caller is able to handle it
go func() {
err := pf.forwarder.ForwardPorts()
errChan <- err
}()
// Stop forwarding in case OS signals are received
go func() {
<-pf.sigChan
pf.Stop()
}()
// Process signals and call readyFunc
select {
case <-pf.readyChan:
return readyFunc(pf)
case err := <-errChan:
return fmt.Errorf("Error during port forwarding: %w", err)
}
}
// Stop stops the port forwarding if not stopped already
func (pf *PortForwarder) Stop() {
defer close(pf.done)
signal.Stop(pf.sigChan)
if pf.stopChan != nil {
close(pf.stopChan)
}
}
// Done returns a channel that is closed after Stop has been called.
func (pf *PortForwarder) Done() <-chan struct{} {
return pf.done
}
// DialerToPod constructs a new httpstream.Dialer to connect to a pod for use
// with a PortForwarder
func DialerToPod(conf *rest.Config, clientSet kubernetes.Interface, podName string, namespace string) (httpstream.Dialer, error) {
roundTripper, upgrader, err := spdy.RoundTripperFor(conf)
if err != nil {
return nil, fmt.Errorf("Error setting up round tripper for port forwarding: %w", err)
}
serverURL := clientSet.CoreV1().RESTClient().Post().
Resource("pods").
Namespace(namespace).
Name(podName).
SubResource("portforward").URL()
return spdy.NewDialer(upgrader, &http.Client{Transport: roundTripper}, http.MethodPost, serverURL), nil
}