Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[MAINT] Refactoring around the ULists and Multiplexer #2693

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
51 changes: 30 additions & 21 deletions srtcore/api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,6 @@ srt::CUDTUnited::CUDTUnited()
: m_Sockets()
, m_GlobControlLock()
, m_IDLock()
, m_mMultiplexer()
, m_MultiplexerLock()
, m_pCache(NULL)
, m_bClosing(false)
Expand Down Expand Up @@ -2802,25 +2801,32 @@ void srt::CUDTUnited::removeSocket(const SRTSOCKET u)
return;
}

CMultiplexer& mx = m->second;
HLOGC(smlog.Debug, log << "unrefing underlying muxer " << mid << " for @" << u << ", ref=" << (m->second.m_iRefCount-1));
if (unrefMuxer(&m->second) == 0)
{
m_mMultiplexer.erase(m);
}
}

mx.m_iRefCount--;
HLOGC(smlog.Debug, log << "unrefing underlying muxer " << mid << " for @" << u << ", ref=" << mx.m_iRefCount);
if (0 == mx.m_iRefCount)
int srt::CUDTUnited::unrefMuxer(CMultiplexer* pmx)
{
pmx->m_iRefCount--;
if (0 == pmx->m_iRefCount)
{
HLOGC(smlog.Debug,
log << "MUXER id=" << mid << " lost last socket @" << u << " - deleting muxer bound to port "
<< mx.m_pChannel->bindAddressAny().hport());
log << "MUXER id=" << pmx->m_iID << " lost last socket - deleting muxer bound to port "
<< pmx->m_pChannel->bindAddressAny().hport());
// The channel has no access to the queues and
// it looks like the multiplexer is the master of all of them.
// The queues must be silenced before closing the channel
// because this will cause error to be returned in any operation
// being currently done in the queues, if any.
mx.m_pSndQueue->setClosing();
mx.m_pRcvQueue->setClosing();
mx.destroy();
m_mMultiplexer.erase(m);
pmx->m_pSndQueue->setClosing();
pmx->m_pRcvQueue->setClosing();
pmx->destroy();
}

return pmx->m_iRefCount;
}

void srt::CUDTUnited::configureMuxer(CMultiplexer& w_m, const CUDTSocket* s, int af)
Expand All @@ -2831,14 +2837,18 @@ void srt::CUDTUnited::configureMuxer(CMultiplexer& w_m, const CUDTSocket* s, int
w_m.m_iID = s->m_SocketID;
}

uint16_t srt::CUDTUnited::installMuxer(CUDTSocket* w_s, CMultiplexer& fw_sm)
void srt::CUDTUnited::bindSocketToMuxer(CUDTSocket* pw_s, CMultiplexer& fw_sm)
{
pw_s->core().m_pSndQueue = fw_sm.m_pSndQueue;
pw_s->core().m_pRcvQueue = fw_sm.m_pRcvQueue;
pw_s->m_iMuxID = fw_sm.m_iID;
}

uint16_t srt::CUDTUnited::getRealEndpoint(CUDTSocket* pw_s, CMultiplexer& fw_sm)
{
w_s->core().m_pSndQueue = fw_sm.m_pSndQueue;
w_s->core().m_pRcvQueue = fw_sm.m_pRcvQueue;
w_s->m_iMuxID = fw_sm.m_iID;
sockaddr_any sa;
fw_sm.m_pChannel->getSockAddr((sa));
w_s->m_SelfAddr = sa; // Will be also completed later, but here it's needed for later checks
pw_s->m_SelfAddr = sa; // Will be also completed later, but here it's needed for later checks
return sa.hport();
}

