Skip to content

Commit

Permalink
[core] Perf improvement of group reading.
Browse files Browse the repository at this point in the history
  • Loading branch information
maxsharabayko committed Aug 4, 2023
1 parent 31294e3 commit 88ca9cc
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 11 deletions.
12 changes: 8 additions & 4 deletions srtcore/buffer_rcv.cpp
Expand Up @@ -236,10 +236,14 @@ int CRcvBuffer::dropUpTo(int32_t seqno)
m_iStartSeqNo = seqno;
// Move forward if there are "read/drop" entries.
releaseNextFillerEntries();
// Set nonread position to the starting position before updating,
// because start position was increased, and preceding packets are invalid.
m_iFirstNonreadPos = m_iStartPos;
updateNonreadPos();

// If the nonread position is now behind the starting position, set it to the starting position and update.
// Preceding packets were likely missing, and the non read position can probably be moved further now.
if (CSeqNo::seqcmp(m_iFirstNonreadPos, m_iStartPos) < 0)
{
m_iFirstNonreadPos = m_iStartPos;
updateNonreadPos();
}
if (!m_tsbpd.isEnabled() && m_bMessageAPI)
updateFirstReadableOutOfOrder();
return iDropCnt;
Expand Down
5 changes: 5 additions & 0 deletions srtcore/core.cpp
Expand Up @@ -6804,6 +6804,11 @@ bool srt::CUDT::isRcvBufferReady() const
return m_pRcvBuffer->isRcvDataReady(steady_clock::now());
}

bool srt::CUDT::isRcvBufferReadyNoLock() const
{
return m_pRcvBuffer->isRcvDataReady(steady_clock::now());
}

// int by_exception: accepts values of CUDTUnited::ErrorHandling:
// - 0 - by return value
// - 1 - by exception
Expand Down
3 changes: 3 additions & 0 deletions srtcore/core.h
Expand Up @@ -720,6 +720,9 @@ class CUDT
SRT_ATTR_EXCLUDES(m_RcvBufferLock)
bool isRcvBufferReady() const;

SRT_ATTR_REQUIRES2(m_RcvBufferLock)
bool isRcvBufferReadyNoLock() const;

// TSBPD thread main function.
static void* tsbpd(void* param);

Expand Down
11 changes: 4 additions & 7 deletions srtcore/group.cpp
Expand Up @@ -2295,26 +2295,23 @@ int CUDTGroup::recv(char* buf, int len, SRT_MSGCTRL& w_mc)
m_stats.recv.count(res);
updateAvgPayloadSize(res);

bool canReadFurther = false;
for (vector<CUDTSocket*>::const_iterator si = aliveMembers.begin(); si != aliveMembers.end(); ++si)
{
CUDTSocket* ps = *si;
ScopedLock lg(ps->core().m_RcvBufferLock);
if (m_RcvBaseSeqNo != SRT_SEQNO_NONE)
{
int cnt = ps->core().rcvDropTooLateUpTo(CSeqNo::incseq(m_RcvBaseSeqNo));
const int cnt = ps->core().rcvDropTooLateUpTo(CSeqNo::incseq(m_RcvBaseSeqNo));
if (cnt > 0)
{
HLOGC(grlog.Debug,
log << "grp/recv: $" << id() << ": @" << ps->m_SocketID << ": dropped " << cnt
<< " packets after reading: m_RcvBaseSeqNo=" << m_RcvBaseSeqNo);
}
}
}
bool canReadFurther = false;
for (vector<CUDTSocket*>::const_iterator si = aliveMembers.begin(); si != aliveMembers.end(); ++si)
{
CUDTSocket* ps = *si;
if (!ps->core().isRcvBufferReady())

if (!ps->core().isRcvBufferReadyNoLock())
m_Global.m_EPoll.update_events(ps->m_SocketID, ps->core().m_sPollID, SRT_EPOLL_IN, false);
else
canReadFurther = true;
Expand Down

0 comments on commit 88ca9cc

Please sign in to comment.