Skip to content

Commit

Permalink
[core] Improved RTT estimation (#1957)
Browse files Browse the repository at this point in the history
* Receiver: use the first RTT estimation without smoothing.
* Sender: take RTT value from ACK in case of unidirectional transmission.
  • Loading branch information
mbakholdina committed Apr 27, 2021
1 parent 291e010 commit d898f1c
Show file tree
Hide file tree
Showing 3 changed files with 189 additions and 28 deletions.
1 change: 1 addition & 0 deletions CMakeLists.txt
Expand Up @@ -97,6 +97,7 @@ endforeach()
# SRT_DEBUG_TLPKTDROP_DROPSEQ 1
# SRT_DEBUG_SNDQ_HIGHRATE 1
# SRT_DEBUG_BONDING_STATES 1
# SRT_DEBUG_RTT 1 /* RTT trace */
# SRT_MAVG_SAMPLING_RATE 40 /* Max sampling rate */

# option defaults
Expand Down
190 changes: 171 additions & 19 deletions srtcore/core.cpp
Expand Up @@ -241,7 +241,6 @@ void CUDT::construct()
// Will be reset to 0 for HSv5, this value is important for HSv4.
m_iSndHsRetryCnt = SRT_MAX_HSRETRY + 1;

// Initial status
m_bOpened = false;
m_bListening = false;
m_bConnecting = false;
Expand Down Expand Up @@ -932,8 +931,10 @@ void CUDT::open()
m_pRNode->m_pPrev = m_pRNode->m_pNext = NULL;
m_pRNode->m_bOnList = false;

m_iRTT = 10 * COMM_SYN_INTERVAL_US;
m_iRTTVar = m_iRTT >> 1;
// Set initial values of smoothed RTT and RTT variance.
m_iRTT = INITIAL_RTT;
m_iRTTVar = INITIAL_RTTVAR;
m_bIsFirstRTTReceived = false;

// set minimum NAK and EXP timeout to 300ms
m_tdMinNakInterval = milliseconds_from(300);
Expand Down Expand Up @@ -1867,6 +1868,74 @@ void SrtExtractHandshakeExtensions(const char* bufbegin, size_t buflength,
}
}

#if SRT_DEBUG_RTT
class RttTracer
{
public:
RttTracer()
{
}

~RttTracer()
{
srt::sync::ScopedLock lck(m_mtx);
m_fout.close();
}

void trace(const srt::sync::steady_clock::time_point& currtime,
const std::string& event, int rtt_sample, int rttvar_sample,
bool is_smoothed_rtt_reset, int64_t recvTotal,
int smoothed_rtt, int rttvar)
{
srt::sync::ScopedLock lck(m_mtx);
create_file();

m_fout << srt::sync::FormatTimeSys(currtime) << ",";
m_fout << srt::sync::FormatTime(currtime) << ",";
m_fout << event << ",";
m_fout << rtt_sample << ",";
m_fout << rttvar_sample << ",";
m_fout << is_smoothed_rtt_reset << ",";
m_fout << recvTotal << ",";
m_fout << smoothed_rtt << ",";
m_fout << rttvar << "\n";
m_fout.flush();
}

private:
void print_header()
{
m_fout << "Timepoint_SYST,Timepoint_STDY,Event,usRTTSample,"
"usRTTVarSample,IsSmoothedRTTReset,pktsRecvTotal,"
"usSmoothedRTT,usRTTVar\n";
}

void create_file()
{
if (m_fout.is_open())
return;

std::string str_tnow = srt::sync::FormatTimeSys(srt::sync::steady_clock::now());
str_tnow.resize(str_tnow.size() - 7); // remove trailing ' [SYST]' part
while (str_tnow.find(':') != std::string::npos) {
str_tnow.replace(str_tnow.find(':'), 1, 1, '_');
}
const std::string fname = "rtt_trace_" + str_tnow + "_" + SRT_SYNC_CLOCK_STR + ".csv";
m_fout.open(fname, std::ofstream::out);
if (!m_fout)
std::cerr << "IPE: Failed to open " << fname << "!!!\n";

print_header();
}

private:
srt::sync::Mutex m_mtx;
std::ofstream m_fout;
};

RttTracer s_rtt_trace;
#endif


bool CUDT::processSrtMsg(const CPacket *ctrlpkt)
{
Expand Down Expand Up @@ -3261,8 +3330,8 @@ void CUDT::synchronizeWithGroup(CUDTGroup* gp)
}
}
}

#endif

