Skip to content

Commit

Permalink
COMMON: Replace the use of XRootD atomics with std::atomic in RWMutex
Browse files Browse the repository at this point in the history
  • Loading branch information
esindril committed Jul 4, 2018
1 parent 66d15f5 commit dcc1625
Show file tree
Hide file tree
Showing 2 changed files with 107 additions and 101 deletions.
188 changes: 97 additions & 91 deletions common/RWMutex.cc
Expand Up @@ -29,14 +29,14 @@
EOSCOMMONNAMESPACE_BEGIN

#ifdef EOS_INSTRUMENTED_RWMUTEX
size_t RWMutex::mRdCumulatedWait_static = 0;
size_t RWMutex::mWrCumulatedWait_static = 0;
size_t RWMutex::mRdMaxWait_static = 0;
size_t RWMutex::mWrMaxWait_static = 0;
size_t RWMutex::mRdMinWait_static = 1e12;
size_t RWMutex::mWrMinWait_static = 1e12;
size_t RWMutex::mRdLockCounterSample_static = 0;
size_t RWMutex::mWrLockCounterSample_static = 0;
std::atomic<uint64_t> RWMutex::mRdCumulatedWait_static {0};
std::atomic<uint64_t> RWMutex::mWrCumulatedWait_static {0};
std::atomic<uint64_t> RWMutex::mRdLockCounterSample_static {0};
std::atomic<uint64_t> RWMutex::mWrLockCounterSample_static {0};
std::atomic<uint64_t> RWMutex::mRdMaxWait_static {0};
std::atomic<uint64_t> RWMutex::mWrMaxWait_static {0};
std::atomic<uint64_t> RWMutex::mRdMinWait_static {std::numeric_limits<uint64_t>::max()};
std::atomic<uint64_t> RWMutex::mWrMinWait_static {std::numeric_limits<uint64_t>::max()};
size_t RWMutex::timingCompensation = 0;
size_t RWMutex::timingLatency = 0;
size_t RWMutex::orderCheckingLatency = 0;
Expand All @@ -61,54 +61,54 @@ pthread_rwlock_t RWMutex::mOrderChkLock;
#define EOS_RWMUTEX_CHECKORDER_LOCK if(sEnableGlobalOrderCheck) CheckAndLockOrder();
#define EOS_RWMUTEX_CHECKORDER_UNLOCK if(sEnableGlobalOrderCheck) CheckAndUnlockOrder();

