-
Notifications
You must be signed in to change notification settings - Fork 0
/
sshc.go
385 lines (329 loc) · 10 KB
/
sshc.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
package ssh_mesh
import (
"context"
"errors"
"io"
"log"
"net"
"net/http"
"strconv"
"strings"
"time"
"log/slog"
"github.com/costinm/ssh-mesh/nio"
"golang.org/x/crypto/ssh"
)
// Client side of SSH mesh.
// Dial opens one TCP or H2 connection to addr.
// It blocks until the SSH handshake is done.
func (sc *SSHCMux) Dial(ctx context.Context, addr string) error {
if strings.HasPrefix(addr, "https://") {
// 'nio' has wrappers for streaming over h2 and h2c
//
tcon, err := nio.NewStreamH2(ctx, http.DefaultClient, addr, sshVip, sc.mds)
if err != nil {
return err
}
return sc.DialConn(ctx, tcon, addr)
}
tcon, err := net.Dial("tcp", addr)
if err != nil {
return err
}
return sc.DialConn(ctx, tcon, addr)
}
func (sc *SSHCMux) DialConn(ctx context.Context, tcon net.Conn, addr string) error {
sc.NetConn = tcon
clientCfg := &ssh.ClientConfig{
Auth: []ssh.AuthMethod{
ssh.PublicKeys(sc.SignerClient),
},
Config: ssh.Config{},
//ClientVersion: version,
Timeout: 3 * time.Second,
User: sc.Mesh.Name,
// hostname is passed back from addr, empty string
HostKeyCallback: func(hostname string, remote net.Addr, key ssh.PublicKey) error {
if sc.CertChecker != nil {
err := sc.CertChecker.CheckHostKey(hostname, remote, key)
if err == nil {
return err
}
}
// Not in mesh mode - allow any hosts, they're only used for jumping, not trusted.
if len(sc.AuthorizedCA) == 0 {
slog.Info("Permissive/test host, no CA", "host", hostname, "addr", remote, "key", key)
return nil
}
slog.Info("SSHC rejected host", "host", hostname, "addr", remote, "key", key)
return errors.New("Server not authenticated")
},
}
if sc.mds != nil {
clientCfg.Auth = append(clientCfg.Auth, ssh.PasswordCallback(func() (secret string, err error) {
t, err := sc.mds.GetToken(ctx, "ssh://"+addr)
if err != nil {
return "", err
}
return t, nil
}))
}
cc, chans, reqs, err := ssh.NewClientConn(tcon, addr, clientCfg)
if err != nil {
return err
}
sc.SSHConn = cc
sc.chans = chans
sc.reqs = reqs
sc.LastConnected = time.Now()
// NewClient will handle requests (reply false) and channel opens (via channelHandlers created by c.HandleChannelOpen),
// keep track of reverseForwards (close all on done)
//
// It is the only way to use the Session implementation in the core library - which is equivalent to OpenChannel("session")
c := ssh.NewClient(cc, chans, reqs)
sc.SSHClient = c
// The client adds "forwarded-tcpip" and "forwarded-streamlocal" when ListenTCP is called.
// This in turns sends "tcpip-forward" command, with IP:port
// The method returns a Listener, with port set.
// Instead, we don't call ListenTCP, and handle the channels directly (
// no listener )
fch := c.HandleChannelOpen("forwarded-tcpip")
go sc.handleReverseForwardAccept(fch)
// TODO: allow registration of arbitrary handlers for channels
// TODO: allow registration of HTTP handlers (channels)
return nil
}
// RFC 4254 7.1
type channelForwardMsg struct {
addr string
rport uint32
}
// ListenTCP requests the remote peer open a listening socket on port.
//
// Regular SSH servers don't multiplex on port.
// RFC4254:
// "" means that connections are to be accepted on all protocol
//
// families supported by the SSH implementation.
//
// o "0.0.0.0" means to listen on all IPv4 addresses.
//
// o "::" means to listen on all IPv6 addresses.
//
// o "localhost" means to listen on all protocol families supported by
// the SSH implementation on loopback addresses only ([RFC3330] and
// [RFC3513]).
//
// o "127.0.0.1" and "::1" indicate listening on the loopback
// interfaces for IPv4 and IPv6, respectively.
//
// Port 0 is usually supported.
func (c *SSHCMux) ListenTCP(domain string, port uint32) (uint32, error) {
m := channelForwardMsg{
domain,
port,
}
// send message
ok, resp, err := c.SSHConn.SendRequest("tcpip-forward", true, ssh.Marshal(&m))
if err != nil {
return 0, err
}
if !ok {
return 0, errors.New("ssh: tcpip-forward request denied by peer")
}
var p struct {
Port uint32
}
if err := ssh.Unmarshal(resp, &p); err != nil {
return 0, err
}
return p.Port, nil
}
// See RFC 4254, section 7.2
type forwardedTCPPayload struct {
Addr string
Port uint32
// Note that sish doesn't preserve it by default
OriginAddr string
OriginPort uint32
}
func (sc *SSHCMux) handleReverseForwardAccept(fch <-chan ssh.NewChannel) {
// This would needed to kick handleForwardsOnce
// c.ListenTCP(&net.TCPAddr{}) // any port, ignored
for ch := range fch {
switch channelType := ch.ChannelType(); channelType {
case "forwarded-tcpip":
var payload forwardedTCPPayload
if err := ssh.Unmarshal(ch.ExtraData(), &payload); err != nil {
ch.Reject(ssh.ConnectionFailed, "could not parse forwarded-tcpip payload: "+err.Error())
continue
}
// RFC 4254 section 7.2 specifies that incoming
// addresses should list the address, in string
// format. It is implied that this should be an IP
// address, as it would be impossible to connect to it
// otherwise.
//laddr, err := parseTCPAddr(payload.Addr, payload.Port)
//if err != nil {
// ch.Reject(ConnectionFailed, err.Error())
// continue
//}
//raddr, err = parseTCPAddr(payload.OriginAddr, payload.OriginPort)
//if err != nil {
// ch.Reject(ConnectionFailed, err.Error())
// continue
//}
c, cr, err := ch.Accept()
if err != nil {
ch.Reject(ssh.ConnectionFailed, "could not accept: "+err.Error())
continue
}
go ssh.DiscardRequests(cr)
go sc.handleReverseAcceptedStream(c, payload)
}
}
}
// handleReverseAcceptedStream handles streams accepted from the remote server (-R).
// This is an extension to the regular SSH client, waypoints can use any port.
//
// For a regular ssh client, all service ports must be forwarded.
func (sc *SSHCMux) handleReverseAcceptedStream(ch io.ReadWriteCloser,
fwto forwardedTCPPayload) {
if fwto.Port == 22 || fwto.Port == 15022 {
sc.Proxy(ch, "localhost:15022", "")
return
}
// fwto.Addr will be this hostname. We need the port.
dstp := sc.ReverseForwards[strconv.Itoa(int(fwto.Port))]
if dstp == "" {
if sc.Waypoint {
dstp = net.JoinHostPort(fwto.Addr, strconv.Itoa(int(fwto.Port)))
} else {
slog.Warn("Unknown port", "port", fwto.Port, "addr", fwto.Addr,
"origPort", fwto.OriginPort, "oAddr", fwto.OriginAddr)
ch.Close()
return
}
}
sc.Proxy(ch, dstp, "")
}
// Proxy an incoming stream to a destination, for remotely accepted steams (-R)
// TODO: optimize.
func (sc *SSHCMux) Proxy(ch io.ReadWriteCloser, dstp string, s string) {
c, err := net.Dial("tcp", dstp)
if err != nil {
slog.Error("ssh failed to connect", "addr", dstp)
return
}
proxy(ch, c, func(err error, err2 error, i int64, i2 int64) {
if err == nil && err2 == nil {
slog.Info("Proxy", "dst", dstp, "in", i, "out", i2)
} else {
slog.Info("Proxy", "dst", dstp, "in", i, "out", i2, "errin", err, "errout", err2)
}
})
}
// StayConnected will maintain an active connection, typically with a jump host.
//
// 'addr' is the IP:port to connect to - not the 'canonical' service.
func (sshc *SSHCMux) StayConnected(addr string) {
// TODO: create or use UDS for multiplex.
sshDomain, _, _ := net.SplitHostPort(addr)
ctx := context.Background()
backoff := 1000 * time.Millisecond
for {
t0 := time.Now()
err := sshc.Dial(ctx, addr)
if err != nil {
slog.Info("Dial_error", "err", err, "addr", addr)
if backoff < 15*time.Minute {
backoff = 2 * backoff
}
time.Sleep(backoff)
continue
}
t1 := time.Now()
c := sshc.SSHClient
// Open a session - for example sish sends logs and info (without waiting for shell!)
//go sshc.ClientSession()
port, err := sshc.ListenTCP("", 0)
// This is used for openssh/dropbear - which don't multiplex 22
// (no automatic jump-host / waypoint feature ).
slog.Info("JumpHost", "port", port, "addr", addr)
crv := sshc.Mesh.Name // os.Getenv("K_REVISION")
if crv != "" {
_, err = sshc.ListenTCP(crv+"."+sshDomain, 22)
if err != nil {
log.Println("Failed to forward", err)
}
}
for k, _ := range sshc.ReverseForwards {
kp, _ := strconv.Atoi(k)
_, err = sshc.ListenTCP(crv+"."+sshDomain, uint32(kp))
if err != nil {
log.Println("Failed to forward", err)
}
}
slog.Info("SSHC_CONNECTED", "hostname", addr, "domain", sshDomain,
"dial_time", t1.Sub(t0),
"con_time", time.Since(t0))
backoff = 1000 * time.Millisecond
c.Wait()
slog.Info("SSHC_DISCONNECTED", "hostname", addr, "domain", sshDomain, "dur", time.Since(t0))
}
}
// OpenStream creates a new stream.
// This uses the same channel in both directions.
func (c *SSHCMux) OpenStream(n string, data []byte) (*Stream, error) {
s, r, err := c.SSHConn.OpenChannel(n, data)
if err != nil {
return nil, err
}
go ssh.DiscardRequests(r)
return &Stream{Channel: s, clientMux: c}, nil
}
// Exec opens a client session channel for a command.
func (ssht *SSHCMux) Exec(cmd string, env map[string]string) (*RemoteExec, error) {
if ssht.SSHConn == nil {
return nil, errors.New("Only for client connections")
}
sessionCh, sessionServerReq, err := ssht.SSHConn.OpenChannel("session", nil)
if err != nil {
log.Println("Error opening session", err)
ssht.SSHConn.Close()
return nil, err
}
re := &RemoteExec{
Channel: sessionCh,
sessionServerReq: sessionServerReq,
}
// serverReq will be used only to notity that the session is over, may receive keepalives
go func() {
for msg := range sessionServerReq {
// TODO: exit-status, exit-signal messages
log.Println("SSHCMux: /ssh/srvmsg session message from server ", msg.Type, msg)
if msg.WantReply {
msg.Reply(false, nil)
}
}
}()
req := execMsg{
Command: cmd,
}
// TODO: send env first
ok, err := sessionCh.SendRequest("exec", true, ssh.Marshal(&req))
if err == nil && !ok {
log.Println("SSHCMux: Message channel failed", err)
return nil, err
}
return re, nil
}
// RemoteExec is a "session" channel.
type RemoteExec struct {
ssh.Channel
sessionServerReq <-chan *ssh.Request
}
// RFC 4254 Section 6.5.
type execMsg struct {
Command string
}