Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Fix hang up on not enough space in the RCV buffer. #2745

Merged
merged 10 commits into from Aug 8, 2023
51 changes: 29 additions & 22 deletions srtcore/core.cpp
Expand Up @@ -7904,7 +7904,6 @@
SRT_ASSERT(ctrlpkt.getMsgTimeStamp() != 0);
int nbsent = 0;
int local_prevack = 0;

#if ENABLE_HEAVY_LOGGING
struct SaveBack
{
Expand All @@ -7927,21 +7926,22 @@
// The TSBPD thread may change the first lost sequence record (TLPKTDROP).
// To avoid it the m_RcvBufferLock has to be acquired.
UniqueLock bufflock(m_RcvBufferLock);

// The full ACK should be sent to indicate there is now available space in the RCV buffer
// since the last full ACK. It should unblock the sender to proceed further.
const bool bNeedFullAck = (m_bBufferWasFull && getAvailRcvBufferSizeNoLock() > 0);
maxsharabayko marked this conversation as resolved.
Show resolved Hide resolved
int32_t ack; // First unacknowledged packet sequence number (acknowledge up to ack).
if (!getFirstNoncontSequence((ack), (reason)))
return nbsent;

if (m_iRcvLastAckAck == ack)
if (m_iRcvLastAckAck == ack && !bNeedFullAck)
{
HLOGC(xtlog.Debug,
log << CONID() << "sendCtrl(UMSG_ACK): last ACK %" << ack << "(" << reason << ") == last ACKACK");
return nbsent;
HLOGC(xtlog.Debug,
log << CONID() << "sendCtrl(UMSG_ACK): last ACK %" << ack << "(" << reason << ") == last ACKACK");
return nbsent;
}

// send out a lite ACK
// to save time on buffer processing and bandwidth/AS measurement, a lite ACK only feeds back an ACK number
if (size == SEND_LITE_ACK)
if (size == SEND_LITE_ACK && !bNeedFullAck)
{
bufflock.unlock();
ctrlpkt.pack(UMSG_ACK, NULL, &ack, size);
Expand Down Expand Up @@ -8079,7 +8079,7 @@
CGlobEvent::triggerEvent();
}
}
else if (ack == m_iRcvLastAck)
else if (ack == m_iRcvLastAck && !bNeedFullAck)
{
// If the ACK was just sent already AND elapsed time did not exceed RTT,
if ((steady_clock::now() - m_tsLastAckTime) <
Expand All @@ -8090,7 +8090,7 @@
return nbsent;
}
}
else
else if (!bNeedFullAck)
{
// Not possible (m_iRcvCurrSeqNo+1 <% m_iRcvLastAck ?)
LOGC(xtlog.Error, log << CONID() << "sendCtrl(UMSG_ACK): IPE: curr %" << ack << " <% last %" << m_iRcvLastAck);
Expand All @@ -8101,8 +8101,8 @@
// [[using locked(m_RcvBufferLock)]];

// Send out the ACK only if has not been received by the sender before
if (CSeqNo::seqcmp(m_iRcvLastAck, m_iRcvLastAckAck) > 0)
if (CSeqNo::seqcmp(m_iRcvLastAck, m_iRcvLastAckAck) > 0 || bNeedFullAck)
{

Check notice

Code scanning / CodeQL

Commented-out code Note

This comment appears to contain commented-out code.
// NOTE: The BSTATS feature turns on extra fields above size 6
// also known as ACKD_TOTAL_SIZE_VER100.
int32_t data[ACKD_TOTAL_SIZE];
Expand All @@ -8117,10 +8117,7 @@
data[ACKD_RTT] = m_iSRTT;
data[ACKD_RTTVAR] = m_iRTTVar;
data[ACKD_BUFFERLEFT] = (int) getAvailRcvBufferSizeNoLock();
// a minimum flow window of 2 is used, even if buffer is full, to break potential deadlock
if (data[ACKD_BUFFERLEFT] < 2)
data[ACKD_BUFFERLEFT] = 2;

m_bBufferWasFull = data[ACKD_BUFFERLEFT] == 0;
if (steady_clock::now() - m_tsLastAckTime > m_tdACKInterval)
{
int rcvRate;
Expand Down Expand Up @@ -8295,7 +8292,6 @@
m_tsLastRspAckTime = currtime;
m_iReXmitCount = 1; // Reset re-transmit count since last ACK
}

return;
}

Expand Down Expand Up @@ -8336,14 +8332,25 @@
return;
}

if (CSeqNo::seqcmp(ackdata_seqno, m_iSndLastAck) >= 0)
if (CSeqNo::seqcmp(ackdata_seqno, m_iSndLastAck) >= 0)
{
const int cwnd1 = std::min(int(m_iFlowWindowSize), int(m_dCongestionWindow));
const bool bWasStuck = cwnd1<= getFlightSpan();
// Update Flow Window Size, must update before and together with m_iSndLastAck
m_iFlowWindowSize = ackdata[ACKD_BUFFERLEFT];
m_iSndLastAck = ackdata_seqno;
m_tsLastRspAckTime = currtime;
m_iReXmitCount = 1; // Reset re-transmit count since last ACK

const int cwnd = std::min(int(m_iFlowWindowSize), int(m_dCongestionWindow));
if (bWasStuck && cwnd > getFlightSpan())
{
// Update Flow Window Size, must update before and together with m_iSndLastAck
m_iFlowWindowSize = ackdata[ACKD_BUFFERLEFT];
m_iSndLastAck = ackdata_seqno;
m_tsLastRspAckTime = currtime;
m_iReXmitCount = 1; // Reset re-transmit count since last ACK
m_pSndQueue->m_pSndUList->update(this, CSndUList::DONT_RESCHEDULE);
HLOGC(gglog.Debug,
log << CONID() << "processCtrlAck: could reschedule SND. iFlowWindowSize " << m_iFlowWindowSize
<< " SPAN " << getFlightSpan() << " ackdataseqno %" << ackdata_seqno);
}
}

/*
* We must not ignore full ack received by peer
Expand Down
2 changes: 1 addition & 1 deletion srtcore/core.h
Expand Up @@ -934,7 +934,7 @@ class CUDT
int32_t m_iAckSeqNo; // Last ACK sequence number
sync::atomic<int32_t> m_iRcvCurrSeqNo; // (RCV) Largest received sequence number. RcvQTh, TSBPDTh.
int32_t m_iRcvCurrPhySeqNo; // Same as m_iRcvCurrSeqNo, but physical only (disregarding a filter)

bool m_bBufferWasFull; // Indicate that RX buffer was full last time a ack was sent
maxsharabayko marked this conversation as resolved.
Show resolved Hide resolved
int32_t m_iPeerISN; // Initial Sequence Number of the peer side

uint32_t m_uPeerSrtVersion;
Expand Down