Skip to content

Commit

Permalink
[core] Fix MaxBW limitation.
Browse files Browse the repository at this point in the history
Don't reschedule sending (keep pacing) on
- SND drop,
- NAK received
- retransmission timeout.
  • Loading branch information
maxsharabayko committed Jan 26, 2022
1 parent 308cd30 commit 912463b
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 78 deletions.
2 changes: 1 addition & 1 deletion srtcore/congctl.cpp
Expand Up @@ -77,7 +77,7 @@ class LiveCC: public SrtCongestionControlBase
{
m_llSndMaxBW = BW_INFINITE; // 1 Gbbps in Bytes/sec BW_INFINITE
m_zMaxPayloadSize = parent->OPT_PayloadSize();
if ( m_zMaxPayloadSize == 0 )
if (m_zMaxPayloadSize == 0)
m_zMaxPayloadSize = parent->maxPayloadSize();
m_zSndAvgPayloadSize = m_zMaxPayloadSize;

Expand Down
136 changes: 64 additions & 72 deletions srtcore/core.cpp
Expand Up @@ -5174,8 +5174,7 @@ void * srt::CUDT::tsbpd(void* param)
rxready = true;
if (info.seq_gap)
{
const int iDropCnt SRT_ATR_UNUSED = self->dropTooLateUpTo(info.seqno);

const int iDropCnt SRT_ATR_UNUSED = self->rcvDropTooLateUpTo(info.seqno);
#if ENABLE_EXPERIMENTAL_BONDING
shall_update_group = true;
#endif
Expand Down Expand Up @@ -5303,7 +5302,7 @@ void * srt::CUDT::tsbpd(void* param)
return NULL;
}

int srt::CUDT::dropTooLateUpTo(int seqno)
int srt::CUDT::rcvDropTooLateUpTo(int seqno)
{
const int seq_gap_len = CSeqNo::seqoff(m_iRcvLastSkipAck, seqno);

Expand Down Expand Up @@ -6327,10 +6326,10 @@ int srt::CUDT::receiveBuffer(char *data, int len)

// [[using maybe_locked(CUDTGroup::m_GroupLock, m_parent->m_GroupOf != NULL)]];
// [[using locked(m_SendLock)]];
bool srt::CUDT::checkNeedDrop()
int srt::CUDT::sndDropTooLate()
{
if (!m_bPeerTLPktDrop)
return false;
return 0;

if (!m_config.bMessageAPI)
{
Expand All @@ -6352,72 +6351,64 @@ bool srt::CUDT::checkNeedDrop()
+ (2 * COMM_SYN_INTERVAL_US / 1000)
: 0;

bool bCongestion = false;
if (threshold_ms && buffdelay_ms > threshold_ms)
{
// protect packet retransmission
enterCS(m_RecvAckLock);
int dbytes;
int32_t first_msgno;
int dpkts = m_pSndBuffer->dropLateData((dbytes), (first_msgno), tnow - milliseconds_from(threshold_ms));
if (dpkts > 0)
{
enterCS(m_StatsLock);
m_stats.sndr.dropped.count(stats::BytesPackets(dbytes, dpkts));
leaveCS(m_StatsLock);
if (threshold_ms == 0 || buffdelay_ms <= threshold_ms)
return 0;

IF_HEAVY_LOGGING(const int32_t realack = m_iSndLastDataAck);
const int32_t fakeack = CSeqNo::incseq(m_iSndLastDataAck, dpkts);
// protect packet retransmission
ScopedLock rcvlck(m_RecvAckLock);
int dbytes;
int32_t first_msgno;
const int dpkts = m_pSndBuffer->dropLateData((dbytes), (first_msgno), tnow - milliseconds_from(threshold_ms));
if (dpkts <= 0)
return 0;

m_iSndLastAck = fakeack;
m_iSndLastDataAck = fakeack;
// If some packets were dropped update stats, socket state, loss list and the parent group if any.
enterCS(m_StatsLock);
m_stats.sndr.dropped.count(dbytes);;
leaveCS(m_StatsLock);

int32_t minlastack = CSeqNo::decseq(m_iSndLastDataAck);
m_pSndLossList->removeUpTo(minlastack);
/* If we dropped packets not yet sent, advance current position */
// THIS MEANS: m_iSndCurrSeqNo = MAX(m_iSndCurrSeqNo, m_iSndLastDataAck-1)
if (CSeqNo::seqcmp(m_iSndCurrSeqNo, minlastack) < 0)
{
m_iSndCurrSeqNo = minlastack;
}
IF_HEAVY_LOGGING(const int32_t realack = m_iSndLastDataAck);
const int32_t fakeack = CSeqNo::incseq(m_iSndLastDataAck, dpkts);

HLOGC(aslog.Debug, log << "SND-DROP: %(" << realack << "-" << m_iSndCurrSeqNo << ") n="
<< dpkts << "pkt " << dbytes << "B, span=" << buffdelay_ms << " ms, FIRST #" << first_msgno);
m_iSndLastAck = fakeack;
m_iSndLastDataAck = fakeack;

#if ENABLE_EXPERIMENTAL_BONDING
// This is done with a presumption that the group
// exists and if this is not NULL, it means that this
// function was called with locked m_GroupLock, as sendmsg2
// function was called from inside CUDTGroup::send, which
// locks the whole function.
//
// XXX This is true only because all existing groups are managed
// groups, that is, sockets cannot be added or removed from group
// manually, nor can send/recv operation be done on a single socket
// from the API call directly. This should be extra verified, if that
// changes in the future.
//
if (m_parent->m_GroupOf)
{
// What's important is that the lock on GroupLock cannot be applied
// here, both because it might be applied already, that is, according
// to the condition defined at this function's header, it is applied
// under this condition. Hence ackMessage can be defined as 100% locked.
m_parent->m_GroupOf->ackMessage(first_msgno);
}
#endif
}
bCongestion = true;
leaveCS(m_RecvAckLock);
}
else if (buffdelay_ms > (m_iPeerTsbPdDelay_ms / 2))
const int32_t minlastack = CSeqNo::decseq(m_iSndLastDataAck);
m_pSndLossList->removeUpTo(minlastack);
/* If we dropped packets not yet sent, advance current position */
// THIS MEANS: m_iSndCurrSeqNo = MAX(m_iSndCurrSeqNo, m_iSndLastDataAck-1)
if (CSeqNo::seqcmp(m_iSndCurrSeqNo, minlastack) < 0)
{
HLOGC(aslog.Debug,
log << "cong TIMESPAN " << buffdelay_ms << "ms");
m_iSndCurrSeqNo = minlastack;
}

bCongestion = true;
HLOGC(aslog.Debug, log << "SND-DROP: %(" << realack << "-" << m_iSndCurrSeqNo << ") n="
<< dpkts << "pkt " << dbytes << "B, span=" << buffdelay_ms << " ms, FIRST #" << first_msgno);

#if ENABLE_EXPERIMENTAL_BONDING
// This is done with a presumption that the group
// exists and if this is not NULL, it means that this
// function was called with locked m_GroupLock, as sendmsg2
// function was called from inside CUDTGroup::send, which
// locks the whole function.
//
// XXX This is true only because all existing groups are managed
// groups, that is, sockets cannot be added or removed from group
// manually, nor can send/recv operation be done on a single socket
// from the API call directly. This should be extra verified, if that
// changes in the future.
//
if (m_parent->m_GroupOf)
{
// What's important is that the lock on GroupLock cannot be applied
// here, both because it might be applied already, that is, according
// to the condition defined at this function's header, it is applied
// under this condition. Hence ackMessage can be defined as 100% locked.
m_parent->m_GroupOf->ackMessage(first_msgno);
}
return bCongestion;
#endif

return dpkts;
}

int srt::CUDT::sendmsg(const char *data, int len, int msttl, bool inorder, int64_t srctime)
Expand Down Expand Up @@ -6520,9 +6511,9 @@ int srt::CUDT::sendmsg2(const char *data, int len, SRT_MSGCTRL& w_mctrl)
m_iReXmitCount = 1;
}

// checkNeedDrop(...) may lock m_RecvAckLock
// sndDropTooLate(...) may lock m_RecvAckLock
// to modify m_pSndBuffer and m_pSndLossList
const bool bCongestion = checkNeedDrop();
const int iPktsTLDropped SRT_ATR_UNUSED = sndDropTooLate();

int minlen = 1; // Minimum sender buffer space required for STREAM API
if (m_config.bMessageAPI)
Expand Down Expand Up @@ -6701,12 +6692,13 @@ int srt::CUDT::sendmsg2(const char *data, int len, SRT_MSGCTRL& w_mctrl)
}
}

