Skip to content

Commit

Permalink
Allow passing in existing StatsTracker
Browse files Browse the repository at this point in the history
  • Loading branch information
oxtoacart committed May 28, 2019
1 parent 96d4456 commit dc3eeb0
Show file tree
Hide file tree
Showing 5 changed files with 140 additions and 100 deletions.
25 changes: 9 additions & 16 deletions gonat.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,22 +43,6 @@ type Server interface {
// Serve starts processing packets and blocks until finished
Serve() error

// Count of accepted packets
AcceptedPackets() int

// Count of invalid packets (unknown destination, wrong IP version, etc.)
InvalidPackets() int

// Count of packets dropped due to being stalled writing down or upstream, being unable to assign a port
// open a connection, etc.
DroppedPackets() int

// Number of TCP connections being tracked
NumTCPConns() int

// Number of UDP connections being tracked
NumUDPConns() int

// Close stops the server and cleans up resources
Close() error
}
Expand All @@ -85,6 +69,12 @@ type Opts struct {
// automatically closed. The default is <DefaultIdleTimeout>.
IdleTimeout time.Duration

// StatsTracker allows specifying an existing StatsTracker to use for tracking
// stats. If not specified, one will be created using the configured StatsInterval.
// Note - the StatsTracker has to be manually closed using its Close() method, otherwise
// it will keep logging stats.
StatsTracker *StatsTracker

// StatsInterval controls how frequently to display stats. Defaults to
// <DefaultStatsInterval>.
StatsInterval time.Duration
Expand Down Expand Up @@ -115,6 +105,9 @@ func (opts *Opts) ApplyDefaults() error {
if opts.StatsInterval <= 0 {
opts.StatsInterval = DefaultStatsInterval
}
if opts.StatsTracker == nil {
opts.StatsTracker = NewStatsTracker(opts.StatsInterval)
}
if opts.OnOutbound == nil {
opts.OnOutbound = func(pkt *IPPacket) {}
}
Expand Down
18 changes: 6 additions & 12 deletions gonat_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,6 @@ import (
)

type server struct {
acceptedPackets int64
invalidPackets int64
droppedPackets int64
numTCPConns int64
numUDPConns int64

tcpSocket io.ReadWriteCloser
udpSocket io.ReadWriteCloser
downstream io.ReadWriter
Expand Down Expand Up @@ -106,7 +100,7 @@ func (s *server) Serve() error {
return errors.New("Unable to obtain connection for managing conntrack: %v", err)
}

ops.Go(s.trackStats)
s.opts.StatsTracker.start()
ops.Go(s.dispatch)
ops.Go(s.writeToDownstream)
return s.readFromDownstream()
Expand Down Expand Up @@ -175,12 +169,12 @@ func (s *server) onPacketFromDownstream(pkt *IPPacket) {
s.connsByDownFT[downFT] = c
s.connsByUpFT[upFT] = c
c.markActive()
s.addConn(pkt.IPProto)
s.opts.StatsTracker.addConn(pkt.IPProto)
}
select {
case c.toUpstream <- pkt:
log.Tracef("Transmit -- %v -> %v", c.downFT, c.upFT)
s.acceptedPacket()
s.opts.StatsTracker.acceptedPacket()
default:
// don't block if we're stalled writing upstream
log.Tracef("Stalled writing packet %v upstream", downFT)
Expand Down Expand Up @@ -209,7 +203,7 @@ func (s *server) onPacketFromUpstream(pkt *IPPacket) {
case s.toDownstream <- pkt:
// okay
log.Tracef("Transmit -- %v <- %v", c.downFT, c.upFT)
s.acceptedPacket()
s.opts.StatsTracker.acceptedPacket()
default:
log.Tracef("Stalled writing packet %v downstream", c.downFT)
s.dropPacket(pkt)
Expand Down Expand Up @@ -332,12 +326,12 @@ func (s *server) readFromUpstream(socket io.ReadWriteCloser) {
}

func (s *server) rejectPacket(b []byte) {
s.invalidPacket()
s.opts.StatsTracker.invalidPacket()
s.bufferPool.Put(b)
}

func (s *server) dropPacket(pkt *IPPacket) {
s.droppedPacket()
s.opts.StatsTracker.droppedPacket()
s.bufferPool.Put(pkt.Raw)
}

Expand Down
1 change: 1 addition & 0 deletions gonat_linux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ func TestEndToEnd(t *testing.T) {
assert.Equal(t, io.EOF, s.Serve())
_s := s.(*server)
assert.True(t, _s.bufferPool.NumPooled() > 0, "buffers should be returned to pool")
_s.opts.StatsTracker.Close()
close(finishedCh)
}()
return func() error { return nil }, nil
Expand Down
124 changes: 124 additions & 0 deletions stats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package gonat

import (
"sync"
"sync/atomic"
"syscall"
"time"

"github.com/getlantern/ops"
)

// StatsTracker tracks statistics for one or more gonat servers.
type StatsTracker struct {
acceptedPackets int64
invalidPackets int64
droppedPackets int64
numTCPConns int64
numUDPConns int64
statsInterval time.Duration
startOnce sync.Once
stop chan interface{}
stopped chan interface{}
}

// NewStatsTracker creates a new StatsTracker that will log stats at the given statsInterval.
// Logging only begins once a Server using this StatsTracker is started, and continues until
// Stop is called
func NewStatsTracker(statsInterval time.Duration) *StatsTracker {
return &StatsTracker{
statsInterval: statsInterval,
stop: make(chan interface{}),
stopped: make(chan interface{}),
}
}

// Stop stops the StatsTracker
func (s *StatsTracker) Close() error {
select {
case <-s.stop:
// already stopped
default:
close(s.stop)
}
<-s.stopped
return nil
}

func (s *StatsTracker) start() {
s.startOnce.Do(func() {
ops.Go(s.trackStats)
})
}

func (s *StatsTracker) trackStats() {
defer close(s.stopped)
ticker := time.NewTicker(s.statsInterval)
defer ticker.Stop()

for {
select {
case <-s.stop:
return
case <-ticker.C:
log.Debugf("TCP Conns: %v UDP Conns: %v", s.NumTCPConns(), s.NumUDPConns())
log.Debugf("Invalid Packets: %d Accepted Packets: %d Dropped Packets: %d", s.InvalidPackets(), s.AcceptedPackets(), s.DroppedPackets())
}
}
}

func (s *StatsTracker) acceptedPacket() {
atomic.AddInt64(&s.acceptedPackets, 1)
}

// AcceptedPackets gives a count of accepted packets
func (s *StatsTracker) AcceptedPackets() int {
return int(atomic.LoadInt64(&s.acceptedPackets))
}

func (s *StatsTracker) invalidPacket() {
atomic.AddInt64(&s.invalidPackets, 1)
}

// InvalidPackets gives a count of invalid packets (unknown destination, wrong IP version, etc.)
func (s *StatsTracker) InvalidPackets() int {
return int(atomic.LoadInt64(&s.invalidPackets))
}

func (s *StatsTracker) droppedPacket() {
atomic.AddInt64(&s.droppedPackets, 1)
}

// DroppedPackets gives a count of packets dropped due to being stalled writing down or upstream,
// being unable to assign a port open a connection, etc.
func (s *StatsTracker) DroppedPackets() int {
return int(atomic.LoadInt64(&s.droppedPackets))
}

func (s *StatsTracker) addConn(proto uint8) {
switch proto {
case syscall.IPPROTO_TCP:
atomic.AddInt64(&s.numTCPConns, 1)
case syscall.IPPROTO_UDP:
atomic.AddInt64(&s.numUDPConns, 1)
}
}

func (s *StatsTracker) removeConn(proto uint8) {
switch proto {
case syscall.IPPROTO_TCP:
atomic.AddInt64(&s.numTCPConns, -1)
case syscall.IPPROTO_UDP:
atomic.AddInt64(&s.numUDPConns, -1)
}
}

// NumTCPConns gives a count of the number of TCP connections being tracked
func (s *StatsTracker) NumTCPConns() int {
return int(atomic.LoadInt64(&s.numTCPConns))
}

// NumUDPConns gives a count of the number of UDP connections being tracked
func (s *StatsTracker) NumUDPConns() int {
return int(atomic.LoadInt64(&s.numUDPConns))
}
72 changes: 0 additions & 72 deletions stats_linux.go

This file was deleted.

0 comments on commit dc3eeb0

Please sign in to comment.