Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Fixed C++11 build (win) #1281

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -489,7 +489,9 @@ if (ENABLE_CODE_COVERAGE)
endif()
endif()

if (PTHREAD_LIBRARY AND PTHREAD_INCLUDE_DIR)
if (ENABLE_STDCXX_SYNC)
message(STATUS "Pthread library: C++11")
elseif (PTHREAD_LIBRARY AND PTHREAD_INCLUDE_DIR)
message(STATUS "Pthread library: ${PTHREAD_LIBRARY}")
message(STATUS "Pthread include dir: ${PTHREAD_INCLUDE_DIR}")
elseif (MICROSOFT)
Expand Down
2 changes: 1 addition & 1 deletion srtcore/api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2638,7 +2638,7 @@ void* CUDTUnited::garbageCollect(void* p)
if (empty)
break;

SleepFor(milliseconds_from(1));
this_thread::sleep_for(milliseconds_from(1));
}

THREAD_EXIT();
Expand Down
2 changes: 1 addition & 1 deletion srtcore/core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8463,7 +8463,7 @@ void CUDT::processCtrl(const CPacket &ctrlpkt)

case UMSG_CGWARNING: // 100 - Delay Warning
// One way packet delay is increasing, so decrease the sending rate
m_tdSendInterval *= 1.125;
m_tdSendInterval = (m_tdSendInterval * 1125) / 1000;
// XXX Note as interesting fact: this is only prepared for handling,
// but nothing in the code is sending this message. Probably predicted
// for a custom congctl. There's a predicted place to call it under
Expand Down
20 changes: 10 additions & 10 deletions srtcore/queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -463,8 +463,7 @@ void CSndUList::remove_(const CUDT* u)

//
CSndQueue::CSndQueue()
: m_WorkerThread()
, m_pSndUList(NULL)
: m_pSndUList(NULL)
, m_pChannel(NULL)
, m_pTimer(NULL)
, m_WindowCond()
Expand Down Expand Up @@ -1055,10 +1054,11 @@ CRcvQueue::CRcvQueue()
CRcvQueue::~CRcvQueue()
{
m_bClosing = true;
if (!pthread_equal(m_WorkerThread, pthread_t()))

if (m_WorkerThread.joinable())
{
HLOGC(mglog.Debug, log << "RcvQueue: EXIT");
pthread_join(m_WorkerThread, NULL);
m_WorkerThread.join();
}
releaseCond(m_BufferCond);

Expand Down Expand Up @@ -1101,13 +1101,13 @@ void CRcvQueue::init(int qsize, int payload, int version, int hsize, CChannel *c

#if ENABLE_LOGGING
++m_counter;
std::string thrname = "SRT:RcvQ:w" + Sprint(m_counter);
ThreadName tn(thrname.c_str());
const std::string thrname = "SRT:RcvQ:w" + Sprint(m_counter);
#else
const std::string thrname = "SRT:RcvQ:w";
#endif

if (0 != pthread_create(&m_WorkerThread, NULL, CRcvQueue::worker, this))
if (!StartThread(m_WorkerThread, CRcvQueue::worker, this, thrname.c_str()))
{
m_WorkerThread = pthread_t();
throw CUDTException(MJ_SYSTEMRES, MN_THREAD);
}
}
Expand Down Expand Up @@ -1529,15 +1529,15 @@ void CRcvQueue::stopWorker()
m_bClosing = true;

// Sanity check of the function's affinity.
if (pthread_equal(pthread_self(), m_WorkerThread))
if (this_thread::get_id() == m_WorkerThread.get_id())
{
LOGC(mglog.Error, log << "IPE: RcvQ:WORKER TRIES TO CLOSE ITSELF!");
return; // do nothing else, this would cause a hangup or crash.
}

HLOGC(mglog.Debug, log << "RcvQueue: EXIT (forced)");
// And we trust the thread that it does.
pthread_join(m_WorkerThread, NULL);
m_WorkerThread.join();
}

int CRcvQueue::recvfrom(int32_t id, CPacket& w_packet)
Expand Down
10 changes: 2 additions & 8 deletions srtcore/queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,15 +108,12 @@ class CUnitQueue

CUnit* getNextAvailUnit();


