Skip to content

Commit

Permalink
[core] Runtime link stability timeout for main/backup (#1775)
Browse files Browse the repository at this point in the history
  • Loading branch information
maxsharabayko committed Feb 8, 2021
1 parent 662db72 commit 7656759
Show file tree
Hide file tree
Showing 5 changed files with 127 additions and 75 deletions.
1 change: 1 addition & 0 deletions CMakeLists.txt
Expand Up @@ -96,6 +96,7 @@ endforeach()
# SRT_DEBUG_TSBPD_WRAP 1 /* Debug packet timestamp wraparound */
# SRT_DEBUG_TLPKTDROP_DROPSEQ 1
# SRT_DEBUG_SNDQ_HIGHRATE 1
# SRT_DEBUG_BONDING_STATES 1
# SRT_MAVG_SAMPLING_RATE 40 /* Max sampling rate */

# option defaults
Expand Down
2 changes: 2 additions & 0 deletions docs/APISocketOptions.md
Expand Up @@ -437,6 +437,8 @@ function will return the group, not this socket ID.
| --------------------- | ----- | -------- | ---------- | ------ | -------- | ------ | --- | ------ |
| `SRTO_GROUPSTABTIMEO` | 1.5.0 | pre | `int32_t` | ms | 80 | 10-... | W | GSD+ |

**Not in use at the moment. Is to be repurposed in SRT v1.4.3!**

This setting is used for groups of type `SRT_GTYPE_BACKUP`. It defines the stability
timeout, which is the maximum interval between two consecutive packets retrieved from
the peer on the currently active link. These two packets can be of any type,
Expand Down
2 changes: 1 addition & 1 deletion srtcore/core.cpp
Expand Up @@ -11175,7 +11175,7 @@ bool CUDT::checkExpTimer(const steady_clock::time_point& currtime, int check_rea
* (keepalive fix)
* duB:
* It seems there is confusion of the direction of the Response here.
* LastRspTime is supposed to be when receiving (data/ctrl) from peer
* lastRspTime is supposed to be when receiving (data/ctrl) from peer
* as shown in processCtrl and processData,
* Here we set because we sent something?
*
Expand Down
6 changes: 5 additions & 1 deletion srtcore/core.h
Expand Up @@ -374,9 +374,12 @@ class CUDT

bool isOPT_TsbPd() const { return m_bOPT_TsbPd; }
int RTT() const { return m_iRTT; }
int RTTVar() const { return m_iRTTVar; }
int32_t sndSeqNo() const { return m_iSndCurrSeqNo; }
int32_t schedSeqNo() const { return m_iSndNextSeqNo; }
bool overrideSndSeqNo(int32_t seq);
srt::sync::steady_clock::time_point lastRspTime() const { return m_tsLastRspTime; }
srt::sync::steady_clock::time_point freshActivationStart() const { return m_tsFreshActivation; }

int32_t rcvSeqNo() const { return m_iRcvCurrSeqNo; }
int flowWindowSize() const { return m_iFlowWindowSize; }
Expand All @@ -385,7 +388,8 @@ class CUDT
int64_t maxBandwidth() const { return m_llMaxBW; }
int MSS() const { return m_iMSS; }

uint32_t latency_us() const {return m_iTsbPdDelay_ms*1000; }
uint32_t peerLatency_us() const {return m_iPeerTsbPdDelay_ms * 1000; }
int peerIdleTimeout_ms() const { return m_iOPT_PeerIdleTimeout; }
size_t maxPayloadSize() const { return m_iMaxSRTPayloadSize; }
size_t OPT_PayloadSize() const { return m_zOPT_ExpPayloadSize; }
int sndLossLength() { return m_pSndLossList->getLossLength(); }
Expand Down
191 changes: 118 additions & 73 deletions srtcore/group.cpp
Expand Up @@ -2978,6 +2978,111 @@ void CUDTGroup::sendBackup_CheckIdleTime(gli_t w_d)
}
}

