/
portforward.go
155 lines (136 loc) · 3.61 KB
/
portforward.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
package kube
import (
"context"
"fmt"
"net/http"
"os"
"strings"
"github.com/main-kube/util/slice"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/cli-runtime/pkg/genericclioptions"
"k8s.io/client-go/tools/portforward"
"k8s.io/client-go/transport/spdy"
)
type PortForwardA struct {
KubeClient *ClientS
Name string
ServiceName string
Namespace string
LocalPort int
// KubePort is the target port for the pod
KubePort int
Resource string
Condition bool
OwnerName string
// Steams configures where to write or read input from
streams genericclioptions.IOStreams
// stopCh is the channel used to manage the port forward lifecycle
stopCh chan struct{}
// readyCh communicates when the tunnel is ready to receive traffic
readyCh chan struct{}
Notify chan any
}
var out *os.File
func init() {
// out, _ = os.OpenFile("/tmp/ibtwpfp-portforward-log", os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644)
out, _ = os.OpenFile("/tmp/log/itsy/logk", os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644)
os.Stderr = out
}
func (pf *PortForwardA) Forward() {
pf.KubeClient = Client
pf.stopCh = make(chan struct{})
pf.readyCh = make(chan struct{})
pf.streams = genericclioptions.IOStreams{
Out: out,
ErrOut: out,
In: os.Stdin,
}
if pf.Resource == "services" {
pf.Resource = "pods"
if err := pf.getFirstPod(); err != nil {
pf.Notify <- err
log.Error(err)
return
}
}
url := pf.KubeClient.API.RESTClient().Post().Resource(pf.Resource).Namespace(pf.Namespace).Name(pf.Name).SubResource("portforward").Prefix("/api/v1").URL()
transport, upgrader, err := spdy.RoundTripperFor(pf.KubeClient.Config)
if err != nil {
pf.Notify <- err
log.Error(err)
return
}
dialer := spdy.NewDialer(upgrader, &http.Client{Transport: transport}, http.MethodPost, url)
fw, err := portforward.New(dialer, []string{fmt.Sprintf("%d:%d", pf.LocalPort, pf.KubePort)}, pf.stopCh, pf.readyCh, pf.streams.Out, pf.streams.ErrOut)
if err != nil {
pf.Notify <- err
log.Error(err)
return
}
if err := fw.ForwardPorts(); err != nil {
pf.Notify <- err
log.Error(err)
if strings.Contains(err.Error(), "pod not found") {
go pf.Forward()
}
return
}
}
func (pf *PortForwardA) Close() {
if pf == nil {
return
}
select {
case pf.stopCh <- struct{}{}:
default:
}
if Map.Get(pf.Namespace).Get(pf.Name) != nil && Map.Get(pf.Namespace).Get(pf.Name).PFs != nil {
slice.Remove(&Map.Get(pf.Namespace).Get(pf.Name).PFs, pf)
}
if Services.Get(pf.Namespace).Get(pf.ServiceName) != nil {
slice.Remove(&Services.Get(pf.Namespace).Get(pf.ServiceName).PFs, pf)
}
}
func (pf *PortForwardA) Ready() {
<-pf.readyCh
}
func (pf *PortForwardA) getFirstPod() error {
serv, err := Client.API.CoreV1().Services(pf.Namespace).Get(context.TODO(), pf.Name, v1.GetOptions{})
if err != nil {
return err
}
var selector string
for k, v := range serv.Spec.Selector {
selector = k + "=" + v
break
}
pods, err := Client.API.CoreV1().Pods(pf.Namespace).List(Client.CTX, v1.ListOptions{
LabelSelector: selector,
Limit: 1,
})
if err != nil {
return err
}
if len(pods.Items) == 0 {
return fmt.Errorf("Service has no pods")
}
pod := pods.Items[0]
pf.Name = pod.Name
pf.OwnerName = pod.ObjectMeta.OwnerReferences[0].Name
podm := Map.Get(pf.Namespace).Get(pf.Name)
podm.PFs = append(podm.PFs, pf)
return nil
}
func (pf *PortForwardA) Copy() *PortForwardA {
return &PortForwardA{
Name: pf.Name,
ServiceName: pf.ServiceName,
Namespace: pf.Namespace,
LocalPort: pf.LocalPort,
KubePort: pf.KubePort,
Resource: pf.Resource,
Condition: false,
OwnerName: pf.OwnerName,
Notify: pf.Notify,
}
}