Skip to content

Commit

Permalink
Merged and fixed #2674
Browse files Browse the repository at this point in the history
  • Loading branch information
Mikołaj Małecki committed May 8, 2023
2 parents afc1841 + 3247614 commit 83aca36
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 14 deletions.
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ endforeach()
# SRT_DEBUG_BONDING_STATES 1
# SRT_DEBUG_RTT 1 /* RTT trace */
# SRT_MAVG_SAMPLING_RATE 40 /* Max sampling rate */
# SRT_ENABLE_FREQUENT_LOG_TRACE 0 : set to 1 to enable printing reason for suppressed freq logs

# option defaults
set(ENABLE_HEAVY_LOGGING_DEFAULT OFF)
Expand Down
16 changes: 16 additions & 0 deletions srtcore/atomic.h
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,22 @@ class atomic {
#endif
}

T operator|=(T i) {
#if defined(ATOMIC_USE_SRT_SYNC_MUTEX) && (ATOMIC_USE_SRT_SYNC_MUTEX == 1)
ScopedLock lg_(mutex_);
const T t = value_ |= i;
return t;
#elif defined(ATOMIC_USE_GCC_INTRINSICS)
return __atomic_or_fetch(&value_, i, __ATOMIC_SEQ_CST);
#elif defined(ATOMIC_USE_MSVC_INTRINSICS)
return msvc::interlocked<T>::or_fetch(&value_, i);
#elif defined(ATOMIC_USE_CPP11_ATOMIC)
return value_ |= i;
#else
#error "Implement Me."
#endif
}

/// @brief Performs an atomic compare-and-swap (CAS) operation.
///
/// The value of the atomic object is only updated to the new value if the
Expand Down
5 changes: 5 additions & 0 deletions srtcore/atomic_msvc.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,11 @@ struct interlocked<T, 1> {
return static_cast<T>(_InterlockedExchange8(
reinterpret_cast<volatile char*>(x), static_cast<const char>(new_val)));
}

static inline T or_fetch(T volatile* x, const T val) {
return static_cast<T>(_InterlockedOr8(
reinterpret_cast<volatile char*>(x), static_cast<const char>(new_val)));
}
};

template <typename T>
Expand Down
84 changes: 72 additions & 12 deletions srtcore/core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -976,6 +976,7 @@ void srt::CUDT::open()
#endif

m_iReXmitCount = 1;
memset(&m_aSuppressedMsg, 0, sizeof m_aSuppressedMsg);
m_iPktCount = 0;
m_iLightACKCount = 1;
m_tsNextSendTime = steady_clock::time_point();
Expand Down Expand Up @@ -5421,9 +5422,19 @@ void * srt::CUDT::tsbpd(void* param)
<< iDropCnt << " packets) playable at " << FormatTime(info.tsbpd_time) << " delayed "
<< (timediff_us / 1000) << "." << std::setw(3) << std::setfill('0') << (timediff_us % 1000) << " ms");
#endif
LOGC(brlog.Warn, log << self->CONID() << "RCV-DROPPED " << iDropCnt << " packet(s). Packet seqno %" << info.seqno
<< " delayed for " << (timediff_us / 1000) << "." << std::setw(3) << std::setfill('0')
<< (timediff_us % 1000) << " ms");
string why;
if (self->frequentLogAllowed(FREQLOGFA_RCV_DROPPED, tnow, (why)))
{
LOGC(brlog.Warn, log << self->CONID() << "RCV-DROPPED " << iDropCnt << " packet(s). Packet seqno %" << info.seqno
<< " delayed for " << (timediff_us / 1000) << "." << std::setw(3) << std::setfill('0')
<< (timediff_us % 1000) << " ms " << why);
}
#if SRT_ENABLE_FREQUENT_LOG_TRACE
else
{
LOGC(brlog.Warn, log << "SUPPRESSED: RCV-DROPPED LOG: " << why);
}
#endif
#endif

tsNextDelivery = steady_clock::time_point(); // Ready to read, nothing to wait for.
Expand Down Expand Up @@ -5908,13 +5919,44 @@ void srt::CUDT::acceptAndRespond(const sockaddr_any& agent, const sockaddr_any&
addressAndSend((rsppkt));
}

bool srt::CUDT::frequentLogAllowed(const time_point& tnow) const
bool srt::CUDT::frequentLogAllowed(size_t logid, const time_point& tnow, std::string& w_why)
{
#ifndef SRT_LOG_SLOWDOWN_FREQ_MS
#define SRT_LOG_SLOWDOWN_FREQ_MS 1000
#endif

return (m_tsLogSlowDown + milliseconds_from(SRT_LOG_SLOWDOWN_FREQ_MS)) <= tnow;
bool is_suppressed = IsSet(m_LogSlowDownExpired, BIT(logid));
bool isnow = (m_tsLogSlowDown.load() + milliseconds_from(SRT_LOG_SLOWDOWN_FREQ_MS)) <= tnow;
if (isnow)
{
// Theoretically this should prevent other calls of this function to take
// set their values simultaneously, but if it happened that the time is
// also set, this section will not fire for the other log, if it didn't do
// the check yet.
m_LogSlowDownExpired.store(uint8_t(BIT(logid))); // Clear all other bits

// Note: it may happen that two threads could intermix one another between
// the check and setting up, but this will at worst case set the slightly
// later time again.
m_tsLogSlowDown.store(tnow);

is_suppressed = false;

int supr = m_aSuppressedMsg[logid];

if (supr > 0)
w_why = Sprint("++SUPPRESSED: ", supr);
m_aSuppressedMsg[logid] = 0;
}
else
{
w_why = Sprint("Too early - last one was ", FormatDuration<DUNIT_MS>(tnow - m_tsLogSlowDown.load()));
// Set YOUR OWN bit, atomically.
m_LogSlowDownExpired |= uint8_t(BIT(logid));
++m_aSuppressedMsg[logid];
}

return !is_suppressed;
}

