diff --git a/perf/reader.go b/perf/reader.go index 3c82070..6ff9213 100644 --- a/perf/reader.go +++ b/perf/reader.go @@ -347,6 +347,7 @@ func (pr *Reader) ReadInto(rec *Record) error { return fmt.Errorf("perf ringbuffer: %w", ErrClosed) } + var checkAllRings = false for { if len(pr.epollRings) == 0 { // NB: The deferred pauseMu.Unlock will panic if Wait panics, which @@ -355,7 +356,11 @@ func (pr *Reader) ReadInto(rec *Record) error { nEvents, err := pr.poller.Wait(pr.epollEvents, pr.deadline) pr.pauseMu.Lock() if err != nil { - return err + if errors.Is(err, os.ErrDeadlineExceeded) { + checkAllRings = true + } else { + return err + } } // Re-validate pr.paused since we dropped pauseMu. @@ -363,14 +368,25 @@ func (pr *Reader) ReadInto(rec *Record) error { return errMustBePaused } - for _, event := range pr.epollEvents[:nEvents] { - ring := pr.rings[cpuForEvent(&event)] - pr.epollRings = append(pr.epollRings, ring) + if checkAllRings { + for _, ring := range pr.rings { + pr.epollRings = append(pr.epollRings, ring) - // Read the current head pointer now, not every time - // we read a record. This prevents a single fast producer - // from keeping the reader busy. - ring.loadHead() + // Read the current head pointer now, not every time + // we read a record. This prevents a single fast producer + // from keeping the reader busy. + ring.loadHead() + } + } else { + for _, event := range pr.epollEvents[:nEvents] { + ring := pr.rings[cpuForEvent(&event)] + pr.epollRings = append(pr.epollRings, ring) + + // Read the current head pointer now, not every time + // we read a record. This prevents a single fast producer + // from keeping the reader busy. + ring.loadHead() + } } } @@ -382,6 +398,9 @@ func (pr *Reader) ReadInto(rec *Record) error { // We've emptied the current ring buffer, process // the next one. pr.epollRings = pr.epollRings[:len(pr.epollRings)-1] + if checkAllRings && len(pr.epollRings) == 0 { + return fmt.Errorf("epoll wait: %w", os.ErrDeadlineExceeded) + } continue }