Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core][apps] Implemented the socket close reason feature #2747

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
151 changes: 137 additions & 14 deletions srtcore/api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,14 +117,14 @@ SRT_SOCKSTATUS srt::CUDTSocket::getStatus()
}

// [[using locked(m_GlobControlLock)]]
void srt::CUDTSocket::breakSocket_LOCKED()
void srt::CUDTSocket::breakSocket_LOCKED(int reason)
{
// This function is intended to be called from GC,
// under a lock of m_GlobControlLock.
m_UDT.m_bBroken = true;
m_UDT.m_iBrokenCounter = 0;
HLOGC(smlog.Debug, log << "@" << m_SocketID << " CLOSING AS SOCKET");
m_UDT.closeInternal();
m_UDT.closeInternal(reason);
setClosed();
}

Expand Down Expand Up @@ -813,7 +813,7 @@ int srt::CUDTUnited::newConnection(const SRTSOCKET listen,
#endif

SRTSOCKET id = ns->m_SocketID;
ns->core().closeInternal();
ns->core().closeInternal(SRT_CLS_LATE);
ns->setClosed();

// The mapped socket should be now unmapped to preserve the situation that
Expand Down Expand Up @@ -911,6 +911,37 @@ SRT_SOCKSTATUS srt::CUDTUnited::getStatus(const SRTSOCKET u)
return i->second->getStatus();
}

int srt::CUDTUnited::getCloseReason(const SRTSOCKET u, SRT_CLOSE_INFO& info)
{
// protects the m_Sockets structure
ScopedLock cg(m_GlobControlLock);

// We need to search for the socket in:
// m_Sockets, if it is somehow still alive,
// m_ClosedSockets, if it's when it should be,
// m_ClosedDatabase, if it has been already garbage-collected and deleted.

sockets_t::const_iterator i = m_Sockets.find(u);
if (i != m_Sockets.end())
{
i->second->core().copyCloseInfo((info));
return 0;
}

i = m_ClosedSockets.find(u);
if (i != m_ClosedSockets.end())
{
i->second->core().copyCloseInfo((info));
}

map<SRTSOCKET, CloseInfo>::iterator c = m_ClosedDatabase.find(u);
if (c == m_ClosedDatabase.end())
return -1;

info = c->second.info;
return 0;
}

int srt::CUDTUnited::bind(CUDTSocket* s, const sockaddr_any& name)
{
ScopedLock cg(s->m_ControlLock);
Expand Down Expand Up @@ -1811,7 +1842,7 @@ int srt::CUDTUnited::groupConnect(CUDTGroup* pg, SRT_SOCKGROUPCONFIG* targets, i
continue;

// This will also automatically remove it from the group and all eids
close(s);
close(s, SRT_CLS_INTERNAL);
}

// There's no possibility to report a problem on every connection
Expand Down Expand Up @@ -1895,7 +1926,7 @@ int srt::CUDTUnited::connectIn(CUDTSocket* s, const sockaddr_any& target_addr, i
return 0;
}

int srt::CUDTUnited::close(const SRTSOCKET u)
int srt::CUDTUnited::close(const SRTSOCKET u, int reason)
{
#if ENABLE_BONDING
if (u & SRTGROUP_MASK)
Expand All @@ -1910,7 +1941,7 @@ int srt::CUDTUnited::close(const SRTSOCKET u)
if (!s)
throw CUDTException(MJ_NOTSUP, MN_SIDINVAL, 0);

return close(s);
return close(s, reason);
}

#if ENABLE_BONDING
Expand Down Expand Up @@ -1961,7 +1992,54 @@ void srt::CUDTUnited::deleteGroup_LOCKED(CUDTGroup* g)
}
#endif

int srt::CUDTUnited::close(CUDTSocket* s)
// [[using locked(m_GlobControlLock)]]
void srt::CUDTUnited::recordCloseReason(CUDTSocket* s)
{
CloseInfo ci;
ci.info.agent = SRT_CLOSE_REASON(s->core().m_AgentCloseReason.load());
ci.info.peer = SRT_CLOSE_REASON(s->core().m_PeerCloseReason.load());
ci.info.time = s->core().m_CloseTimeStamp.load().time_since_epoch().count();

m_ClosedDatabase[s->m_SocketID] = ci;

// As a DOS attack prevention, do not allow to keep more than 10 records.
// In a normal functioning of the application this shouldn't be necessary,
// but it is still needed that a record of a dead socket is kept for
// 10 gc cycles more to ensure that the application can obtain it even after
// the socket has been physically removed. But if we don't limit the number
// of these records, this could be vulnerable for DOS attack if the user
// forces the application to create and close SRT sockets very quickly.
// Hence remove the oldest record, which can be recognized from the `time`
// field, if the number of records exceeds 10.
if (m_ClosedDatabase.size() > MAX_CLOSE_RECORD_SIZE)
{
// remove the oldest one
// This can only be done by collecting all time info
map<int32_t, SRTSOCKET> which;

for (map<SRTSOCKET, CloseInfo>::iterator x = m_ClosedDatabase.begin();
x != m_ClosedDatabase.end(); ++x)
{
which[x->second.info.time] = x->first;
}

map<int32_t, SRTSOCKET>::iterator y = which.begin();
size_t ntodel = m_ClosedDatabase.size() - MAX_CLOSE_RECORD_SIZE;
for (size_t i = 0; i < ntodel; ++i)
{
// Sanity check - should never happen because it's unlikely
// that two different sockets were closed exactly at the same
// nanosecond time.
if (y == which.end())
break;

m_ClosedDatabase.erase(y->second);
++y;
}
}
}

