Skip to content

Commit

Permalink
Refactor afring mock sources to simplify use and avoid race conditions
Browse files Browse the repository at this point in the history
  • Loading branch information
fako1024 committed May 16, 2023
1 parent bfed528 commit aba2da7
Show file tree
Hide file tree
Showing 3 changed files with 136 additions and 54 deletions.
60 changes: 8 additions & 52 deletions capture/afpacket/afring/afring_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ type MockSource struct {

mockBlocks chan int
mockBlockCount int
isNoDrain bool

MockFd *socket.MockFileDescriptor

Expand Down Expand Up @@ -215,55 +214,13 @@ func (m *MockSource) Pipe(src capture.Source, doneReadingChan chan struct{}) (er

// Run executes processing of packets in the background, mimicking the function of an actual kernel
// packet ring buffer
func (m *MockSource) Run() chan error {
func (m *MockSource) Run() <-chan error {
errChan := make(chan error)
go m.run(errChan)

return errChan
}

// RunNoDrain acts as a high-throughput mode to allow continuous reading the same data currently in the
// mock buffer without consuming it and with minimal overhead from handling the mock socket / semaphore
// It is intended to be used in benchmarks using the mock source to minimize measurement noise from the
// mock implementation itself
func (m *MockSource) RunNoDrain(releaseInterval time.Duration) chan error {

m.isNoDrain = true
m.FinalizeBlock(false)
m.MockFd.SetNoRelease(true)

errChan := make(chan error)
go func(errs chan error) {

defer close(errs)

// Queue / trigger a single event equivalent to receiving a new block via the PPOLL syscall and
// instruct the mock socket to not release the semaphore. That way data can be consumed immediately
// at all times
if err := event.ToMockHandler(m.eventHandler).SignalAvailableData(); err != nil {
errs <- err
return
}

// Continuously mark all blocks as available to the user at the given interval
for {
for i := 0; i < m.nBlocks; i++ {

// If the ring buffer is empty it was apparently closed / free'd
if len(m.ringBuffer.ring) == 0 {
errs <- nil
return
}

m.markBlock(i, unix.TP_STATUS_USER)
time.Sleep(releaseInterval)
}
}
}(errChan)

return errChan
}

// Done notifies the mock source that no more mock packets will be added, causing the ring buffer
// filling routine / channel to terminate once all packets have been written to the ring buffer
func (m *MockSource) Done() {
Expand All @@ -280,7 +237,7 @@ func (m *MockSource) ForceBlockRelease() {

//////////////////////////////////////////////////////////////////////////////////////////////////////

func (m *MockSource) run(errChan chan error) {
func (m *MockSource) run(errChan chan<- error) {
defer close(errChan)

for block := range m.mockBlocks {
Expand Down Expand Up @@ -323,20 +280,19 @@ func (m *MockSource) hasUserlandBlock() bool {
// Close stops / closes the capture source
func (m *MockSource) Close() error {

// Wait until all blocks have been retuned to the mock kernel
if m.isNoDrain {
m.ringBuffer.ring = nil
} else {
for m.hasUserlandBlock() {
time.Sleep(10 * time.Millisecond)
}
// Ensure that all blocks / packets have been consumed
for m.hasUserlandBlock() {
time.Sleep(10 * time.Millisecond)
}

return m.Source.Close()
}

// Free releases any pending resources from the capture source (must be called after Close())
func (m *MockSource) Free() error {
for m.MockFd.IsOpen() {
time.Sleep(10 * time.Millisecond)
}
m.ringBuffer.ring = nil
return nil
}
98 changes: 98 additions & 0 deletions capture/afpacket/afring/afring_mock_nodrain.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package afring

import (
"sync/atomic"
"time"

"github.com/fako1024/slimcap/event"
"golang.org/x/sys/unix"
)

// MockSourceNoDrain denotes a fully mocked, high-throughput ring buffer source, behaving just like one
// with the notable exception that blocks / packets are not drained but reused instead.
// Since it wraps a regular Source, it can be used as a stand-in replacement without any further
// code modifications:
//
// src, err := afring.NewSource("eth0", <options>...)
// ==>
// src, err := afring.NewMockSourceNoDrain("eth0", <options>...)
type MockSourceNoDrain struct {
*MockSource

closing atomic.Bool
doneClosing chan struct{}
}

// NewMockSourceNoDrain instantiates a new high-throughput mock ring buffer source, wrapping a regular Source
func NewMockSourceNoDrain(iface string, options ...Option) (*MockSourceNoDrain, error) {
mockSrc, err := NewMockSource(iface, options...)
if err != nil {
return nil, err
}

return &MockSourceNoDrain{
MockSource: mockSrc,
doneClosing: make(chan struct{}, 1),
}, nil
}

// Run acts as a high-throughput mode to allow continuous reading the same data currently in the
// mock buffer without consuming it and with minimal overhead from handling the mock socket / semaphore
// It is intended to be used in benchmarks using the mock source to minimize measurement noise from the
// mock implementation itself
func (m *MockSourceNoDrain) Run(releaseInterval time.Duration) <-chan error {

m.FinalizeBlock(false)
m.MockFd.SetNoRelease(true)

errChan := make(chan error)
go func(errs chan error) {

defer close(errs)

// Queue / trigger a single event equivalent to receiving a new block via the PPOLL syscall and
// instruct the mock socket to not release the semaphore. That way data can be consumed immediately
// at all times
if err := event.ToMockHandler(m.eventHandler).SignalAvailableData(); err != nil {
errs <- err
return
}

// Continuously mark all blocks as available to the user at the given interval
for {
for i := 0; i < m.nBlocks; i++ {

// If the mocks source is closing retun
if m.closing.Load() {
m.doneClosing <- struct{}{}
errs <- nil
return
}

m.markBlock(i, unix.TP_STATUS_USER)
time.Sleep(releaseInterval)
}
}
}(errChan)

return errChan
}

// Done notifies the mock source that no more mock packets will be added, causing the ring buffer
// filling routine to terminate
func (m *MockSourceNoDrain) Done() {
m.closing.Store(true)
}

//////////////////////////////////////////////////////////////////////////////////////////////////////

// Close stops / closes the capture source
func (m *MockSourceNoDrain) Close() error {

m.Done()

// Ensure that the Run() routine has terminated to avoid a race condition
<-m.doneClosing

return m.Source.Close()
}
32 changes: 30 additions & 2 deletions capture/afpacket/afring/afring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,34 @@ func TestClosedSource(t *testing.T) {
require.ErrorIs(t, err, capture.ErrCaptureStopped)
}

func TestClosedSourceNoDrain(t *testing.T) {

// Setup a mock source
mockSrc, err := NewMockSourceNoDrain("mock",
CaptureLength(link.CaptureLengthMinimalIPv4Transport),
Promiscuous(false),
BufferSize(1024*1024, 5),
)
require.Nil(t, err)
mockSrc.Run(time.Millisecond)

// Close it right away
require.Nil(t, mockSrc.Close())

// Attempt to read from the source
pkt, err := mockSrc.NextPacket(nil)
require.Nil(t, pkt)
require.ErrorIs(t, err, capture.ErrCaptureStopped)

// Free resources
require.Nil(t, mockSrc.Free())

// Attempt to read from the source
pkt, err = mockSrc.NextPacket(nil)
require.Nil(t, pkt)
require.ErrorIs(t, err, capture.ErrCaptureStopped)
}

func TestFillRingBuffer(t *testing.T) {

// Setup the original mock source
Expand Down Expand Up @@ -366,7 +394,7 @@ func BenchmarkCaptureMethods(b *testing.B) {
require.Nil(b, err)

// Setup a mock source
mockSrc, err := NewMockSource("mock",
mockSrc, err := NewMockSourceNoDrain("mock",
CaptureLength(link.CaptureLengthMinimalIPv4Transport),
Promiscuous(false),
)
Expand All @@ -375,7 +403,7 @@ func BenchmarkCaptureMethods(b *testing.B) {
for mockSrc.CanAddPackets() {
require.Nil(b, mockSrc.AddPacket(testPacket))
}
mockSrc.RunNoDrain(time.Microsecond)
mockSrc.Run(time.Microsecond)

b.Run("NextPacket", func(b *testing.B) {
b.ReportAllocs()
Expand Down

0 comments on commit aba2da7

Please sign in to comment.