Expand Down Expand Up @@ -3085,7 +3095,7 @@ void srt::CUDTUnited::updateMux(CUDTSocket* s, const sockaddr_any& reqaddr, cons
HLOGC(smlog.Debug, log << "bind: reusing multiplexer for port " << port);
// reuse the existing multiplexer
++i->second.m_iRefCount;
installMuxer((s), (i->second));
bindSocketToMuxer((s), (i->second));
return;
}
else
Expand Down Expand Up @@ -3161,7 +3171,8 @@ void srt::CUDTUnited::updateMux(CUDTSocket* s, const sockaddr_any& reqaddr, cons

// Rewrite the port here, as it might be only known upon return
// from CChannel::open.
m.m_iPort = installMuxer((s), m);
bindSocketToMuxer((s), m);
m.m_iPort = getRealEndpoint((s), m);
m_mMultiplexer[m.m_iID] = m;
}
catch (const CUDTException&)
Expand Down Expand Up @@ -3261,9 +3272,7 @@ bool srt::CUDTUnited::updateListenerMux(CUDTSocket* s, const CUDTSocket* ls)
{
// reuse the existing multiplexer
++mux->m_iRefCount;
s->core().m_pSndQueue = mux->m_pSndQueue;
s->core().m_pRcvQueue = mux->m_pRcvQueue;
s->m_iMuxID = mux->m_iID;
bindSocketToMuxer((s), *mux);
return true;
}

Expand Down
4 changes: 3 additions & 1 deletion srtcore/api.h
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,8 @@ class CUDTUnited

// Utility functions for updateMux
void configureMuxer(CMultiplexer& w_m, const CUDTSocket* s, int af);
uint16_t installMuxer(CUDTSocket* w_s, CMultiplexer& sm);
void bindSocketToMuxer(CUDTSocket* w_s, CMultiplexer& sm);
uint16_t getRealEndpoint(CUDTSocket* w_s, CMultiplexer& sm);

/// @brief Checks if channel configuration matches the socket configuration.
/// @param cfgMuxer multiplexer configuration.
Expand Down Expand Up @@ -485,6 +486,7 @@ class CUDTUnited

void checkBrokenSockets();
void removeSocket(const SRTSOCKET u);
int unrefMuxer(CMultiplexer* mux);

CEPoll m_EPoll; // handling epoll data structures and events

Expand Down
8 changes: 6 additions & 2 deletions srtcore/queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,7 @@ void srt::CSndUList::signalInterrupt() const
m_ListCond.notify_one();
}

// [[using locked(this->m_ListLock)]]
void srt::CSndUList::realloc_()
{
CSNode** temp = NULL;
Expand All @@ -315,6 +316,7 @@ void srt::CSndUList::realloc_()
m_pHeap = temp;
}

// [[using locked(this->m_ListLock)]]
void srt::CSndUList::insert_(const steady_clock::time_point& ts, const CUDT* u)
{
// increase the heap array size if necessary
Expand All @@ -324,6 +326,7 @@ void srt::CSndUList::insert_(const steady_clock::time_point& ts, const CUDT* u)
insert_norealloc_(ts, u);
}

// [[using locked(this->m_ListLock)]]
void srt::CSndUList::insert_norealloc_(const steady_clock::time_point& ts, const CUDT* u)
{
CSNode* n = u->m_pSNode;
Expand Down Expand Up @@ -365,6 +368,7 @@ void srt::CSndUList::insert_norealloc_(const steady_clock::time_point& ts, const
}
}

// [[using locked(this->m_ListLock)]]
void srt::CSndUList::remove_(const CUDT* u)
{
CSNode* n = u->m_pSNode;
Expand Down Expand Up @@ -550,7 +554,7 @@ void* srt::CSndQueue::worker(void* param)
if (currtime < next_time)
{
THREAD_PAUSED();
self->m_pTimer->sleep_until(next_time);
self->m_pTimer->sleep_until(next_time, self->m_bClosing);
THREAD_RESUMED();
IF_DEBUG_HIGHRATE(self->m_WorkerStats.lSleepTo++);
}
Expand All @@ -570,7 +574,7 @@ void* srt::CSndQueue::worker(void* param)
<< UST(Opened));
#undef UST

if (!u->m_bConnected || u->m_bBroken)
if (!u->m_bConnected || u->m_bBroken || self->m_bClosing)
{
IF_DEBUG_HIGHRATE(self->m_WorkerStats.lNotReadyPop++);
continue;
Expand Down
40 changes: 27 additions & 13 deletions srtcore/sync.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -191,15 +191,34 @@ srt::sync::CTimer::~CTimer()
}