// insert this socket to the snd list if it is not on the list yet
// Insert this socket to the snd list if it is not on the list already.
// m_pSndUList->pop may lock CSndUList::m_ListLock and then m_RecvAckLock
m_pSndQueue->m_pSndUList->update(this, CSndUList::rescheduleIf(bCongestion));
m_pSndQueue->m_pSndUList->update(this, CSndUList::DONT_RESCHEDULE);

#ifdef SRT_ENABLE_ECN
if (bCongestion)
// IF there was a packet drop on the sender side, report congestion to the app.
if (iPktsTLDropped > 0)
{
LOGC(aslog.Error, log << "sendmsg2: CONGESTION; reporting error");
throw CUDTException(MJ_AGAIN, MN_CONGESTION, 0);
Expand Down Expand Up @@ -8192,7 +8184,7 @@ void srt::CUDT::updateSndLossListOnACK(int32_t ackdata_seqno)

// Guard access to m_iSndAckedMsgNo field
// Note: This can't be done inside CUDTGroup::ackMessage
// because this function is also called from CUDT::checkNeedDrop
// because this function is also called from CUDT::sndDropTooLate
// called from CUDT::sendmsg2 called from CUDTGroup::send, which
// applies the lock on m_GroupLock already.
ScopedLock glk (*m_parent->m_GroupOf->exp_groupLock());
Expand Down Expand Up @@ -8696,7 +8688,7 @@ void srt::CUDT::processCtrlLossReport(const CPacket& ctrlpkt)
}

