forked from v2ray/v2ray-core
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathudp_server.go
171 lines (153 loc) · 3.71 KB
/
udp_server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
package udp
import (
"sync"
"time"
"v2ray.com/core/app/dispatcher"
"v2ray.com/core/common/buf"
"v2ray.com/core/common/log"
v2net "v2ray.com/core/common/net"
"v2ray.com/core/proxy"
"v2ray.com/core/transport/ray"
)
type UDPResponseCallback func(destination v2net.Destination, payload *buf.Buffer)
type TimedInboundRay struct {
name string
inboundRay ray.InboundRay
accessed chan bool
server *UDPServer
sync.RWMutex
}
func NewTimedInboundRay(name string, inboundRay ray.InboundRay, server *UDPServer) *TimedInboundRay {
r := &TimedInboundRay{
name: name,
inboundRay: inboundRay,
accessed: make(chan bool, 1),
server: server,
}
go r.Monitor()
return r
}
func (v *TimedInboundRay) Monitor() {
for {
time.Sleep(time.Second * 16)
select {
case <-v.accessed:
default:
// Ray not accessed for a while, assuming communication is dead.
v.RLock()
if v.server == nil {
v.RUnlock()
return
}
v.server.RemoveRay(v.name)
v.RUnlock()
v.Release()
return
}
}
}
func (v *TimedInboundRay) InboundInput() ray.OutputStream {
v.RLock()
defer v.RUnlock()
if v.inboundRay == nil {
return nil
}
select {
case v.accessed <- true:
default:
}
return v.inboundRay.InboundInput()
}
func (v *TimedInboundRay) InboundOutput() ray.InputStream {
v.RLock()
defer v.RUnlock()
if v.inboundRay == nil {
return nil
}
select {
case v.accessed <- true:
default:
}
return v.inboundRay.InboundOutput()
}
func (v *TimedInboundRay) Release() {
log.Debug("UDP Server: Releasing TimedInboundRay: ", v.name)
v.Lock()
defer v.Unlock()
if v.server == nil {
return
}
v.server = nil
v.inboundRay.InboundInput().Close()
v.inboundRay.InboundOutput().Release()
v.inboundRay = nil
}
type UDPServer struct {
sync.RWMutex
conns map[string]*TimedInboundRay
packetDispatcher dispatcher.PacketDispatcher
}
func NewUDPServer(packetDispatcher dispatcher.PacketDispatcher) *UDPServer {
return &UDPServer{
conns: make(map[string]*TimedInboundRay),
packetDispatcher: packetDispatcher,
}
}
func (v *UDPServer) RemoveRay(name string) {
v.Lock()
defer v.Unlock()
delete(v.conns, name)
}
func (v *UDPServer) locateExistingAndDispatch(name string, payload *buf.Buffer) bool {
log.Debug("UDP Server: Locating existing connection for ", name)
v.RLock()
defer v.RUnlock()
if entry, found := v.conns[name]; found {
outputStream := entry.InboundInput()
if outputStream == nil {
return false
}
err := outputStream.Write(payload)
if err != nil {
go entry.Release()
return false
}
return true
}
return false
}
func (v *UDPServer) Dispatch(session *proxy.SessionInfo, payload *buf.Buffer, callback UDPResponseCallback) {
source := session.Source
destination := session.Destination
// TODO: Add user to destString
destString := source.String() + "-" + destination.String()
log.Debug("UDP Server: Dispatch request: ", destString)
if v.locateExistingAndDispatch(destString, payload) {
return
}
log.Info("UDP Server: establishing new connection for ", destString)
inboundRay := v.packetDispatcher.DispatchToOutbound(session)
timedInboundRay := NewTimedInboundRay(destString, inboundRay, v)
outputStream := timedInboundRay.InboundInput()
if outputStream != nil {
outputStream.Write(payload)
}
v.Lock()
v.conns[destString] = timedInboundRay
v.Unlock()
go v.handleConnection(timedInboundRay, source, callback)
}
func (v *UDPServer) handleConnection(inboundRay *TimedInboundRay, source v2net.Destination, callback UDPResponseCallback) {
for {
inputStream := inboundRay.InboundOutput()
if inputStream == nil {
break
}
data, err := inboundRay.InboundOutput().Read()
if err != nil {
break
}
callback(source, data)
}
inboundRay.Release()
}