diff --git a/srtcore/buffer_rcv.cpp b/srtcore/buffer_rcv.cpp index 3098407a6..8774d4d1d 100644 --- a/srtcore/buffer_rcv.cpp +++ b/srtcore/buffer_rcv.cpp @@ -236,10 +236,14 @@ int CRcvBuffer::dropUpTo(int32_t seqno) m_iStartSeqNo = seqno; // Move forward if there are "read/drop" entries. releaseNextFillerEntries(); - // Set nonread position to the starting position before updating, - // because start position was increased, and preceding packets are invalid. - m_iFirstNonreadPos = m_iStartPos; - updateNonreadPos(); + + // If the nonread position is now behind the starting position, set it to the starting position and update. + // Preceding packets were likely missing, and the non read position can probably be moved further now. + if (CSeqNo::seqcmp(m_iFirstNonreadPos, m_iStartPos) < 0) + { + m_iFirstNonreadPos = m_iStartPos; + updateNonreadPos(); + } if (!m_tsbpd.isEnabled() && m_bMessageAPI) updateFirstReadableOutOfOrder(); return iDropCnt; diff --git a/srtcore/core.cpp b/srtcore/core.cpp index a5d32d9e3..752eb51b6 100644 --- a/srtcore/core.cpp +++ b/srtcore/core.cpp @@ -6804,6 +6804,11 @@ bool srt::CUDT::isRcvBufferReady() const return m_pRcvBuffer->isRcvDataReady(steady_clock::now()); } +bool srt::CUDT::isRcvBufferReadyNoLock() const +{ + return m_pRcvBuffer->isRcvDataReady(steady_clock::now()); +} + // int by_exception: accepts values of CUDTUnited::ErrorHandling: // - 0 - by return value // - 1 - by exception diff --git a/srtcore/core.h b/srtcore/core.h index e7ca57c50..b83dec6e5 100644 --- a/srtcore/core.h +++ b/srtcore/core.h @@ -720,6 +720,9 @@ class CUDT SRT_ATTR_EXCLUDES(m_RcvBufferLock) bool isRcvBufferReady() const; + SRT_ATTR_REQUIRES2(m_RcvBufferLock) + bool isRcvBufferReadyNoLock() const; + // TSBPD thread main function. static void* tsbpd(void* param); diff --git a/srtcore/group.cpp b/srtcore/group.cpp index fedfdcf76..2952d8c6b 100644 --- a/srtcore/group.cpp +++ b/srtcore/group.cpp @@ -2295,13 +2295,14 @@ int CUDTGroup::recv(char* buf, int len, SRT_MSGCTRL& w_mc) m_stats.recv.count(res); updateAvgPayloadSize(res); + bool canReadFurther = false; for (vector::const_iterator si = aliveMembers.begin(); si != aliveMembers.end(); ++si) { CUDTSocket* ps = *si; ScopedLock lg(ps->core().m_RcvBufferLock); if (m_RcvBaseSeqNo != SRT_SEQNO_NONE) { - int cnt = ps->core().rcvDropTooLateUpTo(CSeqNo::incseq(m_RcvBaseSeqNo)); + const int cnt = ps->core().rcvDropTooLateUpTo(CSeqNo::incseq(m_RcvBaseSeqNo)); if (cnt > 0) { HLOGC(grlog.Debug, @@ -2309,12 +2310,8 @@ int CUDTGroup::recv(char* buf, int len, SRT_MSGCTRL& w_mc) << " packets after reading: m_RcvBaseSeqNo=" << m_RcvBaseSeqNo); } } - } - bool canReadFurther = false; - for (vector::const_iterator si = aliveMembers.begin(); si != aliveMembers.end(); ++si) - { - CUDTSocket* ps = *si; - if (!ps->core().isRcvBufferReady()) + + if (!ps->core().isRcvBufferReadyNoLock()) m_Global.m_EPoll.update_events(ps->m_SocketID, ps->core().m_sPollID, SRT_EPOLL_IN, false); else canReadFurther = true;