int srt::CUDTUnited::close(CUDTSocket* s, int reason)
{
HLOGC(smlog.Debug, log << s->core().CONID() << "CLOSE. Acquiring control lock");
ScopedLock socket_cg(s->m_ControlLock);
Expand Down Expand Up @@ -1994,6 +2072,8 @@ int srt::CUDTUnited::close(CUDTSocket* s)

// broadcast all "accept" waiting
CSync::lock_notify_all(s->m_AcceptCond, s->m_AcceptLock);

s->core().setAgentCloseReason(reason);
}
else
{
Expand All @@ -2003,7 +2083,7 @@ int srt::CUDTUnited::close(CUDTSocket* s)
// may block INDEFINITELY. As long as it's acceptable to block the
// call to srt_close(), and all functions in all threads where this
// very socket is used, this shall not block the central database.
s->core().closeInternal();
s->core().closeInternal(reason);

// synchronize with garbage collection.
HLOGC(smlog.Debug,
Expand Down Expand Up @@ -2038,6 +2118,8 @@ int srt::CUDTUnited::close(CUDTSocket* s)
}
#endif

recordCloseReason(s);

m_Sockets.erase(s->m_SocketID);
m_ClosedSockets[s->m_SocketID] = s;
HLOGC(smlog.Debug, log << "@" << u << "U::close: Socket MOVED TO CLOSED for collecting later.");
Expand Down Expand Up @@ -2639,6 +2721,12 @@ void srt::CUDTUnited::checkBrokenSockets()

HLOGC(smlog.Debug, log << "checkBrokenSockets: moving BROKEN socket to CLOSED: @" << i->first);

// Note that this will not override the value that has been already
// set by some other functionality, only set it when not yet set.
s->core().setAgentCloseReason(SRT_CLS_INTERNAL);

recordCloseReason(s);

// close broken connections and start removal timer
s->setClosed();
tbc.push_back(i->first);
Expand Down Expand Up @@ -2757,7 +2845,7 @@ void srt::CUDTUnited::removeSocket(const SRTSOCKET u)

CUDTSocket* as = si->second;

as->breakSocket_LOCKED();
as->breakSocket_LOCKED(SRT_CLS_DEADLSN);
m_ClosedSockets[*q] = as;
m_Sockets.erase(*q);
}
Expand All @@ -2783,7 +2871,7 @@ void srt::CUDTUnited::removeSocket(const SRTSOCKET u)
m_ClosedSockets.erase(i);

HLOGC(smlog.Debug, log << "GC/removeSocket: closing associated UDT @" << u);
s->core().closeInternal();
s->core().closeInternal(SRT_CLS_INTERNAL);
HLOGC(smlog.Debug, log << "GC/removeSocket: DELETING SOCKET @" << u);
delete s;
HLOGC(smlog.Debug, log << "GC/removeSocket: socket @" << u << " DELETED. Checking muxer.");
Expand Down Expand Up @@ -2823,6 +2911,31 @@ void srt::CUDTUnited::removeSocket(const SRTSOCKET u)
}
}

void srt::CUDTUnited::checkTemporaryDatabases()
{
ScopedLock cg(m_GlobControlLock);

// It's not very efficient to collect first the keys of all
// elements to remove and then remove from the map by key.

// In C++20 this is possible by doing
// m_ClosedDatabase.erase_if([](auto& c) { return --c.generation <= 0; });
// but nothing equivalent in the earlier standards.

vector<SRTSOCKET> expired;

for (map<SRTSOCKET, CloseInfo>::iterator c = m_ClosedDatabase.begin();
c != m_ClosedDatabase.end(); ++c)
{
--c->second.generation;
if (c->second.generation <= 0)
expired.push_back(c->first);
}

for (vector<SRTSOCKET>::iterator i = expired.begin(); i != expired.end(); ++i)
m_ClosedDatabase.erase(*i);
}