// This function is required to be called when a caller receives an INDUCTION
Expand Down Expand Up @@ -8986,14 +9028,25 @@ void srt::CUDT::processCtrlDropReq(const CPacket& ctrlpkt)

if (iDropCnt > 0)
{
LOGC(brlog.Warn, log << CONID() << "RCV-DROPPED " << iDropCnt << " packet(s), seqno range %"
<< dropdata[0] << "-%" << dropdata[1] << ", #" << msgno << " (SND DROP REQUEST).");
ScopedLock lg (m_StatsLock);
const steady_clock::time_point tnow = steady_clock::now();
string why;
if (frequentLogAllowed(FREQLOGFA_RCV_DROPPED, tnow, (why)))
{
LOGC(brlog.Warn, log << CONID() << "RCV-DROPPED " << iDropCnt << " packet(s), seqno range %"
<< dropdata[0] << "-%" << dropdata[1] << ", #" << msgno
<< " (SND DROP REQUEST). " << why);
}
#if SRT_ENABLE_FREQUENT_LOG_TRACE
else
{
LOGC(brlog.Warn, log << "SUPPRESSED: RCV-DROPPED LOG: " << why);
}
#endif

enterCS(m_StatsLock);
// Estimate dropped bytes from average payload size.
const uint64_t avgpayloadsz = m_pRcvBuffer->getRcvAvgPayloadSize();
m_stats.rcvr.dropped.count(stats::BytesPacketsCount(iDropCnt * avgpayloadsz, (uint32_t) iDropCnt));
leaveCS(m_StatsLock);
}
}
// When the drop request was received, it means that there are
Expand Down Expand Up @@ -10098,12 +10151,19 @@ int srt::CUDT::handleSocketPacketReception(const vector<CUnit*>& incoming, bool&
ScopedLock lg(m_StatsLock);
m_stats.rcvr.dropped.count(stats::BytesPacketsCount(iDropCnt * rpkt.getLength(), iDropCnt));
m_stats.rcvr.undecrypted.count(stats::BytesPacketsCount(rpkt.getLength(), 1));
if (frequentLogAllowed(tnow))
string why;
if (frequentLogAllowed(FREQLOGFA_ENCRYPTION_FAILURE, tnow, (why)))
{
LOGC(qrlog.Warn, log << CONID() << "Decryption failed (seqno %" << u->m_Packet.getSeqNo() << "), dropped "
<< iDropCnt << ". pktRcvUndecryptTotal=" << m_stats.rcvr.undecrypted.total.count() << ".");
m_tsLogSlowDown = tnow;
<< iDropCnt << ". pktRcvUndecryptTotal=" << m_stats.rcvr.undecrypted.total.count() << "." << why);
}
#if SRT_ENABLE_FREQUENT_LOG_TRACE
else
{

LOGC(qrlog.Warn, log << "SUPPRESSED: Decryption failed LOG: " << why);
}
#endif
}
}
}
Expand Down
14 changes: 12 additions & 2 deletions srtcore/core.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ modified by

#include <haicrypt.h>

#ifndef SRT_ENABLE_FREQUENT_LOG_TRACE
#define SRT_ENABLE_FREQUENT_LOG_TRACE 0
#endif


// TODO: Utility function - to be moved to utilities.h?
template <class T>
Expand Down Expand Up @@ -915,14 +919,20 @@ class CUDT
SRT_ATTR_GUARDED_BY(m_RecvAckLock)
int32_t m_iReXmitCount; // Re-Transmit Count since last ACK

time_point m_tsLogSlowDown; // The last time a log message from the "slow down" group was shown.
static const size_t
MAX_FREQLOGFA = 2,
FREQLOGFA_ENCRYPTION_FAILURE = 0,
FREQLOGFA_RCV_DROPPED = 1;
atomic_time_point m_tsLogSlowDown; // The last time a log message from the "slow down" group was shown.
// The "slow down" group of logs are those that can be printed too often otherwise, but can't be turned off (warnings and errors).
// Currently only used by decryption failure message, therefore no mutex protection needed.
sync::atomic<uint8_t> m_LogSlowDownExpired; // Can't use bitset because atomic
sync::atomic<int> m_aSuppressedMsg[MAX_FREQLOGFA];

/// @brief Check if a frequent log can be shown.
/// @param tnow current time
/// @return true if it is ok to print a frequent log message.
bool frequentLogAllowed(const time_point& tnow) const;
bool frequentLogAllowed(size_t logid, const time_point& tnow, std::string& why);

private: // Receiving related data
CRcvBuffer* m_pRcvBuffer; //< Receiver buffer
Expand Down

0 comments on commit 83aca36

Please sign in to comment.