From 599c1fbaa21c61de848d9c82501a60b8b36e60cf Mon Sep 17 00:00:00 2001 From: Maxim Sharabayko Date: Tue, 14 Feb 2023 15:22:08 +0100 Subject: [PATCH] [core] Reworked the CRcvBuffer::dropMessage(..) function (#2661). Drop by seqno offset first, then refine using msgno. --- srtcore/buffer_rcv.cpp | 131 ++++++++++++++++++++++++----------------- 1 file changed, 76 insertions(+), 55 deletions(-) diff --git a/srtcore/buffer_rcv.cpp b/srtcore/buffer_rcv.cpp index 506f57cf7..3098407a6 100644 --- a/srtcore/buffer_rcv.cpp +++ b/srtcore/buffer_rcv.cpp @@ -257,47 +257,8 @@ int CRcvBuffer::dropAll() int CRcvBuffer::dropMessage(int32_t seqnolo, int32_t seqnohi, int32_t msgno, DropActionIfExists actionOnExisting) { IF_RCVBUF_DEBUG(ScopedLog scoped_log); - IF_RCVBUF_DEBUG(scoped_log.ss << "CRcvBuffer::dropMessage: seqnolo " << seqnolo << " seqnohi " << seqnohi << " m_iStartSeqNo " << m_iStartSeqNo); - const bool bKeepExisting = (actionOnExisting == KEEP_EXISTING); - // TODO: count bytes as removed? - int minDroppedOffset = -1; - int iDropCnt = 0; - if (msgno > 0) // excluding SRT_MSGNO_NONE and SRT_MSGNO_CONTROL - { - // First try to drop by message number in case the message starts earlier thtan @a seqnolo. - IF_RCVBUF_DEBUG(scoped_log.ss << " msgno " << msgno); - const int end_pos = incPos(m_iStartPos, m_iMaxPosOff); - for (int i = m_iStartPos; i != end_pos; i = incPos(i)) - { - // Can't drop is message number is not known. - // TODO: Maybe check entry status? - if (!m_entries[i].pUnit) - continue; - - const PacketBoundary bnd = packetAt(i).getMsgBoundary(); - const int32_t msgseq = packetAt(i).getMsgSeq(m_bPeerRexmitFlag); - if (msgseq == msgno) - { - if (bKeepExisting && bnd == PB_SOLO) - { - LOGC(rbuflog.Debug, log << "CRcvBuffer.dropMessage(): Skipped dropping an exising SOLO message packet %" - << packetAt(i).getSeqNo() << "."); - break; - } - - ++iDropCnt; - dropUnitInPos(i); - m_entries[i].status = EntryState_Drop; - if (minDroppedOffset == -1) - minDroppedOffset = offPos(m_iStartPos, i); - - // Break the loop if the end of message has been found. No need to search further. - if (bnd == PB_LAST) - break; - } - } - IF_RCVBUF_DEBUG(scoped_log.ss << " iDropCnt " << iDropCnt); - } + IF_RCVBUF_DEBUG(scoped_log.ss << "CRcvBuffer::dropMessage: seqnolo " << seqnolo << " seqnohi " << seqnohi + << ", msgno " << msgno << " m_iStartSeqNo " << m_iStartSeqNo); // Drop by packet seqno range to also wipe those packets that do not exist in the buffer. const int offset_a = CSeqNo::seqoff(m_iStartSeqNo, seqnolo); @@ -305,27 +266,50 @@ int CRcvBuffer::dropMessage(int32_t seqnolo, int32_t seqnohi, int32_t msgno, Dro if (offset_b < 0) { LOGC(rbuflog.Debug, log << "CRcvBuffer.dropMessage(): nothing to drop. Requested [" << seqnolo << "; " - << seqnohi << "]. Buffer start " << m_iStartSeqNo << "."); + << seqnohi << "]. Buffer start " << m_iStartSeqNo << "."); return 0; } - const int end_pos = decPos(m_iStartPos); // Can potentially drop the whole buffer, even if it is empty. + const bool bKeepExisting = (actionOnExisting == KEEP_EXISTING); + int minDroppedOffset = -1; + int iDropCnt = 0; const int start_off = max(0, offset_a); - const int break_pos = incPos(m_iStartPos, offset_b + 1); // The position right after the last packet to drop. - for (int i = incPos(m_iStartPos, start_off); i != end_pos && i != break_pos; i = incPos(i)) + const int start_pos = incPos(m_iStartPos, start_off); + const int end_off = min((int) m_szSize - 1, offset_b + 1); + const int end_pos = incPos(m_iStartPos, end_off); + bool bDropByMsgNo = msgno > SRT_MSGNO_CONTROL; // Excluding both SRT_MSGNO_NONE (-1) and SRT_MSGNO_CONTROL (0). + for (int i = start_pos; i != end_pos; i = incPos(i)) { - // Don't drop messages, if all its packets are already in the buffer. - // TODO: Don't drop a several-packet message if all packets are in the buffer. - if (bKeepExisting && m_entries[i].pUnit && packetAt(i).getMsgBoundary() == PB_SOLO) - { - LOGC(rbuflog.Debug, log << "CRcvBuffer.dropMessage(): Skipped dropping an exising SOLO packet %" << packetAt(i).getSeqNo() << "."); - continue; - } - // Check if the unit was already dropped earlier. if (m_entries[i].status == EntryState_Drop) continue; + if (m_entries[i].pUnit) + { + const PacketBoundary bnd = packetAt(i).getMsgBoundary(); + + // Don't drop messages, if all its packets are already in the buffer. + // TODO: Don't drop a several-packet message if all packets are in the buffer. + if (bKeepExisting && bnd == PB_SOLO) + { + bDropByMsgNo = false; // Solo packet, don't search for the rest of the message. + LOGC(rbuflog.Debug, log << "CRcvBuffer.dropMessage(): Skipped dropping an exising SOLO packet %" << packetAt(i).getSeqNo() << "."); + continue; + } + + const int32_t msgseq = packetAt(i).getMsgSeq(m_bPeerRexmitFlag); + if (msgno > SRT_MSGNO_CONTROL && msgseq != msgno) + { + LOGC(rbuflog.Warn, log << "CRcvBuffer.dropMessage(): Packet seqno %" << packetAt(i).getSeqNo() << " has msgno " << msgseq << " differs from requested " << msgno); + } + + if (bDropByMsgNo && bnd == PB_FIRST) + { + // First packet of the message is about to be dropped. That was the only reason to search for msgno. + bDropByMsgNo = false; + } + } + dropUnitInPos(i); ++iDropCnt; m_entries[i].status = EntryState_Drop; @@ -333,8 +317,45 @@ int CRcvBuffer::dropMessage(int32_t seqnolo, int32_t seqnohi, int32_t msgno, Dro minDroppedOffset = offPos(m_iStartPos, i); } - HLOGC(rbuflog.Debug, log << "CRcvBuffer.dropMessage(): [" << seqnolo << "; " - << seqnohi << "]."); + if (bDropByMsgNo) + { + // First try to drop by message number in case the message starts earlier thtan @a seqnolo. + // The sender should have the last packet of the message it is requesting to be dropped, + // therefore we don't search forward. + const int stop_pos = decPos(m_iStartPos); + for (int i = start_pos; i != stop_pos; i = decPos(i)) + { + // Can't drop is message number is not known. + if (!m_entries[i].pUnit) // also dropped earlier. + continue; + + const PacketBoundary bnd = packetAt(i).getMsgBoundary(); + const int32_t msgseq = packetAt(i).getMsgSeq(m_bPeerRexmitFlag); + if (msgseq != msgno) + break; + + if (bKeepExisting && bnd == PB_SOLO) + { + LOGC(rbuflog.Debug, log << "CRcvBuffer.dropMessage(): Skipped dropping an exising SOLO message packet %" + << packetAt(i).getSeqNo() << "."); + break; + } + + ++iDropCnt; + dropUnitInPos(i); + m_entries[i].status = EntryState_Drop; + + if (minDroppedOffset == -1) + minDroppedOffset = offPos(m_iStartPos, i); + else + minDroppedOffset = min(offPos(m_iStartPos, i), minDroppedOffset); + + // Break the loop if the start of message has been found. No need to search further. + if (bnd == PB_FIRST) + break; + } + IF_RCVBUF_DEBUG(scoped_log.ss << " iDropCnt " << iDropCnt); + } // Check if units before m_iFirstNonreadPos are dropped. const bool needUpdateNonreadPos = (minDroppedOffset != -1 && minDroppedOffset <= getRcvDataSize());