Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[bugfix] Ring buffer block not automatically released without call to nextPacket() #34

Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 4 additions & 6 deletions capture/afpacket/afpacket/afpacket.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ func (s *Source) NextIPPacket(pBuf capture.IPLayer) (capture.IPLayer, capture.Pa
// must be completed prior to any subsequent call to any Next*() method.
func (s *Source) NextPacketFn(fn func(payload []byte, totalLen uint32, pktType capture.PacketType, ipLayerOffset byte) error) error {

if s.eventHandler.Fd == 0 {
if !s.eventHandler.Fd.IsOpen() {
return errors.New("cannot NextPacketFn() on closed capture source")
}

Expand Down Expand Up @@ -268,8 +268,6 @@ func (s *Source) Close() error {
return err
}

s.eventHandler.Fd = -1

return nil
}

Expand All @@ -278,7 +276,7 @@ func (s *Source) Free() error {
if s == nil {
return errors.New("cannot call Free() on nil capture source")
}
if s.eventHandler.Fd >= 0 {
if !s.eventHandler.Fd.IsOpen() {
return errors.New("cannot call Free() on open capture source, call Close() first")
}

Expand All @@ -289,7 +287,7 @@ func (s *Source) Free() error {

func (s *Source) nextPacketInto(data capture.Packet) (int, error) {

if s.eventHandler.Fd == 0 {
if !s.eventHandler.Fd.IsOpen() {
return -1, errors.New("cannot nextPacketInto() on closed capture source")
}

Expand Down Expand Up @@ -332,7 +330,7 @@ retry:

func (s *Source) nextPayloadInto(data capture.IPLayer) (int, capture.PacketType, uint32, error) {

if s.eventHandler.Fd == 0 {
if !s.eventHandler.Fd.IsOpen() {
return -1, capture.PacketUnknown, 0, errors.New("cannot nextPacketInto() on closed capture source")
}

Expand Down
16 changes: 9 additions & 7 deletions capture/afpacket/afpacket/afpacket_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type MockSource struct {
*Source

mockPackets chan capture.Packet
mockFd *socket.MockFileDescriptor
MockFd *socket.MockFileDescriptor
packetAddCallbackFn func(payload []byte, totalLen uint32, pktType, ipLayerOffset byte)
}

Expand All @@ -40,7 +40,7 @@ func (m *MockSource) PacketAddCallbackFn(fn func(payload []byte, totalLen uint32

// AddPacket adds a new mock packet to the source
// This can happen prior to calling run or continuously while consuming data
func (m *MockSource) AddPacket(pkt capture.Packet) {
func (m *MockSource) AddPacket(pkt capture.Packet) error {
m.mockPackets <- pkt

// If a callback function was provided, execute it
Expand All @@ -50,7 +50,9 @@ func (m *MockSource) AddPacket(pkt capture.Packet) {

// We count packets as "seen" when they enter the pipeline, not when they are
// consumed from the buffer
m.mockFd.IncrementPacketCount(1)
m.MockFd.IncrementPacketCount(1)

return nil
}

// AddPacketFromSource consumes a single packet from the provided source and adds it to the source
Expand Down Expand Up @@ -100,7 +102,7 @@ func NewMockSource(iface string, options ...Option) (*MockSource, error) {
return &MockSource{
Source: src,
mockPackets: make(chan capture.Packet, packetBufferDepth),
mockFd: mockFd,
MockFd: mockFd,
}, nil
}

Expand Down Expand Up @@ -156,7 +158,7 @@ func (m *MockSource) RunNoDrain() chan error {
// Queue / trigger a single event equivalent to receiving a new block via the PPOLL syscall and
// instruct the mock socket to not release the semaphore. That way data can be consumed immediately
// at all times
m.mockFd.SetNoRelease(true)
m.MockFd.SetNoRelease(true)
if err := event.ToMockHandler(m.eventHandler).SignalAvailableData(); err != nil {
errs <- err
return
Expand All @@ -165,7 +167,7 @@ func (m *MockSource) RunNoDrain() chan error {
// Continuously mark all blocks as available to the user at the given interval
for {
for i := 0; i < len(packets); i++ {
m.mockFd.Put(packets[i])
m.MockFd.Put(packets[i])
}
}
}(errChan)
Expand All @@ -187,7 +189,7 @@ func (m *MockSource) run(errChan chan error) {

for pkt := range m.mockPackets {

m.mockFd.Put(pkt)
m.MockFd.Put(pkt)

// Queue / trigger an event equivalent to receiving a new packet via the PPOLL syscall
if err := event.ToMockHandler(m.eventHandler).SignalAvailableData(); err != nil {
Expand Down
14 changes: 6 additions & 8 deletions capture/afpacket/afring/afring.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ func (s *Source) Stats() (capture.Stats, error) {
// Unblock ensures that a potentially ongoing blocking poll operation is released (returning an ErrCaptureUnblock from
// any potentially ongoing call to Next*() that might currently be blocked)
func (s *Source) Unblock() error {
if s == nil || s.eventHandler.Efd < 0 || s.eventHandler.Fd < 0 {
if s == nil || s.eventHandler.Efd < 0 || !s.eventHandler.Fd.IsOpen() {
return errors.New("cannot call Unblock() on nil / closed capture source")
}

Expand All @@ -288,7 +288,7 @@ func (s *Source) Unblock() error {

// Close stops / closes the capture source
func (s *Source) Close() error {
if s == nil || s.eventHandler.Efd < 0 || s.eventHandler.Fd < 0 {
if s == nil || s.eventHandler.Efd < 0 || !s.eventHandler.Fd.IsOpen() {
return errors.New("cannot call Close() on nil / closed capture source")
}

Expand All @@ -300,8 +300,6 @@ func (s *Source) Close() error {
return err
}

s.eventHandler.Fd = -1

return nil
}

Expand All @@ -310,7 +308,7 @@ func (s *Source) Free() error {
if s == nil {
return errors.New("cannot call Free() on nil capture source")
}
if s.eventHandler.Fd >= 0 {
if s.eventHandler.Fd.IsOpen() {
return errors.New("cannot call Free() on open capture source, call Close() first")
}

Expand All @@ -328,9 +326,9 @@ func (s *Source) Link() *link.Link {

func (s *Source) nextPacket() error {

// If the socket is invalid the capture is obviously closed and we return the respective
// If the ring buffer is invalid the capture is obviously closed and we return the respective
// error
if s.eventHandler.Fd < 0 {
if s == nil || s.ring == nil {
return capture.ErrCaptureStopped
}

Expand Down Expand Up @@ -435,7 +433,7 @@ func (s *Source) handleEvent() error {

func setupRingBuffer(sd socket.FileDescriptor, tPacketReq tPacketRequest) ([]byte, event.EvtFileDescriptor, error) {

if sd <= 0 {
if !sd.IsOpen() {
return nil, -1, errors.New("invalid socket")
}

Expand Down
77 changes: 42 additions & 35 deletions capture/afpacket/afring/afring_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"io"
"net"
"sync"
"sync/atomic"
"time"
"unsafe"

Expand Down Expand Up @@ -35,8 +36,7 @@ type MockSource struct {
mockBlocks chan int
mockBlockCount int

mockFd *socket.MockFileDescriptor
isClosed bool
MockFd *socket.MockFileDescriptor

packetAddCallbackFn func(payload []byte, totalLen uint32, pktType, ipLayerOffset byte)
}
Expand Down Expand Up @@ -83,7 +83,7 @@ func NewMockSource(iface string, options ...Option) (*MockSource, error) {
return &MockSource{
Source: src,
mockBlocks: make(chan int, src.nBlocks),
mockFd: mockFd,
MockFd: mockFd,
}, nil
}

Expand Down Expand Up @@ -156,7 +156,7 @@ func (m *MockSource) addPacket(payload []byte, totalLen uint32, pktType, ipLayer

// Similar to the actual kernel ring buffer, we count packets as "seen" when they enter
// the pipeline, not when they are consumed from the buffer
m.mockFd.IncrementPacketCount(1)
m.MockFd.IncrementPacketCount(1)

// If a callback function was provided, execute it
if m.packetAddCallbackFn != nil {
Expand Down Expand Up @@ -187,35 +187,18 @@ func (m *MockSource) CanAddPackets() bool {
// the ring buffer / TPacketHeader block retirement setting for population of the ring buffer
func (m *MockSource) Pipe(src capture.Source) chan error {
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've simplified the Pipe() method significantly, dropping the whole "fake" block expiry simulation. Thing is: This was racy by definition, because it would create yet another goroutine that was accessing individual parts of the mock ring buffer (and not guarded by the memory barrier of the getBlockStatus() and markBlock() calls. Fixing that would have required an additional memory barrier, which would also have affected the normal, non-mock process (both in terms of complication and performance). Given that the expiry feature was nice to have but in no way important I think opting for simplicity and performance is the best course of action.

errChan := make(chan error)
go func(errs chan error) {
pipe := make(chan error)

// Run the next capture attempt in a goroutine to allow timing out the operation
go func(errs chan error) {
for {
go func() {
pipe <- m.AddPacketFromSource(src)
}()

retry:
select {
// Simulate TPacket block retirement
case <-time.After(time.Duration(m.ringBuffer.tpReq.retire_blk_tov) * time.Millisecond):

// To ensure the process cannot enter a deadlock, block finalization is forced (just as
// it would be the case for the actual ring buffer) even if no packets were received
m.FinalizeBlock(true)
goto retry

case err := <-pipe:
if err != nil {
if errors.Is(err, io.EOF) || errors.Is(err, capture.ErrCaptureStopped) {
m.FinalizeBlock(false)
m.Done()
return
}
errs <- err
if err := m.AddPacketFromSource(src); err != nil {
if errors.Is(err, io.EOF) || errors.Is(err, capture.ErrCaptureStopped) {
m.FinalizeBlock(false)
m.Done()

return
}
errs <- err
return
}
}
}(errChan)
Expand Down Expand Up @@ -249,7 +232,7 @@ func (m *MockSource) RunNoDrain(releaseInterval time.Duration) chan error {
// Queue / trigger a single event equivalent to receiving a new block via the PPOLL syscall and
// instruct the mock socket to not release the semaphore. That way data can be consumed immediately
// at all times
m.mockFd.SetNoRelease(true)
m.MockFd.SetNoRelease(true)
if err := event.ToMockHandler(m.eventHandler).SignalAvailableData(); err != nil {
errs <- err
return
Expand All @@ -260,7 +243,7 @@ func (m *MockSource) RunNoDrain(releaseInterval time.Duration) chan error {
for i := 0; i < m.nBlocks; i++ {

// If the ring buffer is empty it was apparently closed / free'd
if m.isClosed || len(m.ringBuffer.ring) == 0 {
if len(m.ringBuffer.ring) == 0 {
errs <- nil
return
}
Expand All @@ -280,6 +263,14 @@ func (m *MockSource) Done() {
close(m.mockBlocks)
}

// ForceBlockRelease releases all blocks to the kernel (in order to "unblock" any potential mock capture
// from the consuming routine without having to attempt a failed packet consumption)
func (m *MockSource) ForceBlockRelease() {
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was needed to work around the conundrum that there is no way to release the current block to the kernel without attempting to call nextPacket() again (because the data might still be worked on by the consumer) after the data has been used. That it turn leads to a deadlock in tests, where you quite often know (and want to check) exactly how many packets were sent through the pipeline. This way a test can basically signal that it's done consuming all data and release the block to the mock "kernel" (hence allowing the mock source to terminate cleanly and without races).

for i := 0; i < m.nBlocks; i++ {
m.markBlock(i, unix.TP_STATUS_KERNEL)
}
}

//////////////////////////////////////////////////////////////////////////////////////////////////////

func (m *MockSource) run(errChan chan error) {
Expand All @@ -288,7 +279,7 @@ func (m *MockSource) run(errChan chan error) {
for block := range m.mockBlocks {

// If the ring buffer is empty it was apparently closed / free'd
if m.isClosed || len(m.ringBuffer.ring) == 0 {
if len(m.ringBuffer.ring) == 0 {
break
}

Expand All @@ -306,21 +297,37 @@ func (m *MockSource) run(errChan chan error) {
}

func (m *MockSource) getBlockStatus(n int) (status uint32) {
return *(*uint32)(unsafe.Pointer(&m.ringBuffer.ring[n*m.blockSize+8]))
return atomic.LoadUint32((*uint32)(unsafe.Pointer(&m.ringBuffer.ring[n*m.blockSize+8])))
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OKOK, turns out some atomic magic is needed (for the mock only, but since the overhead is small and tPacket has no easy way of knowing if it's a mock or not I've just made the whole block status atomic).

}

func (m *MockSource) markBlock(n int, status uint32) {
*(*uint32)(unsafe.Pointer(&m.ringBuffer.ring[n*m.blockSize+8])) = status
atomic.StoreUint32((*uint32)(unsafe.Pointer(&m.ringBuffer.ring[n*m.blockSize+8])), status)
}

func (m *MockSource) hasUserlandBlock() bool {
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Used in conjunction with ForceBlockRelease() to have the mock wait before really closing (a consumer might still be accessing data).

for i := 0; i < m.nBlocks; i++ {
if m.getBlockStatus(i) != unix.TP_STATUS_KERNEL {
return true
}
}
return false
}

// Close stops / closes the capture source
func (m *MockSource) Close() error {
m.isClosed = true

// Wait until all blocks have been retuned to the mock kernel
for m.hasUserlandBlock() {
time.Sleep(10 * time.Millisecond)
}

return m.Source.Close()
}

// Free releases any pending resources from the capture source (must be called after Close())
func (m *MockSource) Free() error {
m.ringBuffer.ring = nil

// panic("stack")
return nil
}
Loading