diff --git a/CMakeLists.txt b/CMakeLists.txt index 967651138..0c126fb3d 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -160,7 +160,6 @@ option(USE_BUSY_WAITING "Enable more accurate sending times at a cost of potenti option(USE_GNUSTL "Get c++ library/headers from the gnustl.pc" OFF) option(ENABLE_SOCK_CLOEXEC "Enable setting SOCK_CLOEXEC on a socket" ON) option(ENABLE_SHOW_PROJECT_CONFIG "Enable show Project Configuration" OFF) -option(ENABLE_NEW_RCVBUFFER "Enable new receiver buffer implementation" ON) option(ENABLE_CLANG_TSA "Enable Clang Thread Safety Analysis" OFF) @@ -533,14 +532,6 @@ if (ENABLE_SOCK_CLOEXEC) add_definitions(-DENABLE_SOCK_CLOEXEC=1) endif() -if (ENABLE_NEW_RCVBUFFER) - add_definitions(-DENABLE_NEW_RCVBUFFER=1) - message(STATUS "RECEIVER_BUFFER: NEW") -else() - remove_definitions(-DENABLE_NEW_RCVBUFFER) - message(STATUS "RECEIVER_BUFFER: OLD") -endif() - if (CMAKE_MAJOR_VERSION LESS 3) set (FORCE_CXX_STANDARD_GNUONLY 1) endif() diff --git a/docs/build/build-options.md b/docs/build/build-options.md index 4968cef88..12a24b59e 100644 --- a/docs/build/build-options.md +++ b/docs/build/build-options.md @@ -40,7 +40,6 @@ Option details are given further below. | [`ENABLE_INET_PTON`](#enable_inet_pton) | 1.3.2 | `BOOL` | ON | Enables usage of the `inet_pton` function used to resolve the network endpoint name into an IP address. | | [`ENABLE_LOGGING`](#enable_logging) | 1.2.0 | `BOOL` | ON | Enables normal logging, including errors. | | [`ENABLE_MONOTONIC_CLOCK`](#enable_monotonic_clock) | 1.4.0 | `BOOL` | ON* | Enforces the use of `clock_gettime` with a monotonic clock that is independent of the currently set time in the system. | -| [`ENABLE_NEW_RCVBUFFER`](#enable_new_rcvbuffer) | 1.5.0 | `BOOL` | ON | Enables the new implementation of the receiver buffer with behavior and code improvements. | | [`ENABLE_PROFILE`](#enable_profile) | 1.2.0 | `BOOL` | OFF | Enables code instrumentation for profiling (only for GNU-compatible compilers). | | [`ENABLE_RELATIVE_LIBPATH`](#enable_relative_libpath) | 1.3.2 | `BOOL` | OFF | Enables adding a relative path to a library for linking against a shared SRT library by reaching out to a sibling directory. | | [`ENABLE_SHARED`](#enable_shared--enable_static) | 1.2.0 | `BOOL` | ON | Enables building SRT as a shared library. | @@ -349,22 +348,9 @@ clock (as configured in the resources used in the operation). However the current time of the monotonic clock can only be obtained by the `clock_gettime` function. - - - [:arrow_up:   Back to List of Build Options](#list-of-build-options) - -#### ENABLE_NEW_RCVBUFFER -**`--enable-new-rcvbuffer`** (default: ON) - -When ON, this option enables the newest implementation of the receiver buffer -with behavior and code improvements. Note that while it is still possible to fall -back to the old receiver buffer implementation, eventually the new implementation -will be the only one available. - - #### ENABLE_PROFILE **`--enable-profile`** (default: OFF) diff --git a/srtcore/api.cpp b/srtcore/api.cpp index 6336c56b6..5603a82a1 100644 --- a/srtcore/api.cpp +++ b/srtcore/api.cpp @@ -2597,11 +2597,7 @@ void srt::CUDTUnited::checkBrokenSockets() // NOT WHETHER THEY ARE ALSO READY TO PLAY at the time when // this function is called (isRcvDataReady also checks if the // available data is "ready to play"). -#if ENABLE_NEW_RCVBUFFER && s->core().m_pRcvBuffer->hasAvailablePackets()) -#else - && s->core().m_pRcvBuffer->isRcvDataAvailable()) -#endif { const int bc = s->core().m_iBrokenCounter.load(); if (bc > 0) diff --git a/srtcore/buffer.cpp b/srtcore/buffer.cpp index 20149446f..01f73e017 100644 --- a/srtcore/buffer.cpp +++ b/srtcore/buffer.cpp @@ -805,1494 +805,4 @@ void CSndBuffer::increase() << " (total size: " << m_iSize << " bytes)"); } -//////////////////////////////////////////////////////////////////////////////// - -#if (!ENABLE_NEW_RCVBUFFER) - -/* - * RcvBuffer (circular buffer): - * - * |<------------------- m_iSize ----------------------------->| - * | |<--- acked pkts -->|<--- m_iMaxPos --->| | - * | | | | | - * +---+---+---+---+---+---+---+---+---+---+---+---+---+ +---+ - * | 0 | 0 | 1 | 1 | 1 | 0 | 1 | 1 | 1 | 1 | 0 | 1 | 0 |...| 0 | m_pUnit[] - * +---+---+---+---+---+---+---+---+---+---+---+---+---+ +---+ - * | | | | - * | | \__last pkt received - * | \___ m_iLastAckPos: last ack sent - * \___ m_iStartPos: first message to read - * - * m_pUnit[i]->m_iFlag: 0:free, 1:good, 2:passack, 3:dropped - * - * thread safety: - * m_iStartPos: CUDT::m_RecvLock - * m_iLastAckPos: CUDT::m_AckLock - * m_iMaxPos: none? (modified on add and ack - */ - -CRcvBuffer::CRcvBuffer(CUnitQueue* queue, int bufsize_pkts) - : m_pUnit(NULL) - , m_iSize(bufsize_pkts) - , m_pUnitQueue(queue) - , m_iStartPos(0) - , m_iLastAckPos(0) - , m_iMaxPos(0) - , m_iNotch(0) - , m_BytesCountLock() - , m_iBytesCount(0) - , m_iAckedPktsCount(0) - , m_iAckedBytesCount(0) - , m_uAvgPayloadSz(7 * 188) -{ - m_pUnit = new CUnit*[m_iSize]; - for (int i = 0; i < m_iSize; ++i) - m_pUnit[i] = NULL; - - setupMutex(m_BytesCountLock, "BytesCount"); -} - -CRcvBuffer::~CRcvBuffer() -{ - for (int i = 0; i < m_iSize; ++i) - { - if (m_pUnit[i] != NULL) - { - m_pUnitQueue->makeUnitFree(m_pUnit[i]); - } - } - - delete[] m_pUnit; - - releaseMutex(m_BytesCountLock); -} - -void CRcvBuffer::countBytes(int pkts, int bytes, bool acked) -{ - /* - * Byte counter changes from both sides (Recv & Ack) of the buffer - * so the higher level lock is not enough for thread safe op. - * - * pkts are... - * added (bytes>0, acked=false), - * acked (bytes>0, acked=true), - * removed (bytes<0, acked=n/a) - */ - ScopedLock cg(m_BytesCountLock); - - if (!acked) // adding new pkt in RcvBuffer - { - m_iBytesCount += bytes; /* added or removed bytes from rcv buffer */ - if (bytes > 0) /* Assuming one pkt when adding bytes */ - m_uAvgPayloadSz = ((m_uAvgPayloadSz * (100 - 1)) + bytes) / 100; - } - else // acking/removing pkts to/from buffer - { - m_iAckedPktsCount += pkts; /* acked or removed pkts from rcv buffer */ - m_iAckedBytesCount += bytes; /* acked or removed bytes from rcv buffer */ - - if (bytes < 0) - m_iBytesCount += bytes; /* removed bytes from rcv buffer */ - } -} - -int CRcvBuffer::addData(CUnit* unit, int offset) -{ - SRT_ASSERT(unit != NULL); - if (offset >= getAvailBufSize()) - return -1; - - const int pos = (m_iLastAckPos + offset) % m_iSize; - if (offset >= m_iMaxPos) - m_iMaxPos = offset + 1; - - if (m_pUnit[pos] != NULL) - { - HLOGC(qrlog.Debug, log << "addData: unit %" << unit->m_Packet.m_iSeqNo << " rejected, already exists"); - return -1; - } - m_pUnit[pos] = unit; - countBytes(1, (int)unit->m_Packet.getLength()); - - m_pUnitQueue->makeUnitGood(unit); - - HLOGC(qrlog.Debug, - log << "addData: unit %" << unit->m_Packet.m_iSeqNo << " accepted, off=" << offset << " POS=" << pos); - return 0; -} - -int CRcvBuffer::readBuffer(char* data, int len) -{ - int p = m_iStartPos; - int lastack = m_iLastAckPos; - int rs = len; - IF_HEAVY_LOGGING(char* begin = data); - - const bool bTsbPdEnabled = m_tsbpd.isEnabled(); - const steady_clock::time_point now = (bTsbPdEnabled ? steady_clock::now() : steady_clock::time_point()); - - HLOGC(brlog.Debug, log << CONID() << "readBuffer: start=" << p << " lastack=" << lastack); - while ((p != lastack) && (rs > 0)) - { - if (m_pUnit[p] == NULL) - { - LOGC(brlog.Error, log << CONID() << "IPE readBuffer on null packet pointer"); - return -1; - } - - const CPacket& pkt = m_pUnit[p]->m_Packet; - - if (bTsbPdEnabled) - { - HLOGC(brlog.Debug, - log << CONID() << "readBuffer: chk if time2play:" - << " NOW=" << FormatTime(now) - << " PKT TS=" << FormatTime(getPktTsbPdTime(pkt.getMsgTimeStamp()))); - - if ((getPktTsbPdTime(pkt.getMsgTimeStamp()) > now)) - break; /* too early for this unit, return whatever was copied */ - } - - const int pktlen = (int) pkt.getLength(); - const int remain_pktlen = pktlen - m_iNotch; - - const int unitsize = std::min(remain_pktlen, rs); - - HLOGC(brlog.Debug, - log << CONID() << "readBuffer: copying buffer #" << p << " targetpos=" << int(data - begin) - << " sourcepos=" << m_iNotch << " size=" << unitsize << " left=" << (unitsize - rs)); - memcpy((data), pkt.m_pcData + m_iNotch, unitsize); - - data += unitsize; - - if (rs >= remain_pktlen) - { - freeUnitAt(p); - p = shiftFwd(p); - - m_iNotch = 0; - } - else - m_iNotch += rs; - - rs -= unitsize; - } - - /* we removed acked bytes form receive buffer */ - countBytes(-1, -(len - rs), true); - m_iStartPos = p; - - return len - rs; -} - -int CRcvBuffer::readBufferToFile(fstream& ofs, int len) -{ - int p = m_iStartPos; - int lastack = m_iLastAckPos; - int rs = len; - - int32_t trace_seq SRT_ATR_UNUSED = SRT_SEQNO_NONE; - int trace_shift SRT_ATR_UNUSED = -1; - - while ((p != lastack) && (rs > 0)) - { -#if ENABLE_LOGGING - ++trace_shift; -#endif - // Skip empty units. Note that this shouldn't happen - // in case of a file transfer. - if (!m_pUnit[p]) - { - p = shiftFwd(p); - LOGC(brlog.Error, log << "readBufferToFile: IPE: NULL unit found in file transmission, last good %" - << trace_seq << " + " << trace_shift); - continue; - } - - const CPacket& pkt = m_pUnit[p]->m_Packet; - -#if ENABLE_LOGGING - trace_seq = pkt.getSeqNo(); -#endif - const int pktlen = (int) pkt.getLength(); - const int remain_pktlen = pktlen - m_iNotch; - - const int unitsize = std::min(remain_pktlen, rs); - - ofs.write(pkt.m_pcData + m_iNotch, unitsize); - if (ofs.fail()) - break; - - if (rs >= remain_pktlen) - { - freeUnitAt(p); - p = shiftFwd(p); - - m_iNotch = 0; - } - else - m_iNotch += rs; - - rs -= unitsize; - } - - /* we removed acked bytes form receive buffer */ - countBytes(-1, -(len - rs), true); - m_iStartPos = p; - - return len - rs; -} - -int CRcvBuffer::ackData(int len) -{ - SRT_ASSERT(len < m_iSize); - SRT_ASSERT(len > 0); - int end = shift(m_iLastAckPos, len); - - { - int pkts = 0; - int bytes = 0; - for (int i = m_iLastAckPos; i != end; i = shiftFwd(i)) - { - if (m_pUnit[i] == NULL) - continue; - - pkts++; - bytes += (int)m_pUnit[i]->m_Packet.getLength(); - } - if (pkts > 0) - countBytes(pkts, bytes, true); - } - - HLOGC(brlog.Debug, - log << "ackData: shift by " << len << ", start=" << m_iStartPos << " end=" << m_iLastAckPos << " -> " << end); - - m_iLastAckPos = end; - m_iMaxPos -= len; - if (m_iMaxPos < 0) - m_iMaxPos = 0; - - // Returned value is the distance towards the starting - // position from m_iLastAckPos, which is in sync with CUDT::m_iRcvLastSkipAck. - // This should help determine the sequence number at first read-ready position. - - const int dist = m_iLastAckPos - m_iStartPos; - if (dist < 0) - return dist + m_iSize; - return dist; -} - -void CRcvBuffer::skipData(int len) -{ - /* - * Caller need protect both AckLock and RecvLock - * to move both m_iStartPos and m_iLastAckPost - */ - if (m_iStartPos == m_iLastAckPos) - m_iStartPos = (m_iStartPos + len) % m_iSize; - m_iLastAckPos = (m_iLastAckPos + len) % m_iSize; - m_iMaxPos -= len; - if (m_iMaxPos < 0) - m_iMaxPos = 0; -} - -size_t CRcvBuffer::dropData(int len) -{ - // This function does the same as skipData, although skipData - // should work in the condition of absence of data, so no need - // to force the units in the range to be freed. This function - // works in more general condition where we don't know if there - // are any data in the given range, but want to remove these - // "sequence positions" from the buffer, whether there are data - // at them or not. - - size_t stats_bytes = 0; - - int p = m_iStartPos; - int past_q = shift(p, len); - while (p != past_q) - { - if (m_pUnit[p] && m_pUnit[p]->m_iFlag == CUnit::GOOD) - { - stats_bytes += m_pUnit[p]->m_Packet.getLength(); - freeUnitAt(p); - } - - p = shiftFwd(p); - } - - m_iStartPos = past_q; - return stats_bytes; -} - -bool CRcvBuffer::getRcvFirstMsg(steady_clock::time_point& w_tsbpdtime, - bool& w_passack, - int32_t& w_skipseqno, - int32_t& w_curpktseq, - int32_t base_seq) -{ - HLOGC(brlog.Debug, log << "getRcvFirstMsg: base_seq=" << base_seq); - w_skipseqno = SRT_SEQNO_NONE; - w_passack = false; - // tsbpdtime will be retrieved by the below call - // Returned values: - // - tsbpdtime: real time when the packet is ready to play (whether ready to play or not) - // - w_passack: false (the report concerns a packet with an exactly next sequence) - // - w_skipseqno == SRT_SEQNO_NONE: no packets to skip towards the first RTP - // - w_curpktseq: that exactly packet that is reported (for debugging purposes) - // - @return: whether the reported packet is ready to play - - /* Check the acknowledged packets */ - // getRcvReadyMsg returns true if the time to play for the first message - // that larger than base_seq is in the past. - if (getRcvReadyMsg((w_tsbpdtime), (w_curpktseq), -1, base_seq)) - { - HLOGC(brlog.Debug, log << "getRcvFirstMsg: ready CONTIG packet: %" << w_curpktseq); - return true; - } - else if (!is_zero(w_tsbpdtime)) - { - HLOGC(brlog.Debug, log << "getRcvFirstMsg: packets found, but in future"); - // This means that a message next to be played, has been found, - // but the time to play is in future. - return false; - } - - // Falling here means that there are NO PACKETS in the ACK-ed region - // (m_iStartPos - m_iLastAckPos), but we may have something in the - // region (m_iLastAckPos - (m_iLastAckPos+m_iMaxPos)), that is, packets - // that may be separated from the last ACK-ed by lost ones. - - // Below this line we have only two options: - // - m_iMaxPos == 0, which means that no more packets are in the buffer - // - returned: tsbpdtime=0, w_passack=true, w_skipseqno=SRT_SEQNO_NONE, w_curpktseq=, @return false - // - m_iMaxPos > 0, which means that there are packets arrived after a lost packet: - // - returned: tsbpdtime=PKT.TS, w_passack=true, w_skipseqno=PKT.SEQ, w_curpktseq=PKT, @return LOCAL(PKT.TS) <= - // NOW - - /* - * No acked packets ready but caller want to know next packet to wait for - * Check the not yet acked packets that may be stuck by missing packet(s). - */ - bool haslost = false; - int last_ready_pos = -1; - steady_clock::time_point tsbpdtime = steady_clock::time_point(); - w_tsbpdtime = steady_clock::time_point(); - w_passack = true; - - // XXX SUSPECTED ISSUE with this algorithm: - // The above call to getRcvReadyMsg() should report as to whether: - // - there is an EXACTLY NEXT SEQUENCE packet - // - this packet is ready to play. - // - // Situations handled after the call are when: - // - there's the next sequence packet available and it is ready to play - // - there are no packets at all, ready to play or not - // - // So, the remaining situation is that THERE ARE PACKETS that follow - // the current sequence, but they are not ready to play. This includes - // packets that have the exactly next sequence and packets that jump - // over a lost packet. - // - // As the getRcvReadyMsg() function walks through the incoming units - // to see if there's anything that satisfies these conditions, it *SHOULD* - // be also capable of checking if the next available packet, if it is - // there, is the next sequence packet or not. Retrieving this exactly - // packet would be most useful, as the test for play-readiness and - // sequentiality can be done on it directly. - // - // When done so, the below loop would be completely unnecessary. - - // Logical description of the below algorithm: - // 1. update w_tsbpdtime and w_curpktseq if found one packet ready to play - // - keep check the next packet if still smaller than base_seq - // 2. set w_skipseqno if found packets before w_curpktseq lost - // if no packets larger than base_seq ready to play, return the largest RTP - // else return the first one that larger than base_seq and rady to play - - for (int i = m_iLastAckPos, n = shift(m_iLastAckPos, m_iMaxPos); i != n; i = shiftFwd(i)) - { - if (!m_pUnit[i] || m_pUnit[i]->m_iFlag != CUnit::GOOD) - { - /* There are packets in the sequence not received yet */ - haslost = true; - HLOGC(brlog.Debug, log << "getRcvFirstMsg: empty hole at *" << i); - } - else - { - tsbpdtime = getPktTsbPdTime(m_pUnit[i]->m_Packet.getMsgTimeStamp()); - /* Packet ready to play */ - if (tsbpdtime <= steady_clock::now()) - { - // If the last ready-to-play packet exists, free it. - if (!is_zero(w_tsbpdtime)) { - HLOGC(brlog.Debug, - log << "getRcvFirstMsg: found next ready packet, free last %" - << w_curpktseq << " POS=" << last_ready_pos); - SRT_ASSERT(w_curpktseq != SRT_SEQNO_NONE); - freeUnitAt(last_ready_pos); - } - w_tsbpdtime = tsbpdtime; - w_curpktseq = m_pUnit[i]->m_Packet.m_iSeqNo; - last_ready_pos = i; - if (haslost) - w_skipseqno = w_curpktseq; - - if (base_seq != SRT_SEQNO_NONE && CSeqNo::seqcmp(w_curpktseq, base_seq) <= 0) - { - HLOGC(brlog.Debug, - log << "getRcvFirstMsg: found ready packet %" << w_curpktseq - << " but not larger than base_seq, try next"); - continue; - } - - HLOGC(brlog.Debug, - log << "getRcvFirstMsg: found ready packet, nSKIPPED: " - << ((i - m_iLastAckPos + m_iSize) % m_iSize)); - - // NOTE: if haslost is not set, it means that this is the VERY FIRST - // packet, that is, packet currently at pos = m_iLastAckPos. There's no - // possibility that it is so otherwise because: - // - if this first good packet is ready to play, THIS HERE RETURNS NOW. - // ... - return true; - } - - if (!is_zero(w_tsbpdtime)) { - return true; - } - HLOGC(brlog.Debug, - log << "getRcvFirstMsg: found NOT READY packet, nSKIPPED: " - << ((i - m_iLastAckPos + m_iSize) % m_iSize)); - // ... and if this first good packet WASN'T ready to play, THIS HERE RETURNS NOW, TOO, - // just states that there's no ready packet to play. - // ... - return false; - } - // ... and if this first packet WASN'T GOOD, the loop continues, however since now - // the 'haslost' is set, which means that it continues only to find the first valid - // packet after stating that the very first packet isn't valid. - } - if (!is_zero(w_tsbpdtime)) { - return true; - } - HLOGC(brlog.Debug, log << "getRcvFirstMsg: found NO PACKETS"); - return false; -} - -steady_clock::time_point CRcvBuffer::debugGetDeliveryTime(int offset) -{ - int i; - if (offset > 0) - i = shift(m_iStartPos, offset); - else - i = m_iStartPos; - - CUnit* u = m_pUnit[i]; - if (!u || u->m_iFlag != CUnit::GOOD) - return steady_clock::time_point(); - - return getPktTsbPdTime(u->m_Packet.getMsgTimeStamp()); -} - -int32_t CRcvBuffer::getTopMsgno() const -{ - if (m_iStartPos == m_iLastAckPos) - return SRT_MSGNO_NONE; // No message is waiting - - if (!m_pUnit[m_iStartPos]) - return SRT_MSGNO_NONE; // pity - - return m_pUnit[m_iStartPos]->m_Packet.getMsgSeq(); -} - -bool CRcvBuffer::getRcvReadyMsg(steady_clock::time_point& w_tsbpdtime, int32_t& w_curpktseq, int upto, int base_seq) -{ - const bool havelimit = upto != -1; - int end = -1, past_end = -1; - if (havelimit) - { - int stretch = (m_iSize + m_iStartPos - m_iLastAckPos) % m_iSize; - if (upto > stretch) - { - HLOGC(brlog.Debug, log << "position back " << upto << " exceeds stretch " << stretch); - // Do nothing. This position is already gone. - return false; - } - - end = m_iLastAckPos - upto; - if (end < 0) - end += m_iSize; - past_end = shiftFwd(end); // For in-loop comparison - HLOGC(brlog.Debug, log << "getRcvReadyMsg: will read from position " << end); - } - - // NOTE: position m_iLastAckPos in the buffer represents the sequence number of - // CUDT::m_iRcvLastSkipAck. Therefore 'upto' contains a positive value that should - // be decreased from m_iLastAckPos to get the position in the buffer that represents - // the sequence number up to which we'd like to read. - IF_HEAVY_LOGGING(const char* reason = "NOT RECEIVED"); - - for (int i = m_iStartPos, n = m_iLastAckPos; i != n; i = shiftFwd(i)) - { - // In case when we want to read only up to given sequence number, stop - // the loop if this number was reached. This number must be extracted from - // the buffer and any following must wait here for "better times". Note - // that the unit that points to the requested sequence must remain in - // the buffer, unless there is no valid packet at that position, in which - // case it is allowed to point to the NEXT sequence towards it, however - // if it does, this cell must remain in the buffer for prospective recovery. - if (havelimit && i == past_end) - break; - - bool freeunit = false; - - /* Skip any invalid skipped/dropped packets */ - if (m_pUnit[i] == NULL) - { - HLOGC(brlog.Debug, - log << "getRcvReadyMsg: POS=" << i << " +" << ((i - m_iStartPos + m_iSize) % m_iSize) - << " SKIPPED - no unit there"); - m_iStartPos = shiftFwd(m_iStartPos); - continue; - } - - w_curpktseq = m_pUnit[i]->m_Packet.getSeqNo(); - - if (m_pUnit[i]->m_iFlag != CUnit::GOOD) - { - HLOGC(brlog.Debug, - log << "getRcvReadyMsg: POS=" << i << " +" << ((i - m_iStartPos + m_iSize) % m_iSize) - << " SKIPPED - unit not good"); - freeunit = true; - } - else - { - // This does: - // 1. Get the TSBPD time of the unit. Stop and return false if this unit - // is not yet ready to play. - // 2. If it's ready to play, check also if it's decrypted. If not, skip it. - // 3. Check also if it's larger than base_seq, if not, skip it. - // 4. If it's ready to play, decrypted and larger than base, stop and return it. - if (!havelimit) - { - w_tsbpdtime = getPktTsbPdTime(m_pUnit[i]->m_Packet.getMsgTimeStamp()); - const steady_clock::duration towait = (w_tsbpdtime - steady_clock::now()); - if (towait.count() > 0) - { - HLOGC(brlog.Debug, - log << "getRcvReadyMsg: POS=" << i << " +" << ((i - m_iStartPos + m_iSize) % m_iSize) - << " pkt %" << w_curpktseq << " NOT ready to play (only in " << count_milliseconds(towait) - << "ms)"); - return false; - } - - if (m_pUnit[i]->m_Packet.getMsgCryptoFlags() != EK_NOENC) - { - IF_HEAVY_LOGGING(reason = "DECRYPTION FAILED"); - freeunit = true; /* packet not decrypted */ - } - else if (base_seq != SRT_SEQNO_NONE && CSeqNo::seqcmp(w_curpktseq, base_seq) <= 0) - { - IF_HEAVY_LOGGING(reason = "smaller than base_seq"); - w_tsbpdtime = steady_clock::time_point(); - freeunit = true; - } - else - { - HLOGC(brlog.Debug, - log << "getRcvReadyMsg: POS=" << i << " +" << ((i - m_iStartPos + m_iSize) % m_iSize) - << " pkt %" << w_curpktseq << " ready to play (delayed " << count_milliseconds(towait) - << "ms)"); - return true; - } - } - // In this case: - // 1. We don't even look into the packet if this is not the requested sequence. - // All packets that are earlier than the required sequence will be dropped. - // 2. When found the packet with expected sequence number, and the condition for - // good unit is passed, we get the timestamp. - // 3. If the packet is not decrypted, we allow it to be removed - // 4. If we reached the required sequence, and the packet is good, KEEP IT in the buffer, - // and return with the pointer pointing to this very buffer. Only then return true. - else - { - // We have a limit up to which the reading will be done, - // no matter if the time has come or not - although retrieve it. - if (i == end) - { - HLOGC(brlog.Debug, log << "CAUGHT required seq position " << i); - // We have the packet we need. Extract its data. - w_tsbpdtime = getPktTsbPdTime(m_pUnit[i]->m_Packet.getMsgTimeStamp()); - - // If we have a decryption failure, allow the unit to be released. - if (m_pUnit[i]->m_Packet.getMsgCryptoFlags() != EK_NOENC) - { - IF_HEAVY_LOGGING(reason = "DECRYPTION FAILED"); - freeunit = true; /* packet not decrypted */ - } - else - { - // Stop here and keep the packet in the buffer, so it will be - // next extracted. - HLOGC(brlog.Debug, - log << "getRcvReadyMsg: packet seq=" << w_curpktseq << " ready for extraction"); - return true; - } - } - else - { - HLOGC(brlog.Debug, log << "SKIPPING position " << i); - // Continue the loop and remove the current packet because - // its sequence number is too old. - freeunit = true; - } - } - } - - if (freeunit) - { - HLOGC(brlog.Debug, log << "getRcvReadyMsg: POS=" << i << " FREED: " << reason); - /* removed skipped, dropped, undecryptable bytes from rcv buffer */ - const int rmbytes = (int)m_pUnit[i]->m_Packet.getLength(); - countBytes(-1, -rmbytes, true); - - freeUnitAt(i); - m_iStartPos = shiftFwd(m_iStartPos); - } - } - - HLOGC(brlog.Debug, log << "getRcvReadyMsg: nothing to deliver: " << reason); - return false; -} - -/* - * Return receivable data status (packet timestamp_us ready to play if TsbPd mode) - * Return playtime (tsbpdtime) of 1st packet in queue, ready to play or not - * - * Return data ready to be received (packet timestamp_us ready to play if TsbPd mode) - * Using getRcvDataSize() to know if there is something to read as it was widely - * used in the code (core.cpp) is expensive in TsbPD mode, hence this simpler function - * that only check if first packet in queue is ready. - */ -bool CRcvBuffer::isRcvDataReady(steady_clock::time_point& w_tsbpdtime, int32_t& w_curpktseq, int32_t seqdistance) -{ - w_tsbpdtime = steady_clock::time_point(); - - if (m_tsbpd.isEnabled()) - { - const CPacket* pkt = getRcvReadyPacket(seqdistance); - if (!pkt) - { - HLOGC(brlog.Debug, log << "isRcvDataReady: packet NOT extracted."); - return false; - } - - /* - * Acknowledged data is available, - * Only say ready if time to deliver. - * Report the timestamp, ready or not. - */ - w_curpktseq = pkt->getSeqNo(); - w_tsbpdtime = getPktTsbPdTime(pkt->getMsgTimeStamp()); - - // If seqdistance was passed, then return true no matter what the - // TSBPD time states. - if (seqdistance != -1 || w_tsbpdtime <= steady_clock::now()) - { - HLOGC(brlog.Debug, - log << "isRcvDataReady: packet extracted seqdistance=" << seqdistance - << " TsbPdTime=" << FormatTime(w_tsbpdtime)); - return true; - } - - HLOGC(brlog.Debug, log << "isRcvDataReady: packet extracted, but NOT READY"); - return false; - } - - return isRcvDataAvailable(); -} - -// XXX This function may be called only after checking -// if m_bTsbPdMode. -CPacket* CRcvBuffer::getRcvReadyPacket(int32_t seqdistance) -{ - // If asked for readiness of a packet at given sequence distance - // (that is, we need to extract the packet with given sequence number), - // only check if this cell is occupied in the buffer, and if so, - // if it's occupied with a "good" unit. That's all. It doesn't - // matter whether it's ready to play. - if (seqdistance != -1) - { - // Note: seqdistance is the value to to go BACKWARDS from m_iLastAckPos, - // which is the position that is in sync with CUDT::m_iRcvLastSkipAck. This - // position is the sequence number of a packet that is NOT received, but it's - // expected to be received as next. So the minimum value of seqdistance is 1. - - // SANITY CHECK - if (seqdistance == 0) - { - LOGC(brlog.Fatal, log << "IPE: trying to extract packet past the last ACK-ed!"); - return 0; - } - - if (seqdistance > getRcvDataSize()) - { - HLOGC(brlog.Debug, - log << "getRcvReadyPacket: Sequence offset=" << seqdistance - << " is in the past (start=" << m_iStartPos << " end=" << m_iLastAckPos << ")"); - return 0; - } - - int i = shift(m_iLastAckPos, -seqdistance); - if (m_pUnit[i] && m_pUnit[i]->m_iFlag == CUnit::GOOD) - { - HLOGC(brlog.Debug, log << "getRcvReadyPacket: FOUND PACKET %" << m_pUnit[i]->m_Packet.getSeqNo()); - return &m_pUnit[i]->m_Packet; - } - - HLOGC(brlog.Debug, log << "getRcvReadyPacket: Sequence offset=" << seqdistance << " IS NOT RECEIVED."); - return 0; - } - - IF_HEAVY_LOGGING(int nskipped = 0); - for (int i = m_iStartPos, n = m_iLastAckPos; i != n; i = shiftFwd(i)) - { - /* - * Skip missing packets that did not arrive in time. - */ - if (m_pUnit[i] && m_pUnit[i]->m_iFlag == CUnit::GOOD) - { - HLOGC(brlog.Debug, - log << "getRcvReadyPacket: Found next packet seq=%" << m_pUnit[i]->m_Packet.getSeqNo() << " (" - << nskipped << " empty cells skipped)"); - return &m_pUnit[i]->m_Packet; - } - IF_HEAVY_LOGGING(++nskipped); - } - - return 0; -} - -#if ENABLE_HEAVY_LOGGING -// This function is for debug purposes only and it's called only -// from within HLOG* macros. -void CRcvBuffer::reportBufferStats() const -{ - int nmissing = 0; - int32_t low_seq = SRT_SEQNO_NONE, high_seq = SRT_SEQNO_NONE; - int32_t low_ts = 0, high_ts = 0; - - for (int i = m_iStartPos, n = m_iLastAckPos; i != n; i = (i + 1) % m_iSize) - { - if (m_pUnit[i] && m_pUnit[i]->m_iFlag == CUnit::GOOD) - { - low_seq = m_pUnit[i]->m_Packet.m_iSeqNo; - low_ts = m_pUnit[i]->m_Packet.m_iTimeStamp; - break; - } - ++nmissing; - } - - // Not sure if a packet MUST BE at the last ack pos position, so check, just in case. - int n = m_iLastAckPos; - if (m_pUnit[n] && m_pUnit[n]->m_iFlag == CUnit::GOOD) - { - high_ts = m_pUnit[n]->m_Packet.m_iTimeStamp; - high_seq = m_pUnit[n]->m_Packet.m_iSeqNo; - } - else - { - // Possibilities are: - // m_iStartPos == m_iLastAckPos, high_ts == low_ts, defined. - // No packet: low_ts == 0, so high_ts == 0, too. - high_ts = low_ts; - } - // The 32-bit timestamps are relative and roll over oftten; what - // we really need is the timestamp difference. The only place where - // we can ask for the time base is the upper time because when trying - // to receive the time base for the lower time we'd break the requirement - // for monotonic clock. - - uint64_t upper_time = high_ts; - uint64_t lower_time = low_ts; - - if (lower_time > upper_time) - upper_time += uint64_t(CPacket::MAX_TIMESTAMP) + 1; - - int32_t timespan = upper_time - lower_time; - int seqspan = 0; - if (low_seq != SRT_SEQNO_NONE && high_seq != SRT_SEQNO_NONE) - { - seqspan = CSeqNo::seqoff(low_seq, high_seq); - } - - LOGC(brlog.Debug, - log << "RCV BUF STATS: seqspan=%(" << low_seq << "-" << high_seq << ":" << seqspan << ") missing=" << nmissing - << "pkts"); - LOGC(brlog.Debug, - log << "RCV BUF STATS: timespan=" << timespan << "us (lo=" << lower_time << " hi=" << upper_time << ")"); -} - -#endif // ENABLE_HEAVY_LOGGING - -bool CRcvBuffer::isRcvDataReady() -{ - steady_clock::time_point tsbpdtime; - int32_t seq; - - return isRcvDataReady((tsbpdtime), (seq), -1); -} - -int CRcvBuffer::getAvailBufSize() const -{ - // One slot must be empty in order to tell the difference between "empty buffer" and "full buffer" - return m_iSize - getRcvDataSize() - 1; -} - -int CRcvBuffer::getRcvDataSize() const -{ - if (m_iLastAckPos >= m_iStartPos) - return m_iLastAckPos - m_iStartPos; - - return m_iSize + m_iLastAckPos - m_iStartPos; -} - -int CRcvBuffer::debugGetSize() const -{ - // Does exactly the same as getRcvDataSize, but - // it should be used FOR INFORMATIONAL PURPOSES ONLY. - // The source values might be changed in another thread - // during the calculation, although worst case the - // resulting value may differ to the real buffer size by 1. - int from = m_iStartPos, to = m_iLastAckPos; - int size = to - from; - if (size < 0) - size += m_iSize; - - return size; -} - -/* Return moving average of acked data pkts, bytes, and timespan (ms) of the receive buffer */ -int CRcvBuffer::getRcvAvgDataSize(int& bytes, int& timespan) -{ - // Average number of packets and timespan could be small, - // so rounding is beneficial, while for the number of - // bytes in the buffer is a higher value, so rounding can be omitted, - // but probably better to round all three values. - timespan = round_val(m_mavg.timespan_ms()); - bytes = round_val(m_mavg.bytes()); - return round_val(m_mavg.pkts()); -} - -/* Update moving average of acked data pkts, bytes, and timespan (ms) of the receive buffer */ -void CRcvBuffer::updRcvAvgDataSize(const steady_clock::time_point& now) -{ - if (!m_mavg.isTimeToUpdate(now)) - return; - - int bytes = 0; - int timespan_ms = 0; - const int pkts = getRcvDataSize(bytes, timespan_ms); - m_mavg.update(now, pkts, bytes, timespan_ms); -} - -/* Return acked data pkts, bytes, and timespan (ms) of the receive buffer */ -int CRcvBuffer::getRcvDataSize(int& bytes, int& timespan) -{ - timespan = 0; - if (m_tsbpd.isEnabled()) - { - // Get a valid startpos. - // Skip invalid entries in the beginning, if any. - int startpos = m_iStartPos; - for (; startpos != m_iLastAckPos; startpos = shiftFwd(startpos)) - { - if ((NULL != m_pUnit[startpos]) && (CUnit::GOOD == m_pUnit[startpos]->m_iFlag)) - break; - } - - int endpos = m_iLastAckPos; - - if (m_iLastAckPos != startpos) - { - /* - * |<--- DataSpan ---->|<- m_iMaxPos ->| - * +---+---+---+---+---+---+---+---+---+---+---+--- - * | | 1 | 1 | 1 | 0 | 0 | 1 | 1 | 0 | 1 | | m_pUnits[] - * +---+---+---+---+---+---+---+---+---+---+---+--- - * | | - * \_ m_iStartPos \_ m_iLastAckPos - * - * m_pUnits[startpos] shall be valid (->m_iFlag==CUnit::GOOD). - * If m_pUnits[m_iLastAckPos-1] is not valid (NULL or ->m_iFlag!=CUnit::GOOD), - * it means m_pUnits[m_iLastAckPos] is valid since a valid unit is needed to skip. - * Favor m_pUnits[m_iLastAckPos] if valid over [m_iLastAckPos-1] to include the whole acked interval. - */ - if ((m_iMaxPos <= 0) || (!m_pUnit[m_iLastAckPos]) || (m_pUnit[m_iLastAckPos]->m_iFlag != CUnit::GOOD)) - { - endpos = (m_iLastAckPos == 0 ? m_iSize - 1 : m_iLastAckPos - 1); - } - - if ((NULL != m_pUnit[endpos]) && (NULL != m_pUnit[startpos])) - { - const steady_clock::time_point startstamp = - getPktTsbPdTime(m_pUnit[startpos]->m_Packet.getMsgTimeStamp()); - const steady_clock::time_point endstamp = getPktTsbPdTime(m_pUnit[endpos]->m_Packet.getMsgTimeStamp()); - /* - * There are sampling conditions where spantime is < 0 (big unsigned value). - * It has been observed after changing the SRT latency from 450 to 200 on the sender. - * - * Possible packet order corruption when dropping packet, - * cause by bad thread protection when adding packet in queue - * was later discovered and fixed. Security below kept. - * - * DateTime RecvRate LostRate DropRate AvailBw RTT RecvBufs PdDelay - * 2014-12-08T15:04:25-0500 4712 110 0 96509 33.710 393 450 - * 2014-12-08T15:04:35-0500 4512 95 0 107771 33.493 1496542976 200 - * 2014-12-08T15:04:40-0500 4213 106 3 107352 53.657 9499425 200 - * 2014-12-08T15:04:45-0500 4575 104 0 102194 53.614 59666 200 - * 2014-12-08T15:04:50-0500 4475 124 0 100543 53.526 505 200 - */ - if (endstamp > startstamp) - timespan = count_milliseconds(endstamp - startstamp); - } - /* - * Timespan can be less then 1000 us (1 ms) if few packets. - * Also, if there is only one pkt in buffer, the time difference will be 0. - * Therefore, always add 1 ms if not empty. - */ - if (0 < m_iAckedPktsCount) - timespan += 1; - } - } - HLOGF(brlog.Debug, "getRcvDataSize: %6d %6d %6d ms\n", m_iAckedPktsCount, m_iAckedBytesCount, timespan); - bytes = m_iAckedBytesCount; - return m_iAckedPktsCount; -} - -unsigned CRcvBuffer::getRcvAvgPayloadSize() const -{ - return m_uAvgPayloadSz; -} - -CRcvBuffer::ReadingState CRcvBuffer::debugGetReadingState() const -{ - ReadingState readstate; - - readstate.iNumAcknowledged = 0; - readstate.iNumUnacknowledged = m_iMaxPos; - - if ((NULL != m_pUnit[m_iStartPos]) && (m_pUnit[m_iStartPos]->m_iFlag == CUnit::GOOD)) - { - if (m_tsbpd.isEnabled()) - readstate.tsStart = m_tsbpd.getPktTsbPdTime(m_pUnit[m_iStartPos]->m_Packet.getMsgTimeStamp()); - - readstate.iNumAcknowledged = m_iLastAckPos > m_iStartPos - ? m_iLastAckPos - m_iStartPos - : m_iLastAckPos + (m_iSize - m_iStartPos); - } - - // All further stats are valid if TSBPD is enabled. - if (!m_tsbpd.isEnabled()) - return readstate; - - // m_iLastAckPos points to the first unacknowledged packet - const int iLastAckPos = (m_iLastAckPos - 1) % m_iSize; - if (m_iLastAckPos != m_iStartPos && (NULL != m_pUnit[iLastAckPos]) && (m_pUnit[iLastAckPos]->m_iFlag == CUnit::GOOD)) - { - readstate.tsLastAck = m_tsbpd.getPktTsbPdTime(m_pUnit[iLastAckPos]->m_Packet.getMsgTimeStamp()); - } - - const int iEndPos = (m_iLastAckPos + m_iMaxPos - 1) % m_iSize; - if (m_iMaxPos == 0) - { - readstate.tsEnd = readstate.tsLastAck; - } - else if ((NULL != m_pUnit[iEndPos]) && (m_pUnit[iEndPos]->m_iFlag == CUnit::GOOD)) - { - readstate.tsEnd = m_tsbpd.getPktTsbPdTime(m_pUnit[iEndPos]->m_Packet.getMsgTimeStamp()); - } - - return readstate; -} - -string CRcvBuffer::strFullnessState(const time_point& tsNow) const -{ - const ReadingState bufstate = debugGetReadingState(); - stringstream ss; - - ss << "Space avail " << getAvailBufSize() << "/" << m_iSize; - ss << " pkts. Packets ACKed: " << bufstate.iNumAcknowledged; - if (!is_zero(bufstate.tsStart) && !is_zero(bufstate.tsLastAck)) - { - ss << " (TSBPD ready in "; - ss << count_milliseconds(bufstate.tsStart - tsNow); - ss << " : "; - ss << count_milliseconds(bufstate.tsLastAck - tsNow); - ss << " ms)"; - } - - ss << ", not ACKed: " << bufstate.iNumUnacknowledged; - if (!is_zero(bufstate.tsStart) && !is_zero(bufstate.tsEnd)) - { - ss << ", timespan "; - ss << count_milliseconds(bufstate.tsEnd - bufstate.tsStart); - ss << " ms"; - } - - ss << ". " SRT_SYNC_CLOCK_STR " drift " << getDrift() / 1000 << " ms."; - return ss.str(); -} - -void CRcvBuffer::dropMsg(int32_t msgno, bool using_rexmit_flag) -{ - for (int i = m_iStartPos, n = shift(m_iLastAckPos, m_iMaxPos); i != n; i = shiftFwd(i)) - if ((m_pUnit[i] != NULL) && (m_pUnit[i]->m_Packet.getMsgSeq(using_rexmit_flag) == msgno)) - m_pUnit[i]->m_iFlag = CUnit::DROPPED; -} - -void CRcvBuffer::applyGroupTime(const steady_clock::time_point& timebase, - bool wrp, - uint32_t delay, - const steady_clock::duration& udrift) -{ - m_tsbpd.applyGroupTime(timebase, wrp, delay, udrift); -} - -void CRcvBuffer::applyGroupDrift(const steady_clock::time_point& timebase, - bool wrp, - const steady_clock::duration& udrift) -{ - m_tsbpd.applyGroupDrift(timebase, wrp, udrift); -} - -void CRcvBuffer::getInternalTimeBase(steady_clock::time_point& w_timebase, bool& w_wrp, steady_clock::duration& w_udrift) -{ - return m_tsbpd.getInternalTimeBase(w_timebase, w_wrp, w_udrift); -} - -steady_clock::time_point CRcvBuffer::getPktTsbPdTime(uint32_t usPktTimestamp) -{ - // Updating TSBPD time here is not very accurate and prevents from making the function constant. - // For now preserving the existing behavior. - m_tsbpd.updateTsbPdTimeBase(usPktTimestamp); - return m_tsbpd.getPktTsbPdTime(usPktTimestamp); -} - -void CRcvBuffer::setRcvTsbPdMode(const steady_clock::time_point& timebase, const steady_clock::duration& delay) -{ - const bool no_wrap_check = false; - m_tsbpd.setTsbPdMode(timebase, no_wrap_check, delay); -} - -bool CRcvBuffer::addRcvTsbPdDriftSample(uint32_t timestamp_us, const time_point& tsPktArrival, int rtt) -{ - return m_tsbpd.addDriftSample(timestamp_us, tsPktArrival, rtt); -} - -int CRcvBuffer::readMsg(char* data, int len) -{ - SRT_MSGCTRL dummy = srt_msgctrl_default; - return readMsg(data, len, (dummy), -1); -} - -// NOTE: The order of ref-arguments is odd because: -// - data and len shall be close to one another -// - upto is last because it's a kind of unusual argument that has a default value -int CRcvBuffer::readMsg(char* data, int len, SRT_MSGCTRL& w_msgctl, int upto) -{ - int p = -1, q = -1; - bool passack; - - bool empty = accessMsg((p), (q), (passack), (w_msgctl.srctime), upto); - if (empty) - return 0; - - // This should happen just once. By 'empty' condition - // we have a guarantee that m_pUnit[p] exists and is valid. - CPacket& pkt1 = m_pUnit[p]->m_Packet; - - // This returns the sequence number and message number to - // the API caller. - w_msgctl.pktseq = pkt1.getSeqNo(); - w_msgctl.msgno = pkt1.getMsgSeq(); - - return extractData((data), len, p, q, passack); -} - -#ifdef SRT_DEBUG_TSBPD_OUTJITTER -void CRcvBuffer::debugTraceJitter(time_point playtime) -{ - uint64_t ms = count_microseconds(steady_clock::now() - playtime); - if (ms / 10 < 10) - m_ulPdHisto[0][ms / 10]++; - else if (ms / 100 < 10) - m_ulPdHisto[1][ms / 100]++; - else if (ms / 1000 < 10) - m_ulPdHisto[2][ms / 1000]++; - else - m_ulPdHisto[3][1]++; -} -#endif /* SRT_DEBUG_TSBPD_OUTJITTER */ - -bool CRcvBuffer::accessMsg(int& w_p, int& w_q, bool& w_passack, int64_t& w_playtime, int upto) -{ - // This function should do the following: - // 1. Find the first packet starting the next message (or just next packet) - // 2. When found something ready for extraction, return true. - // 3. w_p and w_q point the index range for extraction - // 4. passack decides if this range shall be removed after extraction - - bool empty = true; - - if (m_tsbpd.isEnabled()) - { - w_passack = false; - int seq = 0; - - steady_clock::time_point play_time; - const bool isReady = getRcvReadyMsg(play_time, (seq), upto); - w_playtime = count_microseconds(play_time.time_since_epoch()); - - if (isReady) - { - empty = false; - // In TSBPD mode you always read one message - // at a time and a message always fits in one UDP packet, - // so in one "unit". - w_p = w_q = m_iStartPos; - - debugTraceJitter(play_time); - } - } - else - { - w_playtime = 0; - if (scanMsg((w_p), (w_q), (w_passack))) - empty = false; - } - - return empty; -} - -int CRcvBuffer::extractData(char* data, int len, int p, int q, bool passack) -{ - SRT_ASSERT(len > 0); - int rs = len > 0 ? len : 0; - const int past_q = shiftFwd(q); - while (p != past_q) - { - const int pktlen = (int)m_pUnit[p]->m_Packet.getLength(); - // When unitsize is less than pktlen, only a fragment is copied to the output 'data', - // but still the whole packet is removed from the receiver buffer. - if (pktlen > 0) - countBytes(-1, -pktlen, true); - - const int unitsize = ((rs >= 0) && (pktlen > rs)) ? rs : pktlen; - - HLOGC(brlog.Debug, log << "readMsg: checking unit POS=" << p); - - if (unitsize > 0) - { - memcpy((data), m_pUnit[p]->m_Packet.m_pcData, unitsize); - data += unitsize; - rs -= unitsize; - IF_HEAVY_LOGGING(readMsgHeavyLogging(p)); - } - else - { - HLOGC(brlog.Debug, log << CONID() << "readMsg: SKIPPED POS=" << p << " - ZERO SIZE UNIT"); - } - - // Note special case for live mode (one packet per message and TSBPD=on): - // - p == q (that is, this loop passes only once) - // - no passack (the unit is always removed from the buffer) - if (!passack) - { - HLOGC(brlog.Debug, log << CONID() << "readMsg: FREEING UNIT POS=" << p); - freeUnitAt(p); - } - else - { - HLOGC(brlog.Debug, log << CONID() << "readMsg: PASSACK UNIT POS=" << p); - m_pUnit[p]->m_iFlag = CUnit::PASSACK; - } - - p = shiftFwd(p); - } - - if (!passack) - m_iStartPos = past_q; - - HLOGC(brlog.Debug, - log << "rcvBuf/extractData: begin=" << m_iStartPos << " reporting extraction size=" << (len - rs)); - - return len - rs; -} - -string CRcvBuffer::debugTimeState(size_t first_n_pkts) const -{ - stringstream ss; - int ipos = m_iStartPos; - for (size_t i = 0; i < first_n_pkts; ++i, ipos = CSeqNo::incseq(ipos)) - { - const CUnit* unit = m_pUnit[ipos]; - if (!unit) - { - ss << "pkt[" << i << "] missing, "; - continue; - } - - const CPacket& pkt = unit->m_Packet; - ss << "pkt[" << i << "] ts=" << pkt.getMsgTimeStamp() << ", "; - } - return ss.str(); -} - -#if ENABLE_HEAVY_LOGGING -void CRcvBuffer::readMsgHeavyLogging(int p) -{ - static steady_clock::time_point prev_now; - static steady_clock::time_point prev_srctime; - const CPacket& pkt = m_pUnit[p]->m_Packet; - - const int32_t seq = pkt.m_iSeqNo; - - steady_clock::time_point nowtime = steady_clock::now(); - steady_clock::time_point srctime = getPktTsbPdTime(m_pUnit[p]->m_Packet.getMsgTimeStamp()); - - const int64_t timediff_ms = count_milliseconds(nowtime - srctime); - const int64_t nowdiff_ms = is_zero(prev_now) ? count_milliseconds(nowtime - prev_now) : 0; - const int64_t srctimediff_ms = is_zero(prev_srctime) ? count_milliseconds(srctime - prev_srctime) : 0; - - const int next_p = shiftFwd(p); - CUnit* u = m_pUnit[next_p]; - string next_playtime; - if (u && u->m_iFlag == CUnit::GOOD) - { - next_playtime = FormatTime(getPktTsbPdTime(u->m_Packet.getMsgTimeStamp())); - } - else - { - next_playtime = "NONE"; - } - - LOGC(brlog.Debug, - log << CONID() << "readMsg: DELIVERED seq=" << seq << " T=" << FormatTime(srctime) << " in " << timediff_ms - << "ms - TIME-PREVIOUS: PKT: " << srctimediff_ms << " LOCAL: " << nowdiff_ms << " !" - << BufferStamp(pkt.data(), pkt.size()) << " NEXT pkt T=" << next_playtime); - - prev_now = nowtime; - prev_srctime = srctime; -} -#endif - -bool CRcvBuffer::scanMsg(int& w_p, int& w_q, bool& w_passack) -{ - // empty buffer - if ((m_iStartPos == m_iLastAckPos) && (m_iMaxPos <= 0)) - { - HLOGC(brlog.Debug, log << "scanMsg: empty buffer"); - return false; - } - - int rmpkts = 0; - int rmbytes = 0; - // skip all bad msgs at the beginning - // This loop rolls until the "buffer is empty" (head == tail), - // in particular, there's no unit accessible for the reader. - while (m_iStartPos != m_iLastAckPos) - { - // Roll up to the first valid unit - if (!m_pUnit[m_iStartPos]) - { - if (++m_iStartPos == m_iSize) - m_iStartPos = 0; - continue; - } - - // Note: PB_FIRST | PB_LAST == PB_SOLO. - // testing if boundary() & PB_FIRST tests if the msg is first OR solo. - if (m_pUnit[m_iStartPos]->m_iFlag == CUnit::GOOD && m_pUnit[m_iStartPos]->m_Packet.getMsgBoundary() & PB_FIRST) - { - bool good = true; - - // look ahead for the whole message - - // We expect to see either of: - // [PB_FIRST] [PB_SUBSEQUENT] [PB_SUBSEQUENT] [PB_LAST] - // [PB_SOLO] - // but not: - // [PB_FIRST] NULL ... - // [PB_FIRST] FREE/PASSACK/DROPPED... - // If the message didn't look as expected, interrupt this. - - // This begins with a message starting at m_iStartPos - // up to m_iLastAckPos OR until the PB_LAST message is found. - // If any of the units on this way isn't good, this OUTER loop - // will be interrupted. - for (int i = m_iStartPos; i != m_iLastAckPos;) - { - if (!m_pUnit[i] || m_pUnit[i]->m_iFlag != CUnit::GOOD) - { - good = false; - break; - } - - // Likewise, boundary() & PB_LAST will be satisfied for last OR solo. - if (m_pUnit[i]->m_Packet.getMsgBoundary() & PB_LAST) - break; - - if (++i == m_iSize) - i = 0; - } - - if (good) - break; - } - - rmpkts++; - rmbytes += (int) freeUnitAt((size_t) m_iStartPos); - - m_iStartPos = shiftFwd(m_iStartPos); - } - /* we removed bytes form receive buffer */ - countBytes(-rmpkts, -rmbytes, true); - - // Not sure if this is correct, but this above 'while' loop exits - // under the following conditions only: - // - m_iStartPos == m_iLastAckPos (that makes passack = true) - // - found at least GOOD unit with PB_FIRST and not all messages up to PB_LAST are good, - // in which case it returns with m_iStartPos <% m_iLastAckPos (earlier) - // Also all units that lied before m_iStartPos are removed. - - w_p = -1; // message head - w_q = m_iStartPos; // message tail - w_passack = m_iStartPos == m_iLastAckPos; - bool found = false; - - // looking for the first message - //>>m_pUnit[size + m_iMaxPos] is not valid - - // XXX Would be nice to make some very thorough refactoring here. - - // This rolls by q variable from m_iStartPos up to m_iLastAckPos, - // actually from the first message up to the one with PB_LAST - // or PB_SOLO boundary. - - // The 'i' variable used in this loop is just a stub and it's - // even hard to define the unit here. It is "shift towards - // m_iStartPos", so the upper value is m_iMaxPos + size. - // m_iMaxPos is itself relative to m_iLastAckPos, so - // the upper value is m_iMaxPos + difference between - // m_iLastAckPos and m_iStartPos, so that this value is relative - // to m_iStartPos. - // - // The 'i' value isn't used anywhere, although the 'q' value rolls - // in this loop in sync with 'i', with the difference that 'q' is - // wrapped around, and 'i' is just incremented normally. - // - // This makes that this loop rolls in the range by 'q' from - // m_iStartPos to m_iStartPos + UPPER, - // where UPPER = m_iLastAckPos -% m_iStartPos + m_iMaxPos - // This embraces the range from the current reading head up to - // the last packet ever received. - // - // 'passack' is set to true when the 'q' has passed through - // the border of m_iLastAckPos and fallen into the range - // of unacknowledged packets. - - for (int i = 0, n = m_iMaxPos + getRcvDataSize(); i < n; ++i) - { - if (m_pUnit[w_q] && m_pUnit[w_q]->m_iFlag == CUnit::GOOD) - { - // Equivalent pseudocode: - // PacketBoundary bound = m_pUnit[w_q]->m_Packet.getMsgBoundary(); - // if ( IsSet(bound, PB_FIRST) ) - // w_p = w_q; - // if ( IsSet(bound, PB_LAST) && w_p != -1 ) - // found = true; - // - // Not implemented this way because it uselessly check w_p for -1 - // also after setting it explicitly. - - switch (m_pUnit[w_q]->m_Packet.getMsgBoundary()) - { - case PB_SOLO: // 11 - w_p = w_q; - found = true; - break; - - case PB_FIRST: // 10 - w_p = w_q; - break; - - case PB_LAST: // 01 - if (w_p != -1) - found = true; - break; - - case PB_SUBSEQUENT:; // do nothing (caught first, rolling for last) - } - } - else - { - // a hole in this message, not valid, restart search - w_p = -1; - } - - // 'found' is set when the current iteration hit a message with PB_LAST - // (including PB_SOLO since the very first message). - if (found) - { - // the msg has to be ack'ed or it is allowed to read out of order, and was not read before - if (!w_passack || !m_pUnit[w_q]->m_Packet.getMsgOrderFlag()) - { - HLOGC(brlog.Debug, log << "scanMsg: found next-to-broken message, delivering OUT OF ORDER."); - break; - } - - found = false; - } - - if (++w_q == m_iSize) - w_q = 0; - - if (w_q == m_iLastAckPos) - w_passack = true; - } - - // no msg found - if (!found) - { - // NOTE: - // This situation may only happen if: - // - Found a packet with PB_FIRST, so w_p = w_q at the moment when it was found - // - Possibly found following components of that message up to shifted w_q - // - Found no terminal packet (PB_LAST) for that message. - - // if the message is larger than the receiver buffer, return part of the message - if ((w_p != -1) && (shiftFwd(w_q) == w_p)) - { - HLOGC(brlog.Debug, log << "scanMsg: BUFFER FULL and message is INCOMPLETE. Returning PARTIAL MESSAGE."); - found = true; - } - else - { - HLOGC(brlog.Debug, log << "scanMsg: PARTIAL or NO MESSAGE found: p=" << w_p << " q=" << w_q); - } - } - else - { - HLOGC(brlog.Debug, - log << "scanMsg: extracted message p=" << w_p << " q=" << w_q << " (" - << ((w_q - w_p + m_iSize + 1) % m_iSize) << " packets)"); - } - - return found; -} - -#endif // !ENABLE_NEW_RCVBUFFER - } // namespace srt diff --git a/srtcore/buffer.h b/srtcore/buffer.h index 48bcb4311..ac4e85b3d 100644 --- a/srtcore/buffer.h +++ b/srtcore/buffer.h @@ -309,303 +309,6 @@ class CSndBuffer CSndBuffer& operator=(const CSndBuffer&); }; -//////////////////////////////////////////////////////////////////////////////// - -#if (!ENABLE_NEW_RCVBUFFER) - -class CRcvBuffer -{ - typedef sync::steady_clock::time_point time_point; - typedef sync::steady_clock::duration duration; - -public: - // XXX There's currently no way to access the socket ID set for - // whatever the queue is currently working for. Required to find - // some way to do this, possibly by having a "reverse pointer". - // Currently just "unimplemented". - std::string CONID() const { return ""; } - - static const int DEFAULT_SIZE = 65536; - /// Construct the buffer. - /// @param [in] queue CUnitQueue that actually holds the units (packets) - /// @param [in] bufsize_pkts in units (packets) - CRcvBuffer(CUnitQueue* queue, int bufsize_pkts = DEFAULT_SIZE); - ~CRcvBuffer(); - -public: - /// Write data into the buffer. - /// @param [in] unit pointer to a data unit containing new packet - /// @param [in] offset offset from last ACK point. - /// @return 0 is success, -1 if data is repeated. - int addData(CUnit* unit, int offset); - - /// Read data into a user buffer. - /// @param [in] data pointer to user buffer. - /// @param [in] len length of user buffer. - /// @return size of data read. - int readBuffer(char* data, int len); - - /// Read data directly into file. - /// @param [in] file C++ file stream. - /// @param [in] len expected length of data to write into the file. - /// @return size of data read. - int readBufferToFile(std::fstream& ofs, int len); - - /// Update the ACK point of the buffer. - /// @param [in] len number of units to be acknowledged. - /// @return 1 if a user buffer is fulfilled, otherwise 0. - int ackData(int len); - - /// Query how many buffer space left for data receiving. - /// Actually only acknowledged packets, that are still in the buffer, - /// are considered to take buffer space. - /// - /// @return size of available buffer space (including user buffer) for data receiving. - /// Not counting unacknowledged packets. - int getAvailBufSize() const; - - /// Query how many data has been continuously received (for reading) and ready to play (tsbpdtime < now). - /// @return size of valid (continous) data for reading. - int getRcvDataSize() const; - - /// Query how many data was received and acknowledged. - /// @param [out] bytes bytes - /// @param [out] spantime spantime - /// @return size in pkts of acked data. - int getRcvDataSize(int& bytes, int& spantime); - - /// Query a 1 sec moving average of how many data was received and acknowledged. - /// @param [out] bytes bytes - /// @param [out] spantime spantime - /// @return size in pkts of acked data. - int getRcvAvgDataSize(int& bytes, int& spantime); - - /// Query how many data of the receive buffer is acknowledged. - /// @param [in] now current time in us. - /// @return none. - void updRcvAvgDataSize(const time_point& now); - - /// Query the received average payload size. - /// @return size (bytes) of payload size - unsigned getRcvAvgPayloadSize() const; - - struct ReadingState - { - time_point tsStart; - time_point tsLastAck; - time_point tsEnd; - int iNumAcknowledged; - int iNumUnacknowledged; - }; - - ReadingState debugGetReadingState() const; - - /// Form a string of the current buffer fullness state. - /// number of packets acknowledged, TSBPD readiness, etc. - std::string strFullnessState(const time_point& tsNow) const; - - /// Mark the message to be dropped from the message list. - /// @param [in] msgno message number. - /// @param [in] using_rexmit_flag whether the MSGNO field uses rexmit flag (if not, one more bit is part of the - /// msgno value) - void dropMsg(int32_t msgno, bool using_rexmit_flag); - - /// read a message. - /// @param [out] data buffer to write the message into. - /// @param [in] len size of the buffer. - /// @return actuall size of data read. - int readMsg(char* data, int len); - -#if ENABLE_HEAVY_LOGGING - void readMsgHeavyLogging(int p); -#endif - - /// read a message. - /// @param [out] data buffer to write the message into. - /// @param [in] len size of the buffer. - /// @param [out] tsbpdtime localtime-based (uSec) packet time stamp including buffering delay - /// @return actuall size of data read. - int readMsg(char* data, int len, SRT_MSGCTRL& w_mctrl, int upto); - - /// Query if data is ready to read (tsbpdtime <= now if TsbPD is active). - /// @param [out] tsbpdtime localtime-based (uSec) packet time stamp including buffering delay - /// of next packet in recv buffer, ready or not. - /// @param [out] curpktseq Sequence number of the packet if there is one ready to play - /// @return true if ready to play, false otherwise (tsbpdtime may be !0 in - /// both cases). - bool isRcvDataReady(time_point& w_tsbpdtime, int32_t& w_curpktseq, int32_t seqdistance); - -#ifdef SRT_DEBUG_TSBPD_OUTJITTER - void debugTraceJitter(time_point t); -#else - void debugTraceJitter(time_point) {} -#endif /* SRT_DEBUG_TSBPD_OUTJITTER */ - - bool isRcvDataReady(); - bool isRcvDataAvailable() { return m_iLastAckPos != m_iStartPos; } - CPacket* getRcvReadyPacket(int32_t seqdistance); - - /// Set TimeStamp-Based Packet Delivery Rx Mode - /// @param [in] timebase localtime base (uSec) of packet time stamps including buffering delay - /// @param [in] delay aggreed TsbPD delay - void setRcvTsbPdMode(const time_point& timebase, const duration& delay); - - /// Add packet timestamp for drift caclculation and compensation - /// @param [in] timestamp packet time stamp - /// @param [in] tsPktArrival arrival time of the packet used to extract the drift sample. - /// @param [in] rtt RTT sample - bool addRcvTsbPdDriftSample(uint32_t timestamp, const time_point& tsPktArrival, int rtt); - -#ifdef SRT_DEBUG_TSBPD_DRIFT - void printDriftHistogram(int64_t iDrift); - void printDriftOffset(int tsbPdOffset, int tsbPdDriftAvg); -#endif - - /// Get information on the 1st message in queue. - // Parameters (of the 1st packet queue, ready to play or not): - /// @param [out] w_tsbpdtime localtime-based (uSec) packet time stamp including buffering delay of 1st packet or 0 - /// if none - /// @param [out] w_passack true if 1st ready packet is not yet acknowleged (allowed to be delivered to the app) - /// @param [out] w_skipseqno SRT_SEQNO_NONE or seq number of 1st unacknowledged pkt ready to play preceeded by - /// missing packets. - /// @param base_seq SRT_SEQNO_NONE or desired, ignore seq smaller than base if exist packet ready-to-play - /// and larger than base - /// @retval true 1st packet ready to play (tsbpdtime <= now). Not yet acknowledged if passack == true - /// @retval false IF tsbpdtime = 0: rcv buffer empty; ELSE: - /// IF skipseqno != SRT_SEQNO_NONE, packet ready to play preceeded by missing packets.; - /// IF skipseqno == SRT_SEQNO_NONE, no missing packet but 1st not ready to play. - bool getRcvFirstMsg(time_point& w_tsbpdtime, - bool& w_passack, - int32_t& w_skipseqno, - int32_t& w_curpktseq, - int32_t base_seq = SRT_SEQNO_NONE); - - /// Update the ACK point of the buffer. - /// @param [in] len size of data to be skip & acknowledged. - void skipData(int len); - -#if ENABLE_HEAVY_LOGGING - void reportBufferStats() const; // Heavy logging Debug only -#endif - bool empty() const - { - // This will not always return the intended value, - // that is, it may return false when the buffer really is - // empty - but it will return true then in one of next calls. - // This function will be always called again at some point - // if it returned false, and on true the connection - // is going to be broken - so this behavior is acceptable. - return m_iStartPos == m_iLastAckPos; - } - bool full() const { return m_iStartPos == (m_iLastAckPos + 1) % m_iSize; } - int capacity() const { return m_iSize; } - -private: - /// This gives up unit at index p. The unit is given back to the - /// free unit storage for further assignment for the new incoming - /// data. - size_t freeUnitAt(size_t p) - { - CUnit* u = m_pUnit[p]; - m_pUnit[p] = NULL; - size_t rmbytes = u->m_Packet.getLength(); - m_pUnitQueue->makeUnitFree(u); - return rmbytes; - } - - /// Adjust receive queue to 1st ready to play message (tsbpdtime < now). - /// Parameters (of the 1st packet queue, ready to play or not): - /// @param [out] tsbpdtime localtime-based (uSec) packet time stamp including buffering delay of 1st packet or 0 if - /// none - /// @param base_seq SRT_SEQNO_NONE or desired, ignore seq smaller than base - /// @retval true 1st packet ready to play without discontinuity (no hole) - /// @retval false tsbpdtime = 0: no packet ready to play - bool getRcvReadyMsg(time_point& w_tsbpdtime, int32_t& w_curpktseq, int upto, int base_seq = SRT_SEQNO_NONE); - -public: - /// @brief Get clock drift in microseconds. - int64_t getDrift() const { return m_tsbpd.drift(); } - -public: - int32_t getTopMsgno() const; - - void getInternalTimeBase(time_point& w_tb, bool& w_wrp, duration& w_udrift); - - void applyGroupTime(const time_point& timebase, bool wrapcheck, uint32_t delay, const duration& udrift); - void applyGroupDrift(const time_point& timebase, bool wrapcheck, const duration& udrift); - time_point getPktTsbPdTime(uint32_t timestamp); - int debugGetSize() const; - time_point debugGetDeliveryTime(int offset); - - size_t dropData(int len); - -private: - int extractData(char* data, int len, int p, int q, bool passack); - bool accessMsg(int& w_p, int& w_q, bool& w_passack, int64_t& w_playtime, int upto); - - /// Describes the state of the first N packets - std::string debugTimeState(size_t first_n_pkts) const; - - /// thread safe bytes counter of the Recv & Ack buffer - /// @param [in] pkts acked or removed pkts from rcv buffer (used with acked = true) - /// @param [in] bytes number of bytes added/delete (if negative) to/from rcv buffer. - /// @param [in] acked true when adding new pkt in RcvBuffer; false when acking/removing pkts to/from buffer - void countBytes(int pkts, int bytes, bool acked = false); - -private: - bool scanMsg(int& w_start, int& w_end, bool& w_passack); - - int shift(int basepos, int shift) const { return (basepos + shift) % m_iSize; } - - /// Simplified versions with ++ and --; avoid using division instruction - int shiftFwd(int basepos) const - { - if (++basepos == m_iSize) - return 0; - return basepos; - } - - int shiftBack(int basepos) const - { - if (basepos == 0) - return m_iSize - 1; - return --basepos; - } - -private: - CUnit** m_pUnit; // Array of pointed units collected in the buffer - const int m_iSize; // Size of the internal array of CUnit* items - CUnitQueue* m_pUnitQueue; // the shared unit queue - - int m_iStartPos; // HEAD: first packet available for reading - int m_iLastAckPos; // the last ACKed position (exclusive), follows the last readable - // EMPTY: m_iStartPos = m_iLastAckPos FULL: m_iStartPos = m_iLastAckPos + 1 - int m_iMaxPos; // delta between acked-TAIL and reception-TAIL - - int m_iNotch; // the starting read point of the first unit - // (this is required for stream reading mode; it's - // the position in the first unit in the list - // up to which data are already retrieved; - // in message reading mode it's unused and always 0) - - sync::Mutex m_BytesCountLock; // used to protect counters operations - int m_iBytesCount; // Number of payload bytes in the buffer - int m_iAckedPktsCount; // Number of acknowledged pkts in the buffer - int m_iAckedBytesCount; // Number of acknowledged payload bytes in the buffer - unsigned m_uAvgPayloadSz; // Average payload size for dropped bytes estimation - - CTsbpdTime m_tsbpd; - - AvgBufSize m_mavg; - -private: - CRcvBuffer(); - CRcvBuffer(const CRcvBuffer&); - CRcvBuffer& operator=(const CRcvBuffer&); -}; - -#endif // !ENABLE_NEW_RCVBUFFER - } // namespace srt #endif diff --git a/srtcore/buffer_rcv.cpp b/srtcore/buffer_rcv.cpp index 974bb68de..d32685e2f 100644 --- a/srtcore/buffer_rcv.cpp +++ b/srtcore/buffer_rcv.cpp @@ -1,4 +1,3 @@ -#if ENABLE_NEW_RCVBUFFER #include #include #include "buffer_rcv.h" @@ -1073,5 +1072,3 @@ void CRcvBufferNew::updRcvAvgDataSize(const steady_clock::time_point& now) } } // namespace srt - -#endif // ENABLE_NEW_RCVBUFFER diff --git a/srtcore/buffer_rcv.h b/srtcore/buffer_rcv.h index ab5ebc165..018ef8cd1 100644 --- a/srtcore/buffer_rcv.h +++ b/srtcore/buffer_rcv.h @@ -11,8 +11,6 @@ #ifndef INC_SRT_BUFFER_RCV_H #define INC_SRT_BUFFER_RCV_H -#if ENABLE_NEW_RCVBUFFER - #include "buffer.h" // AvgBufSize #include "common.h" #include "queue.h" @@ -363,5 +361,4 @@ class CRcvBufferNew } // namespace srt -#endif // ENABLE_NEW_RCVBUFFER #endif // INC_SRT_BUFFER_RCV_H diff --git a/srtcore/core.cpp b/srtcore/core.cpp index d049f4c40..d031c2434 100644 --- a/srtcore/core.cpp +++ b/srtcore/core.cpp @@ -3357,9 +3357,7 @@ void srt::CUDT::synchronizeWithGroup(CUDTGroup* gp) // time to not fill a network window. enterCS(m_RecvLock); m_pRcvBuffer->applyGroupTime(rcv_buffer_time_base, rcv_buffer_wrap_period, m_iTsbPdDelay_ms * 1000, rcv_buffer_udrift); -#if ENABLE_NEW_RCVBUFFER m_pRcvBuffer->setPeerRexmitFlag(m_bPeerRexmitFlag); -#endif leaveCS(m_RecvLock); HLOGF(gmlog.Debug, "AFTER HS: Set Rcv TsbPd mode: delay=%u.%03us GROUP TIME BASE: %s%s", @@ -5237,7 +5235,6 @@ void srt::CUDT::rendezvousSwitchState(UDTRequestType& w_rsptype, bool& w_needs_e * This thread runs only if TsbPd mode is enabled * Hold received packets until its time to 'play' them, at PktTimeStamp + TsbPdDelay. */ -#if ENABLE_NEW_RCVBUFFER void * srt::CUDT::tsbpd(void* param) { CUDT* self = (CUDT*)param; @@ -5438,225 +5435,6 @@ int srt::CUDT::rcvDropTooLateUpTo(int seqno) return iDropCnt; } -#else -void * srt::CUDT::tsbpd(void *param) -{ - CUDT *self = (CUDT *)param; - - THREAD_STATE_INIT("SRT:TsbPd"); - -#if ENABLE_BONDING - // Make the TSBPD thread a "client" of the group, - // which will ensure that the group will not be physically - // deleted until this thread exits. - // NOTE: DO NOT LEAD TO EVER CANCEL THE THREAD!!! - CUDTUnited::GroupKeeper gkeeper (self->uglobal(), self->m_parent); -#endif - - UniqueLock recv_lock (self->m_RecvLock); - CSync recvdata_cc (self->m_RecvDataCond, recv_lock); - CSync tsbpd_cc (self->m_RcvTsbPdCond, recv_lock); - - self->m_bTsbPdAckWakeup = true; - while (!self->m_bClosing) - { - int32_t current_pkt_seq = 0; - steady_clock::time_point tsbpdtime; - bool rxready = false; - int32_t rcv_base_seq = SRT_SEQNO_NONE; -#if ENABLE_BONDING - bool shall_update_group = false; - if (gkeeper.group) - { - // Functions called below will lock m_GroupLock, which in hierarchy - // lies after m_RecvLock. Must unlock m_RecvLock to be able to lock - // m_GroupLock inside the calls. - InvertedLock unrecv(self->m_RecvLock); - rcv_base_seq = gkeeper.group->getRcvBaseSeqNo(); - } -#endif - - enterCS(self->m_RcvBufferLock); - - self->m_pRcvBuffer->updRcvAvgDataSize(steady_clock::now()); - - if (self->m_bTLPktDrop) - { - int32_t skiptoseqno = SRT_SEQNO_NONE; - bool passack = true; // Get next packet to wait for even if not acked - rxready = self->m_pRcvBuffer->getRcvFirstMsg((tsbpdtime), (passack), (skiptoseqno), (current_pkt_seq), rcv_base_seq); - - HLOGC(tslog.Debug, - log << boolalpha << "NEXT PKT CHECK: rdy=" << rxready << " passack=" << passack << " skipto=%" - << skiptoseqno << " current=%" << current_pkt_seq << " buf-base=%" << self->m_iRcvLastSkipAck); - /* - * VALUES RETURNED: - * - * rxready: if true, packet at head of queue ready to play - * tsbpdtime: timestamp of packet at head of queue, ready or not. 0 if none. - * passack: if true, ready head of queue not yet acknowledged - * skiptoseqno: sequence number of packet at head of queue if ready to play but - * some preceeding packets are missing (need to be skipped). -1 if none. - */ - if (rxready) - { - /* Packet ready to play according to time stamp but... */ - int seqlen = CSeqNo::seqoff(self->m_iRcvLastSkipAck, skiptoseqno); - - if (skiptoseqno != SRT_SEQNO_NONE && seqlen > 0) - { - /* - * skiptoseqno != SRT_SEQNO_NONE, - * packet ready to play but preceeded by missing packets (hole). - */ - - self->updateForgotten(seqlen, self->m_iRcvLastSkipAck, skiptoseqno); - self->m_pRcvBuffer->skipData(seqlen); - - self->m_iRcvLastSkipAck = skiptoseqno; -#if ENABLE_BONDING - shall_update_group = true; -#endif - -#if ENABLE_LOGGING - int64_t timediff_us = 0; - if (!is_zero(tsbpdtime)) - timediff_us = count_microseconds(steady_clock::now() - tsbpdtime); -#if ENABLE_HEAVY_LOGGING - HLOGC(tslog.Debug, - log << self->CONID() << "tsbpd: DROPSEQ: up to seqno %" << CSeqNo::decseq(skiptoseqno) << " (" - << seqlen << " packets) playable at " << FormatTime(tsbpdtime) << " delayed " - << (timediff_us / 1000) << "." << std::setw(3) << std::setfill('0') << (timediff_us % 1000) << " ms"); -#endif - LOGC(brlog.Warn, - log << self->CONID() << "RCV-DROPPED " << seqlen << " packet(s), packet seqno %" << skiptoseqno - << " delayed for " << (timediff_us / 1000) << "." << std::setw(3) << std::setfill('0') - << (timediff_us % 1000) << " ms"); -#endif - - tsbpdtime = steady_clock::time_point(); //Next sent ack will unblock - rxready = false; - } - else if (passack) - { - /* Packets ready to play but not yet acknowledged (should happen within 10ms) */ - rxready = false; - tsbpdtime = steady_clock::time_point(); // Next sent ack will unblock - } /* else packet ready to play */ - } /* else packets not ready to play */ - } - else - { - rxready = self->m_pRcvBuffer->isRcvDataReady((tsbpdtime), (current_pkt_seq), -1 /*get first ready*/); - } - leaveCS(self->m_RcvBufferLock); - - if (rxready) - { - HLOGC(tslog.Debug, - log << self->CONID() << "tsbpd: PLAYING PACKET seq=" << current_pkt_seq << " (belated " - << (count_milliseconds(steady_clock::now() - tsbpdtime)) << "ms)"); - /* - * There are packets ready to be delivered - * signal a waiting "recv" call if there is any data available - */ - if (self->m_config.bSynRecving) - { - recvdata_cc.notify_one_locked(recv_lock); - } - /* - * Set EPOLL_IN to wakeup any thread waiting on epoll - */ - self->uglobal().m_EPoll.update_events(self->m_SocketID, self->m_sPollID, SRT_EPOLL_IN, true); -#if ENABLE_BONDING - // If this is NULL, it means: - // - the socket never was a group member - // - the socket was a group member, but: - // - was just removed as a part of closure - // - and will never be member of the group anymore - - // If this is not NULL, it means: - // - This socket is currently member of the group - // - This socket WAS a member of the group, though possibly removed from it already, BUT: - // - the group that this socket IS OR WAS member of is in the GroupKeeper - // - the GroupKeeper prevents the group from being deleted - // - it is then completely safe to access the group here, - // EVEN IF THE SOCKET THAT WAS ITS MEMBER IS BEING DELETED. - - // It is ensured that the group object exists here because GroupKeeper - // keeps it busy, even if you just closed the socket, remove it as a member - // or even the group is empty and was explicitly closed. - if (gkeeper.group) - { - // Functions called below will lock m_GroupLock, which in hierarchy - // lies after m_RecvLock. Must unlock m_RecvLock to be able to lock - // m_GroupLock inside the calls. - InvertedLock unrecv(self->m_RecvLock); - // The current "APP reader" needs to simply decide as to whether - // the next CUDTGroup::recv() call should return with no blocking or not. - // When the group is read-ready, it should update its pollers as it sees fit. - - // NOTE: this call will set lock to m_GroupOf->m_GroupLock - HLOGC(tslog.Debug, log << self->CONID() << "tsbpd: GROUP: checking if %" << current_pkt_seq << " makes group readable"); - gkeeper.group->updateReadState(self->m_SocketID, current_pkt_seq); - - if (shall_update_group) - { - // A group may need to update the parallelly used idle links, - // should it have any. Pass the current socket position in order - // to skip it from the group loop. - // NOTE: SELF LOCKING. - gkeeper.group->updateLatestRcv(self->m_parent); - } - } -#endif - CGlobEvent::triggerEvent(); - tsbpdtime = steady_clock::time_point(); - } - - if (!is_zero(tsbpdtime)) - { - IF_HEAVY_LOGGING(const steady_clock::duration timediff = tsbpdtime - steady_clock::now()); - /* - * Buffer at head of queue is not ready to play. - * Schedule wakeup when it will be. - */ - self->m_bTsbPdAckWakeup = false; - HLOGC(tslog.Debug, - log << self->CONID() << "tsbpd: FUTURE PACKET seq=" << current_pkt_seq - << " T=" << FormatTime(tsbpdtime) << " - waiting " << count_milliseconds(timediff) << "ms"); - THREAD_PAUSED(); - tsbpd_cc.wait_until(tsbpdtime); - THREAD_RESUMED(); - } - else - { - /* - * We have just signaled epoll; or - * receive queue is empty; or - * next buffer to deliver is not in receive queue (missing packet in sequence). - * - * Block until woken up by one of the following event: - * - All ready-to-play packets have been pulled and EPOLL_IN cleared (then loop to block until next pkt time - * if any) - * - New buffers ACKed - * - Closing the connection - */ - HLOGC(tslog.Debug, log << self->CONID() << "tsbpd: no data, scheduling wakeup at ack"); - self->m_bTsbPdAckWakeup = true; - THREAD_PAUSED(); - tsbpd_cc.wait(); - THREAD_RESUMED(); - } - - HLOGC(tslog.Debug, log << self->CONID() << "tsbpd: WAKE UP!!!"); - } - THREAD_EXIT(); - HLOGC(tslog.Debug, log << self->CONID() << "tsbpd: EXITING"); - return NULL; -} -#endif // ENABLE_NEW_RCVBUFFER - void srt::CUDT::setInitialRcvSeq(int32_t isn) { m_iRcvLastAck = isn; @@ -5667,7 +5445,6 @@ void srt::CUDT::setInitialRcvSeq(int32_t isn) m_iRcvLastAckAck = isn; m_iRcvCurrSeqNo = CSeqNo::decseq(isn); -#if ENABLE_NEW_RCVBUFFER sync::ScopedLock rb(m_RcvBufferLock); if (m_pRcvBuffer) { @@ -5682,7 +5459,6 @@ void srt::CUDT::setInitialRcvSeq(int32_t isn) m_pRcvBuffer->setStartSeqNo(m_iRcvLastSkipAck); } -#endif } void srt::CUDT::updateForgotten(int seqlen, int32_t lastack, int32_t skiptoseqno) @@ -5730,12 +5506,8 @@ bool srt::CUDT::prepareConnectionObjects(const CHandShake &hs, HandshakeSide hsd try { m_pSndBuffer = new CSndBuffer(32, m_iMaxSRTPayloadSize); -#if ENABLE_NEW_RCVBUFFER SRT_ASSERT(m_iISN != -1); m_pRcvBuffer = new srt::CRcvBufferNew(m_iISN, m_config.iRcvBufSize, m_pRcvQueue->m_pUnitQueue, m_config.bMessageAPI); -#else - m_pRcvBuffer = new CRcvBuffer(m_pRcvQueue->m_pUnitQueue, m_config.iRcvBufSize); -#endif // after introducing lite ACK, the sndlosslist may not be cleared in time, so it requires twice space. m_pSndLossList = new CSndLossList(m_iFlowWindowSize * 2); m_pRcvLossList = new CRcvLossList(m_config.iFlightFlagSize); @@ -6912,21 +6684,13 @@ size_t srt::CUDT::getAvailRcvBufferSizeLock() const size_t srt::CUDT::getAvailRcvBufferSizeNoLock() const { -#if ENABLE_NEW_RCVBUFFER return m_pRcvBuffer->getAvailSize(m_iRcvLastAck); -#else - return m_pRcvBuffer->getAvailBufSize(); -#endif } bool srt::CUDT::isRcvBufferReady() const { ScopedLock lck(m_RcvBufferLock); -#if ENABLE_NEW_RCVBUFFER return m_pRcvBuffer->isRcvDataReady(steady_clock::now()); -#else - return m_pRcvBuffer->isRcvDataReady(); -#endif } // int by_exception: accepts values of CUDTUnited::ErrorHandling: @@ -6969,13 +6733,9 @@ int srt::CUDT::receiveMessage(char* data, int len, SRT_MSGCTRL& w_mctrl, int by_ { HLOGC(arlog.Debug, log << CONID() << "receiveMessage: CONNECTION BROKEN - reading from recv buffer just for formality"); enterCS(m_RcvBufferLock); -#if ENABLE_NEW_RCVBUFFER const int res = (m_pRcvBuffer->isRcvDataReady(steady_clock::now())) ? m_pRcvBuffer->readMessage(data, len, &w_mctrl) : 0; -#else - const int res = m_pRcvBuffer->readMsg(data, len); -#endif leaveCS(m_RcvBufferLock); w_mctrl.srctime = 0; @@ -7009,21 +6769,13 @@ int srt::CUDT::receiveMessage(char* data, int len, SRT_MSGCTRL& w_mctrl, int by_ return res; } -#if !ENABLE_NEW_RCVBUFFER - const int seqdistance = -1; -#endif - if (!m_config.bSynRecving) { HLOGC(arlog.Debug, log << CONID() << "receiveMessage: BEGIN ASYNC MODE. Going to extract payload size=" << len); enterCS(m_RcvBufferLock); -#if ENABLE_NEW_RCVBUFFER const int res = (m_pRcvBuffer->isRcvDataReady(steady_clock::now())) ? m_pRcvBuffer->readMessage(data, len, &w_mctrl) : 0; -#else - const int res = m_pRcvBuffer->readMsg(data, len, (w_mctrl), seqdistance); -#endif leaveCS(m_RcvBufferLock); HLOGC(arlog.Debug, log << CONID() << "AFTER readMsg: (NON-BLOCKING) result=" << res); @@ -7085,13 +6837,7 @@ int srt::CUDT::receiveMessage(char* data, int len, SRT_MSGCTRL& w_mctrl, int by_ do { -#if ENABLE_NEW_RCVBUFFER if (stillConnected() && !timeout && !m_pRcvBuffer->isRcvDataReady(steady_clock::now())) -#else - steady_clock::time_point tstime SRT_ATR_UNUSED; - int32_t seqno; - if (stillConnected() && !timeout && !m_pRcvBuffer->isRcvDataReady((tstime), (seqno), seqdistance)) -#endif { /* Kick TsbPd thread to schedule next wakeup (if running) */ if (m_bTsbPd) @@ -7147,11 +6893,7 @@ int srt::CUDT::receiveMessage(char* data, int len, SRT_MSGCTRL& w_mctrl, int by_ */ enterCS(m_RcvBufferLock); -#if ENABLE_NEW_RCVBUFFER res = m_pRcvBuffer->readMessage((data), len, &w_mctrl); -#else - res = m_pRcvBuffer->readMsg((data), len, (w_mctrl), seqdistance); -#endif leaveCS(m_RcvBufferLock); HLOGC(arlog.Debug, log << CONID() << "AFTER readMsg: (BLOCKING) result=" << res); @@ -7832,19 +7574,9 @@ void srt::CUDT::ackDataUpTo(int32_t ack) m_iRcvLastAck = ack; m_iRcvLastSkipAck = ack; - -#if !ENABLE_NEW_RCVBUFFER - // NOTE: This is new towards UDT and prevents spurious - // wakeup of select/epoll functions when no new packets - // were signed off for extraction. - if (acksize > 0) - { - m_pRcvBuffer->ackData(acksize); - } -#endif } -#if ENABLE_BONDING && ENABLE_NEW_RCVBUFFER +#if ENABLE_BONDING void srt::CUDT::dropToGroupRecvBase() { int32_t group_recv_base = SRT_SEQNO_NONE; if (m_parent->m_GroupOf) @@ -8071,7 +7803,7 @@ int srt::CUDT::sendCtrlAck(CPacket& ctrlpkt, int size) string reason = "first lost"; // just for "a reason" of giving particular % for ACK #endif -#if ENABLE_BONDING && ENABLE_NEW_RCVBUFFER +#if ENABLE_BONDING dropToGroupRecvBase(); #endif @@ -8120,11 +7852,7 @@ int srt::CUDT::sendCtrlAck(CPacket& ctrlpkt, int size) ackDataUpTo(ack); #if ENABLE_BONDING -#if ENABLE_NEW_RCVBUFFER const int32_t group_read_seq = m_pRcvBuffer->getFirstReadablePacketInfo(steady_clock::now()).seqno; -#else - const int32_t group_read_seq = CSeqNo::decseq(ack); -#endif #endif InvertedLock un_bufflock (m_RcvBufferLock); @@ -8184,11 +7912,9 @@ int srt::CUDT::sendCtrlAck(CPacket& ctrlpkt, int size) { CUniqueSync rdcc (m_RecvLock, m_RecvDataCond); -#if ENABLE_NEW_RCVBUFFER // Locks m_RcvBufferLock, which is unlocked above by InvertedLock un_bufflock. // Must check read-readiness under m_RecvLock to protect the epoll from concurrent changes in readBuffer() if (isRcvBufferReady()) -#endif { if (m_config.bSynRecving) { @@ -9014,7 +8740,6 @@ void srt::CUDT::processCtrlDropReq(const CPacket& ctrlpkt) { const bool using_rexmit_flag = m_bPeerRexmitFlag; ScopedLock rblock(m_RcvBufferLock); -#if ENABLE_NEW_RCVBUFFER const int iDropCnt = m_pRcvBuffer->dropMessage(dropdata[0], dropdata[1], ctrlpkt.getMsgSeq(using_rexmit_flag)); if (iDropCnt > 0) @@ -9029,9 +8754,6 @@ void srt::CUDT::processCtrlDropReq(const CPacket& ctrlpkt) m_stats.rcvr.dropped.count(stats::BytesPackets(iDropCnt * avgpayloadsz, (uint32_t) iDropCnt)); leaveCS(m_StatsLock); } -#else - m_pRcvBuffer->dropMsg(ctrlpkt.getMsgSeq(using_rexmit_flag), using_rexmit_flag); -#endif } // When the drop request was received, it means that there are // packets for which there will never be ACK sent; if the TSBPD thread @@ -9186,18 +8908,12 @@ void srt::CUDT::updateSrtRcvSettings() ScopedLock lock(m_RecvLock); // NOTE: remember to also update synchronizeWithGroup() if more settings are updated here. -#if ENABLE_NEW_RCVBUFFER m_pRcvBuffer->setPeerRexmitFlag(m_bPeerRexmitFlag); -#endif // XXX m_bGroupTsbPd is ignored with SRT_ENABLE_APP_READER if (m_bTsbPd || m_bGroupTsbPd) { -#if ENABLE_NEW_RCVBUFFER m_pRcvBuffer->setTsbPdMode(m_tsRcvPeerStartTime, false, milliseconds_from(m_iTsbPdDelay_ms)); -#else - m_pRcvBuffer->setRcvTsbPdMode(m_tsRcvPeerStartTime, milliseconds_from(m_iTsbPdDelay_ms)); -#endif HLOGF(cnlog.Debug, "AFTER HS: Set Rcv TsbPd mode%s: delay=%u.%03us RCV START: %s", @@ -10235,29 +9951,18 @@ int srt::CUDT::processData(CUnit* in_unit) } else { -#if ENABLE_NEW_RCVBUFFER LOGC(qrlog.Warn, log << CONID() << "No room to store incoming packet seqno " << rpkt.m_iSeqNo << ", insert offset " << offset << ". " << m_pRcvBuffer->strFullnessState( qrlog.Debug.CheckEnabled(), m_iRcvLastSkipAck, steady_clock::now())); -#else - LOGC(qrlog.Warn, log << CONID() << "No room to store incoming packet seqno " << rpkt.m_iSeqNo - << ", insert offset " << offset << ". " - << m_pRcvBuffer->strFullnessState(steady_clock::now()) - ); -#endif return -1; } } bool adding_successful = true; -#if ENABLE_NEW_RCVBUFFER if (m_pRcvBuffer->insert(u) < 0) -#else - if (m_pRcvBuffer->addData(u, offset) < 0) -#endif { // addData returns -1 if at the m_iLastAckPos+offset position there already is a packet. // So this packet is "redundant". @@ -11853,10 +11558,8 @@ void srt::CUDT::processKeepalive(const CPacket& ctrlpkt, const time_point& tsArr } #endif -#if ENABLE_NEW_RCVBUFFER ScopedLock lck(m_RcvBufferLock); m_pRcvBuffer->updateTsbPdTimeBase(ctrlpkt.getMsgTimeStamp()); if (m_config.bDriftTracer) m_pRcvBuffer->addRcvTsbPdDriftSample(ctrlpkt.getMsgTimeStamp(), tsArrival, -1); -#endif } diff --git a/srtcore/core.h b/srtcore/core.h index e12a195d8..297df24c7 100644 --- a/srtcore/core.h +++ b/srtcore/core.h @@ -416,11 +416,7 @@ class CUDT SRTU_PROPERTY_RO(SRTSOCKET, id, m_SocketID); SRTU_PROPERTY_RO(bool, isClosing, m_bClosing); -#if ENABLE_NEW_RCVBUFFER SRTU_PROPERTY_RO(srt::CRcvBufferNew*, rcvBuffer, m_pRcvBuffer); -#else - SRTU_PROPERTY_RO(CRcvBuffer*, rcvBuffer, m_pRcvBuffer); -#endif SRTU_PROPERTY_RO(bool, isTLPktDrop, m_bTLPktDrop); SRTU_PROPERTY_RO(bool, isSynReceiving, m_config.bSynRecving); SRTU_PROPERTY_RR(sync::Condition*, recvDataCond, &m_RecvDataCond); @@ -711,13 +707,11 @@ class CUDT // TSBPD thread main function. static void* tsbpd(void* param); -#if ENABLE_NEW_RCVBUFFER /// Drop too late packets (receiver side). Updaet loss lists and ACK positions. /// The @a seqno packet itself is not dropped. /// @param seqno [in] The sequence number of the first packets following those to be dropped. /// @return The number of packets dropped. int rcvDropTooLateUpTo(int seqno); -#endif void updateForgotten(int seqlen, int32_t lastack, int32_t skiptoseqno); @@ -895,12 +889,7 @@ class CUDT int32_t m_iReXmitCount; // Re-Transmit Count since last ACK private: // Receiving related data -#if ENABLE_NEW_RCVBUFFER - CRcvBufferNew* m_pRcvBuffer; //< Receiver buffer -#else - CRcvBuffer* m_pRcvBuffer; //< Receiver buffer -#endif - + CRcvBufferNew* m_pRcvBuffer; //< Receiver buffer SRT_ATTR_GUARDED_BY(m_RcvLossLock) CRcvLossList* m_pRcvLossList; //< Receiver loss list SRT_ATTR_GUARDED_BY(m_RcvLossLock) @@ -1084,7 +1073,7 @@ class CUDT /// @param seq first unacknowledged packet sequence number. void ackDataUpTo(int32_t seq); -#if ENABLE_BONDING && ENABLE_NEW_RCVBUFFER +#if ENABLE_BONDING /// @brief Drop packets in the recv buffer behind group_recv_base. /// Updates m_iRcvLastSkipAck if it's behind group_recv_base. void dropToGroupRecvBase(); diff --git a/srtcore/group.cpp b/srtcore/group.cpp index 5975cc9ae..f2c424ba0 100644 --- a/srtcore/group.cpp +++ b/srtcore/group.cpp @@ -2134,7 +2134,6 @@ static bool isValidSeqno(int32_t iBaseSeqno, int32_t iPktSeqno) return false; } -#ifdef ENABLE_NEW_RCVBUFFER int CUDTGroup::recv(char* buf, int len, SRT_MSGCTRL& w_mc) { // First, acquire GlobControlLock to make sure all member sockets still exist @@ -2320,544 +2319,6 @@ int CUDTGroup::recv(char* buf, int len, SRT_MSGCTRL& w_mc) m_Global.m_EPoll.update_events(id(), m_sPollID, SRT_EPOLL_IN, false); throw CUDTException(MJ_AGAIN, MN_RDAVAIL, 0); } -#else -// The "app reader" version of the reading function. -// This reads the packets from every socket treating them as independent -// and prepared to work with the application. Then packets are sorted out -// by getting the sequence number. -int CUDTGroup::recv(char* buf, int len, SRT_MSGCTRL& w_mc) -{ - typedef map::iterator pit_t; - // Later iteration over it might be less efficient than - // by vector, but we'll also often try to check a single id - // if it was ever seen broken, so that it's skipped. - set broken; - size_t output_size = 0; - - // First, acquire GlobControlLock to make sure all member sockets still exist - enterCS(m_Global.m_GlobControlLock); - ScopedLock guard(m_GroupLock); - - if (m_bClosing) - { - // The group could be set closing in the meantime, but if - // this is only about to be set by another thread, this thread - // must fist wait for being able to acquire this lock. - // The group will not be deleted now because it is added usage counter - // by this call, but will be released once it exits. - leaveCS(m_Global.m_GlobControlLock); - throw CUDTException(MJ_CONNECTION, MN_CONNLOST, 0); - } - - // Now, still under lock, check if all sockets still can be dispatched - send_CheckValidSockets(); - leaveCS(m_Global.m_GlobControlLock); - - if (m_bClosing) - throw CUDTException(MJ_CONNECTION, MN_CONNLOST, 0); - - for (;;) - { - if (!m_bOpened || !m_bConnected) - { - LOGC(grlog.Error, - log << boolalpha << "group/recv: ERROR opened=" << m_bOpened << " connected=" << m_bConnected); - throw CUDTException(MJ_CONNECTION, MN_NOCONN, 0); - } - - // Check first the ahead packets if you have any to deliver. - if (m_RcvBaseSeqNo != SRT_SEQNO_NONE && !m_Positions.empty()) - { - // This function also updates the group sequence pointer. - ReadPos* pos = checkPacketAhead(); - if (pos) - { - if (size_t(len) < pos->packet.size()) - throw CUDTException(MJ_NOTSUP, MN_XSIZE, 0); - - HLOGC(grlog.Debug, - log << "group/recv: delivering AHEAD packet %" << pos->mctrl.pktseq << " #" << pos->mctrl.msgno - << ": " << BufferStamp(&pos->packet[0], pos->packet.size())); - memcpy(buf, &pos->packet[0], pos->packet.size()); - fillGroupData((w_mc), pos->mctrl); - m_RcvBaseSeqNo = pos->mctrl.pktseq; - len = pos->packet.size(); - pos->packet.clear(); - - // Update stats as per delivery - m_stats.recv.count(len); - updateAvgPayloadSize(len); - - // We predict to have only one packet ahead, others are pending to be reported by tsbpd. - // This will be "re-enabled" if the later check puts any new packet into ahead. - m_Global.m_EPoll.update_events(id(), m_sPollID, SRT_EPOLL_IN, false); - - return len; - } - } - - // LINK QUALIFICATION NAMES: - // - // HORSE: Correct link, which delivers the very next sequence. - // Not necessarily this link is currently active. - // - // KANGAROO: Got some packets dropped and the sequence number - // of the packet jumps over the very next sequence and delivers - // an ahead packet. - // - // ELEPHANT: Is not ready to read, while others are, or reading - // up to the current latest delivery sequence number does not - // reach this sequence and the link becomes non-readable earlier. - - // The above condition has ruled out one kangaroo and turned it - // into a horse. - - // Below there's a loop that will try to extract packets. Kangaroos - // will be among the polled ones because skipping them risks that - // the elephants will take over the reading. Links already known as - // elephants will be also polled in an attempt to revitalize the - // connection that experienced just a short living choking. - // - // After polling we attempt to read from every link that reported - // read-readiness and read at most up to the sequence equal to the - // current delivery sequence. - - // Links that deliver a packet below that sequence will be retried - // until they deliver no more packets or deliver the packet of - // expected sequence. Links that don't have a record in m_Positions - // and report readiness will be always read, at least to know what - // sequence they currently stand on. - // - // Links that are already known as kangaroos will be polled, but - // no reading attempt will be done. If after the reading series - // it will turn out that we have no more horses, the slowest kangaroo - // will be "upgraded to a horse" (the ahead link with a sequence - // closest to the current delivery sequence will get its sequence - // set as current delivered and its recorded ahead packet returned - // as the read packet). - - // If we find at least one horse, the packet read from that link - // will be delivered. All other link will be just ensured update - // up to this sequence number, or at worst all available packets - // will be read. In this case all kangaroos remain kangaroos, - // until the current delivery sequence m_RcvBaseSeqNo will be lifted - // to the sequence recorded for these links in m_Positions, - // during the next time ahead check, after which they will become - // horses. - - const size_t size = m_Group.size(); - - // Prepare first the list of sockets to be added as connect-pending - // and as read-ready, then unlock the group, and then add them to epoll. - vector aliveMembers; - recv_CollectAliveAndBroken(aliveMembers, broken); - - const vector ready_sockets = recv_WaitForReadReady(aliveMembers, broken); - // m_GlobControlLock lifted, m_GroupLock still locked. - // Now we can safely do this scoped way. - - if (!m_bSynRecving && ready_sockets.empty()) - { - HLOGC(grlog.Debug, - log << "group/rcv $" << m_GroupID << ": Not available AT THIS TIME, NOT READ-READY now."); - m_Global.m_EPoll.update_events(id(), m_sPollID, SRT_EPOLL_IN, false); - throw CUDTException(MJ_AGAIN, MN_RDAVAIL, 0); - } - - // Ok, now we need to have some extra qualifications: - // 1. If a socket has no registry yet, we read anyway, just - // to notify the current position. We read ONLY ONE PACKET this time, - // we'll worry later about adjusting it to the current group sequence - // position. - // 2. If a socket is already position ahead, DO NOT read from it, even - // if it is ready. - - // The state of things whether we were able to extract the very next - // sequence will be simply defined by the fact that `output` is nonempty. - - int32_t next_seq = m_RcvBaseSeqNo; - - if (m_bClosing) - { - HLOGC(gslog.Debug, log << "grp/sendBroadcast: GROUP CLOSED, ABANDONING"); - throw CUDTException(MJ_CONNECTION, MN_CONNLOST, 0); - } - // - // NOTE: Although m_GlobControlLock is lifted here so potentially sockets - // colected in ready_sockets could be closed at any time, all of them are member - // sockets of this group. Therefore the first socket attempted to be closed will - // have to remove the socket from the group, and this will require lock on GroupLock, - // which is still applied here. So this will have to wait for this function to finish - // (or block on swait, in which case the lock is lifted) anyway. - - for (vector::const_iterator si = ready_sockets.begin(); si != ready_sockets.end(); ++si) - { - CUDTSocket* ps = *si; - SRTSOCKET id = ps->m_SocketID; - ReadPos* p = NULL; - pit_t pe = m_Positions.find(id); - if (pe != m_Positions.end()) - { - p = &pe->second; - - // Possible results of comparison: - // x < 0: the sequence is in the past, the socket should be adjusted FIRST - // x = 0: the socket should be ready to get the exactly next packet - // x = 1: the case is already handled by GroupCheckPacketAhead. - // x > 1: AHEAD. DO NOT READ. - const int seqdiff = CSeqNo::seqcmp(p->mctrl.pktseq, m_RcvBaseSeqNo); - if (seqdiff > 1) - { - HLOGC(grlog.Debug, - log << "group/recv: EPOLL: @" << id << " %" << p->mctrl.pktseq << " AHEAD %" << m_RcvBaseSeqNo - << ", not reading."); - continue; - } - } - else - { - // The position is not known, so get the position on which - // the socket is currently standing. - pair ee = m_Positions.insert(make_pair(id, ReadPos(ps->core().m_iRcvLastSkipAck))); - p = &(ee.first->second); - HLOGC(grlog.Debug, - log << "group/recv: EPOLL: @" << id << " %" << p->mctrl.pktseq << " NEW SOCKET INSERTED"); - } - - // Read from this socket stubbornly, until: - // - reading is no longer possible (AGAIN) - // - the sequence difference is >= 1 - - for (;;) - { - SRT_MSGCTRL mctrl = srt_msgctrl_default; - - // Read the data into the user's buffer. This is an optimistic - // prediction that we'll read the right data. This will be overwritten - // by "more correct data" if found more appropriate later. But we have to - // copy these data anyway anywhere, even if they need to fall on the floor later. - int stat; - char extrabuf[SRT_LIVE_MAX_PLSIZE]; - char* msgbuf = NULL; - if (output_size) - { - // We already have the target data in `buf`. Now reading extra data potentially redundant (to be ignored) - // or AHEAD (to be buffered internally by the group) - msgbuf = extrabuf; - stat = ps->core().receiveMessage((extrabuf), SRT_LIVE_MAX_PLSIZE, (mctrl), CUDTUnited::ERH_RETURN); - HLOGC(grlog.Debug, - log << "group/recv: @" << id << " EXTRACTED EXTRA data with %" << mctrl.pktseq - << " #" << mctrl.msgno << ": " << (stat <= 0 ? "(NOTHING)" : BufferStamp(extrabuf, stat)) - << (CSeqNo::seqcmp(mctrl.pktseq, m_RcvBaseSeqNo) > 1 ? " - TO STORE" : " - TO IGNORE")); - } - else - { - msgbuf = buf; - stat = ps->core().receiveMessage((buf), len, (mctrl), CUDTUnited::ERH_RETURN); - HLOGC(grlog.Debug, - log << "group/recv: @" << id << " EXTRACTED data with %" << mctrl.pktseq << " #" - << mctrl.msgno << ": " << (stat <= 0 ? "(NOTHING)" : BufferStamp(buf, stat))); - } - if (stat == 0) - { - HLOGC(grlog.Debug, log << "group/recv @" << id << ": SPURIOUS epoll, ignoring"); - // This is returned in case of "again". In case of errors, we have SRT_ERROR. - // Do not treat this as spurious, just stop reading. - break; - } - - if (stat == SRT_ERROR) - { - HLOGC(grlog.Debug, log << "group/recv: @" << id << ": " << srt_getlasterror_str()); - broken.insert(ps); - break; - } - - // NOTE: checks against m_RcvBaseSeqNo and decisions based on it - // must NOT be done if m_RcvBaseSeqNo is SRT_SEQNO_NONE, which - // means that we are about to deliver the very first packet and we - // take its sequence number as a good deal. - - // The order must be: - // - check discrepancy - // - record the sequence - // - check ordering. - // The second one must be done always, but failed discrepancy - // check should exclude the socket from any further checks. - // That's why the common check for m_RcvBaseSeqNo != SRT_SEQNO_NONE can't - // embrace everything below. - - // We need to first qualify the sequence, just for a case - if (m_RcvBaseSeqNo != SRT_SEQNO_NONE && !isValidSeqno(m_RcvBaseSeqNo, mctrl.pktseq)) - { - // This error should be returned if the link turns out - // to be the only one, or set to the group data. - // err = SRT_ESECFAIL; - LOGC(grlog.Error, - log << "group/recv: @" << id << ": SEQUENCE DISCREPANCY: base=%" << m_RcvBaseSeqNo - << " vs pkt=%" << mctrl.pktseq << ", setting ESECFAIL"); - broken.insert(ps); - break; - } - - // Rewrite it to the state for a case when next reading - // would not succeed. Do not insert the buffer here because - // this is only required when the sequence is ahead; for that - // it will be fixed later. - p->mctrl.pktseq = mctrl.pktseq; - - if (m_RcvBaseSeqNo != SRT_SEQNO_NONE) - { - // Now we can safely check it. - const int seqdiff = CSeqNo::seqcmp(mctrl.pktseq, m_RcvBaseSeqNo); - - if (seqdiff <= 0) - { - HLOGC(grlog.Debug, - log << "group/recv: @" << id << " %" << mctrl.pktseq << " #" << mctrl.msgno - << " BEHIND base=%" << m_RcvBaseSeqNo << " - discarding"); - // The sequence is recorded, the packet has to be discarded. - m_stats.recvDiscard.count(stat); - continue; - } - - // Now we have only two possibilities: - // seqdiff == 1: The very next sequence, we want to read and return the packet. - // seqdiff > 1: The packet is ahead - record the ahead packet, but continue with the others. - - if (seqdiff > 1) - { - HLOGC(grlog.Debug, - log << "@" << id << " %" << mctrl.pktseq << " #" << mctrl.msgno << " AHEAD base=%" - << m_RcvBaseSeqNo); - p->packet.assign(msgbuf, msgbuf + stat); - p->mctrl = mctrl; - break; // Don't read from that socket anymore. - } - } - - // We have seqdiff = 1, or we simply have the very first packet - // which's sequence is taken as a good deal. Update the sequence - // and record output. - - if (output_size) - { - HLOGC(grlog.Debug, - log << "group/recv: @" << id << " %" << mctrl.pktseq << " #" << mctrl.msgno << " REDUNDANT"); - break; - } - - HLOGC(grlog.Debug, - log << "group/recv: @" << id << " %" << mctrl.pktseq << " #" << mctrl.msgno << " DELIVERING"); - output_size = stat; - fillGroupData((w_mc), mctrl); - - // Update stats as per delivery - m_stats.recv.count(output_size); - updateAvgPayloadSize(output_size); - - // Record, but do not update yet, until all sockets are handled. - next_seq = mctrl.pktseq; - break; - } - } - -#if ENABLE_HEAVY_LOGGING - if (!broken.empty()) - { - std::ostringstream brks; - for (set::iterator b = broken.begin(); b != broken.end(); ++b) - brks << "@" << (*b)->m_SocketID << " "; - LOGC(grlog.Debug, log << "group/recv: REMOVING BROKEN: " << brks.str()); - } -#endif - - vector brokenid; - // Now remove all broken sockets from aheads, if any. - // Even if they have already delivered a packet. - for (set::iterator di = broken.begin(); di != broken.end(); ++di) - { - CUDTSocket* ps = *di; - m_Positions.erase(ps->m_SocketID); - //ps->setBrokenClosed(); - } - - // Force closing - { - InvertedLock ung (m_GroupLock); - for (set::iterator b = broken.begin(); b != broken.end(); ++b) - { - CUDT::uglobal().close(*b); - } - } - - if (broken.size() >= size) // This > is for sanity check - { - // All broken - HLOGC(grlog.Debug, log << "group/recv: All sockets broken"); - m_Global.m_EPoll.update_events(id(), m_sPollID, SRT_EPOLL_ERR, true); - - throw CUDTException(MJ_CONNECTION, MN_CONNLOST, 0); - } - - // May be required to be re-read. - broken.clear(); - - if (output_size) - { - // We have extracted something, meaning that we have the sequence shift. - // Update it now and don't do anything else with the sockets. - - // Sanity check - if (next_seq == SRT_SEQNO_NONE) - { - LOGP(grlog.Error, "IPE: next_seq not set after output extracted!"); - - // This should never happen, but the only way to keep the code - // safe an recoverable is to use the incremented sequence. By - // leaving the sequence as is there's a risk of hangup. - // Not doing it in case of SRT_SEQNO_NONE as it would make a valid %0. - if (m_RcvBaseSeqNo != SRT_SEQNO_NONE) - m_RcvBaseSeqNo = CSeqNo::incseq(m_RcvBaseSeqNo); - } - else - { - m_RcvBaseSeqNo = next_seq; - } - - const ReadPos* pos = checkPacketAhead(); - if (!pos) - { - // Don't clear the read-readinsess state if you have a packet ahead because - // if you have, the next read call will return it. - m_Global.m_EPoll.update_events(id(), m_sPollID, SRT_EPOLL_IN, false); - } - - HLOGC(grlog.Debug, - log << "group/recv: successfully extracted packet size=" << output_size << " - returning"); - return output_size; - } - - HLOGC(grlog.Debug, log << "group/recv: NOT extracted anything - checking for a need to kick kangaroos"); - - // Check if we have any sockets left :D - - // Here we surely don't have any more HORSES, - // only ELEPHANTS and KANGAROOS. Qualify them and - // attempt to at least take advantage of KANGAROOS. - - // In this position all links are either: - // - updated to the current position - // - updated to the newest possible possition available - // - not yet ready for extraction (not present in the group) - - // If we haven't extracted the very next sequence position, - // it means that we might only have the ahead packets read, - // that is, the next sequence has been dropped by all links. - - if (!m_Positions.empty()) - { - // This might notify both lingering links, which didn't - // deliver the required sequence yet, and links that have - // the sequence ahead. Review them, and if you find at - // least one packet behind, just wait for it to be ready. - // Use again the waiting function because we don't want - // the general waiting procedure to skip others. - set elephants; - - // const because it's `typename decltype(m_Positions)::value_type` - pair* slowest_kangaroo = 0; - - for (pit_t rp = m_Positions.begin(); rp != m_Positions.end(); ++rp) - { - // NOTE that m_RcvBaseSeqNo in this place wasn't updated - // because we haven't successfully extracted anything. - int seqdiff = CSeqNo::seqcmp(rp->second.mctrl.pktseq, m_RcvBaseSeqNo); - if (seqdiff < 0) - { - elephants.insert(rp->first); - } - // If seqdiff == 0, we have a socket ON TRACK. - else if (seqdiff > 0) - { - // If there's already a slowest_kangaroo, seqdiff decides if this one is slower. - // Otherwise it is always slower by having no competition. - seqdiff = slowest_kangaroo - ? CSeqNo::seqcmp(slowest_kangaroo->second.mctrl.pktseq, rp->second.mctrl.pktseq) - : 1; - if (seqdiff > 0) - { - slowest_kangaroo = &*rp; - } - } - } - - // Note that if no "slowest_kangaroo" was found, it means - // that we don't have kangaroos. - if (slowest_kangaroo) - { - // We have a slowest kangaroo. Elephants must be ignored. - // Best case, they will get revived, worst case they will be - // soon broken. - // - // As we already have the packet delivered by the slowest - // kangaroo, we can simply return it. - - // Check how many were skipped and add them to the stats - const int32_t jump = (CSeqNo(slowest_kangaroo->second.mctrl.pktseq) - CSeqNo(m_RcvBaseSeqNo)) - 1; - if (jump > 0) - { - m_stats.recvDrop.count(stats::BytesPackets(jump * static_cast(avgRcvPacketSize()), jump)); - LOGC(grlog.Warn, - log << "@" << m_GroupID << " GROUP RCV-DROPPED " << jump << " packet(s): seqno %" - << m_RcvBaseSeqNo << " to %" << slowest_kangaroo->second.mctrl.pktseq); - } - - m_RcvBaseSeqNo = slowest_kangaroo->second.mctrl.pktseq; - vector& pkt = slowest_kangaroo->second.packet; - if (size_t(len) < pkt.size()) - throw CUDTException(MJ_NOTSUP, MN_XSIZE, 0); - - HLOGC(grlog.Debug, - log << "@" << slowest_kangaroo->first << " KANGAROO->HORSE %" - << slowest_kangaroo->second.mctrl.pktseq << " #" << slowest_kangaroo->second.mctrl.msgno - << ": " << BufferStamp(&pkt[0], pkt.size())); - - memcpy(buf, &pkt[0], pkt.size()); - fillGroupData((w_mc), slowest_kangaroo->second.mctrl); - len = pkt.size(); - pkt.clear(); - - // Update stats as per delivery - m_stats.recv.count(len); - updateAvgPayloadSize(len); - - // It is unlikely to have a packet ahead because usually having one packet jumped-ahead - // clears the possibility of having aheads at all. - // XXX Research if this is possible at all; if it isn't, then don't waste time on - // looking for it. - const ReadPos* pos = checkPacketAhead(); - if (!pos) - { - // Don't clear the read-readinsess state if you have a packet ahead because - // if you have, the next read call will return it. - m_Global.m_EPoll.update_events(id(), m_sPollID, SRT_EPOLL_IN, false); - } - return len; - } - - HLOGC(grlog.Debug, - log << "group/recv: " - << (elephants.empty() ? "NO LINKS REPORTED ANY FRESHER PACKET." : "ALL LINKS ELEPHANTS.") - << " Re-polling."); - } - else - { - HLOGC(grlog.Debug, log << "group/recv: POSITIONS EMPTY - Re-polling."); - } - } -} -#endif // [[using locked(m_GroupLock)]] CUDTGroup::ReadPos* CUDTGroup::checkPacketAhead() diff --git a/test/test_buffer.cpp b/test/test_buffer.cpp index 1dd95c55c..47fa15741 100644 --- a/test/test_buffer.cpp +++ b/test/test_buffer.cpp @@ -30,14 +30,10 @@ class CRcvBufferReadMsg m_unit_queue.reset(new CUnitQueue(m_buff_size_pkts, 1500)); ASSERT_NE(m_unit_queue.get(), nullptr); -#if ENABLE_NEW_RCVBUFFER const bool enable_msg_api = m_use_message_api; const bool enable_peer_rexmit = true; m_rcv_buffer.reset(new CRcvBufferNew(m_init_seqno, m_buff_size_pkts, m_unit_queue.get(), enable_msg_api)); m_rcv_buffer->setPeerRexmitFlag(enable_peer_rexmit); -#else - m_rcv_buffer.reset(new CRcvBuffer(m_unit_queue.get(), m_buff_size_pkts)); -#endif ASSERT_NE(m_rcv_buffer.get(), nullptr); } @@ -77,12 +73,7 @@ class CRcvBufferReadMsg EXPECT_TRUE(packet.getMsgOrderFlag()); } -#if ENABLE_NEW_RCVBUFFER return m_rcv_buffer->insert(unit); -#else - const int offset = CSeqNo::seqoff(m_first_unack_seqno, seqno); - return m_rcv_buffer->addData(unit, offset); -#endif } /// @returns 0 on success, the result of rcv_buffer::insert(..) once it failed @@ -117,47 +108,27 @@ class CRcvBufferReadMsg int ackPackets(int num_pkts) { m_first_unack_seqno = CSeqNo::incseq(m_first_unack_seqno, num_pkts); -#if ENABLE_NEW_RCVBUFFER return 0; -#else - return m_rcv_buffer->ackData(num_pkts); -#endif } int getAvailBufferSize() { -#if ENABLE_NEW_RCVBUFFER return m_rcv_buffer->getAvailSize(m_first_unack_seqno); -#else - return m_rcv_buffer->getAvailBufSize(); -#endif } int readMessage(char* data, size_t len) { -#if ENABLE_NEW_RCVBUFFER return m_rcv_buffer->readMessage(data, len); -#else - return m_rcv_buffer->readMsg(data, len); -#endif } bool hasAvailablePackets() { -#if ENABLE_NEW_RCVBUFFER return m_rcv_buffer->hasAvailablePackets(); -#else - return m_rcv_buffer->isRcvDataAvailable(); -#endif } protected: unique_ptr m_unit_queue; -#if ENABLE_NEW_RCVBUFFER unique_ptr m_rcv_buffer; -#else - unique_ptr m_rcv_buffer; -#endif const int m_buff_size_pkts = 16; const int m_init_seqno = 1000; int m_first_unack_seqno = m_init_seqno; @@ -241,32 +212,20 @@ TEST_F(CRcvBufferReadMsg, OnePacketGap) // BUG. Acknowledging an empty position must not result in a read-readiness. // TODO: Actually we should not acknowledge, but must drop instead. ackPackets(1); -#if ENABLE_NEW_RCVBUFFER // Expected behavior EXPECT_FALSE(hasAvailablePackets()); EXPECT_FALSE(rcv_buffer.isRcvDataReady()); const auto next_packet = m_rcv_buffer->getFirstValidPacketInfo(); EXPECT_EQ(next_packet.seqno, m_init_seqno + 1); -#else // Wrong behavior (BUG) - EXPECT_TRUE(hasAvailablePackets()); - EXPECT_TRUE(rcv_buffer.isRcvDataReady()); -#endif EXPECT_EQ(getAvailBufferSize(), m_buff_size_pkts - 2); -#if ENABLE_NEW_RCVBUFFER - // The new buffer will return 0 as reading is not available. + // The buffer will return 0 as reading is not available. res = rcv_buffer.readBuffer(buff.data(), buff.size()); EXPECT_EQ(res, 0); -#else - cerr << "Expecting IPE from readBuffer(..): \n"; - res = rcv_buffer.readBuffer(buff.data(), buff.size()); - EXPECT_EQ(res, -1); -#endif res = readMessage(buff.data(), buff.size()); EXPECT_EQ(res, 0); -#if ENABLE_NEW_RCVBUFFER // Add a missing packet (can't add before an acknowledged position in the old buffer). EXPECT_EQ(addMessage(1, m_init_seqno), 0); @@ -278,7 +237,6 @@ TEST_F(CRcvBufferReadMsg, OnePacketGap) EXPECT_TRUE(verifyPayload(buff.data(), msg_bytelen, CSeqNo::incseq(m_init_seqno, pktno))); } EXPECT_EQ(m_unit_queue->size(), m_unit_queue->capacity()); -#endif // Further read is not possible EXPECT_FALSE(rcv_buffer.isRcvDataReady()); @@ -301,11 +259,7 @@ TEST_F(CRcvBufferReadMsg, OnePacketGapDrop) auto& rcv_buffer = *m_rcv_buffer.get(); EXPECT_FALSE(hasAvailablePackets()); EXPECT_FALSE(rcv_buffer.isRcvDataReady()); -#if ENABLE_NEW_RCVBUFFER rcv_buffer.dropUpTo(CSeqNo::incseq(m_init_seqno)); -#else - rcv_buffer.dropData(1); -#endif EXPECT_TRUE(hasAvailablePackets()); EXPECT_TRUE(rcv_buffer.isRcvDataReady()); @@ -329,16 +283,7 @@ TEST_F(CRcvBufferReadMsg, OnePacket) const size_t msg_bytelen = msg_pkts * m_payload_sz; array buff; - // The new receiver buffer allows reading without ACK. -#if !ENABLE_NEW_RCVBUFFER - EXPECT_FALSE(hasAvailablePackets()); - - const int res1 = readMessage(buff.data(), buff.size()); - EXPECT_EQ(res1, 0); - - // Full ACK - ackPackets(msg_pkts); -#endif + // The receiver buffer allows reading without ACK. EXPECT_TRUE(hasAvailablePackets()); const int res2 = readMessage(buff.data(), buff.size()); @@ -365,12 +310,8 @@ TEST_F(CRcvBufferReadMsg, AddData) // The value is reported by SRT receiver like this: // data[ACKD_BUFFERLEFT] = m_pRcvBuffer->getAvailBufSize(); EXPECT_EQ(getAvailBufferSize(), m_buff_size_pkts - 1); -#if ENABLE_NEW_RCVBUFFER - // The new receiver buffer does not need ACK to allow reading. + // The receiver buffer does not need ACK to allow reading. EXPECT_TRUE(hasAvailablePackets()); -#else - EXPECT_FALSE(hasAvailablePackets()); -#endif // Now acknowledge two packets const int ack_pkts = 2; @@ -388,10 +329,7 @@ TEST_F(CRcvBufferReadMsg, AddData) } // Add packet to the position of oackets already read. - // Can't check the old buffer, as it does not handle a negative offset. -#if ENABLE_NEW_RCVBUFFER EXPECT_EQ(addPacket(m_init_seqno), -2); -#endif // Add packet to a non-empty position. EXPECT_EQ(addPacket(CSeqNo::incseq(m_init_seqno, ack_pkts)), -1); @@ -478,8 +416,8 @@ TEST_F(CRcvBufferReadMsg, MsgHalfAck) // Nothing to read (0 for zero bytes read). const size_t msg_bytelen = msg_pkts * m_payload_sz; array buff; -#if ENABLE_NEW_RCVBUFFER - // The new receiver buffer does not care about ACK. + + // The receiver buffer does not care about ACK. EXPECT_TRUE(m_rcv_buffer->isRcvDataReady()); EXPECT_TRUE(hasAvailablePackets()); @@ -490,32 +428,6 @@ TEST_F(CRcvBufferReadMsg, MsgHalfAck) const ptrdiff_t offset = i * m_payload_sz; EXPECT_TRUE(verifyPayload(buff.data() + offset, m_payload_sz, CSeqNo::incseq(m_init_seqno, i))); } -#else - EXPECT_FALSE(m_rcv_buffer->isRcvDataReady()); - EXPECT_FALSE(hasAvailablePackets()); - const int res = readMessage(buff.data(), buff.size()); - EXPECT_EQ(res, 0); - - // ACK half of the message and check read-readiness. - ackPackets(2); - // FIXME: Sadly RCV buffer says the data is ready to be read. - EXPECT_TRUE(m_rcv_buffer->isRcvDataReady()); - EXPECT_TRUE(hasAvailablePackets()); - - // Actually must be nothing to read (can't read half a message). - const int res2 = readMessage(buff.data(), buff.size()); - EXPECT_EQ(res2, 0); - - // ACK the remaining half of the message and check read-readiness. - ackPackets(2); - const int res3 = readMessage(buff.data(), buff.size()); - EXPECT_EQ(res3, msg_bytelen); - for (size_t i = 0; i < msg_pkts; ++i) - { - const ptrdiff_t offset = i * m_payload_sz; - EXPECT_TRUE(verifyPayload(buff.data() + offset, m_payload_sz, CSeqNo::incseq(m_init_seqno, i))); - } -#endif EXPECT_EQ(m_unit_queue->size(), m_unit_queue->capacity()); } @@ -528,13 +440,9 @@ TEST_F(CRcvBufferReadMsg, OutOfOrderMsgNoACK) // Adding one message with the Out-Of-Order flag set, but without acknowledging. addMessage(msg_pkts, m_init_seqno, true); -#if ENABLE_NEW_RCVBUFFER EXPECT_TRUE(m_rcv_buffer->isRcvDataReady()); EXPECT_TRUE(hasAvailablePackets()); -#else - EXPECT_FALSE(m_rcv_buffer->isRcvDataReady()); - EXPECT_FALSE(hasAvailablePackets()); -#endif + const size_t msg_bytelen = msg_pkts * m_payload_sz; array buff; const int res = readMessage(buff.data(), buff.size()); @@ -548,19 +456,7 @@ TEST_F(CRcvBufferReadMsg, OutOfOrderMsgNoACK) EXPECT_FALSE(m_rcv_buffer->isRcvDataReady()); EXPECT_FALSE(hasAvailablePackets()); -#if ENABLE_NEW_RCVBUFFER - EXPECT_EQ(m_unit_queue->size(), m_unit_queue->capacity()); -#else - ackPackets(msg_pkts); - // The old buffer still does not free units. - EXPECT_NE(m_unit_queue->size(), m_unit_queue->capacity()); - // BUG: wrong read-ready state. - EXPECT_TRUE(m_rcv_buffer->isRcvDataReady()); - EXPECT_TRUE(hasAvailablePackets()); - // Nothing read, but empty units are freed. - EXPECT_EQ(readMessage(buff.data(), buff.size()), 0); EXPECT_EQ(m_unit_queue->size(), m_unit_queue->capacity()); -#endif } // Adding a message with the out-of-order flag set. @@ -571,13 +467,9 @@ TEST_F(CRcvBufferReadMsg, OutOfOrderMsgGap) // Adding one message with the Out-Of-Order flag set, but without acknowledging. addMessage(msg_pkts, CSeqNo::incseq(m_init_seqno, 1), true); -#if ENABLE_NEW_RCVBUFFER EXPECT_TRUE(m_rcv_buffer->isRcvDataReady()); EXPECT_TRUE(hasAvailablePackets()); -#else - EXPECT_FALSE(m_rcv_buffer->isRcvDataReady()); - EXPECT_FALSE(hasAvailablePackets()); -#endif + const size_t msg_bytelen = msg_pkts * m_payload_sz; array buff; const int res = readMessage(buff.data(), buff.size()); @@ -606,13 +498,8 @@ TEST_F(CRcvBufferReadMsg, OutOfOrderMsgGap) // Only "passack" or EntryState_Read packets remain in the buffer. // They are falsely signalled as read-ready. -#if ENABLE_NEW_RCVBUFFER EXPECT_FALSE(m_rcv_buffer->isRcvDataReady()); EXPECT_FALSE(hasAvailablePackets()); -#else - EXPECT_TRUE(m_rcv_buffer->isRcvDataReady()); // BUG: nothing to read. - EXPECT_TRUE(hasAvailablePackets()); // BUG: nothing to read. -#endif // Adding a packet right after the EntryState_Read packets. const int seqno = CSeqNo::incseq(m_init_seqno, msg_pkts + 1); @@ -643,14 +530,8 @@ TEST_F(CRcvBufferReadMsg, LongMsgReadReady) ackPackets(1); if (!pb_last) { -#if ENABLE_NEW_RCVBUFFER EXPECT_FALSE(m_rcv_buffer->isRcvDataReady()); EXPECT_FALSE(hasAvailablePackets()); -#else - // BUG: The old buffer returns true (read-readiness). - EXPECT_TRUE(m_rcv_buffer->isRcvDataReady()); - EXPECT_TRUE(hasAvailablePackets()); -#endif EXPECT_EQ(readMessage(buff.data(), buff.size()), 0); } } @@ -669,7 +550,6 @@ TEST_F(CRcvBufferReadMsg, LongMsgReadReady) EXPECT_EQ(m_unit_queue->size(), m_unit_queue->capacity()); } -#if ENABLE_NEW_RCVBUFFER // One message (4 packets) is added to the buffer. Can be read out of order. // Reading should be possible even before the missing packet is dropped. TEST_F(CRcvBufferReadMsg, MsgOutOfOrderDrop) @@ -943,5 +823,3 @@ TEST_F(CRcvBufferReadStream, ReadFractional) EXPECT_EQ(m_unit_queue->size(), m_unit_queue->capacity()); } - -#endif // ENABEL_NEW_RCVBUFFER