Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bump version #1780

Merged
merged 7 commits into from Apr 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 10 additions & 0 deletions client/admin_api.go
Expand Up @@ -88,6 +88,7 @@ type StatusResp struct {
Https []ProxyStatusResp `json:"https"`
Stcp []ProxyStatusResp `json:"stcp"`
Xtcp []ProxyStatusResp `json:"xtcp"`
Sudp []ProxyStatusResp `json:"sudp"`
}

type ProxyStatusResp struct {
Expand Down Expand Up @@ -155,6 +156,11 @@ func NewProxyStatusResp(status *proxy.ProxyStatus, serverAddr string) ProxyStatu
psr.LocalAddr = fmt.Sprintf("%s:%d", cfg.LocalIp, cfg.LocalPort)
}
psr.Plugin = cfg.Plugin
case *config.SudpProxyConf:
if cfg.LocalPort != 0 {
psr.LocalAddr = fmt.Sprintf("%s:%d", cfg.LocalIp, cfg.LocalPort)
}
psr.Plugin = cfg.Plugin
}
return psr
}
Expand All @@ -171,6 +177,7 @@ func (svr *Service) apiStatus(w http.ResponseWriter, r *http.Request) {
res.Https = make([]ProxyStatusResp, 0)
res.Stcp = make([]ProxyStatusResp, 0)
res.Xtcp = make([]ProxyStatusResp, 0)
res.Sudp = make([]ProxyStatusResp, 0)

log.Info("Http request [/api/status]")
defer func() {
Expand All @@ -194,6 +201,8 @@ func (svr *Service) apiStatus(w http.ResponseWriter, r *http.Request) {
res.Stcp = append(res.Stcp, NewProxyStatusResp(status, svr.cfg.ServerAddr))
case "xtcp":
res.Xtcp = append(res.Xtcp, NewProxyStatusResp(status, svr.cfg.ServerAddr))
case "sudp":
res.Sudp = append(res.Sudp, NewProxyStatusResp(status, svr.cfg.ServerAddr))
}
}
sort.Sort(ByProxyStatusResp(res.Tcp))
Expand All @@ -202,6 +211,7 @@ func (svr *Service) apiStatus(w http.ResponseWriter, r *http.Request) {
sort.Sort(ByProxyStatusResp(res.Https))
sort.Sort(ByProxyStatusResp(res.Stcp))
sort.Sort(ByProxyStatusResp(res.Xtcp))
sort.Sort(ByProxyStatusResp(res.Sudp))
return
}

Expand Down
151 changes: 151 additions & 0 deletions client/proxy/proxy.go
Expand Up @@ -102,6 +102,12 @@ func NewProxy(ctx context.Context, pxyConf config.ProxyConf, clientCfg config.Cl
BaseProxy: &baseProxy,
cfg: cfg,
}
case *config.SudpProxyConf:
pxy = &SudpProxy{
BaseProxy: &baseProxy,
cfg: cfg,
closeCh: make(chan struct{}),
}
}
return
}
Expand Down Expand Up @@ -540,6 +546,151 @@ func (pxy *UdpProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) {
udp.Forwarder(pxy.localAddr, pxy.readCh, pxy.sendCh)
}

type SudpProxy struct {
*BaseProxy

cfg *config.SudpProxyConf

localAddr *net.UDPAddr

closeCh chan struct{}
}

func (pxy *SudpProxy) Run() (err error) {
pxy.localAddr, err = net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", pxy.cfg.LocalIp, pxy.cfg.LocalPort))
if err != nil {
return
}
return
}

func (pxy *SudpProxy) Close() {
pxy.mu.Lock()
defer pxy.mu.Unlock()
select {
case <-pxy.closeCh:
return
default:
close(pxy.closeCh)
}
}

func (pxy *SudpProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) {
xl := pxy.xl
xl.Info("incoming a new work connection for sudp proxy, %s", conn.RemoteAddr().String())

if pxy.limiter != nil {
rwc := frpIo.WrapReadWriteCloser(limit.NewReader(conn, pxy.limiter), limit.NewWriter(conn, pxy.limiter), func() error {
return conn.Close()
})
conn = frpNet.WrapReadWriteCloserToConn(rwc, conn)
}

workConn := conn
readCh := make(chan *msg.UdpPacket, 1024)
sendCh := make(chan msg.Message, 1024)
isClose := false

mu := &sync.Mutex{}

closeFn := func() {
mu.Lock()
defer mu.Unlock()
if isClose {
return
}

isClose = true
if workConn != nil {
workConn.Close()
}
close(readCh)
close(sendCh)
}

// udp service <- frpc <- frps <- frpc visitor <- user
workConnReaderFn := func(conn net.Conn, readCh chan *msg.UdpPacket) {
defer closeFn()

for {
// first to check sudp proxy is closed or not
select {
case <-pxy.closeCh:
xl.Trace("frpc sudp proxy is closed")
return
default:
}

var udpMsg msg.UdpPacket
if errRet := msg.ReadMsgInto(conn, &udpMsg); errRet != nil {
xl.Warn("read from workConn for sudp error: %v", errRet)
return
}

if errRet := errors.PanicToError(func() {
readCh <- &udpMsg
}); errRet != nil {
xl.Warn("reader goroutine for sudp work connection closed: %v", errRet)
return
}
}
}

// udp service -> frpc -> frps -> frpc visitor -> user
workConnSenderFn := func(conn net.Conn, sendCh chan msg.Message) {
defer func() {
closeFn()
xl.Info("writer goroutine for sudp work connection closed")
}()

var errRet error
for rawMsg := range sendCh {
switch m := rawMsg.(type) {
case *msg.UdpPacket:
xl.Trace("frpc send udp package to frpc visitor, [udp local: %v, remote: %v], [tcp work conn local: %v, remote: %v]",
m.LocalAddr.String(), m.RemoteAddr.String(), conn.LocalAddr().String(), conn.RemoteAddr().String())
case *msg.Ping:
xl.Trace("frpc send ping message to frpc visitor")
}

if errRet = msg.WriteMsg(conn, rawMsg); errRet != nil {
xl.Error("sudp work write error: %v", errRet)
return
}
}
}

heartbeatFn := func(conn net.Conn, sendCh chan msg.Message) {
ticker := time.NewTicker(30 * time.Second)
defer func() {
ticker.Stop()
closeFn()
}()

var errRet error
for {
select {
case <-ticker.C:
if errRet = errors.PanicToError(func() {
sendCh <- &msg.Ping{}
}); errRet != nil {
xl.Warn("heartbeat goroutine for sudp work connection closed")
return
}
case <-pxy.closeCh:
xl.Trace("frpc sudp proxy is closed")
return
}
}
}

go workConnSenderFn(workConn, sendCh)
go workConnReaderFn(workConn, readCh)
go heartbeatFn(workConn, sendCh)

udp.Forwarder(pxy.localAddr, readCh, sendCh)
}

// Common handler for tcp work connections.
func HandleTcpWorkConnection(ctx context.Context, localInfo *config.LocalSvrConf, proxyPlugin plugin.Plugin,
baseInfo *config.BaseProxyConf, limiter *rate.Limiter, workConn net.Conn, encKey []byte, m *msg.StartWorkConn) {
Expand Down