Skip to content

Commit

Permalink
[core] Reworked the CRcvBuffer::dropMessage(..) function (#2661).
Browse files Browse the repository at this point in the history
Drop by seqno offset first, then refine using msgno.
  • Loading branch information
maxsharabayko committed Feb 14, 2023
1 parent 09fa30c commit 599c1fb
Showing 1 changed file with 76 additions and 55 deletions.
131 changes: 76 additions & 55 deletions srtcore/buffer_rcv.cpp
Expand Up @@ -257,84 +257,105 @@ int CRcvBuffer::dropAll()
int CRcvBuffer::dropMessage(int32_t seqnolo, int32_t seqnohi, int32_t msgno, DropActionIfExists actionOnExisting)
{
IF_RCVBUF_DEBUG(ScopedLog scoped_log);
IF_RCVBUF_DEBUG(scoped_log.ss << "CRcvBuffer::dropMessage: seqnolo " << seqnolo << " seqnohi " << seqnohi << " m_iStartSeqNo " << m_iStartSeqNo);
const bool bKeepExisting = (actionOnExisting == KEEP_EXISTING);
// TODO: count bytes as removed?
int minDroppedOffset = -1;
int iDropCnt = 0;
if (msgno > 0) // excluding SRT_MSGNO_NONE and SRT_MSGNO_CONTROL
{
// First try to drop by message number in case the message starts earlier thtan @a seqnolo.
IF_RCVBUF_DEBUG(scoped_log.ss << " msgno " << msgno);
const int end_pos = incPos(m_iStartPos, m_iMaxPosOff);
for (int i = m_iStartPos; i != end_pos; i = incPos(i))
{
// Can't drop is message number is not known.
// TODO: Maybe check entry status?
if (!m_entries[i].pUnit)
continue;

const PacketBoundary bnd = packetAt(i).getMsgBoundary();
const int32_t msgseq = packetAt(i).getMsgSeq(m_bPeerRexmitFlag);
if (msgseq == msgno)
{
if (bKeepExisting && bnd == PB_SOLO)
{
LOGC(rbuflog.Debug, log << "CRcvBuffer.dropMessage(): Skipped dropping an exising SOLO message packet %"
<< packetAt(i).getSeqNo() << ".");
break;
}

++iDropCnt;
dropUnitInPos(i);
m_entries[i].status = EntryState_Drop;
if (minDroppedOffset == -1)
minDroppedOffset = offPos(m_iStartPos, i);

// Break the loop if the end of message has been found. No need to search further.
if (bnd == PB_LAST)
break;
}
}
IF_RCVBUF_DEBUG(scoped_log.ss << " iDropCnt " << iDropCnt);
}
IF_RCVBUF_DEBUG(scoped_log.ss << "CRcvBuffer::dropMessage: seqnolo " << seqnolo << " seqnohi " << seqnohi
<< ", msgno " << msgno << " m_iStartSeqNo " << m_iStartSeqNo);

// Drop by packet seqno range to also wipe those packets that do not exist in the buffer.
const int offset_a = CSeqNo::seqoff(m_iStartSeqNo, seqnolo);
const int offset_b = CSeqNo::seqoff(m_iStartSeqNo, seqnohi);
if (offset_b < 0)
{
LOGC(rbuflog.Debug, log << "CRcvBuffer.dropMessage(): nothing to drop. Requested [" << seqnolo << "; "
<< seqnohi << "]. Buffer start " << m_iStartSeqNo << ".");
<< seqnohi << "]. Buffer start " << m_iStartSeqNo << ".");
return 0;
}

const int end_pos = decPos(m_iStartPos); // Can potentially drop the whole buffer, even if it is empty.
const bool bKeepExisting = (actionOnExisting == KEEP_EXISTING);
int minDroppedOffset = -1;
int iDropCnt = 0;
const int start_off = max(0, offset_a);
const int break_pos = incPos(m_iStartPos, offset_b + 1); // The position right after the last packet to drop.
for (int i = incPos(m_iStartPos, start_off); i != end_pos && i != break_pos; i = incPos(i))
const int start_pos = incPos(m_iStartPos, start_off);
const int end_off = min((int) m_szSize - 1, offset_b + 1);
const int end_pos = incPos(m_iStartPos, end_off);
bool bDropByMsgNo = msgno > SRT_MSGNO_CONTROL; // Excluding both SRT_MSGNO_NONE (-1) and SRT_MSGNO_CONTROL (0).
for (int i = start_pos; i != end_pos; i = incPos(i))
{
// Don't drop messages, if all its packets are already in the buffer.
// TODO: Don't drop a several-packet message if all packets are in the buffer.
if (bKeepExisting && m_entries[i].pUnit && packetAt(i).getMsgBoundary() == PB_SOLO)
{
LOGC(rbuflog.Debug, log << "CRcvBuffer.dropMessage(): Skipped dropping an exising SOLO packet %" << packetAt(i).getSeqNo() << ".");
continue;
}

// Check if the unit was already dropped earlier.
if (m_entries[i].status == EntryState_Drop)
continue;

if (m_entries[i].pUnit)
{
const PacketBoundary bnd = packetAt(i).getMsgBoundary();

// Don't drop messages, if all its packets are already in the buffer.
// TODO: Don't drop a several-packet message if all packets are in the buffer.
if (bKeepExisting && bnd == PB_SOLO)
{
bDropByMsgNo = false; // Solo packet, don't search for the rest of the message.
LOGC(rbuflog.Debug, log << "CRcvBuffer.dropMessage(): Skipped dropping an exising SOLO packet %" << packetAt(i).getSeqNo() << ".");
continue;
}

const int32_t msgseq = packetAt(i).getMsgSeq(m_bPeerRexmitFlag);
if (msgno > SRT_MSGNO_CONTROL && msgseq != msgno)
{
LOGC(rbuflog.Warn, log << "CRcvBuffer.dropMessage(): Packet seqno %" << packetAt(i).getSeqNo() << " has msgno " << msgseq << " differs from requested " << msgno);
}

if (bDropByMsgNo && bnd == PB_FIRST)
{
// First packet of the message is about to be dropped. That was the only reason to search for msgno.
bDropByMsgNo = false;
}
}

dropUnitInPos(i);
++iDropCnt;
m_entries[i].status = EntryState_Drop;
if (minDroppedOffset == -1)
minDroppedOffset = offPos(m_iStartPos, i);
}

HLOGC(rbuflog.Debug, log << "CRcvBuffer.dropMessage(): [" << seqnolo << "; "
<< seqnohi << "].");
if (bDropByMsgNo)
{
// First try to drop by message number in case the message starts earlier thtan @a seqnolo.
// The sender should have the last packet of the message it is requesting to be dropped,
// therefore we don't search forward.
const int stop_pos = decPos(m_iStartPos);
for (int i = start_pos; i != stop_pos; i = decPos(i))
{
// Can't drop is message number is not known.
if (!m_entries[i].pUnit) // also dropped earlier.
continue;

const PacketBoundary bnd = packetAt(i).getMsgBoundary();
const int32_t msgseq = packetAt(i).getMsgSeq(m_bPeerRexmitFlag);
if (msgseq != msgno)
break;

if (bKeepExisting && bnd == PB_SOLO)
{
LOGC(rbuflog.Debug, log << "CRcvBuffer.dropMessage(): Skipped dropping an exising SOLO message packet %"
<< packetAt(i).getSeqNo() << ".");
break;
}

++iDropCnt;
dropUnitInPos(i);
m_entries[i].status = EntryState_Drop;

if (minDroppedOffset == -1)
minDroppedOffset = offPos(m_iStartPos, i);
else
minDroppedOffset = min(offPos(m_iStartPos, i), minDroppedOffset);

// Break the loop if the start of message has been found. No need to search further.
if (bnd == PB_FIRST)
break;
}
IF_RCVBUF_DEBUG(scoped_log.ss << " iDropCnt " << iDropCnt);
}

// Check if units before m_iFirstNonreadPos are dropped.
const bool needUpdateNonreadPos = (minDroppedOffset != -1 && minDroppedOffset <= getRcvDataSize());
Expand Down

0 comments on commit 599c1fb

Please sign in to comment.