Skip to content

Commit

Permalink
Merge pull request #9 from getlantern/ox/speed
Browse files Browse the repository at this point in the history
Sped up packetforward server-side processing by using larger read buf…
  • Loading branch information
oxtoacart committed Apr 21, 2020
2 parents ac9456c + 8ff36cb commit 11933f3
Show file tree
Hide file tree
Showing 6 changed files with 98 additions and 39 deletions.
2 changes: 1 addition & 1 deletion .drone.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,4 @@ steps:
- go test -race
- go test -covermode=count -coverprofile=profile.cov
- go get github.com/mattn/goveralls
- goveralls -coverprofile=profile.cov
- goveralls -service drone.io -coverprofile=profile.cov
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/getlantern/packetforward

go 1.12
go 1.13

require (
github.com/getlantern/errors v0.0.0-20190325191628-abdb3e3e36f7
Expand Down
File renamed without changes.
3 changes: 3 additions & 0 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ type Opts struct {

// BufferPoolSize is the size of the buffer pool in bytes. If not specified, defaults to 1 MB
BufferPoolSize int

// ReadBufferSize is the size of the read buffer for reading framed packets from clients. If not specified, defaults to gonat.MaximumIPPacketSize
ReadBufferSize int
}

type Server interface {
Expand Down
127 changes: 90 additions & 37 deletions server/server_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package server
import (
"errors"
"io"
"math"
"net"
"sync"
"sync/atomic"
Expand All @@ -30,21 +29,28 @@ var (
const (
// DefaultBufferPoolSize is 1MB
DefaultBufferPoolSize = 1000000

// DefaultReadBufferSize is gonat.MaximumIPPacketSize
DefaultReadBufferSize = gonat.MaximumIPPacketSize
)

const (
maxListenDelay = 1 * time.Second

baseIODelay = 10 * time.Millisecond
maxIODelay = 1 * time.Second
baseIODelay = 250 * time.Millisecond
maxIODelay = 10 * time.Second
)

type server struct {
opts *Opts
clients map[string]*client
clientsMx sync.Mutex
close chan interface{}
closed chan interface{}
successfulReads int64
failedReads int64
successfulWrites int64
failedWrites int64
opts *Opts
clients map[string]*client
clientsMx sync.Mutex
close chan interface{}
closed chan interface{}
}

// NewServer constructs a new unstarted packetforward Server. The server can be started by
Expand All @@ -54,6 +60,10 @@ func NewServer(opts *Opts) (Server, error) {
opts.BufferPoolSize = DefaultBufferPoolSize
}

if opts.ReadBufferSize <= 0 {
opts.ReadBufferSize = DefaultReadBufferSize
}

// Apply defaults
err := opts.ApplyDefaults()
if err != nil {
Expand Down Expand Up @@ -107,7 +117,7 @@ func (s *server) handle(conn net.Conn) {
framedConn := framed.NewReadWriteCloser(conn)
framedConn.EnableBigFrames()
framedConn.DisableThreadSafety()
framedConn.EnableBuffering(gonat.MaximumIPPacketSize)
framedConn.EnableBuffering(s.opts.ReadBufferSize)

// Read client ID
b := make([]byte, 36)
Expand All @@ -128,6 +138,7 @@ func (s *server) handle(conn net.Conn) {
s: s,
framedConn: efc,
}
c.markActive()

gn, err := gonat.NewServer(c, &s.opts.Opts)
if err != nil {
Expand Down Expand Up @@ -172,11 +183,12 @@ func (s *server) Close() error {
}

type client struct {
id string
s *server
framedConn eventual.Value
lastActive int64
mx sync.RWMutex
failedOnCurrentConn int64
lastActive int64
id string
s *server
framedConn eventual.Value
mx sync.RWMutex
}

func (c *client) getFramedConn(timeout time.Duration) *framed.ReadWriteCloser {
Expand All @@ -192,60 +204,78 @@ func (c *client) attach(framedConn io.ReadWriteCloser) {
if oldFramedConn != nil {
go oldFramedConn.Close()
}
atomic.StoreInt64(&c.failedOnCurrentConn, 0)
c.framedConn.Set(framedConn)
}

func (c *client) Read(b bpool.ByteSlice) (int, error) {
i := float64(0)
i := 0
for {
conn := c.getFramedConn(c.s.opts.IdleTimeout)
if conn == nil {
if conn == nil || c.idle() {
return c.finished(io.EOF)
}

if c.isFailedOnCurrentConn() {
// wait for client to reconnect before idling
i = sleepWithExponentialBackoff(i)
continue
}

// we're not failed, let's read
i = 0

n, err := conn.Read(b.Bytes())
if err == nil {
c.markActive()
atomic.AddInt64(&c.s.successfulReads, 1)
return n, err
}
if c.idle() {
return c.finished(io.EOF)
}
// ignore errors and retry, because clients can reconnect
sleepTime := time.Duration(math.Pow(2, i)) * baseIODelay
if sleepTime > maxIODelay {
sleepTime = maxIODelay
}
time.Sleep(sleepTime)
i++

// reading failed, but it might succeed in the future if the client reconnects, so don't give up
atomic.AddInt64(&c.s.failedReads, 1)
c.markFailedOnCurrentConn()
}
}

func (c *client) Write(b bpool.ByteSlice) (int, error) {
i := float64(0)
i := 0
for {
conn := c.getFramedConn(c.s.opts.IdleTimeout)
if conn == nil {
return c.finished(ErrNoConnection)
}
if c.idle() {
return c.finished(idletiming.ErrIdled)
}

if c.isFailedOnCurrentConn() {
// wait for client to reconnect before idling
i = sleepWithExponentialBackoff(i)
continue
}

// we're not failed, let's write
i = 0

n, err := conn.WriteAtomic(b)
if err == nil {
atomic.AddInt64(&c.s.successfulWrites, 1)
c.markActive()
return n, err
}
if c.idle() {
return c.finished(idletiming.ErrIdled)
}
// ignore errors and retry, because clients can reconnect
sleepTime := time.Duration(math.Pow(2, i)) * baseIODelay
if sleepTime > maxIODelay {
sleepTime = maxIODelay
}
time.Sleep(sleepTime)
i++

// writing failed, but it might succeed in the future if the client reconnects, so don't give up
atomic.AddInt64(&c.s.failedWrites, 1)
c.markFailedOnCurrentConn()
}
}

func (c *client) finished(err error) (int, error) {
current := c.getFramedConn(0)
if current != nil {
current.Close()
}
c.s.forgetClient(c.id)
return 0, err
}
Expand All @@ -254,6 +284,29 @@ func (c *client) markActive() {
atomic.StoreInt64(&c.lastActive, time.Now().UnixNano())
}

func (c *client) markFailedOnCurrentConn() {
atomic.StoreInt64(&c.failedOnCurrentConn, 1)
current := c.getFramedConn(0)
if current != nil {
current.Close()
}
}

func (c *client) isFailedOnCurrentConn() bool {
return atomic.LoadInt64(&c.failedOnCurrentConn) == 1
}

func (c *client) idle() bool {
return time.Duration(time.Now().UnixNano()-atomic.LoadInt64(&c.lastActive)) > c.s.opts.IdleTimeout
}

func sleepWithExponentialBackoff(i int) int {
sleepTime := time.Duration(2 << i * baseIODelay)
if sleepTime > maxIODelay {
sleepTime = maxIODelay
} else {
i++
}
time.Sleep(sleepTime)
return i
}
3 changes: 3 additions & 0 deletions server/stats_linux.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package server

import (
"sync/atomic"
"time"
)

Expand All @@ -20,6 +21,8 @@ func (s *server) printStats() {
numClients := len(s.clients)
s.clientsMx.Unlock()
log.Debugf("Number of Clients: %d", numClients)
log.Debugf("Reads Succeeded: %d Failed: %d", atomic.LoadInt64(&s.successfulReads), atomic.LoadInt64(&s.failedReads))
log.Debugf("Writes Succeeded: %d Failed: %d", atomic.LoadInt64(&s.successfulWrites), atomic.LoadInt64(&s.failedWrites))
}
}
}

0 comments on commit 11933f3

Please sign in to comment.