diff --git a/srtcore/group.cpp b/srtcore/group.cpp index 2bcc1e142..bd34eb99e 100644 --- a/srtcore/group.cpp +++ b/srtcore/group.cpp @@ -1918,6 +1918,188 @@ struct FLookupSocketWithEvent_LOCKED } }; +void CUDTGroup::recv_CollectAliveAndBroken(vector& alive, set& broken) +{ +#if ENABLE_HEAVY_LOGGING + std::ostringstream ds; + ds << "E(" << m_RcvEID << ") "; +#define HCLOG(expr) expr +#else +#define HCLOG(x) if (false) {} +#endif + + alive.reserve(m_Group.size()); + + HLOGC(grlog.Debug, log << "group/recv: Reviewing member sockets for polling"); + for (gli_t gi = m_Group.begin(); gi != m_Group.end(); ++gi) + { + if (gi->laststatus == SRTS_CONNECTING) + { + HCLOG(ds << "@" << gi->id << " "); + continue; // don't read over a failed or pending socket + } + + if (gi->laststatus >= SRTS_BROKEN) + { + broken.insert(gi->ps); + } + + if (broken.count(gi->ps)) + { + HCLOG(ds << "@" << gi->id << " "); + continue; + } + + if (gi->laststatus != SRTS_CONNECTED) + { + HCLOG(ds << "@" << gi->id << "laststatus) << "> "); + // Sockets in this state are ignored. We are waiting until it + // achieves CONNECTING state, then it's added to write. + // Or gets broken and closed in the next step. + continue; + } + + // Don't skip packets that are ahead because if we have a situation + // that all links are either "elephants" (do not report read readiness) + // and "kangaroos" (have already delivered an ahead packet) then + // omiting kangaroos will result in only elephants to be polled for + // reading. Due to the strict timing requirements and ensurance that + // TSBPD on every link will result in exactly the same delivery time + // for a packet of given sequence, having an elephant and kangaroo in + // one cage means that the elephant is simply a broken or half-broken + // link (the data are not delivered, but it will get repaired soon, + // enough for SRT to maintain the connection, but it will still drop + // packets that didn't arrive in time), in both cases it may + // potentially block the reading for an indefinite time, while + // simultaneously a kangaroo might be a link that got some packets + // dropped, but then it's still capable to deliver packets on time. + + // Note that gi->id might be a socket that was previously being polled + // on write, when it's attempting to connect, but now it's connected. + // This will update the socket with the new event set. + + alive.push_back(gi->ps); + HCLOG(ds << "@" << gi->id << "[READ] "); + } + + HLOGC(grlog.Debug, log << "group/recv: " << ds.str() << " --> EPOLL/SWAIT"); +#undef HCLOG +} + +vector CUDTGroup::recv_WaitForReadReady(const vector& aliveMembers, set& w_broken) +{ + if (aliveMembers.empty()) + { + LOGC(grlog.Error, log << "group/recv: all links broken"); + throw CUDTException(MJ_CONNECTION, MN_NOCONN, 0); + } + + for (vector::const_iterator i = aliveMembers.begin(); i != aliveMembers.end(); ++i) + { + // NOT using the official srt_epoll_add_usock because this will do socket dispatching, + // which requires lock on m_GlobControlLock, while this lock cannot be applied without + // first unlocking m_GroupLock. + const int read_modes = SRT_EPOLL_IN | SRT_EPOLL_ERR; + CUDT::s_UDTUnited.epoll_add_usock_INTERNAL(m_RcvEID, *i, &read_modes); + } + + // Here we need to make an additional check. + // There might be a possibility that all sockets that + // were added to the reader group, are ahead. At least + // surely we don't have a situation that any link contains + // an ahead-read subsequent packet, because GroupCheckPacketAhead + // already handled that case. + // + // What we can have is that every link has: + // - no known seq position yet (is not registered in the position map yet) + // - the position equal to the latest delivered sequence + // - the ahead position + + // Now the situation is that we don't have any packets + // waiting for delivery so we need to wait for any to report one. + + // The non-blocking mode would need to simply check the readiness + // with only immediate report, and read-readiness would have to + // be done in background. + + // In blocking mode, use m_iRcvTimeOut, which's default value -1 + // means to block indefinitely, also in swait(). + // In non-blocking mode use 0, which means to always return immediately. + int timeout = m_bSynRecving ? m_iRcvTimeOut : 0; + int nready = 0; + // Poll on this descriptor until reading is available, indefinitely. + CEPoll::fmap_t sready; + + // GlobControlLock is required for dispatching the sockets. + // Therefore it must be applied only when GroupLock is off. + { + // This call may wait indefinite time, so GroupLock must be unlocked. + InvertedLock ung (m_GroupLock); + THREAD_PAUSED(); + nready = m_pGlobal->m_EPoll.swait(*m_RcvEpolld, sready, timeout, false /*report by retval*/); + THREAD_RESUMED(); + + // HERE GlobControlLock is locked first, then GroupLock is applied back + enterCS(CUDT::s_UDTUnited.m_GlobControlLock); + } + // BOTH m_GlobControlLock AND m_GroupLock are locked here. + + HLOGC(grlog.Debug, log << "group/recv: " << nready << " RDY: " << DisplayEpollResults(sready)); + + if (nready == 0) + { + // GlobControlLock is applied manually, so unlock manually. + // GroupLock will be unlocked as per scope. + leaveCS(CUDT::s_UDTUnited.m_GlobControlLock); + // This can only happen when 0 is passed as timeout and none is ready. + // And 0 is passed only in non-blocking mode. So this is none ready in + // non-blocking mode. + throw CUDTException(MJ_AGAIN, MN_RDAVAIL, 0); + } + + // Handle sockets of pending connection and with errors. + + // Nice to have something like: + + // broken = FilterIf(sready, [] (auto s) + // { return s.second == SRT_EPOLL_ERR && (auto cs = g->locateSocket(s.first, ERH_RETURN)) + // ? {cs, true} + // : {nullptr, false} + // }); + + FilterIf( + /*FROM*/ sready.begin(), + sready.end(), + /*TO*/ std::inserter(w_broken, w_broken.begin()), + /*VIA*/ FLookupSocketWithEvent_LOCKED(m_pGlobal, SRT_EPOLL_ERR)); + + + // If this set is empty, it won't roll even once, therefore output + // will be surely empty. This will be checked then same way as when + // reading from every socket resulted in error. + vector readReady; + readReady.reserve(sready.size()); + for (CEPoll::fmap_t::const_iterator i = sready.begin(); i != sready.end(); ++i) + { + if (i->second & SRT_EPOLL_ERR) + continue; // broken already + + if ((i->second & SRT_EPOLL_IN) == 0) + continue; // not ready for reading + + // Check if this socket is in aheads + // If so, don't read from it, wait until the ahead is flushed. + SRTSOCKET id = i->first; + CUDTSocket* ps = m_pGlobal->locateSocket_LOCKED(id); + if (ps) + readReady.push_back(ps); + } + + leaveCS(CUDT::s_UDTUnited.m_GlobControlLock); + + return readReady; +} + void CUDTGroup::updateReadState(SRTSOCKET /* not sure if needed */, int32_t sequence) { bool ready = false; @@ -2097,195 +2279,16 @@ int CUDTGroup::recv(char* buf, int len, SRT_MSGCTRL& w_mc) // during the next time ahead check, after which they will become // horses. -#if ENABLE_HEAVY_LOGGING - std::ostringstream ds; - ds << "E(" << m_RcvEID << ") "; -#define HCLOG(expr) expr -#else -#define HCLOG(x) \ - if (false) \ - { \ - } -#endif - - bool still_alive = false; - size_t size = 0; + const size_t size = m_Group.size(); - // You can't lock the whole group for that - // action because this will result in a deadlock. // Prepare first the list of sockets to be added as connect-pending // and as read-ready, then unlock the group, and then add them to epoll. - vector read_ready; - vector connect_pending; - - { - HLOGC(grlog.Debug, log << "group/recv: Reviewing member sockets to epoll-add"); - for (gli_t gi = m_Group.begin(); gi != m_Group.end(); ++gi) - { - ++size; // list::size loops over all elements anyway, so this hits two birds with one stone - if (gi->laststatus == SRTS_CONNECTING) - { - HCLOG(ds << "@" << gi->id << " "); - /* - connect_pending.push_back(gi->id); - */ - - continue; // don't read over a failed or pending socket - } - - if (gi->laststatus >= SRTS_BROKEN) - { - broken.insert(gi->ps); - } - - if (broken.count(gi->ps)) - { - HCLOG(ds << "@" << gi->id << " "); - continue; - } - - if (gi->laststatus != SRTS_CONNECTED) - { - HCLOG(ds << "@" << gi->id << "laststatus) << "> "); - // Sockets in this state are ignored. We are waiting until it - // achieves CONNECTING state, then it's added to write. - // Or gets broken and closed in the next step. - continue; - } - - still_alive = true; - - // Don't skip packets that are ahead because if we have a situation - // that all links are either "elephants" (do not report read readiness) - // and "kangaroos" (have already delivered an ahead packet) then - // omiting kangaroos will result in only elephants to be polled for - // reading. Due to the strict timing requirements and ensurance that - // TSBPD on every link will result in exactly the same delivery time - // for a packet of given sequence, having an elephant and kangaroo in - // one cage means that the elephant is simply a broken or half-broken - // link (the data are not delivered, but it will get repaired soon, - // enough for SRT to maintain the connection, but it will still drop - // packets that didn't arrive in time), in both cases it may - // potentially block the reading for an indefinite time, while - // simultaneously a kangaroo might be a link that got some packets - // dropped, but then it's still capable to deliver packets on time. - - // Note that gi->id might be a socket that was previously being polled - // on write, when it's attempting to connect, but now it's connected. - // This will update the socket with the new event set. - - read_ready.push_back(gi->ps); - HCLOG(ds << "@" << gi->id << "[READ] "); - } - } - - int read_modes = SRT_EPOLL_IN | SRT_EPOLL_ERR; - - /* Done at the connecting stage so that it won't be missed. - - int connect_modes = SRT_EPOLL_OUT | SRT_EPOLL_ERR; - for (vector::iterator i = connect_pending.begin(); i != connect_pending.end(); ++i) - { - XXX This is wrong code; this should use the internal function and pass CUDTSocket* - epoll_add_usock_INTERNAL(m_RcvEID, i->second, &connect_modes); - } - - AND this below additionally for sockets that were so far pending connection, - will be now "upgraded" to readable sockets. The epoll adding function for a - socket that already is in the eid container will only change the poll flags, - but will not re-add it, that is, event reports that are both in old and new - flags will survive the operation. - - */ - - for (vector::iterator i = read_ready.begin(); i != read_ready.end(); ++i) - { - // NOT using the official srt_epoll_add_usock because this will do socket dispatching, - // which requires lock on m_GlobControlLock, while this lock cannot be applied without - // first unlocking m_GroupLock. - CUDT::s_UDTUnited.epoll_add_usock_INTERNAL(m_RcvEID, *i, &read_modes); - } - - HLOGC(grlog.Debug, log << "group/recv: " << ds.str() << " --> EPOLL/SWAIT"); -#undef HCLOG - - if (!still_alive) - { - LOGC(grlog.Error, log << "group/recv: all links broken"); - throw CUDTException(MJ_CONNECTION, MN_NOCONN, 0); - } - - // Here we need to make an additional check. - // There might be a possibility that all sockets that - // were added to the reader group, are ahead. At least - // surely we don't have a situation that any link contains - // an ahead-read subsequent packet, because GroupCheckPacketAhead - // already handled that case. - // - // What we can have is that every link has: - // - no known seq position yet (is not registered in the position map yet) - // - the position equal to the latest delivered sequence - // - the ahead position - - // Now the situation is that we don't have any packets - // waiting for delivery so we need to wait for any to report one. - - // XXX We support blocking mode only at the moment. - // The non-blocking mode would need to simply check the readiness - // with only immediate report, and read-readiness would have to - // be done in background. - - // Poll on this descriptor until reading is available, indefinitely. - CEPoll::fmap_t sready; - - // In blocking mode, use m_iRcvTimeOut, which's default value -1 - // means to block indefinitely, also in swait(). - // In non-blocking mode use 0, which means to always return immediately. - int timeout = m_bSynRecving ? m_iRcvTimeOut : 0; - int nready = 0; - - // GlobControlLock is required for dispatching the sockets. - // Therefore it must be applied only when GroupLock is off. - { - // This call may wait indefinite time, so GroupLock must be unlocked. - InvertedLock ung (m_GroupLock); - THREAD_PAUSED(); - nready = m_pGlobal->m_EPoll.swait(*m_RcvEpolld, sready, timeout, false /*report by retval*/); - THREAD_RESUMED(); - - // HERE GlobControlLock is locked first, then GroupLock is applied back - enterCS(CUDT::s_UDTUnited.m_GlobControlLock); - } - // BOTH m_GlobControlLock AND m_GroupLock are locked here. - - HLOGC(grlog.Debug, log << "group/recv: " << nready << " RDY: " << DisplayEpollResults(sready)); - - if (nready == 0) - { - // GlobControlLock is applied manually, so unlock manually. - // GroupLock will be unlocked as per scope. - leaveCS(CUDT::s_UDTUnited.m_GlobControlLock); - // This can only happen when 0 is passed as timeout and none is ready. - // And 0 is passed only in non-blocking mode. So this is none ready in - // non-blocking mode. - throw CUDTException(MJ_AGAIN, MN_RDAVAIL, 0); - } - - // Handle sockets of pending connection and with errors. - - // Nice to have something like: - - // broken = FilterIf(sready, [] (auto s) - // { return s.second == SRT_EPOLL_ERR && (auto cs = g->locateSocket(s.first, ERH_RETURN)) - // ? {cs, true} - // : {nullptr, false} - // }); + vector aliveMembers; + recv_CollectAliveAndBroken(aliveMembers, broken); - FilterIf( - /*FROM*/ sready.begin(), - sready.end(), - /*TO*/ std::inserter(broken, broken.begin()), - /*VIA*/ FLookupSocketWithEvent_LOCKED(m_pGlobal, SRT_EPOLL_ERR)); + const vector ready_sockets = recv_WaitForReadReady(aliveMembers, broken); + // m_GlobControlLock lifted, m_GroupLock still locked. + // Now we can safely do this scoped way. // Ok, now we need to have some extra qualifications: // 1. If a socket has no registry yet, we read anyway, just @@ -2300,32 +2303,6 @@ int CUDTGroup::recv(char* buf, int len, SRT_MSGCTRL& w_mc) int32_t next_seq = m_RcvBaseSeqNo; - // If this set is empty, it won't roll even once, therefore output - // will be surely empty. This will be checked then same way as when - // reading from every socket resulted in error. - - vector ready_sockets; - - for (CEPoll::fmap_t::const_iterator i = sready.begin(); i != sready.end(); ++i) - { - if (i->second & SRT_EPOLL_ERR) - continue; // broken already - - if ((i->second & SRT_EPOLL_IN) == 0) - continue; // not ready for reading - - // Check if this socket is in aheads - // If so, don't read from it, wait until the ahead is flushed. - SRTSOCKET id = i->first; - CUDTSocket* ps = m_pGlobal->locateSocket_LOCKED(id); - if (ps) - ready_sockets.push_back(ps); - } - leaveCS(CUDT::s_UDTUnited.m_GlobControlLock); - - // m_GlobControlLock lifted, m_GroupLock still locked. - // Now we can safely do this scoped way. - if (m_bClosing) { HLOGC(gslog.Debug, log << "grp/sendBroadcast: GROUP CLOSED, ABANDONING"); @@ -2339,7 +2316,7 @@ int CUDTGroup::recv(char* buf, int len, SRT_MSGCTRL& w_mc) // which is still applied here. So this will have to wait for this function to finish // (or block on swait, in which case the lock is lifted) anyway. - for (vector::iterator si = ready_sockets.begin(); si != ready_sockets.end(); ++si) + for (vector::const_iterator si = ready_sockets.begin(); si != ready_sockets.end(); ++si) { CUDTSocket* ps = *si; SRTSOCKET id = ps->m_SocketID; @@ -2354,7 +2331,7 @@ int CUDTGroup::recv(char* buf, int len, SRT_MSGCTRL& w_mc) // x = 0: the socket should be ready to get the exactly next packet // x = 1: the case is already handled by GroupCheckPacketAhead. // x > 1: AHEAD. DO NOT READ. - int seqdiff = CSeqNo::seqcmp(p->mctrl.pktseq, m_RcvBaseSeqNo); + const int seqdiff = CSeqNo::seqcmp(p->mctrl.pktseq, m_RcvBaseSeqNo); if (seqdiff > 1) { HLOGC(grlog.Debug, @@ -3548,6 +3525,18 @@ void CUDTGroup::sendBackup_RetryWaitBlocked(const vector& unstableLinks, SRT_MSGCTRL& w_mc, CUDTException& w_cx) { +#if ENABLE_HEAVY_LOGGING + // Potential problem to be checked in developer mode + for (vector::iterator p = w_parallel.begin(); p != w_parallel.end(); ++p) + { + if (std::find(unstableLinks.begin(), unstableLinks.end(), *p) != unstableLinks.end()) + { + LOGC(gslog.Debug, + log << "grp/sendBackup: IPE: parallel links enclose unstable link @" << (*p)->ps->m_SocketID); + } + } +#endif + // In contradiction to broadcast sending, backup sending must check // the blocking state in total first. We need this information through // epoll because we didn't use all sockets to send the data hence the @@ -3759,22 +3748,8 @@ void CUDTGroup::sendBackup_RetryWaitBlocked(const vector& unstableLinks, } // [[using locked(this->m_GroupLock)]] -void CUDTGroup::sendBackup_SilenceRedundantLinks(const vector& unstableLinks, - vector& w_parallel) +void CUDTGroup::sendBackup_SilenceRedundantLinks(vector& w_parallel) { -#if ENABLE_HEAVY_LOGGING - // Potential problem to be checked in developer mode - for (vector::iterator p = w_parallel.begin(); p != w_parallel.end(); ++p) - { - if (std::find(unstableLinks.begin(), unstableLinks.end(), *p) != unstableLinks.end()) - { - LOGC(gslog.Debug, - log << "grp/sendBackup: IPE: parallel links enclose unstable link @" << (*p)->ps->m_SocketID); - } - } -#endif - - // The most important principle is to keep the data being sent constantly, // even if it means temporarily full redundancy. However, if you are certain // that you have multiple stable links running at the moment, SILENCE all but @@ -4101,7 +4076,7 @@ int CUDTGroup::sendBackup(const char* buf, int len, SRT_MSGCTRL& w_mc) sendBackup_RetryWaitBlocked(unstableLinks, (parallel), (final_stat), (none_succeeded), (w_mc), (cx)); - sendBackup_SilenceRedundantLinks(unstableLinks, (parallel)); + sendBackup_SilenceRedundantLinks((parallel)); // (closing condition checked inside this call) if (none_succeeded) diff --git a/srtcore/group.h b/srtcore/group.h index e1853babe..afb05f0f9 100644 --- a/srtcore/group.h +++ b/srtcore/group.h @@ -326,8 +326,7 @@ class CUDTGroup bool& w_none_succeeded, SRT_MSGCTRL& w_mc, CUDTException& w_cx); - void sendBackup_SilenceRedundantLinks(const std::vector& unstable, - std::vector& w_parallel); + void sendBackup_SilenceRedundantLinks(std::vector& w_parallel); void send_CheckValidSockets(); @@ -666,6 +665,17 @@ class CUDTGroup ReadPos* checkPacketAhead(); + void recv_CollectAliveAndBroken(std::vector& w_alive, std::set& w_broken); + + /// The function polls alive member sockets and retrieves a list of read-ready. + /// [acquires lock for CUDT::s_UDTUnited.m_GlobControlLock] + /// [[using locked(m_GroupLock)]] temporally unlocks-locks internally + /// + /// @returns list of read-ready sockets + /// @throws CUDTException(MJ_CONNECTION, MN_NOCONN, 0) + /// @throws CUDTException(MJ_AGAIN, MN_RDAVAIL, 0) + std::vector recv_WaitForReadReady(const std::vector& aliveMembers, std::set& w_broken); + // This is the sequence number of a packet that has been previously // delivered. Initially it should be set to SRT_SEQNO_NONE so that the sequence read // from the first delivering socket will be taken as a good deal.