Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Added busy counter for sockets and various fixes for data race problems #2893

Open
wants to merge 16 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
84 changes: 78 additions & 6 deletions srtcore/api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1908,11 +1908,29 @@ int srt::CUDTUnited::close(const SRTSOCKET u)
return 0;
}
#endif
CUDTSocket* s = locateSocket(u);
if (!s)
throw CUDTException(MJ_NOTSUP, MN_SIDINVAL, 0);
#if ENABLE_HEAVY_LOGGING
// Wrapping the log into a destructor so that it
// is printed AFTER the destructor of SocketKeeper.
struct ForceDestructor
{
CUDTSocket* ps;
ForceDestructor(): ps(NULL){}
~ForceDestructor()
{
if (ps) // Could be not acquired by SocketKeeper, occasionally
{
HLOGC(smlog.Debug, log << "CUDTUnited::close/end: @" << ps->m_SocketID << " busy=" << ps->isStillBusy());
}
}
} fod;
#endif

SocketKeeper k(*this, u, ERH_THROW);
IF_HEAVY_LOGGING(fod.ps = k.socket);
HLOGC(smlog.Debug, log << "CUDTUnited::close/begin: @" << u << " busy=" << k.socket->isStillBusy());
int ret = close(k.socket);

return close(s);
return ret;
}

#if ENABLE_BONDING
Expand Down Expand Up @@ -2541,6 +2559,45 @@ srt::CUDTGroup* srt::CUDTUnited::acquireSocketsGroup(CUDTSocket* s)
}
#endif

srt::CUDTSocket* srt::CUDTUnited::locateAcquireSocket(SRTSOCKET u, ErrorHandling erh)
{
ScopedLock cg(m_GlobControlLock);

CUDTSocket* s = locateSocket_LOCKED(u);
if (!s)
{
if (erh == ERH_THROW)
throw CUDTException(MJ_NOTSUP, MN_SIDINVAL, 0);
return NULL;
}

s->apiAcquire();
return s;
}

bool srt::CUDTUnited::acquireSocket(CUDTSocket* s)
{
// Note that before using this function you must be certain
// that the socket isn't broken already and it still has at least
// one more GC cycle to live. In other words, you must be certain
// that this pointer passed here isn't dangling and was obtained
// directly from m_Sockets, or even better, has been acquired
// by some other functionality already, which is only about to
// be released earlier than you need.
ScopedLock cg(m_GlobControlLock);
s->apiAcquire();
// Keep the lock so that no one changes anything in the meantime.
// If the socket m_Status == SRTS_CLOSED (set by setClosed()), then
// this socket is no longer present in the m_Sockets container
if (s->m_Status >= SRTS_BROKEN)
{
s->apiRelease();
return false;
}

return true;
}

