Skip to content

Commit

Permalink
fix: improved locking of assembler state (#307)
Browse files Browse the repository at this point in the history
<!--
Thank you for contributing to the project! 💜
Please see our [OSS process
document](https://github.com/honeycombio/home/blob/main/honeycomb-oss-lifecycle-and-practices.md#)
to get an idea of how we operate.
-->

## Which problem is this PR solving?

- #209

## Short description of the changes

During the debugging of suspected load testing issues, the following
bugs were found and fixed:
- Switch from `sync.Map` to regular map w/ mutex
3b8a008
- Ensure stats accessed concurrently are explicitly atomic
fe2ebd3
- Remove unused `sync.Waitgroup` from TCP stream factory
2e039ad
- Add packet counts for the foundMatch path in GetOrStore()

## How to verify that this has the expected result
On my local Linux machine, I compiled the main branch and the changes in
this PR, and ran them concurrently while sending several thousand HTTP
requests to a random netcat server. The results are identical:

Main:
```
DBG TCP assembler stats IPdefrag=0 active_streams=0 conn_rejected_FSM=0 event_queue_length=500 goroutines=6 rejected_FSM=0 rejected_Options=0 source_dropped=0 source_if_dropped=0 source_received=4000 total_TCP_bytes=97500 total_streams=500 uptime_ms=25450
6:33PM DBG Stopping TCP assembler assembler_page_usage="pageCache: used: 0:" closed=500
```

PR:
```
DBG TCP assembler stats IPdefrag=0 active_streams=0 conn_rejected_FSM=0 event_queue_length=500 goroutines=6 rejected_FSM=0 rejected_Options=0 source_dropped=0 source_if_dropped=0 source_received=4000 total_TCP_bytes=97500 total_streams=500 uptime_ms=20920
6:33PM DBG Stopping TCP assembler assembler_page_usage="pageCache: used: 0:" closed=500
```

---------

Signed-off-by: Dan Bond <danbond@protonmail.com>
Co-authored-by: Robb Kidd <robb@thekidds.org>
  • Loading branch information
loshz and robbkidd committed Oct 31, 2023
1 parent b214d91 commit 5cc35d2
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 59 deletions.
48 changes: 28 additions & 20 deletions assemblers/http_matcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ import (
)

type httpMatcher struct {
entries *sync.Map
messages map[int64]*entry
mtx *sync.Mutex
}

type entry struct {
Expand All @@ -21,7 +22,8 @@ type entry struct {

func newRequestResponseMatcher() *httpMatcher {
return &httpMatcher{
entries: &sync.Map{},
messages: make(map[int64]*entry),
mtx: &sync.Mutex{},
}
}

Expand All @@ -33,20 +35,23 @@ func newRequestResponseMatcher() *httpMatcher {
// If the response hasn't been seen yet,
// stores the Request for later lookup and returns match as nil and matchFound will be false.
func (m *httpMatcher) GetOrStoreRequest(key int64, timestamp time.Time, request *http.Request, packetCount int) (match *entry, matchFound bool) {
e := &entry{
m.mtx.Lock()
defer m.mtx.Unlock()

if match, matchFound = m.messages[key]; matchFound {
match.request = request
match.requestTimestamp = timestamp
match.requestPacketCount = packetCount
delete(m.messages, key)
return match, matchFound
}

m.messages[key] = &entry{
request: request,
requestTimestamp: timestamp,
requestPacketCount: packetCount,
}

if v, matchFound := m.entries.LoadOrStore(key, e); matchFound {
m.entries.Delete(key)
e = v.(*entry) // reuse allocated &entry{} to hold the match
// found entry has Response, so update it with Request
e.request = request
e.requestTimestamp = timestamp
return e, true
}
return nil, false
}

Expand All @@ -58,19 +63,22 @@ func (m *httpMatcher) GetOrStoreRequest(key int64, timestamp time.Time, request
// If the request hasn't been seen yet,
// stores the Response for later lookup and returns match as nil and matchFound will be false.
func (m *httpMatcher) GetOrStoreResponse(key int64, timestamp time.Time, response *http.Response, packetCount int) (match *entry, matchFound bool) {
e := &entry{
m.mtx.Lock()
defer m.mtx.Unlock()

if match, matchFound = m.messages[key]; matchFound {
match.response = response
match.responseTimestamp = timestamp
match.responsePacketCount = packetCount
delete(m.messages, key)
return match, matchFound
}

m.messages[key] = &entry{
response: response,
responseTimestamp: timestamp,
responsePacketCount: packetCount,
}

if v, matchFound := m.entries.LoadOrStore(key, e); matchFound {
m.entries.Delete(key)
e = v.(*entry) // reuse allocated &entry{} to hold the match
// found entry has Request, so update it with Response
e.response = response
e.responseTimestamp = timestamp
return e, true
}
return nil, false
}
55 changes: 29 additions & 26 deletions assemblers/tcp_assembler.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,35 +12,39 @@ import (
"github.com/gopacket/gopacket/layers"
"github.com/gopacket/gopacket/pcap"
"github.com/gopacket/gopacket/reassembly"
"github.com/honeycombio/honeycomb-network-agent/config"
"github.com/honeycombio/libhoney-go"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"

"github.com/honeycombio/honeycomb-network-agent/config"
)

var stats struct {
ipdefrag int
totalsz int
rejectFsm int
rejectOpt int
rejectConnFsm int
source_received int
source_dropped int
source_if_dropped int
total_streams uint64
active_streams int64
ipdefrag int
totalsz int

// Below stats could be accessed concurrently, so explicitly
// mark them as atomic.
rejectFsm atomic.Uint64
rejectOpt atomic.Uint64
rejectConnFsm atomic.Uint64
total_streams atomic.Uint64
active_streams atomic.Uint64
source_received atomic.Uint64
source_dropped atomic.Uint64
source_if_dropped atomic.Uint64
}

func IncrementStreamCount() uint64 {
return atomic.AddUint64(&stats.total_streams, 1)
return stats.total_streams.Add(1)
}

func IncrementActiveStreamCount() {
atomic.AddInt64(&stats.active_streams, 1)
stats.active_streams.Add(1)
}

func DecrementActiveStreamCount() {
atomic.AddInt64(&stats.active_streams, -1)
stats.active_streams.Add(^uint64(0))
}

type Context struct {
Expand Down Expand Up @@ -186,7 +190,6 @@ func (h *tcpAssembler) Stop() {
h.streamPool.Dump()
}

h.streamFactory.WaitGoRoutines()
h.logAssemblerStats()
log.Debug().
Int("closed", closed).
Expand All @@ -198,17 +201,17 @@ func (a *tcpAssembler) logAssemblerStats() {
statsFields := map[string]interface{}{
"uptime_ms": time.Since(a.startedAt).Milliseconds(),
"IPdefrag": stats.ipdefrag,
"rejected_FSM": stats.rejectFsm,
"rejected_Options": stats.rejectOpt,
"rejected_FSM": stats.rejectFsm.Load(),
"rejected_Options": stats.rejectOpt.Load(),
"total_TCP_bytes": stats.totalsz,
"conn_rejected_FSM": stats.rejectConnFsm,
"source_received": stats.source_received,
"source_dropped": stats.source_dropped,
"source_if_dropped": stats.source_if_dropped,
"conn_rejected_FSM": stats.rejectConnFsm.Load(),
"source_received": stats.source_received.Load(),
"source_dropped": stats.source_dropped.Load(),
"source_if_dropped": stats.source_if_dropped.Load(),
"event_queue_length": len(a.eventsChan),
"goroutines": runtime.NumGoroutine(),
"total_streams": stats.total_streams,
"active_streams": stats.active_streams,
"total_streams": stats.total_streams.Load(),
"active_streams": stats.active_streams.Load(),
}
statsEvent := libhoney.NewEvent()
statsEvent.Dataset = a.config.StatsDataset
Expand Down Expand Up @@ -261,8 +264,8 @@ func logPcapHandleStats(handle *pcap.Handle) {
log.Error().Err(err).Msg("Failed to get pcap handle stats")
continue
}
stats.source_received += pcapStats.PacketsReceived
stats.source_dropped += pcapStats.PacketsDropped
stats.source_if_dropped += pcapStats.PacketsIfDropped
stats.source_received.Add(uint64(pcapStats.PacketsReceived))
stats.source_dropped.Add(uint64(pcapStats.PacketsDropped))
stats.source_if_dropped.Add(uint64(pcapStats.PacketsIfDropped))
}
}
11 changes: 6 additions & 5 deletions assemblers/tcp_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@ import (
"github.com/gopacket/gopacket"
"github.com/gopacket/gopacket/layers"
"github.com/gopacket/gopacket/reassembly"
"github.com/honeycombio/honeycomb-network-agent/config"
"github.com/rs/zerolog/log"

"github.com/honeycombio/honeycomb-network-agent/config"
)

// tcpStream represents a TCP stream and receives TCP packets from the gopacket assembler
Expand Down Expand Up @@ -61,10 +62,10 @@ func (stream *tcpStream) Accept(tcp *layers.TCP, ci gopacket.CaptureInfo, dir re
// FSM
if !stream.tcpstate.CheckState(tcp, dir) {
// Error("FSM", "%s: Packet rejected by FSM (state:%s)\n", t.ident, t.tcpstate.String())
stats.rejectFsm++
stats.rejectFsm.Add(1)
if !stream.fsmerr {
stream.fsmerr = true
stats.rejectConnFsm++
stats.rejectConnFsm.Add(1)
}
if !stream.config.Ignorefsmerr {
return false
Expand All @@ -74,7 +75,7 @@ func (stream *tcpStream) Accept(tcp *layers.TCP, ci gopacket.CaptureInfo, dir re
err := stream.optchecker.Accept(tcp, ci, dir, nextSeq, start)
if err != nil {
// Error("OptionChecker", "%s: Packet rejected by OptionChecker: %s\n", t.ident, err)
stats.rejectOpt++
stats.rejectOpt.Add(1)
if !stream.config.Nooptcheck {
return false
}
Expand All @@ -98,7 +99,7 @@ func (stream *tcpStream) Accept(tcp *layers.TCP, ci gopacket.CaptureInfo, dir re
}
}
if !accept {
stats.rejectOpt++
stats.rejectOpt.Add(1)
}
return accept
}
Expand Down
10 changes: 2 additions & 8 deletions assemblers/tcp_stream_factory.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,16 @@
package assemblers

import (
"sync"

"github.com/gopacket/gopacket"
"github.com/gopacket/gopacket/layers"
"github.com/gopacket/gopacket/reassembly"
"github.com/honeycombio/honeycomb-network-agent/config"
"github.com/rs/zerolog/log"

"github.com/honeycombio/honeycomb-network-agent/config"
)

type tcpStreamFactory struct {
config config.Config
wg sync.WaitGroup
eventsChan chan Event
}

Expand All @@ -31,7 +29,3 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.T
IncrementActiveStreamCount()
return NewTcpStream(net, transport, factory.config, factory.eventsChan)
}

func (factory *tcpStreamFactory) WaitGoRoutines() {
factory.wg.Wait()
}

0 comments on commit 5cc35d2

Please sign in to comment.