// the lost packet (retransmission) should be sent out immediately
m_pSndQueue->m_pSndUList->update(this, CSndUList::DO_RESCHEDULE);
m_pSndQueue->m_pSndUList->update(this, CSndUList::DONT_RESCHEDULE);

enterCS(m_StatsLock);
m_stats.sndr.recvdNak.count(1);
Expand Down Expand Up @@ -11166,8 +11158,8 @@ void srt::CUDT::checkRexmitTimer(const steady_clock::time_point& currtime)
const ECheckTimerStage stage = is_fastrexmit ? TEV_CHT_FASTREXMIT : TEV_CHT_REXMIT;
updateCC(TEV_CHECKTIMER, EventVariant(stage));

// immediately restart transmission
m_pSndQueue->m_pSndUList->update(this, CSndUList::DO_RESCHEDULE);
// schedule sending if not scheduled already
m_pSndQueue->m_pSndUList->update(this, CSndUList::DONT_RESCHEDULE);
}

void srt::CUDT::checkTimers()
Expand Down
9 changes: 6 additions & 3 deletions srtcore/core.h
Expand Up @@ -543,7 +543,10 @@ class CUDT

void updateIdleLinkFrom(CUDT* source);

bool checkNeedDrop();
/// @brief Drop packets too late to be delivered if any.
/// @returns the number of packets actually dropped.
SRT_ATTR_REQUIRES(m_RecvAckLock, m_StatsLock)
int sndDropTooLate();

/// Connect to a UDT entity as per hs request. This will update
/// required data in the entity, then update them also in the hs structure,
Expand Down Expand Up @@ -706,11 +709,11 @@ class CUDT
static void* tsbpd(void* param);

#if ENABLE_NEW_RCVBUFFER
/// Drop too late packets. Updaet loss lists and ACK positions.
/// Drop too late packets (receiver side). Updaet loss lists and ACK positions.
/// The @a seqno packet itself is not dropped.
/// @param seqno [in] The sequence number of the first packets following those to be dropped.
/// @return The number of packets dropped.
int dropTooLateUpTo(int seqno);
int rcvDropTooLateUpTo(int seqno);
#endif

void updateForgotten(int seqlen, int32_t lastack, int32_t skiptoseqno);
Expand Down
3 changes: 1 addition & 2 deletions srtcore/stats.h
Expand Up @@ -91,8 +91,7 @@ class BytesPackets

uint64_t bytesWithHdr() const
{
static const int PKT_HDR_SIZE = CPacket::HDR_SIZE + CPacket::UDP_HDR_SIZE;
return m_bytes + m_packets * PKT_HDR_SIZE;
return m_bytes + m_packets * CPacket::SRT_DATA_HDR_SIZE;
}

private:
Expand Down

0 comments on commit 912463b

Please sign in to comment.