Skip to content

Commit

Permalink
[core] Fixed C++11 build (win)
Browse files Browse the repository at this point in the history
  • Loading branch information
maxsharabayko committed May 12, 2020
1 parent d1288ec commit 76b1077
Show file tree
Hide file tree
Showing 6 changed files with 38 additions and 22 deletions.
4 changes: 3 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -489,7 +489,9 @@ if (ENABLE_CODE_COVERAGE)
endif()
endif()

if (PTHREAD_LIBRARY AND PTHREAD_INCLUDE_DIR)
if (ENABLE_STDCXX_SYNC)
message(STATUS "Pthread library: C++11")
elseif (PTHREAD_LIBRARY AND PTHREAD_INCLUDE_DIR)
message(STATUS "Pthread library: ${PTHREAD_LIBRARY}")
message(STATUS "Pthread include dir: ${PTHREAD_INCLUDE_DIR}")
elseif (MICROSOFT)
Expand Down
2 changes: 1 addition & 1 deletion srtcore/core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8463,7 +8463,7 @@ void CUDT::processCtrl(const CPacket &ctrlpkt)

case UMSG_CGWARNING: // 100 - Delay Warning
// One way packet delay is increasing, so decrease the sending rate
m_tdSendInterval *= 1.125;
m_tdSendInterval = (m_tdSendInterval * 1125) / 1000;
// XXX Note as interesting fact: this is only prepared for handling,
// but nothing in the code is sending this message. Probably predicted
// for a custom congctl. There's a predicted place to call it under
Expand Down
20 changes: 10 additions & 10 deletions srtcore/queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -463,8 +463,7 @@ void CSndUList::remove_(const CUDT* u)

//
CSndQueue::CSndQueue()
: m_WorkerThread()
, m_pSndUList(NULL)
: m_pSndUList(NULL)
, m_pChannel(NULL)
, m_pTimer(NULL)
, m_WindowCond()
Expand Down Expand Up @@ -1055,10 +1054,11 @@ CRcvQueue::CRcvQueue()
CRcvQueue::~CRcvQueue()
{
m_bClosing = true;
if (!pthread_equal(m_WorkerThread, pthread_t()))

if (m_WorkerThread.joinable())
{
HLOGC(mglog.Debug, log << "RcvQueue: EXIT");
pthread_join(m_WorkerThread, NULL);
m_WorkerThread.join();
}
releaseCond(m_BufferCond);

Expand Down Expand Up @@ -1101,13 +1101,13 @@ void CRcvQueue::init(int qsize, int payload, int version, int hsize, CChannel *c

#if ENABLE_LOGGING
++m_counter;
std::string thrname = "SRT:RcvQ:w" + Sprint(m_counter);
ThreadName tn(thrname.c_str());
const std::string thrname = "SRT:RcvQ:w" + Sprint(m_counter);
#else
const std::string thrname = "SRT:RcvQ:w";
#endif

if (0 != pthread_create(&m_WorkerThread, NULL, CRcvQueue::worker, this))
if (!StartThread(m_WorkerThread, CRcvQueue::worker, this, thrname.c_str()))
{
m_WorkerThread = pthread_t();
throw CUDTException(MJ_SYSTEMRES, MN_THREAD);
}
}
Expand Down Expand Up @@ -1529,15 +1529,15 @@ void CRcvQueue::stopWorker()
m_bClosing = true;

// Sanity check of the function's affinity.
if (pthread_equal(pthread_self(), m_WorkerThread))
if (CheckIfThisThread(m_WorkerThread))
{
LOGC(mglog.Error, log << "IPE: RcvQ:WORKER TRIES TO CLOSE ITSELF!");
return; // do nothing else, this would cause a hangup or crash.
}

HLOGC(mglog.Debug, log << "RcvQueue: EXIT (forced)");
// And we trust the thread that it does.
pthread_join(m_WorkerThread, NULL);
m_WorkerThread.join();
}

int CRcvQueue::recvfrom(int32_t id, CPacket& w_packet)
Expand Down
10 changes: 2 additions & 8 deletions srtcore/queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,15 +108,12 @@ class CUnitQueue