void CUDT::startConnect(const sockaddr_any& serv_addr, int32_t forced_isn)
{
ScopedLock cg (m_ConnectionLock);
Expand Down Expand Up @@ -4485,10 +4554,15 @@ EConnectStatus CUDT::postConnect(const CPacket &response, bool rendezvous, CUDTE
if (m_pCache->lookup(&ib) >= 0)
{
m_iRTT = ib.m_iRTT;
m_iRTTVar = m_iRTT >> 1;
m_iRTTVar = ib.m_iRTT / 2;
m_iBandwidth = ib.m_iBandwidth;
}

#if SRT_DEBUG_RTT
s_rtt_trace.trace(steady_clock::now(), "Connect", -1, -1,
m_bIsFirstRTTReceived, -1, m_iRTT, m_iRTTVar);
#endif

SRT_REJECT_REASON rr = setupCC();
if (rr != SRT_REJ_UNKNOWN)
{
Expand Down Expand Up @@ -5384,10 +5458,15 @@ void CUDT::acceptAndRespond(const sockaddr_any& agent, const sockaddr_any& peer,
if (m_pCache->lookup(&ib) >= 0)
{
m_iRTT = ib.m_iRTT;
m_iRTTVar = m_iRTT >> 1;
m_iRTTVar = ib.m_iRTT / 2;
m_iBandwidth = ib.m_iBandwidth;
}

#if SRT_DEBUG_RTT
s_rtt_trace.trace(steady_clock::now(), "Accept", -1, -1,
m_bIsFirstRTTReceived, -1, m_iRTT, m_iRTTVar);
#endif

m_PeerAddr = peer;

// This should extract the HSREQ and KMREQ portion in the handshake packet.
Expand Down Expand Up @@ -5851,6 +5930,11 @@ bool CUDT::closeInternal()
ib.m_iBandwidth = m_iBandwidth;
m_pCache->update(&ib);

#if SRT_DEBUG_RTT
s_rtt_trace.trace(steady_clock::now(), "Cache", -1, -1,
m_bIsFirstRTTReceived, -1, m_iRTT, -1);
#endif

m_bConnected = false;
}

Expand Down Expand Up @@ -7998,15 +8082,63 @@ void CUDT::processCtrlAck(const CPacket &ctrlpkt, const steady_clock::time_point
}
// This check covers fields up to ACKD_BUFFERLEFT.

// Update RTT
// m_iRTT = ackdata[ACKD_RTT];
// m_iRTTVar = ackdata[ACKD_RTTVAR];
// XXX These ^^^ commented-out were blocked in UDT;
// the current RTT calculations are exactly the same as in UDT4.
const int rtt = ackdata[ACKD_RTT];
// Extract RTT estimate and RTTVar from the ACK packet.
const int rtt = ackdata[ACKD_RTT];
const int rttvar = ackdata[ACKD_RTTVAR];

// Update the values of smoothed RTT and the variation in RTT samples
// on subsequent RTT estimates extracted from the ACK packets
// (during transmission).
if (m_bIsFirstRTTReceived)
{
// Suppose transmission is bidirectional if sender is also receiving
// data packets.
enterCS(m_StatsLock);
const bool bPktsReceived = m_stats.recvTotal != 0;
leaveCS(m_StatsLock);

m_iRTTVar = avg_iir<4>(m_iRTTVar, abs(rtt - m_iRTT));
m_iRTT = avg_iir<8>(m_iRTT, rtt);
if (bPktsReceived) // Transmission is bidirectional.
{
// RTT value extracted from the ACK packet (rtt) is already smoothed
// RTT obtained at the receiver side. Apply EWMA anyway for the second
// time on the sender side. Ignore initial values which might arrive
// after the smoothed RTT on the sender side has been
// reset to the very first RTT sample received from the receiver.
// TODO: The case of bidirectional transmission requires further
// improvements and testing. Double smoothing is applied here to be
// consistent with the previous behavior.

if (rtt != INITIAL_RTT && rttvar != INITIAL_RTTVAR)
{
m_iRTTVar = avg_iir<4>(m_iRTTVar, abs(rtt - m_iRTT));
m_iRTT = avg_iir<8>(m_iRTT, rtt);
}
}
else // Transmission is unidirectional.
{
// Simply take the values of smoothed RTT and RTT variance from
// the ACK packet.
m_iRTT = rtt;
m_iRTTVar = rttvar;
}
}
// Reset the value of smoothed RTT to the first real RTT estimate extracted
// from an ACK after initialization (at the beginning of transmission).
// In case of resumed connection over the same network, the very first RTT
// value sent within an ACK will be taken from cache and equal to previous
// connection's final smoothed RTT value. The reception of such a value
// will also trigger the smoothed RTT reset at the sender side.
else if (rtt != INITIAL_RTT && rttvar != INITIAL_RTTVAR)
{
m_iRTT = rtt;
m_iRTTVar = rttvar;
m_bIsFirstRTTReceived = true;
}

#if SRT_DEBUG_RTT
s_rtt_trace.trace(currtime, "ACK", rtt, rttvar, m_bIsFirstRTTReceived,
m_stats.recvTotal, m_iRTT, m_iRTTVar);
#endif

/* Version-dependent fields:
* Original UDT (total size: ACKD_TOTAL_SIZE_SMALL):
Expand Down Expand Up @@ -8062,7 +8194,7 @@ void CUDT::processCtrlAckAck(const CPacket& ctrlpkt, const time_point& tsArrival
{
int32_t ack = 0;

// Calculate RTT estimate on the receiver side based on ACK/ACKACK pair
// Calculate RTT estimate on the receiver side based on ACK/ACKACK pair.
const int rtt = m_ACKWindow.acknowledge(ctrlpkt.getAckSeqNo(), ack, tsArrival);

if (rtt == -1)
Expand Down Expand Up @@ -8091,12 +8223,32 @@ void CUDT::processCtrlAckAck(const CPacket& ctrlpkt, const time_point& tsArrival
return;
}

// If increasing delay is detected
// If increasing delay is detected.
// sendCtrl(UMSG_CGWARNING);

// Calculate RTT (EWMA) on the receiver side
m_iRTTVar = avg_iir<4>(m_iRTTVar, abs(rtt - m_iRTT));
m_iRTT = avg_iir<8>(m_iRTT, rtt);
// Update the values of smoothed RTT and the variation in RTT samples
// on subsequent RTT samples (during transmission).
if (m_bIsFirstRTTReceived)
{
m_iRTTVar = avg_iir<4>(m_iRTTVar, abs(rtt - m_iRTT));
m_iRTT = avg_iir<8>(m_iRTT, rtt);
}
// Reset the value of smoothed RTT on the first RTT sample after initialization
// (at the beginning of transmission).
// In case of resumed connection over the same network, the initial RTT
// value will be taken from cache and equal to previous connection's
// final smoothed RTT value.
else
{
m_iRTT = rtt;
m_iRTTVar = rtt / 2;
m_bIsFirstRTTReceived = true;
}

#if SRT_DEBUG_RTT
s_rtt_trace.trace(tsArrival, "ACKACK", rtt, -1, m_bIsFirstRTTReceived,
-1, m_iRTT, m_iRTTVar);
#endif

updateCC(TEV_ACKACK, EventVariant(ack));

Expand Down
26 changes: 17 additions & 9 deletions srtcore/core.h
Expand Up @@ -267,13 +267,15 @@ class CUDT
//
// NOTE: Use notation with X*1000*1000*... instead of
// million zeros in a row.
static const int COMM_RESPONSE_MAX_EXP = 16;
static const int SRT_TLPKTDROP_MINTHRESHOLD_MS = 1000;
static const uint64_t COMM_KEEPALIVE_PERIOD_US = 1*1000*1000;
static const int32_t COMM_SYN_INTERVAL_US = 10*1000;
static const int COMM_CLOSE_BROKEN_LISTENER_TIMEOUT_MS = 3000;
static const uint16_t MAX_WEIGHT = 32767;
static const size_t ACK_WND_SIZE = 1024;
static const int COMM_RESPONSE_MAX_EXP = 16;
static const int SRT_TLPKTDROP_MINTHRESHOLD_MS = 1000;
static const uint64_t COMM_KEEPALIVE_PERIOD_US = 1*1000*1000;
static const int32_t COMM_SYN_INTERVAL_US = 10*1000;
static const int COMM_CLOSE_BROKEN_LISTENER_TIMEOUT_MS = 3000;
static const uint16_t MAX_WEIGHT = 32767;
static const size_t ACK_WND_SIZE = 1024;
static const int INITIAL_RTT = 10 * COMM_SYN_INTERVAL_US;
static const int INITIAL_RTTVAR = INITIAL_RTT / 2;

int handshakeVersion()
{
Expand Down Expand Up @@ -731,8 +733,14 @@ class CUDT

int m_iEXPCount; // Expiration counter
int m_iBandwidth; // Estimated bandwidth, number of packets per second
int m_iRTT; // RTT, in microseconds
int m_iRTTVar; // RTT variance
int m_iRTT; // Smoothed RTT (an exponentially-weighted moving average (EWMA)
// of an endpoint's RTT samples), in microseconds
int m_iRTTVar; // The variation in the RTT samples (RTT variance), in microseconds
bool m_bIsFirstRTTReceived; // True if the first RTT sample was obtained from the ACK/ACKACK pair
// at the receiver side or received by the sender from an ACK packet.
// It's used to reset the initial value of smoothed RTT (m_iRTT)
// at the beginning of transmission (including the one taken from
// cache). False by default.
int m_iDeliveryRate; // Packet arrival rate at the receiver side
int m_iByteDeliveryRate; // Byte arrival rate at the receiver side

Expand Down

0 comments on commit d898f1c

Please sign in to comment.