srt::CUDTSocket* srt::CUDTUnited::locatePeer(const sockaddr_any& peer, const SRTSOCKET id, int32_t isn)
{
ScopedLock cg(m_GlobControlLock);
Expand Down Expand Up @@ -2607,7 +2664,7 @@ void srt::CUDTUnited::checkBrokenSockets()

if (s->m_Status == SRTS_LISTENING)
{
const steady_clock::duration elapsed = steady_clock::now() - s->m_tsClosureTimeStamp;
const steady_clock::duration elapsed = steady_clock::now() - s->m_tsClosureTimeStamp.load();
// A listening socket should wait an extra 3 seconds
// in case a client is connecting.
if (elapsed < milliseconds_from(CUDT::COMM_CLOSE_BROKEN_LISTENER_TIMEOUT_MS))
Expand Down Expand Up @@ -2666,6 +2723,13 @@ void srt::CUDTUnited::checkBrokenSockets()
for (sockets_t::iterator j = m_ClosedSockets.begin(); j != m_ClosedSockets.end(); ++j)
{
CUDTSocket* ps = j->second;

if (ps->isStillBusy())
{
HLOGC(smlog.Debug, log << "checkBrokenSockets: @" << ps->m_SocketID << " is still busy, SKIPPING THIS CYCLE.");
continue;
}

CUDT& u = ps->core();

// HLOGC(smlog.Debug, log << "checking CLOSED socket: " << j->first);
Expand All @@ -2685,7 +2749,7 @@ void srt::CUDTUnited::checkBrokenSockets()
// timeout 1 second to destroy a socket AND it has been removed from
// RcvUList
const steady_clock::time_point now = steady_clock::now();
const steady_clock::duration closed_ago = now - ps->m_tsClosureTimeStamp;
const steady_clock::duration closed_ago = now - ps->m_tsClosureTimeStamp.load();
if (closed_ago > seconds_from(1))
{
CRNode* rnode = u.m_pRNode;
Expand Down Expand Up @@ -2735,6 +2799,14 @@ void srt::CUDTUnited::removeSocket(const SRTSOCKET u)
if (rn && rn->m_bOnList)
return;

if (s->isStillBusy())
{
HLOGC(smlog.Debug, log << "@" << s->m_SocketID << " is still busy, NOT deleting");
return;
}

LOGC(smlog.Note, log << "@" << s->m_SocketID << " busy=" << s->isStillBusy());

#if ENABLE_BONDING
if (s->m_GroupOf)
{
Expand Down
59 changes: 57 additions & 2 deletions srtcore/api.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,18 @@ class CUDTSocket

void construct();

private:
srt::sync::atomic<int> m_iBusy;
public:
void apiAcquire() { ++m_iBusy; }
void apiRelease() { --m_iBusy; }

int isStillBusy()
{
return m_iBusy;
}


SRT_ATTR_GUARDED_BY(m_ControlLock)
sync::atomic<SRT_SOCKSTATUS> m_Status; //< current socket state

Expand All @@ -131,7 +143,8 @@ class CUDTSocket
/// of sockets in order to prevent other methods from accessing invalid address.
/// A timer is started and the socket will be removed after approximately
/// 1 second (see CUDTUnited::checkBrokenSockets()).
sync::steady_clock::time_point m_tsClosureTimeStamp;
//sync::steady_clock::time_point m_tsClosureTimeStamp;
sync::AtomicClock<sync::steady_clock> m_tsClosureTimeStamp;

sockaddr_any m_SelfAddr; //< local address of the socket
sockaddr_any m_PeerAddr; //< peer address of the socket
Expand Down Expand Up @@ -442,8 +455,50 @@ class CUDTUnited
}
}
};

#endif

CUDTSocket* locateAcquireSocket(SRTSOCKET u, ErrorHandling erh = ERH_RETURN);
bool acquireSocket(CUDTSocket* s);

public:
struct SocketKeeper
{
CUDTSocket* socket;

SocketKeeper(): socket(NULL) {}

// This is intended for API functions to lock the socket's existence
// for the lifetime of their call.
SocketKeeper(CUDTUnited& glob, SRTSOCKET id, ErrorHandling erh = ERH_RETURN) { socket = glob.locateAcquireSocket(id, erh); }

// This is intended for TSBPD thread that should lock the socket's
// existence until it exits.
SocketKeeper(CUDTUnited& glob, CUDTSocket* s)
{
acquire(glob, s);
}

// Note: acquire doesn't check if the keeper already keeps anything.
// This is only for a use together with an empty constructor.
bool acquire(CUDTUnited& glob, CUDTSocket* s)
{
bool caught = glob.acquireSocket(s);
socket = caught ? s : NULL;
return caught;
}

~SocketKeeper()
{
if (socket)
{
SRT_ASSERT(socket->isStillBusy() > 0);
socket->apiRelease();
}
}
};

private:

void updateMux(CUDTSocket* s, const sockaddr_any& addr, const UDPSOCKET* = NULL);
bool updateListenerMux(CUDTSocket* s, const CUDTSocket* ls);

Expand Down
36 changes: 22 additions & 14 deletions srtcore/core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7121,7 +7121,7 @@

do
{
if (stillConnected() && !timeout && !m_pRcvBuffer->isRcvDataReady(steady_clock::now()))
if (stillConnected() && !timeout && !isRcvBufferReady())
{
/* Kick TsbPd thread to schedule next wakeup (if running) */
if (m_bTsbPd)
Expand Down Expand Up @@ -8723,11 +8723,14 @@
// srt_recvfile (which doesn't make any sense), you'll have a deadlock.
if (m_config.bDriftTracer)
{
//enterCS(m_RcvBufferLock);

Check notice

Code scanning / CodeQL

Commented-out code Note

This comment appears to contain commented-out code.

#if ENABLE_BONDING
ScopedLock glock(uglobal().m_GlobControlLock);
ScopedLock glock(uglobal().m_GlobControlLock); // XXX not too excessive?
const bool drift_updated =
#endif
m_pRcvBuffer->addRcvTsbPdDriftSample(ctrlpkt.getMsgTimeStamp(), tsArrival, rtt);
//leaveCS(m_RcvBufferLock);

Check notice

Code scanning / CodeQL

Commented-out code Note

This comment appears to contain commented-out code.

#if ENABLE_BONDING
if (drift_updated && m_parent->m_GroupOf)
Expand Down Expand Up @@ -9993,10 +9996,12 @@
{
const bool need_tsbpd = m_bTsbPd || m_bGroupTsbPd;

if (need_tsbpd && !m_RcvTsbPdThread.joinable())
{
ScopedLock lock(m_RcvTsbPdStartupLock);
if (!need_tsbpd)
return 0;

ScopedLock lock(m_RcvTsbPdStartupLock);
if (!m_RcvTsbPdThread.joinable())
{
if (m_bClosing) // Check again to protect join() in CUDT::releaseSync()
return -1;

Expand Down Expand Up @@ -11701,17 +11706,20 @@
// Bound to one call because this requires locking
pg->updateFailedLink();
}
// Sockets that never succeeded to connect must be deleted
// explicitly, otherwise they will never be deleted. OTOH
// the socket can be on the path of deletion already, so
// this only makes sure that the socket will be deleted,
// one way or another.
if (pending_broken)
{
// XXX This somehow can cause a deadlock
// uglobal()->close(m_parent);
LOGC(smlog.Debug, log << "updateBrokenConnection...: BROKEN SOCKET @" << m_SocketID << " - CLOSING, to be removed from group.");
m_parent->setBrokenClosed();
}
}

// Sockets that never succeeded to connect must be deleted
// explicitly, otherwise they will never be deleted.
if (pending_broken)
{
// XXX This somehow can cause a deadlock
// uglobal()->close(m_parent);
LOGC(smlog.Debug, log << "updateBrokenConnection...: BROKEN SOCKET @" << m_SocketID << " - CLOSING, to be removed from group.");
m_parent->setBrokenClosed();
}
#endif
}

Expand Down
35 changes: 32 additions & 3 deletions srtcore/queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -576,6 +576,13 @@ void* srt::CSndQueue::worker(void* param)
continue;
}

CUDTUnited::SocketKeeper sk (CUDT::uglobal(), u->id());
if (!sk.socket)
{
HLOGC(qslog.Debug, log << "Socket to be processed was deleted in the meantime, not packing");
continue;
}

// pack a packet from the socket
CPacket pkt;
steady_clock::time_point next_send_time;
Expand All @@ -588,7 +595,6 @@ void* srt::CSndQueue::worker(void* param)
IF_DEBUG_HIGHRATE(self->m_WorkerStats.lNotReadyPop++);
continue;
}

const sockaddr_any addr = u->m_PeerAddr;
if (!is_zero(next_send_time))
self->m_pSndUList->update(u, CSndUList::DO_RESCHEDULE, next_send_time);
Expand Down Expand Up @@ -931,6 +937,16 @@ void srt::CRendezvousQueue::updateConnStatus(EReadStatus rst, EConnectStatus cst
EReadStatus read_st = rst;
EConnectStatus conn_st = cst;

CUDTUnited::SocketKeeper sk (CUDT::uglobal(), i->id);
if (!sk.socket)
{
// Socket deleted already, so stop this and proceed to the next loop.
LOGC(cnlog.Error, log << "updateConnStatus: IPE: socket @" << i->id << " already closed, proceed to only removal from lists");
toRemove.push_back(*i);
continue;
}


if (cst != CONN_RENDEZVOUS && dest_id != 0)
{
if (i->id != dest_id)
Expand Down Expand Up @@ -976,14 +992,22 @@ void srt::CRendezvousQueue::updateConnStatus(EReadStatus rst, EConnectStatus cst
for (vector<LinkStatusInfo>::iterator i = toRemove.begin(); i != toRemove.end(); ++i)
{
HLOGC(cnlog.Debug, log << "updateConnStatus: COMPLETING dep objects update on failed @" << i->id);
//
remove(i->id);

CUDTUnited::SocketKeeper sk (CUDT::uglobal(), i->id);
if (!sk.socket)
{
// This actually shall never happen, so it's a kind of paranoid check.
LOGC(cnlog.Error, log << "updateConnStatus: IPE: socket @" << i->id << " already closed, NOT ACCESSING its contents");
continue;
}

// Setting m_bConnecting to false, and need to remove the socket from the rendezvous queue
// because the next CUDT::close will not remove it from the queue when m_bConnecting = false,
// and may crash on next pass.
//
// TODO: maybe lock i->u->m_ConnectionLock?
i->u->m_bConnecting = false;
remove(i->u->m_SocketID);

// DO NOT close the socket here because in this case it might be
// unable to get status from at the right moment. Also only member
Expand All @@ -994,6 +1018,11 @@ void srt::CRendezvousQueue::updateConnStatus(EReadStatus rst, EConnectStatus cst
CUDT::uglobal().m_EPoll.update_events(
i->u->m_SocketID, i->u->m_sPollID, SRT_EPOLL_IN | SRT_EPOLL_OUT | SRT_EPOLL_ERR, true);

// Make sure that the socket wasn't deleted in the meantime.
// Skip this part if it was. Note also that if the socket was
// decided to be deleted, it's already moved to m_ClosedSockets
// and should have been therefore already processed for deletion.

i->u->completeBrokenConnectionDependencies(i->errorcode);
}

Expand Down