Skip to content

Commit

Permalink
[core] Refact: moved code to processCtrlAckAck
Browse files Browse the repository at this point in the history
dedicated function
  • Loading branch information
maxsharabayko committed Apr 9, 2021
1 parent 8bf8c55 commit bd55e29
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 76 deletions.
146 changes: 73 additions & 73 deletions srtcore/core.cpp
Expand Up @@ -8034,6 +8034,77 @@ void CUDT::processCtrlAck(const CPacket &ctrlpkt, const steady_clock::time_point
leaveCS(m_StatsLock);
}

void CUDT::processCtrlAckAck(const CPacket& ctrlpkt, const time_point& tsArrival)
{
int32_t ack = 0;

// Calculate RTT estimate on the receiver side based on ACK/ACKACK pair
const int rtt = m_ACKWindow.acknowledge(ctrlpkt.getAckSeqNo(), ack, tsArrival);

if (rtt == -1)
{
if (ctrlpkt.getAckSeqNo() > (m_iAckSeqNo - static_cast<int>(ACK_WND_SIZE)) && ctrlpkt.getAckSeqNo() <= m_iAckSeqNo)
{
LOGC(inlog.Warn,
log << CONID() << "ACKACK out of order, skipping RTT calculation "
<< "(ACK number: " << ctrlpkt.getAckSeqNo() << ", last ACK sent: " << m_iAckSeqNo
<< ", RTT (EWMA): " << m_iRTT << ")");
return;
}

LOGC(inlog.Error,
log << CONID() << "IPE: ACK record not found, can't estimate RTT "
<< "(ACK number: " << ctrlpkt.getAckSeqNo() << ", last ACK sent: " << m_iAckSeqNo
<< ", RTT (EWMA): " << m_iRTT << ")");
return;
}

if (rtt <= 0)
{
LOGC(inlog.Error,
log << CONID() << "IPE: invalid RTT estimate " << rtt
<< ", possible time shift. Clock: " << SRT_SYNC_CLOCK_STR);
return;
}

// If increasing delay is detected
// sendCtrl(UMSG_CGWARNING);

// Calculate RTT (EWMA) on the receiver side
m_iRTTVar = avg_iir<4>(m_iRTTVar, abs(rtt - m_iRTT));
m_iRTT = avg_iir<8>(m_iRTT, rtt);

updateCC(TEV_ACKACK, EventVariant(ack));

// This function will put a lock on m_RecvLock by itself, as needed.
// It must be done inside because this function reads the current time
// and if waiting for the lock has caused a delay, the time will be
// inaccurate. Additionally it won't lock if TSBPD mode is off, and
// won't update anything. Note that if you set TSBPD mode and use
// srt_recvfile (which doesn't make any sense), you'll have a deadlock.
if (m_config.bDriftTracer)
{
steady_clock::duration udrift(0);
steady_clock::time_point newtimebase;
const bool drift_updated ATR_UNUSED = m_pRcvBuffer->addRcvTsbPdDriftSample(ctrlpkt.getMsgTimeStamp(), m_RecvLock,
(udrift), (newtimebase));
#if ENABLE_EXPERIMENTAL_BONDING
if (drift_updated && m_parent->m_GroupOf)
{
ScopedLock glock(s_UDTUnited.m_GlobControlLock);
if (m_parent->m_GroupOf)
{
m_parent->m_GroupOf->synchronizeDrift(this, udrift, newtimebase);
}
}
#endif
}

// Update last ACK that has been received by the sender
if (CSeqNo::seqcmp(ack, m_iRcvLastAckAck) > 0)
m_iRcvLastAckAck = ack;
}

