diff --git a/srtcore/api.cpp b/srtcore/api.cpp index 63dab3927..1d0fd8385 100644 --- a/srtcore/api.cpp +++ b/srtcore/api.cpp @@ -174,7 +174,6 @@ srt::CUDTUnited::CUDTUnited() : m_Sockets() , m_GlobControlLock() , m_IDLock() - , m_mMultiplexer() , m_MultiplexerLock() , m_pCache(NULL) , m_bClosing(false) @@ -2802,25 +2801,32 @@ void srt::CUDTUnited::removeSocket(const SRTSOCKET u) return; } - CMultiplexer& mx = m->second; + HLOGC(smlog.Debug, log << "unrefing underlying muxer " << mid << " for @" << u << ", ref=" << (m->second.m_iRefCount-1)); + if (unrefMuxer(&m->second) == 0) + { + m_mMultiplexer.erase(m); + } +} - mx.m_iRefCount--; - HLOGC(smlog.Debug, log << "unrefing underlying muxer " << mid << " for @" << u << ", ref=" << mx.m_iRefCount); - if (0 == mx.m_iRefCount) +int srt::CUDTUnited::unrefMuxer(CMultiplexer* pmx) +{ + pmx->m_iRefCount--; + if (0 == pmx->m_iRefCount) { HLOGC(smlog.Debug, - log << "MUXER id=" << mid << " lost last socket @" << u << " - deleting muxer bound to port " - << mx.m_pChannel->bindAddressAny().hport()); + log << "MUXER id=" << pmx->m_iID << " lost last socket - deleting muxer bound to port " + << pmx->m_pChannel->bindAddressAny().hport()); // The channel has no access to the queues and // it looks like the multiplexer is the master of all of them. // The queues must be silenced before closing the channel // because this will cause error to be returned in any operation // being currently done in the queues, if any. - mx.m_pSndQueue->setClosing(); - mx.m_pRcvQueue->setClosing(); - mx.destroy(); - m_mMultiplexer.erase(m); + pmx->m_pSndQueue->setClosing(); + pmx->m_pRcvQueue->setClosing(); + pmx->destroy(); } + + return pmx->m_iRefCount; } void srt::CUDTUnited::configureMuxer(CMultiplexer& w_m, const CUDTSocket* s, int af) @@ -2831,14 +2837,18 @@ void srt::CUDTUnited::configureMuxer(CMultiplexer& w_m, const CUDTSocket* s, int w_m.m_iID = s->m_SocketID; } -uint16_t srt::CUDTUnited::installMuxer(CUDTSocket* w_s, CMultiplexer& fw_sm) +void srt::CUDTUnited::bindSocketToMuxer(CUDTSocket* pw_s, CMultiplexer& fw_sm) +{ + pw_s->core().m_pSndQueue = fw_sm.m_pSndQueue; + pw_s->core().m_pRcvQueue = fw_sm.m_pRcvQueue; + pw_s->m_iMuxID = fw_sm.m_iID; +} + +uint16_t srt::CUDTUnited::getRealEndpoint(CUDTSocket* pw_s, CMultiplexer& fw_sm) { - w_s->core().m_pSndQueue = fw_sm.m_pSndQueue; - w_s->core().m_pRcvQueue = fw_sm.m_pRcvQueue; - w_s->m_iMuxID = fw_sm.m_iID; sockaddr_any sa; fw_sm.m_pChannel->getSockAddr((sa)); - w_s->m_SelfAddr = sa; // Will be also completed later, but here it's needed for later checks + pw_s->m_SelfAddr = sa; // Will be also completed later, but here it's needed for later checks return sa.hport(); } @@ -3085,7 +3095,7 @@ void srt::CUDTUnited::updateMux(CUDTSocket* s, const sockaddr_any& reqaddr, cons HLOGC(smlog.Debug, log << "bind: reusing multiplexer for port " << port); // reuse the existing multiplexer ++i->second.m_iRefCount; - installMuxer((s), (i->second)); + bindSocketToMuxer((s), (i->second)); return; } else @@ -3161,7 +3171,8 @@ void srt::CUDTUnited::updateMux(CUDTSocket* s, const sockaddr_any& reqaddr, cons // Rewrite the port here, as it might be only known upon return // from CChannel::open. - m.m_iPort = installMuxer((s), m); + bindSocketToMuxer((s), m); + m.m_iPort = getRealEndpoint((s), m); m_mMultiplexer[m.m_iID] = m; } catch (const CUDTException&) @@ -3261,9 +3272,7 @@ bool srt::CUDTUnited::updateListenerMux(CUDTSocket* s, const CUDTSocket* ls) { // reuse the existing multiplexer ++mux->m_iRefCount; - s->core().m_pSndQueue = mux->m_pSndQueue; - s->core().m_pRcvQueue = mux->m_pRcvQueue; - s->m_iMuxID = mux->m_iID; + bindSocketToMuxer((s), *mux); return true; } diff --git a/srtcore/api.h b/srtcore/api.h index 9ba77d23a..ec5791e0f 100644 --- a/srtcore/api.h +++ b/srtcore/api.h @@ -449,7 +449,8 @@ class CUDTUnited // Utility functions for updateMux void configureMuxer(CMultiplexer& w_m, const CUDTSocket* s, int af); - uint16_t installMuxer(CUDTSocket* w_s, CMultiplexer& sm); + void bindSocketToMuxer(CUDTSocket* w_s, CMultiplexer& sm); + uint16_t getRealEndpoint(CUDTSocket* w_s, CMultiplexer& sm); /// @brief Checks if channel configuration matches the socket configuration. /// @param cfgMuxer multiplexer configuration. @@ -485,6 +486,7 @@ class CUDTUnited void checkBrokenSockets(); void removeSocket(const SRTSOCKET u); + int unrefMuxer(CMultiplexer* mux); CEPoll m_EPoll; // handling epoll data structures and events diff --git a/srtcore/queue.cpp b/srtcore/queue.cpp index 4282965b4..2de5e242e 100644 --- a/srtcore/queue.cpp +++ b/srtcore/queue.cpp @@ -296,6 +296,7 @@ void srt::CSndUList::signalInterrupt() const m_ListCond.notify_one(); } +// [[using locked(this->m_ListLock)]] void srt::CSndUList::realloc_() { CSNode** temp = NULL; @@ -315,6 +316,7 @@ void srt::CSndUList::realloc_() m_pHeap = temp; } +// [[using locked(this->m_ListLock)]] void srt::CSndUList::insert_(const steady_clock::time_point& ts, const CUDT* u) { // increase the heap array size if necessary @@ -324,6 +326,7 @@ void srt::CSndUList::insert_(const steady_clock::time_point& ts, const CUDT* u) insert_norealloc_(ts, u); } +// [[using locked(this->m_ListLock)]] void srt::CSndUList::insert_norealloc_(const steady_clock::time_point& ts, const CUDT* u) { CSNode* n = u->m_pSNode; @@ -365,6 +368,7 @@ void srt::CSndUList::insert_norealloc_(const steady_clock::time_point& ts, const } } +// [[using locked(this->m_ListLock)]] void srt::CSndUList::remove_(const CUDT* u) { CSNode* n = u->m_pSNode; @@ -550,7 +554,7 @@ void* srt::CSndQueue::worker(void* param) if (currtime < next_time) { THREAD_PAUSED(); - self->m_pTimer->sleep_until(next_time); + self->m_pTimer->sleep_until(next_time, self->m_bClosing); THREAD_RESUMED(); IF_DEBUG_HIGHRATE(self->m_WorkerStats.lSleepTo++); } @@ -570,7 +574,7 @@ void* srt::CSndQueue::worker(void* param) << UST(Opened)); #undef UST - if (!u->m_bConnected || u->m_bBroken) + if (!u->m_bConnected || u->m_bBroken || self->m_bClosing) { IF_DEBUG_HIGHRATE(self->m_WorkerStats.lNotReadyPop++); continue; diff --git a/srtcore/sync.cpp b/srtcore/sync.cpp index a7cebb909..c06ceffaf 100644 --- a/srtcore/sync.cpp +++ b/srtcore/sync.cpp @@ -191,15 +191,34 @@ srt::sync::CTimer::~CTimer() } -bool srt::sync::CTimer::sleep_until(TimePoint tp) +bool srt::sync::CTimer::sleep_until(TimePoint tp, const bool& forced) { // The class member m_sched_time can be used to interrupt the sleep. // Refer to Timer::interrupt(). - enterCS(m_event.mutex()); + UniqueLock lk (m_event.mutex()); m_tsSchedTime = tp; - leaveCS(m_event.mutex()); + return sleep_internal(lk, forced); +} -#if USE_BUSY_WAITING +#if !USE_BUSY_WAITING // Non-busy-waiting (build-default) version. +bool srt::sync::CTimer::sleep_internal(UniqueLock& lk, const bool& forced) +{ + TimePoint cur_tp = steady_clock::now(); + + while (cur_tp < m_tsSchedTime) + { + m_event.cond().wait_until(lk, m_tsSchedTime); + cur_tp = steady_clock::now(); + if (forced) + return false; + } + return cur_tp >= m_tsSchedTime; +} + +#else + +bool srt::sync::CTimer::sleep_internal(UniqueLock& lk, const bool& forced) +{ #if defined(_WIN32) // 10 ms on Windows: bad accuracy of timers const steady_clock::duration @@ -209,27 +228,23 @@ bool srt::sync::CTimer::sleep_until(TimePoint tp) const steady_clock::duration td_threshold = milliseconds_from(1); #endif -#endif // USE_BUSY_WAITING TimePoint cur_tp = steady_clock::now(); while (cur_tp < m_tsSchedTime) { -#if USE_BUSY_WAITING steady_clock::duration td_wait = m_tsSchedTime - cur_tp; if (td_wait <= 2 * td_threshold) break; td_wait -= td_threshold; - m_event.lock_wait_for(td_wait); -#else - m_event.lock_wait_until(m_tsSchedTime); -#endif // USE_BUSY_WAITING + m_event.cond().wait_for(lk, td_wait); cur_tp = steady_clock::now(); + if (forced) + return false; } -#if USE_BUSY_WAITING while (cur_tp < m_tsSchedTime) { #ifdef IA32 @@ -248,11 +263,10 @@ bool srt::sync::CTimer::sleep_until(TimePoint tp) cur_tp = steady_clock::now(); } -#endif // USE_BUSY_WAITING return cur_tp >= m_tsSchedTime; } - +#endif void srt::sync::CTimer::interrupt() { diff --git a/srtcore/sync.h b/srtcore/sync.h index 87be6f458..608b9ff53 100644 --- a/srtcore/sync.h +++ b/srtcore/sync.h @@ -711,7 +711,7 @@ class CTimer /// /// @return true if the specified time was reached /// false should never happen - bool sleep_until(steady_clock::time_point tp); + bool sleep_until(steady_clock::time_point tp, const bool& forced); /// Resets target wait time and interrupts waiting /// in sleep_until(..) @@ -723,6 +723,9 @@ class CTimer void tick(); private: + + bool sleep_internal(UniqueLock& ulk, const bool& forced); + CEvent m_event; steady_clock::time_point m_tsSchedTime; }; diff --git a/test/test_timer.cpp b/test/test_timer.cpp index df2fc2e48..aa0dcf63c 100644 --- a/test/test_timer.cpp +++ b/test/test_timer.cpp @@ -20,13 +20,14 @@ TEST(CTimer, DISABLED_SleeptoAccuracy) const uint64_t sleep_intervals_us[] = { 1, 5, 10, 50, 100, 250, 500, 1000, 5000, 10000 }; CTimer timer; + bool dummy_forced = false; for (uint64_t interval_us : sleep_intervals_us) { for (int i = 0; i < num_samples; i++) { steady_clock::time_point currtime = steady_clock::now(); - timer.sleep_until(currtime + microseconds_from(interval_us)); + timer.sleep_until(currtime + microseconds_from(interval_us), dummy_forced); steady_clock::time_point new_time = steady_clock::now(); sleeps_us[i] = count_microseconds(new_time - currtime);