bool srt::sync::CTimer::sleep_until(TimePoint<steady_clock> tp)
bool srt::sync::CTimer::sleep_until(TimePoint<steady_clock> tp, const bool& forced)
{
// The class member m_sched_time can be used to interrupt the sleep.
// Refer to Timer::interrupt().
enterCS(m_event.mutex());
UniqueLock lk (m_event.mutex());
m_tsSchedTime = tp;
leaveCS(m_event.mutex());
return sleep_internal(lk, forced);
}

#if USE_BUSY_WAITING
#if !USE_BUSY_WAITING // Non-busy-waiting (build-default) version.
bool srt::sync::CTimer::sleep_internal(UniqueLock& lk, const bool& forced)
{
TimePoint<steady_clock> cur_tp = steady_clock::now();

while (cur_tp < m_tsSchedTime)
{
m_event.cond().wait_until(lk, m_tsSchedTime);
cur_tp = steady_clock::now();
if (forced)
return false;
}
return cur_tp >= m_tsSchedTime;
}

#else

bool srt::sync::CTimer::sleep_internal(UniqueLock& lk, const bool& forced)
{
#if defined(_WIN32)
// 10 ms on Windows: bad accuracy of timers
const steady_clock::duration
Expand All @@ -209,27 +228,23 @@ bool srt::sync::CTimer::sleep_until(TimePoint<steady_clock> tp)
const steady_clock::duration
td_threshold = milliseconds_from(1);
#endif
#endif // USE_BUSY_WAITING

TimePoint<steady_clock> cur_tp = steady_clock::now();

while (cur_tp < m_tsSchedTime)
{
#if USE_BUSY_WAITING
steady_clock::duration td_wait = m_tsSchedTime - cur_tp;
if (td_wait <= 2 * td_threshold)
break;

td_wait -= td_threshold;
m_event.lock_wait_for(td_wait);
#else
m_event.lock_wait_until(m_tsSchedTime);
#endif // USE_BUSY_WAITING
m_event.cond().wait_for(lk, td_wait);

cur_tp = steady_clock::now();
if (forced)
return false;
}

#if USE_BUSY_WAITING
while (cur_tp < m_tsSchedTime)
{
#ifdef IA32
Expand All @@ -248,11 +263,10 @@ bool srt::sync::CTimer::sleep_until(TimePoint<steady_clock> tp)

cur_tp = steady_clock::now();
}
#endif // USE_BUSY_WAITING

return cur_tp >= m_tsSchedTime;
}

#endif

void srt::sync::CTimer::interrupt()
{
Expand Down
5 changes: 4 additions & 1 deletion srtcore/sync.h
Original file line number Diff line number Diff line change
Expand Up @@ -711,7 +711,7 @@ class CTimer
///
/// @return true if the specified time was reached
/// false should never happen
bool sleep_until(steady_clock::time_point tp);
bool sleep_until(steady_clock::time_point tp, const bool& forced);

/// Resets target wait time and interrupts waiting
/// in sleep_until(..)
Expand All @@ -723,6 +723,9 @@ class CTimer
void tick();

private:

bool sleep_internal(UniqueLock& ulk, const bool& forced);

CEvent m_event;
steady_clock::time_point m_tsSchedTime;
};
Expand Down
3 changes: 2 additions & 1 deletion test/test_timer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,14 @@ TEST(CTimer, DISABLED_SleeptoAccuracy)
const uint64_t sleep_intervals_us[] = { 1, 5, 10, 50, 100, 250, 500, 1000, 5000, 10000 };

CTimer timer;
bool dummy_forced = false;

for (uint64_t interval_us : sleep_intervals_us)
{
for (int i = 0; i < num_samples; i++)
{
steady_clock::time_point currtime = steady_clock::now();
timer.sleep_until(currtime + microseconds_from(interval_us));
timer.sleep_until(currtime + microseconds_from(interval_us), dummy_forced);

steady_clock::time_point new_time = steady_clock::now();
sleeps_us[i] = count_microseconds(new_time - currtime);
Expand Down