Skip to content

Commit

Permalink
[core] Fixed RCV TL drop of packets dropped by SND (#2214)
Browse files Browse the repository at this point in the history
Moved the TL drop logic from the TSBPD thread to a dedicated function.
  • Loading branch information
maxsharabayko committed Dec 24, 2021
1 parent 5f7bc23 commit 1111cbd
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 28 deletions.
2 changes: 2 additions & 0 deletions docs/API/statistics.md
Expand Up @@ -235,6 +235,8 @@ where `SRTO_PEERLATENCY` is the configured SRT latency, `SRTO_SNDDROPDELAY` adds
The total number of _dropped_ by the SRT receiver and, as a result, not delivered to the upstream application DATA packets (refer to [TLPKTDROP](https://github.com/Haivision/srt-rfc/blob/master/draft-sharabayko-mops-srt.md#too-late-packet-drop-too-late-packet-drop) mechanism). Available for receiver.

This statistic counts

- not arrived packets including those signalled for dropping by the sender, that were dropped in favor of the subsequent existing packets,
- arrived too late packets (retransmitted or original packets arrived out of order),
- arrived in time packets, but decrypted with errors (see also [pktRcvUndecryptTotal](#pktRcvUndecryptTotal) statistic).

Expand Down
9 changes: 4 additions & 5 deletions srtcore/buffer_rcv.cpp
Expand Up @@ -159,7 +159,7 @@ int CRcvBufferNew::insert(CUnit* unit)
return 0;
}

void CRcvBufferNew::dropUpTo(int32_t seqno)
int CRcvBufferNew::dropUpTo(int32_t seqno)
{
// Can drop only when nothing to read, and
// first unacknowledged packet is missing.
Expand All @@ -173,17 +173,15 @@ void CRcvBufferNew::dropUpTo(int32_t seqno)
if (len <= 0)
{
IF_RCVBUF_DEBUG(scoped_log.ss << ". Nothing to drop.");
return;
return 0;
}

/*LOGC(rbuflog.Warn, log << "CRcvBufferNew.dropUpTo(): seqno=" << seqno << ", pkts=" << len
<< ". Buffer start " << m_iStartSeqNo << ".");*/

m_iMaxPosInc -= len;
if (m_iMaxPosInc < 0)
m_iMaxPosInc = 0;

// Check that all packets being dropped are missing.
const int iDropCnt = len;
while (len > 0)
{
if (m_entries[m_iStartPos].pUnit != NULL)
Expand All @@ -210,6 +208,7 @@ void CRcvBufferNew::dropUpTo(int32_t seqno)
// because start position was increased, and preceeding packets are invalid.
m_iFirstNonreadPos = m_iStartPos;
updateNonreadPos();
return iDropCnt;
}

void CRcvBufferNew::dropMessage(int32_t seqnolo, int32_t seqnohi, int32_t msgno)
Expand Down
3 changes: 2 additions & 1 deletion srtcore/buffer_rcv.h
Expand Up @@ -69,8 +69,9 @@ class CRcvBufferNew

/// Drop packets in the receiver buffer from the current position up to the seqno (excluding seqno).
/// @param [in] seqno drop units up to this sequence number
/// @return number of dropped packets.
///
void dropUpTo(int32_t seqno);
int dropUpTo(int32_t seqno);

/// @brief Drop the whole message from the buffer.
/// If message number is 0, then use sequence numbers to locate sequence range to drop [seqnolo, seqnohi].
Expand Down
51 changes: 29 additions & 22 deletions srtcore/core.cpp
Expand Up @@ -5174,39 +5174,21 @@ void * srt::CUDT::tsbpd(void* param)
rxready = true;
if (info.seq_gap)
{
const int seq_gap_len = CSeqNo::seqoff(self->m_iRcvLastSkipAck, info.seqno);
SRT_ASSERT(seq_gap_len > 0);
/*if (!info.seq_gap)
{
LOGC(brlog.Warn, log << "TSBPD worker: no gap. pktseqno=" << info.seqno
<< ", m_iRcvLastSkipAck=" << self->m_iRcvLastSkipAck
<< ", RBuffer start seqno=" << self->m_pRcvBuffer->getStartSeqNo()
<< ", m_iRcvLastAck=" << self->m_iRcvLastAck
<< ", init seqnoo=" << self->m_iISN);
}*/

// Drop too late packets
self->updateForgotten(seq_gap_len, self->m_iRcvLastSkipAck, info.seqno);
//if (info.seq_gap) // If there is no sequence gap, we are reading ahead of ACK.
//{
self->m_pRcvBuffer->dropUpTo(info.seqno);
//}

self->m_iRcvLastSkipAck = info.seqno;
const int iDropCnt SRT_ATR_UNUSED = self->dropTooLateUpTo(info.seqno);

#if ENABLE_EXPERIMENTAL_BONDING
shall_update_group = true;
#endif

#if ENABLE_LOGGING
const int64_t timediff_us = count_microseconds(tnow - info.tsbpd_time);
// TODO: seq_gap_len is not the actual number of packets dropped.
#if ENABLE_HEAVY_LOGGING
HLOGC(tslog.Debug,
log << self->CONID() << "tsbpd: DROPSEQ: up to seqno %" << CSeqNo::decseq(info.seqno) << " ("
<< seq_gap_len << " packets) playable at " << FormatTime(info.tsbpd_time) << " delayed "
<< iDropCnt << " packets) playable at " << FormatTime(info.tsbpd_time) << " delayed "
<< (timediff_us / 1000) << "." << std::setw(3) << std::setfill('0') << (timediff_us % 1000) << " ms");
#endif
LOGC(brlog.Warn, log << self->CONID() << "RCV-DROPPED " << seq_gap_len << " packet(s), packet seqno %" << info.seqno
LOGC(brlog.Warn, log << self->CONID() << "RCV-DROPPED " << iDropCnt << " packet(s). Packet seqno %" << info.seqno
<< " delayed for " << (timediff_us / 1000) << "." << std::setw(3) << std::setfill('0')
<< (timediff_us % 1000) << " ms");
#endif
Expand Down Expand Up @@ -5539,6 +5521,30 @@ void * srt::CUDT::tsbpd(void *param)
}
#endif // ENABLE_NEW_RCVBUFFER

int srt::CUDT::dropTooLateUpTo(int seqno)
{
const int seq_gap_len = CSeqNo::seqoff(m_iRcvLastSkipAck, seqno);

// seq_gap_len can be <= 0 if a packet has been dropped by the sender.
if (seq_gap_len > 0)
{
// Remove [from,to-inclusive]
dropFromLossLists(m_iRcvLastSkipAck, CSeqNo::decseq(seqno));
m_iRcvLastSkipAck = seqno;
}

const int iDropCnt = m_pRcvBuffer->dropUpTo(seqno);
if (iDropCnt > 0)
{
enterCS(m_StatsLock);
// Estimate dropped bytes from average payload size.
const uint64_t avgpayloadsz = m_pRcvBuffer->getRcvAvgPayloadSize();
m_stats.rcvr.dropped.count(stats::BytesPackets(iDropCnt * avgpayloadsz, (size_t) iDropCnt));
leaveCS(m_StatsLock);
}
return iDropCnt;
}

void srt::CUDT::updateForgotten(int seqlen, int32_t lastack, int32_t skiptoseqno)
{
enterCS(m_StatsLock);
Expand Down Expand Up @@ -7898,6 +7904,7 @@ int srt::CUDT::sendCtrlAck(CPacket& ctrlpkt, int size)
// If there is no loss, the ACK is the current largest sequence number plus 1;
// Otherwise it is the smallest sequence number in the receiver loss list.
ScopedLock lock(m_RcvLossLock);
// TODO: Consider the Fresh Loss list as well!!!
ack = m_pRcvLossList->getFirstLostSeq();
}

Expand Down
6 changes: 6 additions & 0 deletions srtcore/core.h
Expand Up @@ -705,6 +705,12 @@ class CUDT
// TSBPD thread main function.
static void* tsbpd(void* param);

/// Drop too late packets. Updaet loss lists and ACK positions.
/// The @a seqno packet itself is not dropped.
/// @param seqno [in] The sequence number of the first packets following those to be dropped.
/// @return The number of packets dropped.
int dropTooLateUpTo(int seqno);

void updateForgotten(int seqlen, int32_t lastack, int32_t skiptoseqno);

static loss_seqs_t defaultPacketArrival(void* vself, CPacket& pkt);
Expand Down

0 comments on commit 1111cbd

Please sign in to comment.