Skip to content

Commit

Permalink
use thread local cache to avoid interprocess lock on shm GetData
Browse files Browse the repository at this point in the history
  • Loading branch information
ironMann authored and rbx committed May 7, 2021
1 parent a90dbf6 commit f7ba305
Showing 1 changed file with 30 additions and 1 deletion.
31 changes: 30 additions & 1 deletion fairmq/shmem/Manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,7 @@ class Manager

(fEventCounter->fCount)++;
}
fRegionsGen += 1; // signal TL cache invalidation
fRegionEventsCV.notify_all();

return result;
Expand All @@ -327,8 +328,28 @@ class Manager

Region* GetRegion(const uint16_t id)
{
// NOTE: gcc optimizations. Prevent loading tls addresses many times in the fast path
const auto &lTlCache = fTlRegionCache;
const auto &lTlCacheVec = lTlCache.fRegionsTLCache;
const auto lTlCacheGen = lTlCache.fRegionsTLCacheGen;

// fast path
for (const auto &lRegion : lTlCacheVec) {
if ((lRegion.second == id) && (lTlCacheGen == fRegionsGen)) {
return lRegion.first;
}
}

boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fShmMtx);
return GetRegionUnsafe(id);
// slow path: check invalidation
if (lTlCacheGen != fRegionsGen) {
fTlRegionCache.fRegionsTLCache.clear();
}

auto *lRegion = GetRegionUnsafe(id);
fTlRegionCache.fRegionsTLCache.emplace_back(std::make_pair(lRegion, id));
fTlRegionCache.fRegionsTLCacheGen = fRegionsGen;
return lRegion;
}

Region* GetRegionUnsafe(const uint16_t id)
Expand Down Expand Up @@ -366,6 +387,7 @@ class Manager
fShmRegions->at(id).fDestroyed = true;
(fEventCounter->fCount)++;
}
fRegionsGen += 1; // signal TL cache invalidation
fRegionEventsCV.notify_all();
}

Expand Down Expand Up @@ -613,6 +635,7 @@ class Manager
using namespace boost::interprocess;
bool lastRemoved = false;

fRegionsGen += 1; // signal TL cache invalidation
UnsubscribeFromRegionEvents();

{
Expand Down Expand Up @@ -665,6 +688,12 @@ class Manager
Uint16SegmentInfoHashMap* fShmSegments;
Uint16RegionInfoHashMap* fShmRegions;
std::unordered_map<uint16_t, std::unique_ptr<Region>> fRegions;
// make sure this is alone in the cache line: mostly read
alignas(128) inline static std::atomic<unsigned long> fRegionsGen = 0ul;
inline static thread_local struct ManagerTLCache {
unsigned long fRegionsTLCacheGen;
std::vector<std::pair<Region*, uint16_t>> fRegionsTLCache;
} fTlRegionCache;

std::atomic<bool> fInterrupted;
std::atomic<int32_t> fMsgCounter; // TODO: find a better lifetime solution instead of the counter
Expand Down

0 comments on commit f7ba305

Please sign in to comment.