Skip to content

Commit

Permalink
feat: add deadline when tun ws #55
Browse files Browse the repository at this point in the history
  • Loading branch information
Ehco1996 committed Aug 8, 2021
1 parent 5a7cbc9 commit 7ffa205
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 4 deletions.
1 change: 1 addition & 0 deletions internal/constant/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ const (

MaxMWSSStreamCnt = 100
DialTimeOut = 3 * time.Second
DefaultDeadline = 15 * time.Second

Listen_RAW = "raw"
Listen_WS = "ws"
Expand Down
58 changes: 57 additions & 1 deletion internal/transporter/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"io"
"net"
"syscall"
"time"

"go.uber.org/atomic"

Expand Down Expand Up @@ -77,7 +78,62 @@ func transport(rw1, rw2 io.ReadWriter, remote string) error {
}()

err := <-errc
// NOTE 我们不关心operror 比如 eof/reset/broken pipe
// NOTE 我们不关心 operror 比如 eof/reset/broken pipe
if err != nil {
if err == io.EOF || errors.Is(err, syscall.EPIPE) || errors.Is(err, syscall.ECONNRESET) {
err = nil
}
}
return err
}

func transportWithTimeOut(conn1 net.Conn, conn2 io.ReadWriteCloser, remote string) error {
// only set oneway deadline is enough
errChan := make(chan error, 2)
// conn1 to conn2
go func() {
buf := BufferPool.Get()
defer BufferPool.Put(buf)
for {
_ = conn1.SetDeadline(time.Now().Add(constant.DefaultDeadline))
rn, err := conn1.Read(buf)
if rn > 0 {
_, err := conn2.Write(buf[:rn])
if err != nil {
errChan <- err
return
}
web.NetWorkTransmitBytes.WithLabelValues(remote).Add(float64(rn * 2))
}
if err != nil {
errChan <- err
return
}
}
}()
// conn2 to conn1
go func() {
buf := BufferPool.Get()
defer BufferPool.Put(buf)
for {
rn, err := conn2.Read(buf)
if rn > 0 {
_, err := conn1.Write(buf[:rn])
if err != nil {
errChan <- err
return
}
web.NetWorkTransmitBytes.WithLabelValues(remote).Add(float64(rn * 2))
_ = conn1.SetDeadline(time.Now().Add(constant.DefaultDeadline))
}
if err != nil {
errChan <- err
return
}
}
}()
err := <-errChan
// NOTE 我们不关心 operror 比如 eof/reset/broken pipe
if err != nil {
if err == io.EOF || errors.Is(err, syscall.EPIPE) || errors.Is(err, syscall.ECONNRESET) {
err = nil
Expand Down
6 changes: 4 additions & 2 deletions internal/transporter/raw.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"sync"
"time"

"github.com/Ehco1996/ehco/internal/constant"
"github.com/Ehco1996/ehco/internal/lb"
"github.com/Ehco1996/ehco/internal/logger"
"github.com/Ehco1996/ehco/internal/web"
Expand Down Expand Up @@ -64,7 +65,7 @@ func (raw *Raw) HandleUDPConn(uaddr *net.UDPAddr, local *net.UDPConn) {
defer cancel()
wt := 0
for {
rc.SetReadDeadline(time.Now().Add(time.Second * 15))
_ = rc.SetDeadline(time.Now().Add(constant.DefaultDeadline))
i, err := rc.Read(buf)
if err != nil {
logger.Info(err)
Expand Down Expand Up @@ -92,6 +93,7 @@ func (raw *Raw) HandleUDPConn(uaddr *net.UDPAddr, local *net.UDPConn) {
logger.Info(err)
return
}
_ = rc.SetDeadline(time.Now().Add(constant.DefaultDeadline))
}
web.NetWorkTransmitBytes.WithLabelValues(remote).Add(float64(wt * 2))
}()
Expand Down Expand Up @@ -133,7 +135,7 @@ func (raw *Raw) HandleWsRequset(w http.ResponseWriter, req *http.Request) {
defer rc.Close()

logger.Infof("[tun] HandleWsRequset from:%s to:%s", wsc.RemoteAddr(), rc.RemoteAddr())
if err := transport(rc, wsc, remote); err != nil {
if err := transportWithTimeOut(rc, wsc, remote); err != nil {
logger.Infof("[tun] HandleWsRequset err: %s", err.Error())
}
}
Expand Down
2 changes: 1 addition & 1 deletion internal/transporter/ws.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,5 +33,5 @@ func (s *Ws) HandleTCPConn(c *net.TCPConn) error {
}
defer wsc.Close()
logger.Infof("[ws] HandleTCPConn from %s to %s", c.LocalAddr().String(), remote)
return transport(c, wsc, remote)
return transportWithTimeOut(c, wsc, remote)
}

0 comments on commit 7ffa205

Please sign in to comment.