diff --git a/srtcore/api.cpp b/srtcore/api.cpp index 56c581fec..31cbee673 100644 --- a/srtcore/api.cpp +++ b/srtcore/api.cpp @@ -1908,11 +1908,29 @@ int srt::CUDTUnited::close(const SRTSOCKET u) return 0; } #endif - CUDTSocket* s = locateSocket(u); - if (!s) - throw CUDTException(MJ_NOTSUP, MN_SIDINVAL, 0); +#if ENABLE_HEAVY_LOGGING + // Wrapping the log into a destructor so that it + // is printed AFTER the destructor of SocketKeeper. + struct ForceDestructor + { + CUDTSocket* ps; + ForceDestructor(): ps(NULL){} + ~ForceDestructor() + { + if (ps) // Could be not acquired by SocketKeeper, occasionally + { + HLOGC(smlog.Debug, log << "CUDTUnited::close/end: @" << ps->m_SocketID << " busy=" << ps->isStillBusy()); + } + } + } fod; +#endif + + SocketKeeper k(*this, u, ERH_THROW); + IF_HEAVY_LOGGING(fod.ps = k.socket); + HLOGC(smlog.Debug, log << "CUDTUnited::close/begin: @" << u << " busy=" << k.socket->isStillBusy()); + int ret = close(k.socket); - return close(s); + return ret; } #if ENABLE_BONDING @@ -2541,6 +2559,45 @@ srt::CUDTGroup* srt::CUDTUnited::acquireSocketsGroup(CUDTSocket* s) } #endif +srt::CUDTSocket* srt::CUDTUnited::locateAcquireSocket(SRTSOCKET u, ErrorHandling erh) +{ + ScopedLock cg(m_GlobControlLock); + + CUDTSocket* s = locateSocket_LOCKED(u); + if (!s) + { + if (erh == ERH_THROW) + throw CUDTException(MJ_NOTSUP, MN_SIDINVAL, 0); + return NULL; + } + + s->apiAcquire(); + return s; +} + +bool srt::CUDTUnited::acquireSocket(CUDTSocket* s) +{ + // Note that before using this function you must be certain + // that the socket isn't broken already and it still has at least + // one more GC cycle to live. In other words, you must be certain + // that this pointer passed here isn't dangling and was obtained + // directly from m_Sockets, or even better, has been acquired + // by some other functionality already, which is only about to + // be released earlier than you need. + ScopedLock cg(m_GlobControlLock); + s->apiAcquire(); + // Keep the lock so that no one changes anything in the meantime. + // If the socket m_Status == SRTS_CLOSED (set by setClosed()), then + // this socket is no longer present in the m_Sockets container + if (s->m_Status >= SRTS_BROKEN) + { + s->apiRelease(); + return false; + } + + return true; +} + srt::CUDTSocket* srt::CUDTUnited::locatePeer(const sockaddr_any& peer, const SRTSOCKET id, int32_t isn) { ScopedLock cg(m_GlobControlLock); @@ -2607,7 +2664,7 @@ void srt::CUDTUnited::checkBrokenSockets() if (s->m_Status == SRTS_LISTENING) { - const steady_clock::duration elapsed = steady_clock::now() - s->m_tsClosureTimeStamp; + const steady_clock::duration elapsed = steady_clock::now() - s->m_tsClosureTimeStamp.load(); // A listening socket should wait an extra 3 seconds // in case a client is connecting. if (elapsed < milliseconds_from(CUDT::COMM_CLOSE_BROKEN_LISTENER_TIMEOUT_MS)) @@ -2666,6 +2723,13 @@ void srt::CUDTUnited::checkBrokenSockets() for (sockets_t::iterator j = m_ClosedSockets.begin(); j != m_ClosedSockets.end(); ++j) { CUDTSocket* ps = j->second; + + if (ps->isStillBusy()) + { + HLOGC(smlog.Debug, log << "checkBrokenSockets: @" << ps->m_SocketID << " is still busy, SKIPPING THIS CYCLE."); + continue; + } + CUDT& u = ps->core(); // HLOGC(smlog.Debug, log << "checking CLOSED socket: " << j->first); @@ -2685,7 +2749,7 @@ void srt::CUDTUnited::checkBrokenSockets() // timeout 1 second to destroy a socket AND it has been removed from // RcvUList const steady_clock::time_point now = steady_clock::now(); - const steady_clock::duration closed_ago = now - ps->m_tsClosureTimeStamp; + const steady_clock::duration closed_ago = now - ps->m_tsClosureTimeStamp.load(); if (closed_ago > seconds_from(1)) { CRNode* rnode = u.m_pRNode; @@ -2735,6 +2799,14 @@ void srt::CUDTUnited::removeSocket(const SRTSOCKET u) if (rn && rn->m_bOnList) return; + if (s->isStillBusy()) + { + HLOGC(smlog.Debug, log << "@" << s->m_SocketID << " is still busy, NOT deleting"); + return; + } + + LOGC(smlog.Note, log << "@" << s->m_SocketID << " busy=" << s->isStillBusy()); + #if ENABLE_BONDING if (s->m_GroupOf) { diff --git a/srtcore/api.h b/srtcore/api.h index 9ba77d23a..c194b7333 100644 --- a/srtcore/api.h +++ b/srtcore/api.h @@ -123,6 +123,18 @@ class CUDTSocket void construct(); +private: + srt::sync::atomic m_iBusy; +public: + void apiAcquire() { ++m_iBusy; } + void apiRelease() { --m_iBusy; } + + int isStillBusy() + { + return m_iBusy; + } + + SRT_ATTR_GUARDED_BY(m_ControlLock) sync::atomic m_Status; //< current socket state @@ -131,7 +143,8 @@ class CUDTSocket /// of sockets in order to prevent other methods from accessing invalid address. /// A timer is started and the socket will be removed after approximately /// 1 second (see CUDTUnited::checkBrokenSockets()). - sync::steady_clock::time_point m_tsClosureTimeStamp; + //sync::steady_clock::time_point m_tsClosureTimeStamp; + sync::AtomicClock m_tsClosureTimeStamp; sockaddr_any m_SelfAddr; //< local address of the socket sockaddr_any m_PeerAddr; //< peer address of the socket @@ -442,8 +455,50 @@ class CUDTUnited } } }; - #endif + + CUDTSocket* locateAcquireSocket(SRTSOCKET u, ErrorHandling erh = ERH_RETURN); + bool acquireSocket(CUDTSocket* s); + +public: + struct SocketKeeper + { + CUDTSocket* socket; + + SocketKeeper(): socket(NULL) {} + + // This is intended for API functions to lock the socket's existence + // for the lifetime of their call. + SocketKeeper(CUDTUnited& glob, SRTSOCKET id, ErrorHandling erh = ERH_RETURN) { socket = glob.locateAcquireSocket(id, erh); } + + // This is intended for TSBPD thread that should lock the socket's + // existence until it exits. + SocketKeeper(CUDTUnited& glob, CUDTSocket* s) + { + acquire(glob, s); + } + + // Note: acquire doesn't check if the keeper already keeps anything. + // This is only for a use together with an empty constructor. + bool acquire(CUDTUnited& glob, CUDTSocket* s) + { + bool caught = glob.acquireSocket(s); + socket = caught ? s : NULL; + return caught; + } + + ~SocketKeeper() + { + if (socket) + { + SRT_ASSERT(socket->isStillBusy() > 0); + socket->apiRelease(); + } + } + }; + +private: + void updateMux(CUDTSocket* s, const sockaddr_any& addr, const UDPSOCKET* = NULL); bool updateListenerMux(CUDTSocket* s, const CUDTSocket* ls); diff --git a/srtcore/core.cpp b/srtcore/core.cpp index 1612830e7..28b3c91b0 100644 --- a/srtcore/core.cpp +++ b/srtcore/core.cpp @@ -7121,7 +7121,7 @@ int srt::CUDT::receiveMessage(char* data, int len, SRT_MSGCTRL& w_mctrl, int by_ do { - if (stillConnected() && !timeout && !m_pRcvBuffer->isRcvDataReady(steady_clock::now())) + if (stillConnected() && !timeout && !isRcvBufferReady()) { /* Kick TsbPd thread to schedule next wakeup (if running) */ if (m_bTsbPd) @@ -8723,11 +8723,14 @@ void srt::CUDT::processCtrlAckAck(const CPacket& ctrlpkt, const time_point& tsAr // srt_recvfile (which doesn't make any sense), you'll have a deadlock. if (m_config.bDriftTracer) { + //enterCS(m_RcvBufferLock); + #if ENABLE_BONDING - ScopedLock glock(uglobal().m_GlobControlLock); + ScopedLock glock(uglobal().m_GlobControlLock); // XXX not too excessive? const bool drift_updated = #endif m_pRcvBuffer->addRcvTsbPdDriftSample(ctrlpkt.getMsgTimeStamp(), tsArrival, rtt); + //leaveCS(m_RcvBufferLock); #if ENABLE_BONDING if (drift_updated && m_parent->m_GroupOf) @@ -9993,10 +9996,12 @@ int srt::CUDT::checkLazySpawnTsbPdThread() { const bool need_tsbpd = m_bTsbPd || m_bGroupTsbPd; - if (need_tsbpd && !m_RcvTsbPdThread.joinable()) - { - ScopedLock lock(m_RcvTsbPdStartupLock); + if (!need_tsbpd) + return 0; + ScopedLock lock(m_RcvTsbPdStartupLock); + if (!m_RcvTsbPdThread.joinable()) + { if (m_bClosing) // Check again to protect join() in CUDT::releaseSync() return -1; @@ -11701,17 +11706,20 @@ void srt::CUDT::completeBrokenConnectionDependencies(int errorcode) // Bound to one call because this requires locking pg->updateFailedLink(); } + // Sockets that never succeeded to connect must be deleted + // explicitly, otherwise they will never be deleted. OTOH + // the socket can be on the path of deletion already, so + // this only makes sure that the socket will be deleted, + // one way or another. + if (pending_broken) + { + // XXX This somehow can cause a deadlock + // uglobal()->close(m_parent); + LOGC(smlog.Debug, log << "updateBrokenConnection...: BROKEN SOCKET @" << m_SocketID << " - CLOSING, to be removed from group."); + m_parent->setBrokenClosed(); + } } - // Sockets that never succeeded to connect must be deleted - // explicitly, otherwise they will never be deleted. - if (pending_broken) - { - // XXX This somehow can cause a deadlock - // uglobal()->close(m_parent); - LOGC(smlog.Debug, log << "updateBrokenConnection...: BROKEN SOCKET @" << m_SocketID << " - CLOSING, to be removed from group."); - m_parent->setBrokenClosed(); - } #endif } diff --git a/srtcore/queue.cpp b/srtcore/queue.cpp index 98999a81f..558b1b187 100644 --- a/srtcore/queue.cpp +++ b/srtcore/queue.cpp @@ -576,6 +576,13 @@ void* srt::CSndQueue::worker(void* param) continue; } + CUDTUnited::SocketKeeper sk (CUDT::uglobal(), u->id()); + if (!sk.socket) + { + HLOGC(qslog.Debug, log << "Socket to be processed was deleted in the meantime, not packing"); + continue; + } + // pack a packet from the socket CPacket pkt; steady_clock::time_point next_send_time; @@ -588,7 +595,6 @@ void* srt::CSndQueue::worker(void* param) IF_DEBUG_HIGHRATE(self->m_WorkerStats.lNotReadyPop++); continue; } - const sockaddr_any addr = u->m_PeerAddr; if (!is_zero(next_send_time)) self->m_pSndUList->update(u, CSndUList::DO_RESCHEDULE, next_send_time); @@ -931,6 +937,16 @@ void srt::CRendezvousQueue::updateConnStatus(EReadStatus rst, EConnectStatus cst EReadStatus read_st = rst; EConnectStatus conn_st = cst; + CUDTUnited::SocketKeeper sk (CUDT::uglobal(), i->id); + if (!sk.socket) + { + // Socket deleted already, so stop this and proceed to the next loop. + LOGC(cnlog.Error, log << "updateConnStatus: IPE: socket @" << i->id << " already closed, proceed to only removal from lists"); + toRemove.push_back(*i); + continue; + } + + if (cst != CONN_RENDEZVOUS && dest_id != 0) { if (i->id != dest_id) @@ -976,14 +992,22 @@ void srt::CRendezvousQueue::updateConnStatus(EReadStatus rst, EConnectStatus cst for (vector::iterator i = toRemove.begin(); i != toRemove.end(); ++i) { HLOGC(cnlog.Debug, log << "updateConnStatus: COMPLETING dep objects update on failed @" << i->id); - // + remove(i->id); + + CUDTUnited::SocketKeeper sk (CUDT::uglobal(), i->id); + if (!sk.socket) + { + // This actually shall never happen, so it's a kind of paranoid check. + LOGC(cnlog.Error, log << "updateConnStatus: IPE: socket @" << i->id << " already closed, NOT ACCESSING its contents"); + continue; + } + // Setting m_bConnecting to false, and need to remove the socket from the rendezvous queue // because the next CUDT::close will not remove it from the queue when m_bConnecting = false, // and may crash on next pass. // // TODO: maybe lock i->u->m_ConnectionLock? i->u->m_bConnecting = false; - remove(i->u->m_SocketID); // DO NOT close the socket here because in this case it might be // unable to get status from at the right moment. Also only member @@ -994,6 +1018,11 @@ void srt::CRendezvousQueue::updateConnStatus(EReadStatus rst, EConnectStatus cst CUDT::uglobal().m_EPoll.update_events( i->u->m_SocketID, i->u->m_sPollID, SRT_EPOLL_IN | SRT_EPOLL_OUT | SRT_EPOLL_ERR, true); + // Make sure that the socket wasn't deleted in the meantime. + // Skip this part if it was. Note also that if the socket was + // decided to be deleted, it's already moved to m_ClosedSockets + // and should have been therefore already processed for deletion. + i->u->completeBrokenConnectionDependencies(i->errorcode); }