#if SRT_DEBUG_BONDING_STATES
class StabilityTracer
{
public:
StabilityTracer()
{
}

~StabilityTracer()
{
srt::sync::ScopedLock lck(m_mtx);
m_fout.close();
}

void trace(const CUDT& u, const srt::sync::steady_clock::time_point& currtime, uint32_t activation_period_us,
int64_t stability_tmo_us, const std::string& state, uint16_t weight)
{
srt::sync::ScopedLock lck(m_mtx);
create_file();

m_fout << srt::sync::FormatTime(currtime) << ",";
m_fout << u.id() << ",";
m_fout << weight << ",";
m_fout << u.peerLatency_us() << ",";
m_fout << u.RTT() << ",";
m_fout << u.RTTVar() << ",";
m_fout << stability_tmo_us << ",";
m_fout << count_microseconds(currtime - u.lastRspTime()) << ",";
m_fout << state << ",";
m_fout << (srt::sync::is_zero(u.freshActivationStart()) ? -1 : (count_microseconds(currtime - u.freshActivationStart()))) << ",";
m_fout << activation_period_us << "\n";
m_fout.flush();
}

private:
void print_header()
{
//srt::sync::ScopedLock lck(m_mtx);
m_fout << "Timepoint,SocketID,weight,usLatency,usRTT,usRTTVar,usStabilityTimeout,usSinceLastResp,State,usSinceActivation,usActivationPeriod\n";
}

void create_file()
{
if (m_fout)
return;

std::string str_tnow = srt::sync::FormatTimeSys(srt::sync::steady_clock::now());
str_tnow.resize(str_tnow.size() - 6); // remove trailing ' [SYS]' part
while (str_tnow.find(':') != std::string::npos) {
str_tnow.replace(str_tnow.find(':'), 1, 1, '_');
}
const std::string fname = "stability_trace_" + str_tnow + ".csv";
m_fout.open(fname, std::ofstream::out);
if (!m_fout)
std::cerr << "IPE: Failed to open " << fname << "!!!\n";

print_header();
}

private:
srt::sync::Mutex m_mtx;
std::ofstream m_fout;
};

StabilityTracer s_stab_trace;
#endif

/// TODO: Remove 'weight' parameter? Only needed for logging.
/// @retval 1 - link is identified as stable
/// @retval 0 - link state remains unchanged (too early to identify, still in activation phase)
/// @retval -1 - link is identified as unstable
static int sendBackup_CheckRunningLinkStable(const CUDT& u, const srt::sync::steady_clock::time_point& currtime, uint16_t weight)
{
const uint32_t latency_us = u.peerLatency_us();
const int32_t min_stability_us = 60000; // Minimum Link Stability Timeout: 60ms.
const int64_t initial_stabtout_us = max<int64_t>(min_stability_us, latency_us);
const int64_t activation_period_us = initial_stabtout_us + 5 * CUDT::COMM_SYN_INTERVAL_US;

// RTT and RTTVar values are still being refined during activation period,
// therefore the dymanic timeout should not be used in activation phase.
const bool is_activation_phase = !is_zero(u.freshActivationStart())
&& (count_microseconds(currtime - u.freshActivationStart()) <= activation_period_us);

const int64_t stability_tout_us = is_activation_phase
? initial_stabtout_us // activation phase
: min<int64_t>(max<int64_t>(min_stability_us, 2 * u.RTT() + 4 * u.RTTVar()), latency_us);

const steady_clock::time_point last_rsp = max(u.freshActivationStart(), u.lastRspTime());
const steady_clock::duration td_response = currtime - last_rsp;
if (count_microseconds(td_response) > stability_tout_us)
{
#if SRT_DEBUG_BONDING_STATES
s_stab_trace.trace(u, currtime, activation_period_us, stability_tout_us, is_activation_phase ? "ACTIVATION-UNSTABLE" : "UNSTABLE", weight);
#endif
return -1;
}

// u.lastRspTime() > currtime is alwats true due to the very first check above in this function
#if SRT_DEBUG_BONDING_STATES
s_stab_trace.trace(u, currtime, activation_period_us, stability_tout_us, is_activation_phase ? "ACTIVATION" : "STABLE", weight);
#endif
return is_activation_phase ? 0 : 1;
}


