forked from eycorsican/go-tun2socks
/
udp.go
138 lines (121 loc) · 2.91 KB
/
udp.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
package v2ray
import (
"context"
"errors"
"fmt"
"net"
"sync"
"time"
vcore "v2ray.com/core"
vsession "v2ray.com/core/common/session"
vsignal "v2ray.com/core/common/signal"
vtask "v2ray.com/core/common/task"
"github.com/eycorsican/go-tun2socks/common/log"
"github.com/eycorsican/go-tun2socks/core"
)
type udpConnEntry struct {
conn net.PacketConn
// `ReadFrom` method of PacketConn given by V2Ray
// won't return the correct remote address, we treat
// all data receive from V2Ray are coming from the
// same remote host, i.e. the `target` that passed
// to `Connect`.
target net.Addr
updater vsignal.ActivityUpdater
}
type udpHandler struct {
sync.Mutex
ctx context.Context
v *vcore.Instance
conns map[core.UDPConn]*udpConnEntry
timeout time.Duration // Maybe override by V2Ray local policies for some conns.
}
func (h *udpHandler) fetchInput(conn core.UDPConn) {
h.Lock()
c, ok := h.conns[conn]
h.Unlock()
if !ok {
return
}
buf := core.NewBytes(core.BufSize)
defer core.FreeBytes(buf)
for {
n, _, err := c.conn.ReadFrom(buf)
if err != nil && n <= 0 {
h.Close(conn)
conn.Close()
return
}
c.updater.Update()
_, err = conn.WriteFrom(buf[:n], c.target)
if err != nil {
h.Close(conn)
conn.Close()
return
}
}
}
func NewUDPHandler(ctx context.Context, instance *vcore.Instance, timeout time.Duration) core.UDPConnHandler {
return &udpHandler{
ctx: ctx,
v: instance,
conns: make(map[core.UDPConn]*udpConnEntry, 16),
timeout: timeout,
}
}
func (h *udpHandler) Connect(conn core.UDPConn, target net.Addr) error {
if target == nil {
return errors.New("nil target is not allowed")
}
sid := vsession.NewID()
ctx := vsession.ContextWithID(h.ctx, sid)
ctx, cancel := context.WithCancel(ctx)
pc, err := vcore.DialUDP(ctx, h.v)
if err != nil {
return errors.New(fmt.Sprintf("dial V proxy connection failed: %v", err))
}
timer := vsignal.CancelAfterInactivity(ctx, cancel, h.timeout)
h.Lock()
h.conns[conn] = &udpConnEntry{
conn: pc,
target: target,
updater: timer,
}
h.Unlock()
fetchTask := func() error {
h.fetchInput(conn)
return nil
}
go func() {
if err := vtask.Run(ctx, fetchTask); err != nil {
pc.Close()
}
}()
log.Infof("new proxy connection for target: %s:%s", target.Network(), target.String())
return nil
}
func (h *udpHandler) DidReceiveTo(conn core.UDPConn, data []byte, addr net.Addr) error {
h.Lock()
c, ok := h.conns[conn]
h.Unlock()
if ok {
_, err := c.conn.WriteTo(data, addr)
c.updater.Update()
if err != nil {
h.Close(conn)
return errors.New(fmt.Sprintf("write remote failed: %v", err))
}
return nil
} else {
h.Close(conn)
return errors.New(fmt.Sprintf("proxy connection %v->%v does not exists", conn.LocalAddr(), c.target))
}
}
func (h *udpHandler) Close(conn core.UDPConn) {
h.Lock()
defer h.Unlock()
if c, found := h.conns[conn]; found {
c.conn.Close()
}
delete(h.conns, conn)
}