void srt::CUDTUnited::configureMuxer(CMultiplexer& w_m, const CUDTSocket* s, int af)
{
w_m.m_mcfg = s->core().m_config;
Expand Down Expand Up @@ -3278,25 +3391,35 @@ void* srt::CUDTUnited::garbageCollect(void* p)

UniqueLock gclock(self->m_GCStopLock);

// START LIBRARY RUNNING LOOP
while (!self->m_bClosing)
{
INCREMENT_THREAD_ITERATIONS();
self->checkBrokenSockets();
self->checkTemporaryDatabases();

HLOGC(inlog.Debug, log << "GC: sleep 1 s");
self->m_GCStopCond.wait_for(gclock, seconds_from(1));
}
// END.

// All the below code does the library cleanup, which should
// happen as a result of an application calling `srt_cleanup()`.

// remove all sockets and multiplexers
HLOGC(inlog.Debug, log << "GC: GLOBAL EXIT - releasing all pending sockets. Acquring control lock...");

{
ScopedLock glock(self->m_GlobControlLock);

// Do not do generative expiry removal - there's no chance
// anyone can extract the close reason information since this point on.
self->m_ClosedDatabase.clear();

for (sockets_t::iterator i = self->m_Sockets.begin(); i != self->m_Sockets.end(); ++i)
{
CUDTSocket* s = i->second;
s->breakSocket_LOCKED();
s->breakSocket_LOCKED(SRT_CLS_CLEANUP);

#if ENABLE_BONDING
if (s->m_GroupOf)
Expand Down Expand Up @@ -3705,11 +3828,11 @@ int srt::CUDT::connect(SRTSOCKET u, const sockaddr* name, int namelen, int32_t f
}
}

int srt::CUDT::close(SRTSOCKET u)
int srt::CUDT::close(SRTSOCKET u, int reason)
{
try
{
return uglobal().close(u);
return uglobal().close(u, reason);
}
catch (const CUDTException& e)
{
Expand Down Expand Up @@ -4366,7 +4489,7 @@ int connect(SRTSOCKET u, const struct sockaddr* name, int namelen)

int close(SRTSOCKET u)
{
return srt::CUDT::close(u);
return srt::CUDT::close(u, SRT_CLS_API);
}

int getpeername(SRTSOCKET u, struct sockaddr* name, int* namelen)
Expand Down
27 changes: 24 additions & 3 deletions srtcore/api.h
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ class CUDTSocket
/// from within the GC thread only (that is, only when
/// the socket should be no longer visible in the
/// connection, including for sending remaining data).
void breakSocket_LOCKED();
void breakSocket_LOCKED(int reason);

/// This makes the socket no longer capable of performing any transmission
/// operation, but continues to be responsive in the connection in order
Expand Down Expand Up @@ -233,6 +233,8 @@ class CUDTUnited

// Public constants
static const int32_t MAX_SOCKET_VAL = SRTGROUP_MASK - 1; // maximum value for a regular socket
static const int MAX_CLOSE_RECORD_TTL = 10;
static const size_t MAX_CLOSE_RECORD_SIZE = 10;

public:
enum ErrorHandling
Expand Down Expand Up @@ -295,8 +297,8 @@ class CUDTUnited
int groupConnect(CUDTGroup* g, SRT_SOCKGROUPCONFIG targets[], int arraysize);
int singleMemberConnect(CUDTGroup* g, SRT_SOCKGROUPCONFIG* target);
#endif
int close(const SRTSOCKET u);
int close(CUDTSocket* s);
int close(const SRTSOCKET u, int reason);
int close(CUDTSocket* s, int reason);
void getpeername(const SRTSOCKET u, sockaddr* name, int* namelen);
void getsockname(const SRTSOCKET u, sockaddr* name, int* namelen);
int select(UDT::UDSET* readfds, UDT::UDSET* writefds, UDT::UDSET* exceptfds, const timeval* timeout);
Expand Down Expand Up @@ -488,6 +490,25 @@ class CUDTUnited

CEPoll m_EPoll; // handling epoll data structures and events

struct CloseInfo
{
SRT_CLOSE_INFO info;
int generation;

// The value here defines how many GC rolls it takes
// to remove the record. As GC rolls every 1 second,
// this is more-less the number of seconds this record
// will be alive AFTER you close the socket.
CloseInfo(): generation(MAX_CLOSE_RECORD_TTL) {}
};
std::map<SRTSOCKET, CloseInfo> m_ClosedDatabase;

void checkTemporaryDatabases();
void recordCloseReason(CUDTSocket* s);

public:
int getCloseReason(const SRTSOCKET u, SRT_CLOSE_INFO& info);

private:
CUDTUnited(const CUDTUnited&);
CUDTUnited& operator=(const CUDTUnited&);
Expand Down
6 changes: 6 additions & 0 deletions srtcore/atomic_clock.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,12 @@ class AtomicClock
dur.store(uint64_t(d.time_since_epoch().count()));
}

void compare_exchange(const time_point_type& exp, const time_point_type& toset)
{
uint64_t val = exp.time_since_epoch().count();
dur.compare_exchange(val, toset.time_since_epoch().count());
}

AtomicClock& operator=(const time_point_type& s)
{
dur = s.time_since_epoch().count();
Expand Down