-
Notifications
You must be signed in to change notification settings - Fork 0
/
forwarder.go
160 lines (147 loc) · 3.41 KB
/
forwarder.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
package client
import (
"context"
"fmt"
"github.com/SuzukiHonoka/spaceship/internal/transport"
"github.com/SuzukiHonoka/spaceship/internal/transport/rpc"
proxy "github.com/SuzukiHonoka/spaceship/internal/transport/rpc/proto"
"io"
"os"
"time"
)
type Forwarder struct {
Ctx context.Context
Stream proxy.Proxy_ProxyClient
Writer io.Writer
Reader io.Reader
LocalAddr chan string
}
func NewForwarder(ctx context.Context, s proxy.Proxy_ProxyClient, w io.Writer, r io.Reader) *Forwarder {
return &Forwarder{
Ctx: ctx,
Stream: s,
Writer: w,
Reader: r,
LocalAddr: make(chan string),
}
}
func (f *Forwarder) copySRCtoTarget(buf []byte) error {
//log.Println("rpc client reading...")
//read from src
n, err := f.Reader.Read(buf)
if err != nil {
return err
}
//fmt.Printf("<----- packet size: %d\n%s\n", n, buf)
// send to rpc
srcData := &proxy.ProxySRC{
Data: buf[:n],
}
return f.Stream.Send(srcData)
//log.Println("rpc client msg forwarded")
}
func (f *Forwarder) CopyTargetToSRC() error {
buf := new(proxy.ProxyDST)
for {
select {
case <-f.Ctx.Done():
return nil
default:
if err := f.copyTargetToSRC(buf); err != nil {
return err
}
}
}
}
func (f *Forwarder) copyTargetToSRC(buf *proxy.ProxyDST) error {
//log.Println("rpc server reading..")
var err error
if buf, err = f.Stream.Recv(); err != nil {
return err
}
//log.Printf("rpc client on receive: %d", res.Status)
//fmt.Printf("----> \n%s\n", res.Data)
switch buf.Status {
case proxy.ProxyStatus_Session:
//log.Printf("target: %s", string(res.Data))
if _, err = f.Writer.Write(buf.Data); err != nil {
// log.Printf("error when sending client request to target stream: %v", err)
return err
}
//log.Println("rpc server msg forwarded")
case proxy.ProxyStatus_Accepted:
f.LocalAddr <- buf.Addr
case proxy.ProxyStatus_EOF:
return io.EOF
case proxy.ProxyStatus_Error:
close(f.LocalAddr)
return transport.ErrServerFailed
default:
return fmt.Errorf("unknown status: %d", buf.Status)
}
return nil
}
func (f *Forwarder) CopySRCtoTarget() error {
// buffer
buf := make([]byte, transport.BufferSize)
for {
select {
case <-f.Ctx.Done():
return nil
default:
if err := f.copySRCtoTarget(buf); err != nil {
return err
}
}
}
}
func (f *Forwarder) Start(req *transport.Request, localAddrChan chan<- string) error {
// handshake and get localAddr first
if err := f.Stream.Send(&proxy.ProxySRC{
Id: uuid,
Fqdn: req.Host,
Port: uint32(req.Port),
}); err != nil {
return err
}
// buffered err ch
proxyErrChan := make(chan error, 2)
// rpc stream receiver
go func() {
err := f.CopyTargetToSRC()
if err != nil {
err = fmt.Errorf("rpc: src <- server <- %s: %w", req.Host, err)
}
proxyErrChan <- err
}()
// rpc sender
go func() {
err := f.CopySRCtoTarget()
if err != nil {
err = fmt.Errorf("rpc: src -> server -> %s: %w", req.Host, err)
}
proxyErrChan <- err
}()
// ack timeout
t := time.NewTimer(rpc.GeneralTimeout)
select {
case <-t.C:
//timed out
return fmt.Errorf("rpc: server -> %s timed out: %w", req.Host, os.ErrDeadlineExceeded)
case localAddr, ok := <-f.LocalAddr:
if ok {
localAddrChan <- localAddr
t.Stop()
}
// done
//log.Printf("rpc: server -> %s success", req.Host)
}
// wait 2 error
var err error
for i := 0; i < 2; i++ {
if proxyErr := <-proxyErrChan; proxyErr != nil {
err = proxyErr
}
}
return err
}