From 3151fcc5f4d4024c79761643a7556fb2fdfbc471 Mon Sep 17 00:00:00 2001 From: Maxim Sharabayko Date: Thu, 15 Aug 2019 16:42:10 +0200 Subject: [PATCH] [core] Added processCtrlAck() function. The code from the corresponding section of CUDT::processCtrl(...) is directly copied to the new function without changes. --- srtcore/core.cpp | 448 +++++++++++++++++++++++---------------------- srtcore/core.h | 4 +- srtcore/packet.cpp | 2 +- srtcore/packet.h | 2 +- 4 files changed, 230 insertions(+), 226 deletions(-) diff --git a/srtcore/core.cpp b/srtcore/core.cpp index 50cd861dd..c1612b97d 100644 --- a/srtcore/core.cpp +++ b/srtcore/core.cpp @@ -6809,7 +6809,7 @@ static void DebugAck(string hdr, int prev, int ack) static inline void DebugAck(string, int, int) {} #endif -void CUDT::sendCtrl(UDTMessageType pkttype, void* lparam, void* rparam, int size) +void CUDT::sendCtrl(UDTMessageType pkttype, const void* lparam, void* rparam, int size) { CPacket ctrlpkt; uint64_t currtime_tk; @@ -7127,268 +7127,270 @@ void CUDT::sendCtrl(UDTMessageType pkttype, void* lparam, void* rparam, int size m_ullLastSndTime_tk = currtime_tk; } -void CUDT::processCtrl(CPacket& ctrlpkt) +void CUDT::processCtrlAck(const CPacket& ctrlpkt, const uint64_t currtime_tk) { - // Just heard from the peer, reset the expiration count. - m_iEXPCount = 1; - uint64_t currtime_tk; - CTimer::rdtsc(currtime_tk); - m_ullLastRspTime_tk = currtime_tk; - bool using_rexmit_flag = m_bPeerRexmitFlag; - - HLOGC(mglog.Debug, log << CONID() << "incoming UMSG:" << ctrlpkt.getType() << " (" - << MessageTypeStr(ctrlpkt.getType(), ctrlpkt.getExtendedType()) << ") socket=%" << ctrlpkt.m_iID); + int32_t ack; + int32_t* ackdata = (int32_t*)ctrlpkt.m_pcData; - switch (ctrlpkt.getType()) - { - case UMSG_ACK: //010 - Acknowledgement - { - int32_t ack; - int32_t* ackdata = (int32_t*)ctrlpkt.m_pcData; - - // process a lite ACK - if (ctrlpkt.getLength() == (size_t)SEND_LITE_ACK) - { - ack = *ackdata; - if (CSeqNo::seqcmp(ack, m_iSndLastAck) >= 0) - { + // process a lite ACK + if (ctrlpkt.getLength() == (size_t)SEND_LITE_ACK) + { + ack = *ackdata; + if (CSeqNo::seqcmp(ack, m_iSndLastAck) >= 0) + { m_iFlowWindowSize -= CSeqNo::seqoff(m_iSndLastAck, ack); HLOGC(mglog.Debug, log << CONID() << "ACK covers: " << m_iSndLastDataAck << " - " << ack << " [ACK=" << m_iSndLastAck << "] (FLW: " << m_iFlowWindowSize << ") [LITE]"); m_iSndLastAck = ack; m_ullLastRspAckTime_tk = currtime_tk; m_iReXmitCount = 1; // Reset re-transmit count since last ACK - } + } - break; - } + return; + } - // read ACK seq. no. - ack = ctrlpkt.getAckSeqNo(); + // read ACK seq. no. + ack = ctrlpkt.getAckSeqNo(); - // send ACK acknowledgement - // number of ACK2 can be much less than number of ACK - uint64_t now = CTimer::getTime(); - if ((now - m_ullSndLastAck2Time > (uint64_t)COMM_SYN_INTERVAL_US) || (ack == m_iSndLastAck2)) - { - sendCtrl(UMSG_ACKACK, &ack); - m_iSndLastAck2 = ack; - m_ullSndLastAck2Time = now; - } + // send ACK acknowledgement + // number of ACK2 can be much less than number of ACK + uint64_t now = CTimer::getTime(); + if ((now - m_ullSndLastAck2Time > (uint64_t)COMM_SYN_INTERVAL_US) || (ack == m_iSndLastAck2)) + { + sendCtrl(UMSG_ACKACK, &ack); + m_iSndLastAck2 = ack; + m_ullSndLastAck2Time = now; + } - // Got data ACK - ack = ackdata[ACKD_RCVLASTACK]; + // Got data ACK + ack = ackdata[ACKD_RCVLASTACK]; - // New code, with TLPKTDROP + // New code, with TLPKTDROP - // protect packet retransmission - CGuard::enterCS(m_AckLock); + // protect packet retransmission + CGuard::enterCS(m_AckLock); - // check the validation of the ack - if (CSeqNo::seqcmp(ack, CSeqNo::incseq(m_iSndCurrSeqNo)) > 0) - { - CGuard::leaveCS(m_AckLock); - //this should not happen: attack or bug - LOGC(glog.Error, log << CONID() << "ATTACK/IPE: incoming ack seq " << ack << " exceeds current " - << m_iSndCurrSeqNo << " by " << (CSeqNo::seqoff(m_iSndCurrSeqNo, ack)-1) << "!"); - m_bBroken = true; - m_iBrokenCounter = 0; - break; - } + // check the validation of the ack + if (CSeqNo::seqcmp(ack, CSeqNo::incseq(m_iSndCurrSeqNo)) > 0) + { + CGuard::leaveCS(m_AckLock); + //this should not happen: attack or bug + LOGC(glog.Error, log << CONID() << "ATTACK/IPE: incoming ack seq " << ack << " exceeds current " + << m_iSndCurrSeqNo << " by " << (CSeqNo::seqoff(m_iSndCurrSeqNo, ack) - 1) << "!"); + m_bBroken = true; + m_iBrokenCounter = 0; + return; + } - if (CSeqNo::seqcmp(ack, m_iSndLastAck) >= 0) - { - // Update Flow Window Size, must update before and together with m_iSndLastAck - m_iFlowWindowSize = ackdata[ACKD_BUFFERLEFT]; - m_iSndLastAck = ack; - m_ullLastRspAckTime_tk = currtime_tk; - m_iReXmitCount = 1; // Reset re-transmit count since last ACK - } + if (CSeqNo::seqcmp(ack, m_iSndLastAck) >= 0) + { + // Update Flow Window Size, must update before and together with m_iSndLastAck + m_iFlowWindowSize = ackdata[ACKD_BUFFERLEFT]; + m_iSndLastAck = ack; + m_ullLastRspAckTime_tk = currtime_tk; + m_iReXmitCount = 1; // Reset re-transmit count since last ACK + } - /* - * We must not ignore full ack received by peer - * if data has been artificially acked by late packet drop. - * Therefore, a distinct ack state is used for received Ack (iSndLastFullAck) - * and ack position in send buffer (m_iSndLastDataAck). - * Otherwise, when severe congestion causing packet drops (and m_iSndLastDataAck update) - * occures, we drop received acks (as duplicates) and do not update stats like RTT, - * which may go crazy and stay there, preventing proper stream recovery. - */ - - if (CSeqNo::seqoff(m_iSndLastFullAck, ack) <= 0) - { - // discard it if it is a repeated ACK - CGuard::leaveCS(m_AckLock); - break; - } - m_iSndLastFullAck = ack; + /* + * We must not ignore full ack received by peer + * if data has been artificially acked by late packet drop. + * Therefore, a distinct ack state is used for received Ack (iSndLastFullAck) + * and ack position in send buffer (m_iSndLastDataAck). + * Otherwise, when severe congestion causing packet drops (and m_iSndLastDataAck update) + * occures, we drop received acks (as duplicates) and do not update stats like RTT, + * which may go crazy and stay there, preventing proper stream recovery. + */ + + if (CSeqNo::seqoff(m_iSndLastFullAck, ack) <= 0) + { + // discard it if it is a repeated ACK + CGuard::leaveCS(m_AckLock); + return; + } + m_iSndLastFullAck = ack; + + int offset = CSeqNo::seqoff(m_iSndLastDataAck, ack); + // IF distance between m_iSndLastDataAck and ack is nonempty... + if (offset > 0) { + // acknowledge the sending buffer (remove data that predate 'ack') + m_pSndBuffer->ackData(offset); + + const int64_t currtime = CTimer::getTime(); + // record total time used for sending + CGuard::enterCS(m_StatsLock); + m_stats.sndDuration += currtime - m_stats.sndDurationCounter; + m_stats.m_sndDurationTotal += currtime - m_stats.sndDurationCounter; + m_stats.sndDurationCounter = currtime; + CGuard::leaveCS(m_StatsLock); + + HLOGC(mglog.Debug, log << CONID() << "ACK covers: " << m_iSndLastDataAck << " - " << ack + << " [ACK=" << m_iSndLastAck << "] BUFr=" << m_iFlowWindowSize + << " RTT=" << ackdata[ACKD_RTT] << " RTT*=" << ackdata[ACKD_RTTVAR] + << " BW=" << ackdata[ACKD_BANDWIDTH] << " Vrec=" << ackdata[ACKD_RCVSPEED]); + // update sending variables + m_iSndLastDataAck = ack; + + // remove any loss that predates 'ack' (not to be considered loss anymore) + m_pSndLossList->remove(CSeqNo::decseq(m_iSndLastDataAck)); + } + + /* OLD CODE without TLPKTDROP + + // check the validation of the ack + if (CSeqNo::seqcmp(ack, CSeqNo::incseq(m_iSndCurrSeqNo)) > 0) + { + //this should not happen: attack or bug + m_bBroken = true; + m_iBrokenCounter = 0; + break; + } - int offset = CSeqNo::seqoff(m_iSndLastDataAck, ack); - // IF distance between m_iSndLastDataAck and ack is nonempty... - if (offset > 0) { - // acknowledge the sending buffer (remove data that predate 'ack') + if (CSeqNo::seqcmp(ack, m_iSndLastAck) >= 0) + { + // Update Flow Window Size, must update before and together with m_iSndLastAck + m_iFlowWindowSize = ackdata[ACKD_BUFFERLEFT]; + m_iSndLastAck = ack; + m_ullLastRspAckTime_tk = currtime_tk; + m_iReXmitCount = 1; // Reset re-transmit count since last ACK + } + + // protect packet retransmission + CGuard::enterCS(m_AckLock); + + int offset = CSeqNo::seqoff(m_iSndLastDataAck, ack); + if (offset <= 0) + { + // discard it if it is a repeated ACK + CGuard::leaveCS(m_AckLock); + break; + } + + // acknowledge the sending buffer m_pSndBuffer->ackData(offset); - const int64_t currtime = CTimer::getTime(); // record total time used for sending - CGuard::enterCS(m_StatsLock); - m_stats.sndDuration += currtime - m_stats.sndDurationCounter; - m_stats.m_sndDurationTotal += currtime - m_stats.sndDurationCounter; - m_stats.sndDurationCounter = currtime; - CGuard::leaveCS(m_StatsLock); - - HLOGC(mglog.Debug, log << CONID() << "ACK covers: " << m_iSndLastDataAck << " - " << ack - << " [ACK=" << m_iSndLastAck << "] BUFr=" << m_iFlowWindowSize - << " RTT=" << ackdata[ACKD_RTT] << " RTT*=" << ackdata[ACKD_RTTVAR] - << " BW=" << ackdata[ACKD_BANDWIDTH] << " Vrec=" << ackdata[ACKD_RCVSPEED]); + int64_t currtime = currtime_tk/m_ullCPUFrequency; + + m_llSndDuration += currtime - m_llSndDurationCounter; + m_llSndDurationTotal += currtime - m_llSndDurationCounter; + m_llSndDurationCounter = currtime; + // update sending variables m_iSndLastDataAck = ack; - - // remove any loss that predates 'ack' (not to be considered loss anymore) m_pSndLossList->remove(CSeqNo::decseq(m_iSndLastDataAck)); - } - -/* OLD CODE without TLPKTDROP - - // check the validation of the ack - if (CSeqNo::seqcmp(ack, CSeqNo::incseq(m_iSndCurrSeqNo)) > 0) - { - //this should not happen: attack or bug - m_bBroken = true; - m_iBrokenCounter = 0; - break; - } - if (CSeqNo::seqcmp(ack, m_iSndLastAck) >= 0) - { - // Update Flow Window Size, must update before and together with m_iSndLastAck - m_iFlowWindowSize = ackdata[ACKD_BUFFERLEFT]; - m_iSndLastAck = ack; - m_ullLastRspAckTime_tk = currtime_tk; - m_iReXmitCount = 1; // Reset re-transmit count since last ACK - } + #endif SRT_ENABLE_TLPKTDROP */ - // protect packet retransmission - CGuard::enterCS(m_AckLock); - - int offset = CSeqNo::seqoff(m_iSndLastDataAck, ack); - if (offset <= 0) - { - // discard it if it is a repeated ACK - CGuard::leaveCS(m_AckLock); - break; - } + CGuard::leaveCS(m_AckLock); + if (m_bSynSending) + { + CGuard lk(m_SendBlockLock); + pthread_cond_signal(&m_SendBlockCond); + } - // acknowledge the sending buffer - m_pSndBuffer->ackData(offset); + // acknowledde any waiting epolls to write + s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_OUT, true); - // record total time used for sending - int64_t currtime = currtime_tk/m_ullCPUFrequency; + // insert this socket to snd list if it is not on the list yet + m_pSndQueue->m_pSndUList->update(this, CSndUList::DONT_RESCHEDULE); - m_llSndDuration += currtime - m_llSndDurationCounter; - m_llSndDurationTotal += currtime - m_llSndDurationCounter; - m_llSndDurationCounter = currtime; + size_t acksize = ctrlpkt.getLength(); // TEMPORARY VALUE FOR CHECKING + bool wrongsize = 0 != (acksize % ACKD_FIELD_SIZE); + acksize = acksize / ACKD_FIELD_SIZE; // ACTUAL VALUE - // update sending variables - m_iSndLastDataAck = ack; - m_pSndLossList->remove(CSeqNo::decseq(m_iSndLastDataAck)); + if (wrongsize) + { + // Issue a log, but don't do anything but skipping the "odd" bytes from the payload. + LOGC(mglog.Error, log << CONID() << "Received UMSG_ACK payload is not evened up to 4-byte based field size - cutting to " << acksize << " fields"); + } -#endif SRT_ENABLE_TLPKTDROP */ + // Start with checking the base size. + if (acksize < ACKD_TOTAL_SIZE_SMALL) + { + LOGC(mglog.Error, log << CONID() << "Invalid ACK size " << acksize << " fields - less than minimum required!"); + // Ack is already interpreted, just skip further parts. + return; + } + // This check covers fields up to ACKD_BUFFERLEFT. + + // Update RTT + //m_iRTT = ackdata[ACKD_RTT]; + //m_iRTTVar = ackdata[ACKD_RTTVAR]; + // XXX These ^^^ commented-out were blocked in UDT; + // the current RTT calculations are exactly the same as in UDT4. + int rtt = ackdata[ACKD_RTT]; + + m_iRTTVar = avg_iir<4>(m_iRTTVar, abs(rtt - m_iRTT)); + m_iRTT = avg_iir<8>(m_iRTT, rtt); + + /* Version-dependent fields: + * Original UDT (total size: ACKD_TOTAL_SIZE_SMALL): + * ACKD_RCVLASTACK + * ACKD_RTT + * ACKD_RTTVAR + * ACKD_BUFFERLEFT + * Additional UDT fields, not always attached: + * ACKD_RCVSPEED + * ACKD_BANDWIDTH + * SRT extension version 1.0.2 (bstats): + * ACKD_RCVRATE + * SRT extension version 1.0.4: + * ACKD_XMRATE + */ - CGuard::leaveCS(m_AckLock); - if (m_bSynSending) - { - CGuard lk(m_SendBlockLock); - pthread_cond_signal(&m_SendBlockCond); - } + if (acksize > ACKD_TOTAL_SIZE_SMALL) + { + // This means that ACKD_RCVSPEED and ACKD_BANDWIDTH fields are available. + int pktps = ackdata[ACKD_RCVSPEED]; + int bandwidth = ackdata[ACKD_BANDWIDTH]; + int bytesps; - // acknowledde any waiting epolls to write - s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_OUT, true); + /* SRT v1.0.2 Bytes-based stats: bandwidth (pcData[ACKD_XMRATE]) and delivery rate (pcData[ACKD_RCVRATE]) in bytes/sec instead of pkts/sec */ + /* SRT v1.0.3 Bytes-based stats: only delivery rate (pcData[ACKD_RCVRATE]) in bytes/sec instead of pkts/sec */ + if (acksize > ACKD_TOTAL_SIZE_UDTBASE) + bytesps = ackdata[ACKD_RCVRATE]; + else + bytesps = pktps * m_iMaxSRTPayloadSize; - // insert this socket to snd list if it is not on the list yet - m_pSndQueue->m_pSndUList->update(this, CSndUList::DONT_RESCHEDULE); + m_iBandwidth = avg_iir<8>(m_iBandwidth, bandwidth); + m_iDeliveryRate = avg_iir<8>(m_iDeliveryRate, pktps); + m_iByteDeliveryRate = avg_iir<8>(m_iByteDeliveryRate, bytesps); + // XXX not sure if ACKD_XMRATE is of any use. This is simply + // calculated as ACKD_BANDWIDTH * m_iMaxSRTPayloadSize. - size_t acksize = ctrlpkt.getLength(); // TEMPORARY VALUE FOR CHECKING - bool wrongsize = 0 != (acksize % ACKD_FIELD_SIZE); - acksize = acksize / ACKD_FIELD_SIZE; // ACTUAL VALUE + // Update Estimated Bandwidth and packet delivery rate + // m_iRcvRate = m_iDeliveryRate; + // ^^ This has been removed because with the SrtCongestion class + // instead of reading the m_iRcvRate local field this will read + // cudt->deliveryRate() instead. + } - if ( wrongsize ) - { - // Issue a log, but don't do anything but skipping the "odd" bytes from the payload. - LOGC(mglog.Error, log << CONID() << "Received UMSG_ACK payload is not evened up to 4-byte based field size - cutting to " << acksize << " fields"); - } + checkSndTimers(REGEN_KM); + updateCC(TEV_ACK, ack); - // Start with checking the base size. - if ( acksize < ACKD_TOTAL_SIZE_SMALL ) - { - LOGC(mglog.Error, log << CONID() << "Invalid ACK size " << acksize << " fields - less than minimum required!"); - // Ack is already interpreted, just skip further parts. - break; - } - // This check covers fields up to ACKD_BUFFERLEFT. - - // Update RTT - //m_iRTT = ackdata[ACKD_RTT]; - //m_iRTTVar = ackdata[ACKD_RTTVAR]; - // XXX These ^^^ commented-out were blocked in UDT; - // the current RTT calculations are exactly the same as in UDT4. - int rtt = ackdata[ACKD_RTT]; - - m_iRTTVar = avg_iir<4>(m_iRTTVar, abs(rtt - m_iRTT)); - m_iRTT = avg_iir<8>(m_iRTT, rtt); - - /* Version-dependent fields: - * Original UDT (total size: ACKD_TOTAL_SIZE_SMALL): - * ACKD_RCVLASTACK - * ACKD_RTT - * ACKD_RTTVAR - * ACKD_BUFFERLEFT - * Additional UDT fields, not always attached: - * ACKD_RCVSPEED - * ACKD_BANDWIDTH - * SRT extension version 1.0.2 (bstats): - * ACKD_RCVRATE - * SRT extension version 1.0.4: - * ACKD_XMRATE - */ - - if (acksize > ACKD_TOTAL_SIZE_SMALL) - { - // This means that ACKD_RCVSPEED and ACKD_BANDWIDTH fields are available. - int pktps = ackdata[ACKD_RCVSPEED]; - int bandwidth = ackdata[ACKD_BANDWIDTH]; - int bytesps; - - /* SRT v1.0.2 Bytes-based stats: bandwidth (pcData[ACKD_XMRATE]) and delivery rate (pcData[ACKD_RCVRATE]) in bytes/sec instead of pkts/sec */ - /* SRT v1.0.3 Bytes-based stats: only delivery rate (pcData[ACKD_RCVRATE]) in bytes/sec instead of pkts/sec */ - if (acksize > ACKD_TOTAL_SIZE_UDTBASE) - bytesps = ackdata[ACKD_RCVRATE]; - else - bytesps = pktps * m_iMaxSRTPayloadSize; - - m_iBandwidth = avg_iir<8>(m_iBandwidth, bandwidth); - m_iDeliveryRate = avg_iir<8>(m_iDeliveryRate, pktps); - m_iByteDeliveryRate = avg_iir<8>(m_iByteDeliveryRate, bytesps); - // XXX not sure if ACKD_XMRATE is of any use. This is simply - // calculated as ACKD_BANDWIDTH * m_iMaxSRTPayloadSize. - - // Update Estimated Bandwidth and packet delivery rate - // m_iRcvRate = m_iDeliveryRate; - // ^^ This has been removed because with the SrtCongestion class - // instead of reading the m_iRcvRate local field this will read - // cudt->deliveryRate() instead. - } + CGuard::enterCS(m_StatsLock); + ++m_stats.recvACK; + ++m_stats.recvACKTotal; + CGuard::leaveCS(m_StatsLock); +} - checkSndTimers(REGEN_KM); - updateCC(TEV_ACK, ack); +void CUDT::processCtrl(CPacket& ctrlpkt) +{ + // Just heard from the peer, reset the expiration count. + m_iEXPCount = 1; + uint64_t currtime_tk; + CTimer::rdtsc(currtime_tk); + m_ullLastRspTime_tk = currtime_tk; + bool using_rexmit_flag = m_bPeerRexmitFlag; - CGuard::enterCS(m_StatsLock); - ++ m_stats.recvACK; - ++ m_stats.recvACKTotal; - CGuard::leaveCS(m_StatsLock); + HLOGC(mglog.Debug, log << CONID() << "incoming UMSG:" << ctrlpkt.getType() << " (" + << MessageTypeStr(ctrlpkt.getType(), ctrlpkt.getExtendedType()) << ") socket=%" << ctrlpkt.m_iID); - break; - } + switch (ctrlpkt.getType()) + { + case UMSG_ACK: //010 - Acknowledgement + processCtrlAck(ctrlpkt, currtime_tk); + break; case UMSG_ACKACK: //110 - Acknowledgement of Acknowledgement { diff --git a/srtcore/core.h b/srtcore/core.h index 9c7a5a61e..5f9899744 100644 --- a/srtcore/core.h +++ b/srtcore/core.h @@ -724,9 +724,11 @@ class CUDT bool createCrypter(HandshakeSide side, bool bidi); private: // Generation and processing of packets - void sendCtrl(UDTMessageType pkttype, void* lparam = NULL, void* rparam = NULL, int size = 0); + void sendCtrl(UDTMessageType pkttype, const void* lparam = NULL, void* rparam = NULL, int size = 0); + void processCtrl(CPacket& ctrlpkt); void sendLossReport(const std::vector< std::pair >& losslist); + void processCtrlAck(const CPacket& ctrlpkt, const uint64_t currtime_tk); /// Pack a packet from a list of lost packets. /// diff --git a/srtcore/packet.cpp b/srtcore/packet.cpp index 81f7c2050..0b9143958 100644 --- a/srtcore/packet.cpp +++ b/srtcore/packet.cpp @@ -224,7 +224,7 @@ void CPacket::setLength(size_t len) m_PacketVector[PV_DATA].setLength(len); } -void CPacket::pack(UDTMessageType pkttype, void* lparam, void* rparam, int size) +void CPacket::pack(UDTMessageType pkttype, const void* lparam, void* rparam, int size) { // Set (bit-0 = 1) and (bit-1~15 = type) setControl(pkttype); diff --git a/srtcore/packet.h b/srtcore/packet.h index 003188398..e80e100af 100644 --- a/srtcore/packet.h +++ b/srtcore/packet.h @@ -243,7 +243,7 @@ friend class CRcvQueue; /// @param rparam [in] pointer to the second data structure, explained by the packet type. /// @param size [in] size of rparam, in number of bytes; - void pack(UDTMessageType pkttype, void* lparam = NULL, void* rparam = NULL, int size = 0); + void pack(UDTMessageType pkttype, const void* lparam = NULL, void* rparam = NULL, int size = 0); /// Read the packet vector. /// @return Pointer to the packet vector.