/
portforwarder.go
108 lines (91 loc) · 2.76 KB
/
portforwarder.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
// Copyright 2023 Antrea Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package portforwarder
import (
"fmt"
"io"
"net/http"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/portforward"
"k8s.io/client-go/transport/spdy"
)
type PortForwarder struct {
config *rest.Config
clientset kubernetes.Interface
namespace string
name string
targetPort int
listenAddress string
listenPort int
stopCh chan struct{}
}
// NewPortForwarder creates Port Forwarder for a Pod
// After creating Port Forwarder object, call Start() on it to start forwarding
// channel and Stop() to terminate it
func NewPortForwarder(config *rest.Config, namespace string, pod string, targetPort int, listenAddress string, listenPort int) (*PortForwarder, error) {
pf := &PortForwarder{
config: config,
namespace: namespace,
name: pod,
targetPort: targetPort,
listenAddress: listenAddress,
listenPort: listenPort,
}
var err error
pf.clientset, err = kubernetes.NewForConfig(pf.config)
if err != nil {
return pf, fmt.Errorf("failed to create Kubernetes client: %v", err)
}
return pf, nil
}
// Start Port Forwarding channel
func (p *PortForwarder) Start() error {
p.stopCh = make(chan struct{}, 1)
readyCh := make(chan struct{})
errCh := make(chan error, 1)
url := p.clientset.CoreV1().RESTClient().Post().
Resource("pods").
Namespace(p.namespace).
Name(p.name).
SubResource("portforward").URL()
transport, upgrader, err := spdy.RoundTripperFor(p.config)
if err != nil {
return fmt.Errorf("failed to create dialer: %v", err)
}
dialer := spdy.NewDialer(upgrader, &http.Client{Transport: transport}, "POST", url)
ports := []string{
fmt.Sprintf("%d:%d", p.listenPort, p.targetPort),
}
addresses := []string{
p.listenAddress,
}
pf, err := portforward.NewOnAddresses(dialer, addresses, ports, p.stopCh, readyCh, io.Discard, io.Discard)
if err != nil {
return fmt.Errorf("port forward request failed: %v", err)
}
go func() {
errCh <- pf.ForwardPorts()
}()
select {
case err = <-errCh:
return fmt.Errorf("port forward request failed: %v", err)
case <-readyCh:
return nil
}
}
// Stop Port Forwarding channel
func (p *PortForwarder) Stop() {
p.stopCh <- struct{}{}
}