#define EOS_RWMUTEX_TIMER_START \
bool issampled = false; size_t tstamp = 0; \
if(mEnableTiming || sEnableGlobalTiming) { \
issampled=mEnableSampling ? (!((++mCounter)%mSamplingModulo)) : true; \
if( issampled ) tstamp = Timing::GetNowInNs(); \
#define EOS_RWMUTEX_TIMER_START \
bool issampled = false; uint64_t tstamp = 0; \
if (mEnableTiming || sEnableGlobalTiming) { \
issampled = mEnableSampling ? (!((++mCounter)%mSamplingModulo)) : true; \
if (issampled) tstamp = Timing::GetNowInNs(); \
}

// what = mRd or mWr
#define EOS_RWMUTEX_TIMER_STOP_AND_UPDATE(what) \
(what##LockCounter)++; \
if(issampled) { \
tstamp = Timing::GetNowInNs() - tstamp; \
if(mEnableTiming) { \
AtomicInc(what##LockCounterSample); \
AtomicAdd(what##CumulatedWait, tstamp); \
bool needloop=true; \
do {size_t mymax = AtomicGet(what##MaxWait); \
if (tstamp > mymax) \
needloop = !AtomicCAS(what##MaxWait, mymax, tstamp); \
else needloop = false; } \
while(needloop); \
do {size_t mymin = AtomicGet(what##MinWait); \
if (tstamp < mymin) \
needloop = !AtomicCAS(what##MinWait, mymin, tstamp); \
else needloop = false; } \
while(needloop); \
} \
if(sEnableGlobalTiming) { \
AtomicInc(what##LockCounterSample_static); \
AtomicAdd(what##CumulatedWait_static,tstamp); \
bool needloop = true; \
do {size_t mymax = AtomicGet(what##MaxWait_static); \
if (tstamp > mymax) \
needloop = !AtomicCAS(what##MaxWait_static, mymax, tstamp); \
else needloop = false; } \
while(needloop); \
do {size_t mymin = AtomicGet(what##MinWait_static); \
if (tstamp < mymin) \
needloop = !AtomicCAS(what##MinWait_static, mymin, tstamp); \
else needloop = false; } \
while(needloop); \
} \
#define EOS_RWMUTEX_TIMER_STOP_AND_UPDATE(what) \
++(what##LockCounter); \
if(issampled) { \
tstamp = Timing::GetNowInNs() - tstamp; \
if(mEnableTiming) { \
++(what##LockCounterSample); \
what##CumulatedWait += tstamp; \
bool needloop=true; \
do {size_t mymax = what##MaxWait.load(); \
if (tstamp > mymax) \
needloop = !(what##MaxWait).compare_exchange_strong(mymax, tstamp); \
else needloop = false; \
} while(needloop); \
do {size_t mymin = what##MinWait.load(); \
if (tstamp < mymin) \
needloop = !(what##MinWait).compare_exchange_strong(mymin, tstamp); \
else needloop = false; \
} while(needloop); \
} \
if(sEnableGlobalTiming) { \
++(what##LockCounterSample_static); \
what##CumulatedWait_static += tstamp; \
bool needloop = true; \
do {size_t mymax = what##MaxWait_static.load(); \
if (tstamp > mymax) \
needloop = !(what##MaxWait_static).compare_exchange_strong(mymax, tstamp); \
else needloop = false; \
} while(needloop); \
do {size_t mymin = what##MinWait_static.load(); \
if (tstamp < mymin) \
needloop = !(what##MinWait_static).compare_exchange_strong(mymin, tstamp); \
else needloop = false; \
} while(needloop); \
} \
}
#else
#define EOS_RWMUTEX_CHECKORDER_LOCK
#define EOS_RWMUTEX_CHECKORDER_UNLOCK
#define EOS_RWMUTEX_TIMER_START
#define EOS_RWMUTEX_TIMER_STOP_AND_UPDATE(what) ++what##LockCounter;
#define EOS_RWMUTEX_TIMER_STOP_AND_UPDATE(what) ++(what##LockCounter);
#endif

//------------------------------------------------------------------------------
Expand Down Expand Up @@ -362,8 +362,7 @@ RWMutex::LockWrite()
} else {
// fprintf(stderr,"==== WRITE LOCK PENDING ==== TID=%llu OBJECT=%llx\n",
// (unsigned long long)XrdSysThread::ID(), (unsigned long long)this);
XrdSysTimer msSleep;
msSleep.Wait(500);
std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
} else {
// fprintf(stderr,"=== WRITE LOCK ACQUIRED ==== TID=%llu OBJECT=%llx\n",
Expand Down Expand Up @@ -476,10 +475,14 @@ void
RWMutex::ResetTimingStatistics()
{
// Might need a mutex or at least a flag!!!
mRdCumulatedWait = mWrCumulatedWait = 0;
mRdMaxWait = mWrMaxWait = std::numeric_limits<size_t>::min();
mRdMinWait = mWrMinWait = std::numeric_limits<long long>::max();
mRdLockCounterSample = mWrLockCounterSample = 0;
mRdMaxWait.store(std::numeric_limits<uint64_t>::min());
mWrMaxWait.store(std::numeric_limits<uint64_t>::min());
mRdMinWait.store(std::numeric_limits<uint64_t>::max());
mWrMinWait.store(std::numeric_limits<uint64_t>::max());
mRdLockCounterSample.store(0);
mWrLockCounterSample.store(0);
mRdCumulatedWait.store(0);
mWrCumulatedWait.store(0);
}

//-----------------------------------------------------------------------------
Expand All @@ -488,11 +491,14 @@ RWMutex::ResetTimingStatistics()
void
RWMutex::ResetTimingStatisticsGlobal()
{
// Might need a mutex or at least a flag!!!
mRdCumulatedWait_static = mWrCumulatedWait_static = 0;
mRdMaxWait_static = mWrMaxWait_static = std::numeric_limits<size_t>::min();
mRdMinWait_static = mWrMinWait_static = std::numeric_limits<long long>::max();
mRdLockCounterSample_static = mWrLockCounterSample_static = 0;
mRdMaxWait_static.store(std::numeric_limits<uint64_t>::min());
mWrMaxWait_static.store(std::numeric_limits<uint64_t>::min());
mRdMinWait_static.store(std::numeric_limits<uint64_t>::max());
mWrMinWait_static.store(std::numeric_limits<uint64_t>::max());
mRdLockCounterSample_static.store(0);
mWrLockCounterSample_static.store(0);
mRdCumulatedWait_static.store(0);
mWrCumulatedWait_static.store(0);
}

#ifdef __APPLE__
Expand Down Expand Up @@ -922,13 +928,13 @@ void
RWMutex::GetTimingStatistics(TimingStats& stats, bool compensate)
{
size_t compensation = (compensate ? timingCompensation : 0);
stats.readLockCounterSample = AtomicGet(mRdLockCounterSample);
stats.writeLockCounterSample = AtomicGet(mWrLockCounterSample);
stats.readLockCounterSample.store(mRdLockCounterSample.load());
stats.writeLockCounterSample.store(mWrLockCounterSample.load());
stats.averagewaitread = 0;

if (AtomicGet(mRdLockCounterSample) != 0) {
double avg = (double(AtomicGet(mRdCumulatedWait)) /
AtomicGet(mRdLockCounterSample) - compensation);
if (mRdLockCounterSample.load() != 0) {
double avg = (double(mRdCumulatedWait.load()) /
mRdLockCounterSample.load() - compensation);

if (avg > 0) {
stats.averagewaitread = avg;
Expand All @@ -937,17 +943,17 @@ RWMutex::GetTimingStatistics(TimingStats& stats, bool compensate)

stats.averagewaitwrite = 0;

if (AtomicGet(mWrLockCounterSample) != 0) {
double avg = (double(AtomicGet(mWrCumulatedWait)) / AtomicGet(
mWrLockCounterSample) - compensation);
if (mWrLockCounterSample.load() != 0) {
double avg = (double(mWrCumulatedWait.load()) /
mWrLockCounterSample.load() - compensation);

if (avg > 0) {
stats.averagewaitwrite = avg;
}
}

if (AtomicGet(mRdMinWait) != std::numeric_limits<size_t>::max()) {
long long compensated = AtomicGet(mRdMinWait) - compensation;
if (mRdMinWait.load() != std::numeric_limits<size_t>::max()) {
long long compensated = mRdMinWait.load() - compensation;

if (compensated > 0) {
stats.minwaitread = compensated;
Expand All @@ -958,8 +964,8 @@ RWMutex::GetTimingStatistics(TimingStats& stats, bool compensate)
stats.minwaitread = std::numeric_limits<long long>::max();
}

if (AtomicGet(mRdMaxWait) != std::numeric_limits<size_t>::min()) {
long long compensated = AtomicGet(mRdMaxWait) - compensation;
if (mRdMaxWait.load() != std::numeric_limits<size_t>::min()) {
long long compensated = mRdMaxWait.load() - compensation;

if (compensated > 0) {
stats.maxwaitread = compensated;
Expand All @@ -970,8 +976,8 @@ RWMutex::GetTimingStatistics(TimingStats& stats, bool compensate)
stats.maxwaitread = std::numeric_limits<size_t>::min();
}

if (AtomicGet(mWrMinWait) != std::numeric_limits<size_t>::max()) {
long long compensated = AtomicGet(mWrMinWait) - compensation;
if (mWrMinWait.load() != std::numeric_limits<size_t>::max()) {
long long compensated = mWrMinWait.load() - compensation;

if (compensated > 0) {
stats.minwaitwrite = compensated;
Expand All @@ -982,8 +988,8 @@ RWMutex::GetTimingStatistics(TimingStats& stats, bool compensate)
stats.minwaitwrite = std::numeric_limits<long long>::max();
}

if (AtomicGet(mWrMaxWait) != std::numeric_limits<size_t>::min()) {
long long compensated = AtomicGet(mWrMaxWait) - compensation;
if (mWrMaxWait.load() != std::numeric_limits<size_t>::min()) {
long long compensated = mWrMaxWait.load() - compensation;

if (compensated > 0) {
stats.maxwaitwrite = compensated;
Expand Down Expand Up @@ -1097,13 +1103,13 @@ void
RWMutex::GetTimingStatisticsGlobal(TimingStats& stats, bool compensate)
{
size_t compensation = compensate ? timingCompensation : 0;
stats.readLockCounterSample = AtomicGet(mRdLockCounterSample_static);
stats.writeLockCounterSample = AtomicGet(mWrLockCounterSample_static);
stats.readLockCounterSample.store(mRdLockCounterSample_static.load());
stats.writeLockCounterSample.store(mWrLockCounterSample_static.load());
stats.averagewaitread = 0;

if (AtomicGet(mRdLockCounterSample_static) != 0) {
double avg = (double(AtomicGet(mRdCumulatedWait_static)) / AtomicGet(
mRdLockCounterSample_static) - compensation);
if (mRdLockCounterSample_static.load() != 0) {
double avg = (double(mRdCumulatedWait_static.load()) /
mRdLockCounterSample_static.load() - compensation);

if (avg > 0) {
stats.averagewaitread = avg;
Expand All @@ -1112,17 +1118,17 @@ RWMutex::GetTimingStatisticsGlobal(TimingStats& stats, bool compensate)

stats.averagewaitwrite = 0;

if (AtomicGet(mWrLockCounterSample_static) != 0) {
double avg = (double(AtomicGet(mWrCumulatedWait_static)) / AtomicGet(
mWrLockCounterSample_static) - compensation);
if (mWrLockCounterSample_static.load() != 0) {
double avg = (double(mWrCumulatedWait_static.load()) /
mWrLockCounterSample_static.load() - compensation);

if (avg > 0) {
stats.averagewaitwrite = avg;
}
}

if (AtomicGet(mRdMinWait_static) != std::numeric_limits<size_t>::max()) {
long long compensated = AtomicGet(mRdMinWait_static) - compensation;
if (mRdMinWait_static.load() != std::numeric_limits<size_t>::max()) {
long long compensated = mRdMinWait_static.load() - compensation;

if (compensated > 0) {
stats.minwaitread = compensated;
Expand All @@ -1133,8 +1139,8 @@ RWMutex::GetTimingStatisticsGlobal(TimingStats& stats, bool compensate)
stats.minwaitread = std::numeric_limits<long long>::max();
}

if (AtomicGet(mWrMaxWait_static) != std::numeric_limits<size_t>::min()) {
long long compensated = AtomicGet(mWrMaxWait_static) - compensation;
if (mWrMaxWait_static.load() != std::numeric_limits<size_t>::min()) {
long long compensated = mWrMaxWait_static.load() - compensation;

if (compensated > 0) {
stats.maxwaitread = compensated;
Expand All @@ -1145,8 +1151,8 @@ RWMutex::GetTimingStatisticsGlobal(TimingStats& stats, bool compensate)
stats.maxwaitread = std::numeric_limits<size_t>::min();
}

if (AtomicGet(mWrMinWait_static) != std::numeric_limits<size_t>::max()) {
long long compensated = AtomicGet(mWrMinWait_static) - compensation;
if (mWrMinWait_static.load() != std::numeric_limits<size_t>::max()) {
long long compensated = mWrMinWait_static.load() - compensation;

if (compensated > 0) {
stats.minwaitwrite = compensated;
Expand All @@ -1157,8 +1163,8 @@ RWMutex::GetTimingStatisticsGlobal(TimingStats& stats, bool compensate)
stats.minwaitwrite = std::numeric_limits<long long>::max();
}

if (AtomicGet(mWrMaxWait_static) != std::numeric_limits<size_t>::min()) {
long long compensated = AtomicGet(mWrMaxWait_static) - compensation;
if (mWrMaxWait_static.load() != std::numeric_limits<size_t>::min()) {
long long compensated = mWrMaxWait_static.load() - compensation;

if (compensated > 0) {
stats.maxwaitwrite = compensated;
Expand Down
20 changes: 10 additions & 10 deletions common/RWMutex.hh
Expand Up @@ -56,10 +56,9 @@
#include "common/Timing.hh"
#include "common/IRWMutex.hh"
#include "XrdSys/XrdSysPthread.hh"
#include "XrdSys/XrdSysTimer.hh"
#include "XrdSys/XrdSysAtomics.hh"
#include <stdio.h>
#include <stdint.h>
#include <atomic>
#ifdef EOS_INSTRUMENTED_RWMUTEX
#include <map>
#include <vector>
Expand Down Expand Up @@ -195,7 +194,7 @@ public:
double averagewaitwrite;
double minwaitwrite, maxwaitwrite;
double minwaitread, maxwaitread;
size_t readLockCounterSample, writeLockCounterSample;
std::atomic<uint64_t> readLockCounterSample, writeLockCounterSample;
};

//----------------------------------------------------------------------------
Expand Down Expand Up @@ -483,9 +482,9 @@ private:
int mSamplingModulo;
std::atomic<bool> mEnableTiming, mEnableSampling;
//! Specific type of counters
size_t mRdCumulatedWait, mWrCumulatedWait;
size_t mRdMaxWait, mWrMaxWait, mRdMinWait, mWrMinWait;
size_t mRdLockCounterSample, mWrLockCounterSample;
std::atomic<uint64_t> mRdMaxWait, mWrMaxWait, mRdMinWait, mWrMinWait;
std::atomic<uint64_t> mRdCumulatedWait, mWrCumulatedWait;
std::atomic<uint64_t> mRdLockCounterSample, mWrLockCounterSample;

std::map<std::thread::id, int> mThreadsRdLock; ///< Threads holding a read lock
std::set<std::thread::id> mThreadsWrLock; ///< Threads holding a write lock
Expand All @@ -497,10 +496,11 @@ private:
static bool sEnableGlobalTiming;
static int sSamplingModulo;
static size_t timingCompensation, timingLatency, lockUnlockDuration;
static size_t mRdCumulatedWait_static, mWrCumulatedWait_static;
static size_t mRdMaxWait_static, mWrMaxWait_static;
static size_t mRdMinWait_static, mWrMinWait_static;
static size_t mRdLockCounterSample_static, mWrLockCounterSample_static;
static std::atomic<uint64_t> mRdMaxWait_static, mWrMaxWait_static;
static std::atomic<uint64_t> mRdMinWait_static, mWrMinWait_static;
static std::atomic<uint64_t> mRdLockCounterSample_static,
mWrLockCounterSample_static;
static std::atomic<uint64_t> mRdCumulatedWait_static, mWrCumulatedWait_static;

// Actual order checking
// Pointers referring to a memory location not thread specific so that if the
Expand Down

0 comments on commit dcc1625

Please sign in to comment.