forked from JayceLau/kt-connect
-
Notifications
You must be signed in to change notification settings - Fork 0
/
inbound.go
128 lines (114 loc) · 3.65 KB
/
inbound.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
package connect
import (
"fmt"
"github.com/alibaba/kt-connect/pkg/kt/cluster"
"strconv"
"strings"
"sync"
"time"
"github.com/alibaba/kt-connect/pkg/kt/channel"
"github.com/alibaba/kt-connect/pkg/kt/options"
"github.com/alibaba/kt-connect/pkg/kt/exec/kubectl"
"github.com/alibaba/kt-connect/pkg/kt/util"
"github.com/rs/zerolog/log"
)
// Inbound mapping local port from cluster
func (s *Shadow) Inbound(exposePorts, podName, remoteIP string, credential *util.SSHCredential) (err error) {
kubernetesCli := &kubectl.Cli{KubeOptions: s.Options.KubeOptions}
ssh := &channel.SSHChannel{}
log.Info().Msg("creating shadow inbound(remote->local)")
return inbound(exposePorts, podName, remoteIP, credential, s.Options, kubernetesCli, ssh)
}
func inbound(exposePorts, podName, remoteIP string, credential *util.SSHCredential,
options *options.DaemonOptions,
kubernetesCli kubectl.CliInterface,
ssh channel.Channel,
) (err error) {
//stop := make(chan bool)
//rootCtx, cancel := context.WithCancel(context.Background())
//
//// one of the background process start failed and will cancel the started process
//go func() {
// util.StopBackendProcess(<-stop, cancel)
//}()
log.Info().Msgf("remote %s forward to local %s", remoteIP, exposePorts)
localSSHPort, err := strconv.Atoi(util.GetRandomSSHPort(remoteIP))
if err != nil {
return
}
err = portForward(podName, localSSHPort, options)
if err != nil {
return
}
exposeLocalPortsToRemote(ssh, exposePorts, localSSHPort)
return nil
}
func portForward(podName string, localSSHPort int, options *options.DaemonOptions) error {
var err error
var wg sync.WaitGroup
//debug := options.Debug
namespace := options.Namespace
kubernetes, err := cluster.Create(options.KubeConfig)
wg.Add(1)
go func(wg *sync.WaitGroup) {
err := kubernetes.PortForward(namespace, podName, localSSHPort)
//portforward := kubernetesCli.PortForward(namespace, podName, localSSHPort)
//err = exec.BackgroundRunWithCtx(
// &exec.CMDContext{
// Ctx: rootCtx,
// Cmd: portforward,
// Name: "exchange port forward to local",
// Stop: stop,
// },
// debug,
//)
if err != nil {
return
}
log.Info().Msgf("wait(%ds) port-forward successful", options.WaitTime)
time.Sleep(time.Duration(options.WaitTime) * time.Second)
wg.Done()
}(&wg)
wg.Wait()
return err
}
func exposeLocalPortsToRemote(ssh channel.Channel, exposePorts string, localSSHPort int) {
var wg sync.WaitGroup
// supports multi port pairs
portPairs := strings.Split(exposePorts, ",")
for _, exposePort := range portPairs {
localPort, remotePort := getPortMapping(exposePort)
exposeLocalPortToRemote(wg, ssh, remotePort, localPort, localSSHPort)
}
wg.Wait()
}
func exposeLocalPortToRemote(wg sync.WaitGroup, ssh channel.Channel, remotePort string, localPort string, localSSHPort int) {
wg.Add(1)
go func(wg *sync.WaitGroup) {
log.Info().Msgf("exposeLocalPortsToRemote request from pod:%s to 127.0.0.1:%s\n", remotePort, localPort)
err := ssh.ForwardRemoteToLocal(
&channel.Certificate{
Username: "root",
Password: "root",
},
fmt.Sprintf("127.0.0.1:%d", localSSHPort),
fmt.Sprintf("0.0.0.0:%s", remotePort),
fmt.Sprintf("127.0.0.1:%s", localPort),
)
if err != nil {
log.Error().Msgf("error happen when forward remote request to local %s", err)
}
log.Info().Msgf("exposeLocalPortsToRemote request from pod:%s to 127.0.0.1:%s finished\n", remotePort, localPort)
wg.Done()
}(&wg)
}
func getPortMapping(exposePort string) (string, string) {
localPort := exposePort
remotePort := exposePort
ports := strings.SplitN(exposePort, ":", 2)
if len(ports) > 1 {
localPort = ports[1]
remotePort = ports[0]
}
return localPort, remotePort
}