CUnit* getNextAvailUnit();


void makeUnitFree(CUnit * unit);

void makeUnitGood(CUnit * unit);

public:

inline int getIPversion() const { return m_iIPversion; }

inline int getIPversion() const { return m_iIPversion; }

private:
struct CQEntry
Expand Down Expand Up @@ -471,9 +468,6 @@ friend class CUDTUnited;

int recvfrom(int32_t id, CPacket& to_packet);

// Needed for affinity check (debug only)
pthread_t threadId() { return m_WorkerThread; }

void stopWorker();

void setClosing()
Expand All @@ -483,7 +477,7 @@ friend class CUDTUnited;

private:
static void* worker(void* param);
pthread_t m_WorkerThread;
srt::sync::CThread m_WorkerThread;
// Subroutines of worker
EReadStatus worker_RetrieveUnit(int32_t& id, CUnit*& unit, sockaddr_any& sa);
EConnectStatus worker_ProcessConnectionRequest(CUnit* unit, const sockaddr_any& sa);
Expand Down
9 changes: 9 additions & 0 deletions srtcore/sync.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -673,6 +673,15 @@ bool StartThread(CThread& th, void* (*f) (void*), void* args, const char* name)
return true;
}

bool CheckIfThisThread(const CThread& th)
{
#if ENABLE_STDCXX_SYNC
return this_thread::get_id() == th.get_id();
#else
return pthread_equal(pthread_self(), th.get_id()) != 0;
#endif
}

} // namespace sync
} // namespace srt

Expand Down
15 changes: 13 additions & 2 deletions srtcore/sync.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
#ifdef ENABLE_STDCXX_SYNC
#include <chrono>
#include <thread>
#include <mutex>
#include <condition_variable>
#else
#include <pthread.h>
#endif
Expand Down Expand Up @@ -75,13 +77,14 @@ class Duration
inline bool operator<(const Duration& rhs) const { return m_duration < rhs.m_duration; }

public: // Assignment operators
inline void operator*=(const double mult) { m_duration = static_cast<int64_t>(m_duration * mult); }
inline void operator*=(const int64_t mult) { m_duration = static_cast<int64_t>(m_duration * mult); }
inline void operator+=(const Duration& rhs) { m_duration += rhs.m_duration; }
inline void operator-=(const Duration& rhs) { m_duration -= rhs.m_duration; }

inline Duration operator+(const Duration& rhs) const { return Duration(m_duration + rhs.m_duration); }
inline Duration operator-(const Duration& rhs) const { return Duration(m_duration - rhs.m_duration); }
inline Duration operator*(const int& rhs) const { return Duration(m_duration * rhs); }
inline Duration operator*(const int64_t& rhs) const { return Duration(m_duration * rhs); }
inline Duration operator/(const int64_t& rhs) const { return Duration(m_duration / rhs); }

private:
// int64_t range is from -9,223,372,036,854,775,808 to 9,223,372,036,854,775,807
Expand Down Expand Up @@ -753,6 +756,10 @@ class CThread
/// is still considered an active thread of execution and is therefore joinable.
bool joinable() const;

/// Returns the id of the current thread.
/// In this implementation the ID is the pthread_t.
const pthread_t get_id() const { return m_thread; }

public:
/// Blocks the current thread until the thread identified by *this finishes its execution.
/// If that thread has already terminated, then join() returns immediately.
Expand Down Expand Up @@ -786,6 +793,10 @@ bool StartThread(CThread& th, ThreadFunc&& f, void* args, const char* name);
bool StartThread(CThread& th, void* (*f) (void*), void* args, const char* name);
#endif

/// Checks if the thread object passed to this function is the current thread.
/// @param th reference to the CThread object to check
bool CheckIfThisThread(const CThread& th);

////////////////////////////////////////////////////////////////////////////////
//
// CThreadError class - thread local storage wrapper
Expand Down

0 comments on commit 76b1077

Please sign in to comment.