Skip to content

Commit

Permalink
[core] Small refactoring of processCtrlAck()
Browse files Browse the repository at this point in the history
mainly to avoid confusion between ack_seqno and ackdata_seqno
  • Loading branch information
maxsharabayko authored and rndi committed Oct 15, 2019
1 parent 3151fcc commit 94eb4be
Showing 1 changed file with 62 additions and 98 deletions.
160 changes: 62 additions & 98 deletions srtcore/core.cpp
Expand Up @@ -7129,87 +7129,90 @@ void CUDT::sendCtrl(UDTMessageType pkttype, const void* lparam, void* rparam, in

void CUDT::processCtrlAck(const CPacket& ctrlpkt, const uint64_t currtime_tk)
{
int32_t ack;
int32_t* ackdata = (int32_t*)ctrlpkt.m_pcData;
const int32_t* ackdata = (const int32_t*)ctrlpkt.m_pcData;
const int32_t ackdata_seqno = ackdata[ACKD_RCVLASTACK];

// process a lite ACK
// Process a lite ACK
if (ctrlpkt.getLength() == (size_t)SEND_LITE_ACK)
{
ack = *ackdata;
if (CSeqNo::seqcmp(ack, m_iSndLastAck) >= 0)
if (CSeqNo::seqcmp(ackdata_seqno, m_iSndLastAck) >= 0)
{
m_iFlowWindowSize -= CSeqNo::seqoff(m_iSndLastAck, ack);
m_iFlowWindowSize -= CSeqNo::seqoff(m_iSndLastAck, ackdata_seqno);
HLOGC(mglog.Debug, log << CONID() << "ACK covers: " << m_iSndLastDataAck << " - " << ack << " [ACK=" << m_iSndLastAck << "] (FLW: " << m_iFlowWindowSize << ") [LITE]");

m_iSndLastAck = ack;
m_iSndLastAck = ackdata_seqno;
m_ullLastRspAckTime_tk = currtime_tk;
m_iReXmitCount = 1; // Reset re-transmit count since last ACK
m_iReXmitCount = 1; // Reset re-transmit count since last ACK
}

return;
}

// read ACK seq. no.
ack = ctrlpkt.getAckSeqNo();

// send ACK acknowledgement
// number of ACK2 can be much less than number of ACK
uint64_t now = CTimer::getTime();
if ((now - m_ullSndLastAck2Time > (uint64_t)COMM_SYN_INTERVAL_US) || (ack == m_iSndLastAck2))
// Decide to send ACKACK or not
{
sendCtrl(UMSG_ACKACK, &ack);
m_iSndLastAck2 = ack;
m_ullSndLastAck2Time = now;
}
// Sequence number of the ACK packet
const int32_t ack_seqno = ctrlpkt.getAckSeqNo();

// Got data ACK
ack = ackdata[ACKD_RCVLASTACK];
// Send ACK acknowledgement (UMSG_ACKACK).
// There can be less ACKACK packets in the stream, than the number of ACK packets.
// Only send ACKACK every syn interval or if ACK packet with the sequence number
// already acknowledged (with ACKACK) has come again, which probably means ACKACK was lost.
const uint64_t now = CTimer::getTime();
if ((now - m_ullSndLastAck2Time > (uint64_t)COMM_SYN_INTERVAL_US) || (ack_seqno == m_iSndLastAck2))
{
sendCtrl(UMSG_ACKACK, &ack_seqno);
m_iSndLastAck2 = ack_seqno;
m_ullSndLastAck2Time = now;
}
}

// New code, with TLPKTDROP
//
// Begin of the new code with TLPKTDROP.
//

// protect packet retransmission
// Protect packet retransmission
CGuard::enterCS(m_AckLock);

// check the validation of the ack
if (CSeqNo::seqcmp(ack, CSeqNo::incseq(m_iSndCurrSeqNo)) > 0)
// Check the validation of the ack
if (CSeqNo::seqcmp(ackdata_seqno, CSeqNo::incseq(m_iSndCurrSeqNo)) > 0)
{
CGuard::leaveCS(m_AckLock);
//this should not happen: attack or bug
LOGC(glog.Error, log << CONID() << "ATTACK/IPE: incoming ack seq " << ack << " exceeds current "
<< m_iSndCurrSeqNo << " by " << (CSeqNo::seqoff(m_iSndCurrSeqNo, ack) - 1) << "!");
// this should not happen: attack or bug
LOGC(glog.Error, log << CONID() << "ATTACK/IPE: incoming ack seq " << ackdata_seqno << " exceeds current "
<< m_iSndCurrSeqNo << " by " << (CSeqNo::seqoff(m_iSndCurrSeqNo, ackdata_seqno) - 1) << "!");
m_bBroken = true;
m_iBrokenCounter = 0;
return;
}

if (CSeqNo::seqcmp(ack, m_iSndLastAck) >= 0)
if (CSeqNo::seqcmp(ackdata_seqno, m_iSndLastAck) >= 0)
{
// Update Flow Window Size, must update before and together with m_iSndLastAck
m_iFlowWindowSize = ackdata[ACKD_BUFFERLEFT];
m_iSndLastAck = ack;
m_iSndLastAck = ackdata_seqno;
m_ullLastRspAckTime_tk = currtime_tk;
m_iReXmitCount = 1; // Reset re-transmit count since last ACK
m_iReXmitCount = 1; // Reset re-transmit count since last ACK
}

/*
* We must not ignore full ack received by peer
* if data has been artificially acked by late packet drop.
* Therefore, a distinct ack state is used for received Ack (iSndLastFullAck)
* and ack position in send buffer (m_iSndLastDataAck).
* Otherwise, when severe congestion causing packet drops (and m_iSndLastDataAck update)
* occures, we drop received acks (as duplicates) and do not update stats like RTT,
* which may go crazy and stay there, preventing proper stream recovery.
*/
* We must not ignore full ack received by peer
* if data has been artificially acked by late packet drop.
* Therefore, a distinct ack state is used for received Ack (iSndLastFullAck)
* and ack position in send buffer (m_iSndLastDataAck).
* Otherwise, when severe congestion causing packet drops (and m_iSndLastDataAck update)
* occures, we drop received acks (as duplicates) and do not update stats like RTT,
* which may go crazy and stay there, preventing proper stream recovery.
*/

if (CSeqNo::seqoff(m_iSndLastFullAck, ack) <= 0)
if (CSeqNo::seqoff(m_iSndLastFullAck, ackdata_seqno) <= 0)
{
// discard it if it is a repeated ACK
CGuard::leaveCS(m_AckLock);
return;
}
m_iSndLastFullAck = ack;
m_iSndLastFullAck = ackdata_seqno;

int offset = CSeqNo::seqoff(m_iSndLastDataAck, ack);
int offset = CSeqNo::seqoff(m_iSndLastDataAck, ackdata_seqno);
// IF distance between m_iSndLastDataAck and ack is nonempty...
if (offset > 0) {
// acknowledge the sending buffer (remove data that predate 'ack')
Expand All @@ -7223,63 +7226,21 @@ void CUDT::processCtrlAck(const CPacket& ctrlpkt, const uint64_t currtime_tk)
m_stats.sndDurationCounter = currtime;
CGuard::leaveCS(m_StatsLock);

HLOGC(mglog.Debug, log << CONID() << "ACK covers: " << m_iSndLastDataAck << " - " << ack
HLOGC(mglog.Debug, log << CONID() << "ACK covers: " << m_iSndLastDataAck << " - " << ackdata_seqno
<< " [ACK=" << m_iSndLastAck << "] BUFr=" << m_iFlowWindowSize
<< " RTT=" << ackdata[ACKD_RTT] << " RTT*=" << ackdata[ACKD_RTTVAR]
<< " BW=" << ackdata[ACKD_BANDWIDTH] << " Vrec=" << ackdata[ACKD_RCVSPEED]);
// update sending variables
m_iSndLastDataAck = ack;
m_iSndLastDataAck = ackdata_seqno;

// remove any loss that predates 'ack' (not to be considered loss anymore)
m_pSndLossList->remove(CSeqNo::decseq(m_iSndLastDataAck));
}

/* OLD CODE without TLPKTDROP
// check the validation of the ack
if (CSeqNo::seqcmp(ack, CSeqNo::incseq(m_iSndCurrSeqNo)) > 0)
{
//this should not happen: attack or bug
m_bBroken = true;
m_iBrokenCounter = 0;
break;
}

if (CSeqNo::seqcmp(ack, m_iSndLastAck) >= 0)
{
// Update Flow Window Size, must update before and together with m_iSndLastAck
m_iFlowWindowSize = ackdata[ACKD_BUFFERLEFT];
m_iSndLastAck = ack;
m_ullLastRspAckTime_tk = currtime_tk;
m_iReXmitCount = 1; // Reset re-transmit count since last ACK
}
// protect packet retransmission
CGuard::enterCS(m_AckLock);
int offset = CSeqNo::seqoff(m_iSndLastDataAck, ack);
if (offset <= 0)
{
// discard it if it is a repeated ACK
CGuard::leaveCS(m_AckLock);
break;
}
// acknowledge the sending buffer
m_pSndBuffer->ackData(offset);
// record total time used for sending
int64_t currtime = currtime_tk/m_ullCPUFrequency;
m_llSndDuration += currtime - m_llSndDurationCounter;
m_llSndDurationTotal += currtime - m_llSndDurationCounter;
m_llSndDurationCounter = currtime;
// update sending variables
m_iSndLastDataAck = ack;
m_pSndLossList->remove(CSeqNo::decseq(m_iSndLastDataAck));
#endif SRT_ENABLE_TLPKTDROP */
//
// END of the new code with TLPKTDROP
//

CGuard::leaveCS(m_AckLock);
if (m_bSynSending)
Expand All @@ -7295,13 +7256,15 @@ void CUDT::processCtrlAck(const CPacket& ctrlpkt, const uint64_t currtime_tk)
m_pSndQueue->m_pSndUList->update(this, CSndUList::DONT_RESCHEDULE);

size_t acksize = ctrlpkt.getLength(); // TEMPORARY VALUE FOR CHECKING
bool wrongsize = 0 != (acksize % ACKD_FIELD_SIZE);
acksize = acksize / ACKD_FIELD_SIZE; // ACTUAL VALUE
bool wrongsize = 0 != (acksize % ACKD_FIELD_SIZE);
acksize = acksize / ACKD_FIELD_SIZE; // ACTUAL VALUE

if (wrongsize)
{
// Issue a log, but don't do anything but skipping the "odd" bytes from the payload.
LOGC(mglog.Error, log << CONID() << "Received UMSG_ACK payload is not evened up to 4-byte based field size - cutting to " << acksize << " fields");
LOGC(mglog.Error,
log << CONID() << "Received UMSG_ACK payload is not evened up to 4-byte based field size - cutting to "
<< acksize << " fields");
}

// Start with checking the base size.
Expand All @@ -7314,11 +7277,11 @@ void CUDT::processCtrlAck(const CPacket& ctrlpkt, const uint64_t currtime_tk)
// This check covers fields up to ACKD_BUFFERLEFT.

// Update RTT
//m_iRTT = ackdata[ACKD_RTT];
//m_iRTTVar = ackdata[ACKD_RTTVAR];
// m_iRTT = ackdata[ACKD_RTT];
// m_iRTTVar = ackdata[ACKD_RTTVAR];
// XXX These ^^^ commented-out were blocked in UDT;
// the current RTT calculations are exactly the same as in UDT4.
int rtt = ackdata[ACKD_RTT];
const int rtt = ackdata[ACKD_RTT];

m_iRTTVar = avg_iir<4>(m_iRTTVar, abs(rtt - m_iRTT));
m_iRTT = avg_iir<8>(m_iRTT, rtt);
Expand All @@ -7345,8 +7308,9 @@ void CUDT::processCtrlAck(const CPacket& ctrlpkt, const uint64_t currtime_tk)
int bandwidth = ackdata[ACKD_BANDWIDTH];
int bytesps;

/* SRT v1.0.2 Bytes-based stats: bandwidth (pcData[ACKD_XMRATE]) and delivery rate (pcData[ACKD_RCVRATE]) in bytes/sec instead of pkts/sec */
/* SRT v1.0.3 Bytes-based stats: only delivery rate (pcData[ACKD_RCVRATE]) in bytes/sec instead of pkts/sec */
/* SRT v1.0.2 Bytes-based stats: bandwidth (pcData[ACKD_XMRATE]) and delivery rate (pcData[ACKD_RCVRATE]) in
* bytes/sec instead of pkts/sec */
/* SRT v1.0.3 Bytes-based stats: only delivery rate (pcData[ACKD_RCVRATE]) in bytes/sec instead of pkts/sec */
if (acksize > ACKD_TOTAL_SIZE_UDTBASE)
bytesps = ackdata[ACKD_RCVRATE];
else
Expand All @@ -7366,7 +7330,7 @@ void CUDT::processCtrlAck(const CPacket& ctrlpkt, const uint64_t currtime_tk)
}

checkSndTimers(REGEN_KM);
updateCC(TEV_ACK, ack);
updateCC(TEV_ACK, ackdata_seqno);

CGuard::enterCS(m_StatsLock);
++m_stats.recvACK;
Expand Down

0 comments on commit 94eb4be

Please sign in to comment.