void CUDT::processCtrlLossReport(const CPacket& ctrlpkt)
{
const int32_t* losslist = (int32_t*)(ctrlpkt.m_pcData);
Expand Down Expand Up @@ -8184,7 +8255,6 @@ void CUDT::processCtrl(const CPacket &ctrlpkt)
m_iEXPCount = 1;
const steady_clock::time_point currtime = steady_clock::now();
m_tsLastRspTime = currtime;
bool using_rexmit_flag = m_bPeerRexmitFlag;

HLOGC(inlog.Debug,
log << CONID() << "incoming UMSG:" << ctrlpkt.getType() << " ("
Expand All @@ -8197,77 +8267,8 @@ void CUDT::processCtrl(const CPacket &ctrlpkt)
break;

case UMSG_ACKACK: // 110 - Acknowledgement of Acknowledgement
{
int32_t ack = 0;

// Calculate RTT estimate on the receiver side based on ACK/ACKACK pair
const int rtt = m_ACKWindow.acknowledge(ctrlpkt.getAckSeqNo(), ack, currtime);

if (rtt == -1)
{
if (ctrlpkt.getAckSeqNo() > (m_iAckSeqNo - static_cast<int>(ACK_WND_SIZE)) && ctrlpkt.getAckSeqNo() <= m_iAckSeqNo)
{
LOGC(inlog.Warn,
log << CONID() << "ACKACK out of order, skipping RTT calculation "
<< "(ACK number: " << ctrlpkt.getAckSeqNo() << ", last ACK sent: " << m_iAckSeqNo
<< ", RTT (EWMA): " << m_iRTT << ")");
break;
}

LOGC(inlog.Error,
log << CONID() << "IPE: ACK record not found, can't estimate RTT "
<< "(ACK number: " << ctrlpkt.getAckSeqNo() << ", last ACK sent: " << m_iAckSeqNo
<< ", RTT (EWMA): " << m_iRTT << ")");
break;
}

if (rtt <= 0)
{
LOGC(inlog.Error,
log << CONID() << "IPE: invalid RTT estimate " << rtt
<< ", possible time shift. Clock: " << SRT_SYNC_CLOCK_STR);
break;
}

// If increasing delay is detected
// sendCtrl(UMSG_CGWARNING);

// Calculate RTT (EWMA) on the receiver side
m_iRTTVar = avg_iir<4>(m_iRTTVar, abs(rtt - m_iRTT));
m_iRTT = avg_iir<8>(m_iRTT, rtt);

updateCC(TEV_ACKACK, EventVariant(ack));

// This function will put a lock on m_RecvLock by itself, as needed.
// It must be done inside because this function reads the current time
// and if waiting for the lock has caused a delay, the time will be
// inaccurate. Additionally it won't lock if TSBPD mode is off, and
// won't update anything. Note that if you set TSBPD mode and use
// srt_recvfile (which doesn't make any sense), you'll have a deadlock.
if (m_config.bDriftTracer)
{
steady_clock::duration udrift(0);
steady_clock::time_point newtimebase;
const bool drift_updated ATR_UNUSED = m_pRcvBuffer->addRcvTsbPdDriftSample(ctrlpkt.getMsgTimeStamp(), m_RecvLock,
(udrift), (newtimebase));
#if ENABLE_EXPERIMENTAL_BONDING
if (drift_updated && m_parent->m_GroupOf)
{
ScopedLock glock (s_UDTUnited.m_GlobControlLock);
if (m_parent->m_GroupOf)
{
m_parent->m_GroupOf->synchronizeDrift(this, udrift, newtimebase);
}
}
#endif
}

// Update last ACK that has been received by the sender
if (CSeqNo::seqcmp(ack, m_iRcvLastAckAck) > 0)
m_iRcvLastAckAck = ack;

processCtrlAckAck(ctrlpkt, currtime);
break;
}

case UMSG_LOSSREPORT: // 011 - Loss Report
processCtrlLossReport(ctrlpkt);
Expand All @@ -8284,9 +8285,7 @@ void CUDT::processCtrl(const CPacket &ctrlpkt)
break;

case UMSG_KEEPALIVE: // 001 - Keep-alive

handleKeepalive(ctrlpkt.m_pcData, ctrlpkt.getLength());

break;

case UMSG_HANDSHAKE: // 000 - Handshake
Expand Down Expand Up @@ -8416,6 +8415,7 @@ void CUDT::processCtrl(const CPacket &ctrlpkt)

case UMSG_DROPREQ: // 111 - Msg drop request
{
const bool using_rexmit_flag = m_bPeerRexmitFlag;
UniqueLock rlock(m_RecvLock);
m_pRcvBuffer->dropMsg(ctrlpkt.getMsgSeq(using_rexmit_flag), using_rexmit_flag);
// When the drop request was received, it means that there are
Expand Down
18 changes: 15 additions & 3 deletions srtcore/core.h
Expand Up @@ -943,13 +943,25 @@ class CUDT
///
/// @returns the nmber of packets sent.
int sendCtrlAck(CPacket& ctrlpkt, int size);
void sendLossReport(const std::vector< std::pair<int32_t, int32_t> >& losslist);

void processCtrl(const CPacket& ctrlpkt);
void sendLossReport(const std::vector< std::pair<int32_t, int32_t> >& losslist);
void processCtrlAck(const CPacket& ctrlpkt, const time_point &currtime);

/// @brief Process incoming control ACK packet.
/// @param ctrlpkt incoming packet
/// @param currtime current clock time
void processCtrlAck(const CPacket& ctrlpkt, const time_point& currtime);

/// @brief Process incoming control ACKACK packet.
/// @param ctrlpkt incoming packet
/// @param tsArrival time when packet has arrived (used to calculate RTT)
void processCtrlAckAck(const CPacket& ctrlpkt, const time_point& tsArrival);

/// @brief Process incoming loss report (NAK) packet.
/// @param ctrlpkt incoming NAK packet
void processCtrlLossReport(const CPacket& ctrlpkt);

///
/// @brief Update sender's loss list on an incoming acknowledgement.
/// @param ackdata_seqno sequence number of a data packet being acknowledged
void updateSndLossListOnACK(int32_t ackdata_seqno);

Expand Down

0 comments on commit bd55e29

Please sign in to comment.