void makeUnitFree(CUnit * unit);

void makeUnitGood(CUnit * unit);

public:

inline int getIPversion() const { return m_iIPversion; }

inline int getIPversion() const { return m_iIPversion; }

private:
struct CQEntry
Expand Down Expand Up @@ -471,9 +468,6 @@ friend class CUDTUnited;

int recvfrom(int32_t id, CPacket& to_packet);

// Needed for affinity check (debug only)
pthread_t threadId() { return m_WorkerThread; }

void stopWorker();

void setClosing()
Expand All @@ -483,7 +477,7 @@ friend class CUDTUnited;

private:
static void* worker(void* param);
pthread_t m_WorkerThread;
srt::sync::CThread m_WorkerThread;
// Subroutines of worker
EReadStatus worker_RetrieveUnit(int32_t& id, CUnit*& unit, sockaddr_any& sa);
EConnectStatus worker_ProcessConnectionRequest(CUnit* unit, const sockaddr_any& sa);
Expand Down
50 changes: 37 additions & 13 deletions srtcore/sync.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
#ifdef ENABLE_STDCXX_SYNC
#include <chrono>
#include <thread>
#include <mutex>
#include <condition_variable>
#else
#include <pthread.h>
#endif
Expand Down Expand Up @@ -75,13 +77,14 @@ class Duration
inline bool operator<(const Duration& rhs) const { return m_duration < rhs.m_duration; }

public: // Assignment operators
inline void operator*=(const double mult) { m_duration = static_cast<int64_t>(m_duration * mult); }
inline void operator*=(const int64_t mult) { m_duration = static_cast<int64_t>(m_duration * mult); }
ethouris marked this conversation as resolved.
Show resolved Hide resolved
inline void operator+=(const Duration& rhs) { m_duration += rhs.m_duration; }
inline void operator-=(const Duration& rhs) { m_duration -= rhs.m_duration; }

inline Duration operator+(const Duration& rhs) const { return Duration(m_duration + rhs.m_duration); }
inline Duration operator-(const Duration& rhs) const { return Duration(m_duration - rhs.m_duration); }
inline Duration operator*(const int& rhs) const { return Duration(m_duration * rhs); }
inline Duration operator*(const int64_t& rhs) const { return Duration(m_duration * rhs); }
inline Duration operator/(const int64_t& rhs) const { return Duration(m_duration / rhs); }

private:
// int64_t range is from -9,223,372,036,854,775,808 to 9,223,372,036,854,775,807
Expand Down Expand Up @@ -452,17 +455,6 @@ inline void releaseCond(Condition& cv) { cv.destroy(); }
//
///////////////////////////////////////////////////////////////////////////////

inline void SleepFor(const steady_clock::duration& t)
{
#ifdef ENABLE_STDCXX_SYNC
this_thread::sleep_for(t);
#elif !defined(_WIN32)
usleep(count_microseconds(t)); // microseconds
#else
Sleep(count_milliseconds(t));
#endif
}

// This class is used for condition variable combined with mutex by different ways.
// This should provide a cleaner API around locking with debug-logging inside.
class CSync
Expand Down Expand Up @@ -753,6 +745,23 @@ class CThread
/// is still considered an active thread of execution and is therefore joinable.
bool joinable() const;

struct id
{
id(const pthread_t t)
: value(t)
{}

const pthread_t value;
inline bool operator==(const id& second) const
{
return pthread_equal(value, second.value) != 0;
}
};

/// Returns the id of the current thread.
/// In this implementation the ID is the pthread_t.
const id get_id() const { return id(m_thread); }

public:
/// Blocks the current thread until the thread identified by *this finishes its execution.
/// If that thread has already terminated, then join() returns immediately.
Expand All @@ -768,6 +777,21 @@ class CThread
private:
pthread_t m_thread;
};

namespace this_thread
{
const inline CThread::id get_id() { return CThread::id (pthread_self()); }

inline void sleep_for(const steady_clock::duration& t)
{
#if !defined(_WIN32)
usleep(count_microseconds(t)); // microseconds
#else
Sleep(count_milliseconds(t));
#endif
}
}

#endif

/// StartThread function should be used to do CThread assignments:
Expand Down