// [[using locked(this->m_GroupLock)]]
bool CUDTGroup::sendBackup_CheckRunningStability(const gli_t d, const time_point currtime)
{
Expand All @@ -2994,8 +3099,6 @@ bool CUDTGroup::sendBackup_CheckRunningStability(const gli_t d, const time_point
// negative value is relatively easy, while introducing a mutex would only add a
// deadlock risk and performance degradation.

bool is_stable = true;

HLOGC(gslog.Debug,
log << "grp/sendBackup: CHECK STABLE: @" << d->id
<< ": TIMEDIFF {response= " << FormatDuration<DUNIT_MS>(currtime - u.m_tsLastRspTime)
Expand All @@ -3005,89 +3108,33 @@ bool CUDTGroup::sendBackup_CheckRunningStability(const gli_t d, const time_point
<< (!is_zero(u.m_tsUnstableSince) ? FormatDuration<DUNIT_MS>(currtime - u.m_tsUnstableSince) : "NEVER")
<< "}");

if (currtime > u.m_tsLastRspTime)
{
// The last response predates the start of this function, look at the difference
const steady_clock::duration td_responsive = currtime - u.m_tsLastRspTime;
bool check_stability = true;

if (!is_zero(u.m_tsFreshActivation) && u.m_tsFreshActivation < currtime)
{
// The link is temporary-activated. Calculate then since the activation time.

// Check the last received ACK time first. This time is initialized with 'now'
// at the CUDT::open call, so you can't count on the trap zero time here, but
// it's still possible to check if activation time predates the ACK time. Things
// are here in the following possible order:
//
// - ACK time (old because defined at open)
// - Response time (old because the time of received handshake or keepalive counts)
// ... long time nothing ...
// - Activation time.
//
// If we have this situation, we have to wait for at least one ACK that is
// newer than activation time. However, if in this situation we have a fresh
// response, that is:
//
// - ACK time
// ...
// - Activation time
// - Response time (because a Keepalive had a caprice to come accidentally after sending)
//
// We still wait for a situation that there's at least one ACK that is newer than activation.
const int is_stable = sendBackup_CheckRunningLinkStable(u, currtime, d->weight);

// As we DO have activation time, we need to check if there's at least
// one ACK newer than activation, that is, td_acked < td_active
if (u.m_tsLastRspAckTime < u.m_tsFreshActivation)
{
check_stability = false;
HLOGC(gslog.Debug,
log << "grp/sendBackup: link @" << d->id
<< " activated after ACK, "
"not checking for stability");
}
else
{
u.m_tsFreshActivation = steady_clock::time_point();
}
}

if (check_stability && count_microseconds(td_responsive) > m_uOPT_StabilityTimeout)
{
if (is_zero(u.m_tsUnstableSince))
{
HLOGC(gslog.Debug,
log << "grp/sendBackup: socket NEW UNSTABLE: @" << d->id << " last heard "
<< FormatDuration(td_responsive) << " > " << m_uOPT_StabilityTimeout
<< " (stability timeout)");
// The link seems to have missed two ACKs already.
// Qualify this link as unstable
// Notify that it has been seen so since now
u.m_tsUnstableSince = currtime;
}

is_stable = false;
}
}

if (is_stable)
if (is_stable >= 0)
{
// If stability is ok, but unstable-since was set before, reset it.
HLOGC(gslog.Debug,
log << "grp/sendBackup: link STABLE: @" << d->id
<< (!is_zero(u.m_tsUnstableSince) ? " - RESTORED" : " - CONTINUED")
<< ", state RUNNING - will send a payload");

u.m_tsUnstableSince = steady_clock::time_point();

// For some cases
if (is_stable > 0)
u.m_tsFreshActivation = steady_clock::time_point();
}
else
{
HLOGC(gslog.Debug,
log << "grp/sendBackup: link UNSTABLE for " << FormatDuration(currtime - u.m_tsUnstableSince) << " : @"
<< d->id << " - will send a payload");
if (is_zero(u.m_tsUnstableSince))
{
u.m_tsUnstableSince = currtime;
}
}

return is_stable;
return is_stable >= 0;
}

// [[using locked(this->m_GroupLock)]]
Expand Down Expand Up @@ -3810,12 +3857,10 @@ void CUDTGroup::sendBackup_SilenceRedundantLinks(vector<gli_t>& w_parallel)
}
CUDT& ce = d->ps->core();
steady_clock::duration td(0);
if (!is_zero(ce.m_tsFreshActivation) &&
count_microseconds(td = currtime - ce.m_tsFreshActivation) < ce.m_uOPT_StabilityTimeout)
if (!is_zero(ce.m_tsFreshActivation) && sendBackup_CheckRunningLinkStable(ce, currtime, d->weight) != 1)
{
HLOGC(gslog.Debug,
log << "... not silencing @" << d->id << ": too early: " << FormatDuration(td) << " < "
<< ce.m_uOPT_StabilityTimeout << "(stability timeout)");
log << "... not silencing @" << d->id << ": too early: " << FormatDuration(td));
continue;
}

Expand Down

0 comments on commit 7656759

Please sign in to comment.