From 6db28dcc2bbc5adff85f204e24d774801a281960 Mon Sep 17 00:00:00 2001 From: Sektor van Skijlen Date: Tue, 3 Jan 2023 09:25:23 +0100 Subject: [PATCH] [core] RCV Buffer Refax: added some utils and simplifications (#2522). --- srtcore/buffer_rcv.cpp | 114 ++++++++++++++++++++--------------------- srtcore/buffer_rcv.h | 20 ++++++-- 2 files changed, 74 insertions(+), 60 deletions(-) diff --git a/srtcore/buffer_rcv.cpp b/srtcore/buffer_rcv.cpp index a7056c3b2..0ab0316d6 100644 --- a/srtcore/buffer_rcv.cpp +++ b/srtcore/buffer_rcv.cpp @@ -75,15 +75,15 @@ namespace { #define IF_RCVBUF_DEBUG(instr) (void)0 - // Check if iFirstNonreadPos is in range [iStartPos, (iStartPos + iMaxPosInc) % iSize]. + // Check if iFirstNonreadPos is in range [iStartPos, (iStartPos + iMaxPosOff) % iSize]. // The right edge is included because we expect iFirstNonreadPos to be // right after the last valid packet position if all packets are available. - bool isInRange(int iStartPos, int iMaxPosInc, size_t iSize, int iFirstNonreadPos) + bool isInRange(int iStartPos, int iMaxPosOff, size_t iSize, int iFirstNonreadPos) { if (iFirstNonreadPos == iStartPos) return true; - const int iLastPos = (iStartPos + iMaxPosInc) % iSize; + const int iLastPos = (iStartPos + iMaxPosOff) % iSize; const bool isOverrun = iLastPos < iStartPos; if (isOverrun) @@ -98,7 +98,7 @@ namespace { * RcvBufferNew (circular buffer): * * |<------------------- m_iSize ----------------------------->| - * | |<----------- m_iMaxPosInc ------------>| | + * | |<----------- m_iMaxPosOff ------------>| | * | | | | * +---+---+---+---+---+---+---+---+---+---+---+---+---+ +---+ * | 0 | 0 | 1 | 1 | 1 | 0 | 1 | 1 | 1 | 1 | 0 | 1 | 0 |...| 0 | m_pUnit[] @@ -112,7 +112,7 @@ namespace { * thread safety: * m_iStartPos: CUDT::m_RecvLock * m_iLastAckPos: CUDT::m_AckLock - * m_iMaxPosInc: none? (modified on add and ack + * m_iMaxPosOff: none? (modified on add and ack */ CRcvBuffer::CRcvBuffer(int initSeqNo, size_t size, CUnitQueue* unitqueue, bool bMessageAPI) @@ -122,7 +122,7 @@ CRcvBuffer::CRcvBuffer(int initSeqNo, size_t size, CUnitQueue* unitqueue, bool b , m_iStartSeqNo(initSeqNo) , m_iStartPos(0) , m_iFirstNonreadPos(0) - , m_iMaxPosInc(0) + , m_iMaxPosOff(0) , m_iNotch(0) , m_numOutOfOrderPackets(0) , m_iFirstReadableOutOfOrder(-1) @@ -137,7 +137,7 @@ CRcvBuffer::CRcvBuffer(int initSeqNo, size_t size, CUnitQueue* unitqueue, bool b CRcvBuffer::~CRcvBuffer() { - // Can be optimized by only iterating m_iMaxPosInc from m_iStartPos. + // Can be optimized by only iterating m_iMaxPosOff from m_iStartPos. for (FixedArray::iterator it = m_entries.begin(); it != m_entries.end(); ++it) { if (!it->pUnit) @@ -176,8 +176,8 @@ int CRcvBuffer::insert(CUnit* unit) SRT_ASSERT((m_iStartPos + offset) / m_szSize < 2); const int pos = (m_iStartPos + offset) % m_szSize; - if (offset >= m_iMaxPosInc) - m_iMaxPosInc = offset + 1; + if (offset >= m_iMaxPosOff) + m_iMaxPosOff = offset + 1; // Packet already exists SRT_ASSERT(pos >= 0 && pos < int(m_szSize)); @@ -218,9 +218,9 @@ int CRcvBuffer::dropUpTo(int32_t seqno) return 0; } - m_iMaxPosInc -= len; - if (m_iMaxPosInc < 0) - m_iMaxPosInc = 0; + m_iMaxPosOff -= len; + if (m_iMaxPosOff < 0) + m_iMaxPosOff = 0; const int iDropCnt = len; while (len > 0) @@ -250,7 +250,7 @@ int CRcvBuffer::dropAll() if (empty()) return 0; - const int end_seqno = CSeqNo::incseq(m_iStartSeqNo, m_iMaxPosInc); + const int end_seqno = CSeqNo::incseq(m_iStartSeqNo, m_iMaxPosOff); return dropUpTo(end_seqno); } @@ -259,7 +259,7 @@ int CRcvBuffer::dropMessage(int32_t seqnolo, int32_t seqnohi, int32_t msgno) IF_RCVBUF_DEBUG(ScopedLog scoped_log); IF_RCVBUF_DEBUG(scoped_log.ss << "CRcvBuffer::dropMessage: seqnolo " << seqnolo << " seqnohi " << seqnohi << " m_iStartSeqNo " << m_iStartSeqNo); // TODO: count bytes as removed? - const int end_pos = incPos(m_iStartPos, m_iMaxPosInc); + const int end_pos = incPos(m_iStartPos, m_iMaxPosOff); if (msgno > 0) // including SRT_MSGNO_NONE and SRT_MSGNO_CONTROL { IF_RCVBUF_DEBUG(scoped_log.ss << " msgno " << msgno); @@ -272,7 +272,7 @@ int CRcvBuffer::dropMessage(int32_t seqnolo, int32_t seqnohi, int32_t msgno) continue; // TODO: Break the loop if a massege has been found. No need to search further. - const int32_t msgseq = m_entries[i].pUnit->m_Packet.getMsgSeq(m_bPeerRexmitFlag); + const int32_t msgseq = packetAt(i).getMsgSeq(m_bPeerRexmitFlag); if (msgseq == msgno) { ++iDropCnt; @@ -318,7 +318,7 @@ int CRcvBuffer::dropMessage(int32_t seqnolo, int32_t seqnohi, int32_t msgno) { // Don't drop messages, if all its packets are already in the buffer. // TODO: Don't drop a several-packet message if all packets are in the buffer. - if (m_entries[i].pUnit && m_entries[i].pUnit->m_Packet.getMsgBoundary() == PB_SOLO) + if (m_entries[i].pUnit && packetAt(i).getMsgBoundary() == PB_SOLO) continue; dropUnitInPos(i); @@ -377,7 +377,7 @@ int CRcvBuffer::readMessage(char* data, size_t len, SRT_MSGCTRL* msgctrl) break; } - const CPacket& packet = m_entries[i].pUnit->m_Packet; + const CPacket& packet = packetAt(i); const size_t pktsize = packet.getLength(); const int32_t pktseqno = packet.getSeqNo(); @@ -412,8 +412,8 @@ int CRcvBuffer::readMessage(char* data, size_t len, SRT_MSGCTRL* msgctrl) if (updateStartPos) { m_iStartPos = incPos(i); - --m_iMaxPosInc; - SRT_ASSERT(m_iMaxPosInc >= 0); + --m_iMaxPosOff; + SRT_ASSERT(m_iMaxPosOff >= 0); m_iStartSeqNo = CSeqNo::incseq(pktseqno); } else @@ -434,7 +434,7 @@ int CRcvBuffer::readMessage(char* data, size_t len, SRT_MSGCTRL* msgctrl) releaseNextFillerEntries(); - if (!isInRange(m_iStartPos, m_iMaxPosInc, m_szSize, m_iFirstNonreadPos)) + if (!isInRange(m_iStartPos, m_iMaxPosOff, m_szSize, m_iFirstNonreadPos)) { m_iFirstNonreadPos = m_iStartPos; //updateNonreadPos(); @@ -502,7 +502,7 @@ int CRcvBuffer::readBufferTo(int len, copy_to_dst_f funcCopyToDst, void* arg) return -1; } - const srt::CPacket& pkt = m_entries[p].pUnit->m_Packet; + const srt::CPacket& pkt = packetAt(p); if (bTsbPdEnabled) { @@ -530,8 +530,8 @@ int CRcvBuffer::readBufferTo(int len, copy_to_dst_f funcCopyToDst, void* arg) m_iNotch = 0; m_iStartPos = p; - --m_iMaxPosInc; - SRT_ASSERT(m_iMaxPosInc >= 0); + --m_iMaxPosOff; + SRT_ASSERT(m_iMaxPosOff >= 0); m_iStartSeqNo = CSeqNo::incseq(m_iStartSeqNo); } else @@ -547,7 +547,7 @@ int CRcvBuffer::readBufferTo(int len, copy_to_dst_f funcCopyToDst, void* arg) // Update positions // Set nonread position to the starting position before updating, // because start position was increased, and preceding packets are invalid. - if (!isInRange(m_iStartPos, m_iMaxPosInc, m_szSize, m_iFirstNonreadPos)) + if (!isInRange(m_iStartPos, m_iMaxPosOff, m_szSize, m_iFirstNonreadPos)) { m_iFirstNonreadPos = m_iStartPos; } @@ -588,10 +588,10 @@ int CRcvBuffer::getTimespan_ms() const if (!m_tsbpd.isEnabled()) return 0; - if (m_iMaxPosInc == 0) + if (m_iMaxPosOff == 0) return 0; - int lastpos = incPos(m_iStartPos, m_iMaxPosInc - 1); + int lastpos = incPos(m_iStartPos, m_iMaxPosOff - 1); // Normally the last position should always be non empty // if TSBPD is enabled (reading out of order is not allowed). // However if decryption of the last packet fails, it may be dropped @@ -615,8 +615,8 @@ int CRcvBuffer::getTimespan_ms() const return 0; const steady_clock::time_point startstamp = - getPktTsbPdTime(m_entries[startpos].pUnit->m_Packet.getMsgTimeStamp()); - const steady_clock::time_point endstamp = getPktTsbPdTime(m_entries[lastpos].pUnit->m_Packet.getMsgTimeStamp()); + getPktTsbPdTime(packetAt(startpos).getMsgTimeStamp()); + const steady_clock::time_point endstamp = getPktTsbPdTime(packetAt(lastpos).getMsgTimeStamp()); if (endstamp < startstamp) return 0; @@ -635,14 +635,14 @@ int CRcvBuffer::getRcvDataSize(int& bytes, int& timespan) const CRcvBuffer::PacketInfo CRcvBuffer::getFirstValidPacketInfo() const { - const int end_pos = incPos(m_iStartPos, m_iMaxPosInc); + const int end_pos = incPos(m_iStartPos, m_iMaxPosOff); for (int i = m_iStartPos; i != end_pos; i = incPos(i)) { // TODO: Maybe check status? if (!m_entries[i].pUnit) continue; - const CPacket& packet = m_entries[i].pUnit->m_Packet; + const CPacket& packet = packetAt(i); const PacketInfo info = { packet.getSeqNo(), i != m_iStartPos, getPktTsbPdTime(packet.getMsgTimeStamp()) }; return info; } @@ -693,7 +693,7 @@ CRcvBuffer::PacketInfo CRcvBuffer::getFirstReadablePacketInfo(time_point time_no { if (hasInorderPackets) { - const CPacket& packet = m_entries[m_iStartPos].pUnit->m_Packet; + const CPacket& packet = packetAt(m_iStartPos); const PacketInfo info = {packet.getSeqNo(), false, time_point()}; return info; } @@ -701,7 +701,7 @@ CRcvBuffer::PacketInfo CRcvBuffer::getFirstReadablePacketInfo(time_point time_no if (m_iFirstReadableOutOfOrder >= 0) { SRT_ASSERT(m_numOutOfOrderPackets > 0); - const CPacket& packet = m_entries[m_iFirstReadableOutOfOrder].pUnit->m_Packet; + const CPacket& packet = packetAt(m_iFirstReadableOutOfOrder); const PacketInfo info = {packet.getSeqNo(), true, time_point()}; return info; } @@ -742,9 +742,9 @@ bool CRcvBuffer::dropUnitInPos(int pos) return false; if (m_tsbpd.isEnabled()) { - updateTsbPdTimeBase(m_entries[pos].pUnit->m_Packet.getMsgTimeStamp()); + updateTsbPdTimeBase(packetAt(pos).getMsgTimeStamp()); } - else if (m_bMessageAPI && !m_entries[pos].pUnit->m_Packet.getMsgOrderFlag()) + else if (m_bMessageAPI && !packetAt(pos).getMsgOrderFlag()) { --m_numOutOfOrderPackets; if (pos == m_iFirstReadableOutOfOrder) @@ -763,24 +763,24 @@ void CRcvBuffer::releaseNextFillerEntries() releaseUnitInPos(pos); pos = incPos(pos); m_iStartPos = pos; - --m_iMaxPosInc; - if (m_iMaxPosInc < 0) - m_iMaxPosInc = 0; + --m_iMaxPosOff; + if (m_iMaxPosOff < 0) + m_iMaxPosOff = 0; } } // TODO: Is this function complete? There are some comments left inside. void CRcvBuffer::updateNonreadPos() { - if (m_iMaxPosInc == 0) + if (m_iMaxPosOff == 0) return; - const int end_pos = incPos(m_iStartPos, m_iMaxPosInc); // The empty position right after the last valid entry. + const int end_pos = incPos(m_iStartPos, m_iMaxPosOff); // The empty position right after the last valid entry. int pos = m_iFirstNonreadPos; while (m_entries[pos].pUnit && m_entries[pos].status == EntryState_Avail) { - if (m_bMessageAPI && (m_entries[pos].pUnit->m_Packet.getMsgBoundary() & PB_FIRST) == 0) + if (m_bMessageAPI && (packetAt(pos).getMsgBoundary() & PB_FIRST) == 0) break; for (int i = pos; i != end_pos; i = incPos(i)) @@ -791,7 +791,7 @@ void CRcvBuffer::updateNonreadPos() } // Check PB_LAST only in message mode. - if (!m_bMessageAPI || m_entries[i].pUnit->m_Packet.getMsgBoundary() & PB_LAST) + if (!m_bMessageAPI || packetAt(i).getMsgBoundary() & PB_LAST) { m_iFirstNonreadPos = incPos(i); break; @@ -811,7 +811,7 @@ int CRcvBuffer::findLastMessagePkt() { SRT_ASSERT(m_entries[i].pUnit); - if (m_entries[i].pUnit->m_Packet.getMsgBoundary() & PB_LAST) + if (packetAt(i).getMsgBoundary() & PB_LAST) { return i; } @@ -836,9 +836,9 @@ void CRcvBuffer::onInsertNotInOrderPacket(int insertPos) // Just a sanity check. This function is called when a new packet is added. // So the should be unacknowledged packets. - SRT_ASSERT(m_iMaxPosInc > 0); + SRT_ASSERT(m_iMaxPosOff > 0); SRT_ASSERT(m_entries[insertPos].pUnit); - const CPacket& pkt = m_entries[insertPos].pUnit->m_Packet; + const CPacket& pkt = packetAt(insertPos); const PacketBoundary boundary = pkt.getMsgBoundary(); //if ((boundary & PB_FIRST) && (boundary & PB_LAST)) @@ -866,17 +866,17 @@ void CRcvBuffer::onInsertNotInOrderPacket(int insertPos) bool CRcvBuffer::checkFirstReadableOutOfOrder() { - if (m_numOutOfOrderPackets <= 0 || m_iFirstReadableOutOfOrder < 0 || m_iMaxPosInc == 0) + if (m_numOutOfOrderPackets <= 0 || m_iFirstReadableOutOfOrder < 0 || m_iMaxPosOff == 0) return false; - const int endPos = incPos(m_iStartPos, m_iMaxPosInc); + const int endPos = incPos(m_iStartPos, m_iMaxPosOff); int msgno = -1; for (int pos = m_iFirstReadableOutOfOrder; pos != endPos; pos = incPos(pos)) { if (!m_entries[pos].pUnit) return false; - const CPacket& pkt = m_entries[pos].pUnit->m_Packet; + const CPacket& pkt = packetAt(pos); if (pkt.getMsgOrderFlag()) return false; @@ -897,7 +897,7 @@ void CRcvBuffer::updateFirstReadableOutOfOrder() if (hasReadableInorderPkts() || m_numOutOfOrderPackets <= 0 || m_iFirstReadableOutOfOrder >= 0) return; - if (m_iMaxPosInc == 0) + if (m_iMaxPosOff == 0) return; // TODO: unused variable outOfOrderPktsRemain? @@ -905,7 +905,7 @@ void CRcvBuffer::updateFirstReadableOutOfOrder() // Search further packets to the right. // First check if there are packets to the right. - const int lastPos = (m_iStartPos + m_iMaxPosInc - 1) % m_szSize; + const int lastPos = (m_iStartPos + m_iMaxPosOff - 1) % m_szSize; int posFirst = -1; int posLast = -1; @@ -919,7 +919,7 @@ void CRcvBuffer::updateFirstReadableOutOfOrder() continue; } - const CPacket& pkt = m_entries[pos].pUnit->m_Packet; + const CPacket& pkt = packetAt(pos); if (pkt.getMsgOrderFlag()) // Skip in order packet { @@ -959,7 +959,7 @@ int CRcvBuffer::scanNotInOrderMessageRight(const int startPos, int msgNo) const { // Search further packets to the right. // First check if there are packets to the right. - const int lastPos = (m_iStartPos + m_iMaxPosInc - 1) % m_szSize; + const int lastPos = (m_iStartPos + m_iMaxPosOff - 1) % m_szSize; if (startPos == lastPos) return -1; @@ -970,7 +970,7 @@ int CRcvBuffer::scanNotInOrderMessageRight(const int startPos, int msgNo) const if (!m_entries[pos].pUnit) break; - const CPacket& pkt = m_entries[pos].pUnit->m_Packet; + const CPacket& pkt = packetAt(pos); if (pkt.getMsgSeq(m_bPeerRexmitFlag) != msgNo) { @@ -1001,7 +1001,7 @@ int CRcvBuffer::scanNotInOrderMessageLeft(const int startPos, int msgNo) const if (!m_entries[pos].pUnit) return -1; - const CPacket& pkt = m_entries[pos].pUnit->m_Packet; + const CPacket& pkt = packetAt(pos); if (pkt.getMsgSeq(m_bPeerRexmitFlag) != msgNo) { @@ -1057,22 +1057,22 @@ string CRcvBuffer::strFullnessState(int iFirstUnackSeqNo, const time_point& tsNo stringstream ss; ss << "iFirstUnackSeqNo=" << iFirstUnackSeqNo << " m_iStartSeqNo=" << m_iStartSeqNo - << " m_iStartPos=" << m_iStartPos << " m_iMaxPosInc=" << m_iMaxPosInc << ". "; + << " m_iStartPos=" << m_iStartPos << " m_iMaxPosOff=" << m_iMaxPosOff << ". "; ss << "Space avail " << getAvailSize(iFirstUnackSeqNo) << "/" << m_szSize << " pkts. "; - if (m_tsbpd.isEnabled() && m_iMaxPosInc > 0) + if (m_tsbpd.isEnabled() && m_iMaxPosOff > 0) { const PacketInfo nextValidPkt = getFirstValidPacketInfo(); ss << "(TSBPD ready in "; if (!is_zero(nextValidPkt.tsbpd_time)) { ss << count_milliseconds(nextValidPkt.tsbpd_time - tsNow) << "ms"; - const int iLastPos = incPos(m_iStartPos, m_iMaxPosInc - 1); + const int iLastPos = incPos(m_iStartPos, m_iMaxPosOff - 1); if (m_entries[iLastPos].pUnit) { ss << ", timespan "; - const uint32_t usPktTimestamp = m_entries[iLastPos].pUnit->m_Packet.getMsgTimeStamp(); + const uint32_t usPktTimestamp = packetAt(iLastPos).getMsgTimeStamp(); ss << count_milliseconds(m_tsbpd.getPktTsbPdTime(usPktTimestamp) - nextValidPkt.tsbpd_time); ss << " ms"; } diff --git a/srtcore/buffer_rcv.h b/srtcore/buffer_rcv.h index 861ef0caa..e35238b3c 100644 --- a/srtcore/buffer_rcv.h +++ b/srtcore/buffer_rcv.h @@ -23,7 +23,7 @@ namespace srt * Circular receiver buffer. * * |<------------------- m_szSize ---------------------------->| - * | |<------------ m_iMaxPosInc ----------->| | + * | |<------------ m_iMaxPosOff ----------->| | * | | | | * +---+---+---+---+---+---+---+---+---+---+---+---+---+ +---+ * | 0 | 0 | 1 | 1 | 1 | 0 | 1 | 1 | 1 | 1 | 0 | 1 | 0 |...| 0 | m_pUnit[] @@ -181,7 +181,7 @@ class CRcvBuffer bool empty() const { - return (m_iMaxPosInc == 0); + return (m_iMaxPosOff == 0); } /// Return buffer capacity. @@ -223,6 +223,18 @@ class CRcvBuffer inline int incPos(int pos, int inc = 1) const { return (pos + inc) % m_szSize; } inline int decPos(int pos) const { return (pos - 1) >= 0 ? (pos - 1) : int(m_szSize - 1); } inline int offPos(int pos1, int pos2) const { return (pos2 >= pos1) ? (pos2 - pos1) : int(m_szSize + pos2 - pos1); } + inline int cmpPos(int pos2, int pos1) const + { + // XXX maybe not the best implementation, but this keeps up to the rule + const int off1 = pos1 >= m_iStartPos ? pos1 - m_iStartPos : pos1 + m_szSize - m_iStartPos; + const int off2 = pos2 >= m_iStartPos ? pos2 - m_iStartPos : pos2 + m_szSize - m_iStartPos; + + return off2 - off1; + } + + // NOTE: Assumes that pUnit != NULL + CPacket& packetAt(int pos) { return m_entries[pos].pUnit->m_Packet; } + const CPacket& packetAt(int pos) const { return m_entries[pos].pUnit->m_Packet; } private: void countBytes(int pkts, int bytes); @@ -309,7 +321,7 @@ class CRcvBuffer int m_iStartSeqNo; int m_iStartPos; // the head position for I/O (inclusive) int m_iFirstNonreadPos; // First position that can't be read (<= m_iLastAckPos) - int m_iMaxPosInc; // the furthest data position + int m_iMaxPosOff; // the furthest data position int m_iNotch; // the starting read point of the first unit size_t m_numOutOfOrderPackets; // The number of stored packets with "inorder" flag set to false @@ -340,6 +352,8 @@ class CRcvBuffer time_point getTsbPdTimeBase(uint32_t usPktTimestamp) const; void updateTsbPdTimeBase(uint32_t usPktTimestamp); + bool isTsbPd() const { return m_tsbpd.isEnabled(); } + /// Form a string of the current buffer fullness state. /// number of packets acknowledged, TSBPD readiness, etc. std::string strFullnessState(int iFirstUnackSeqNo, const time_point& tsNow) const;