Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion CCDB/include/CCDB/BasicCCDBManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ class CCDBManagerInstance
long endvalidity = -1;
long cacheValidFrom = 0; // time for which the object was cached
long cacheValidUntil = -1; // object is guaranteed to be valid till this time (modulo new updates)
size_t size = 0;
size_t minSize = -1ULL;
size_t maxSize = 0;
int queries = 0;
Expand Down Expand Up @@ -229,6 +230,7 @@ class CCDBManagerInstance
long mCreatedNotBefore = 0; // lower limit for object creation timestamp (TimeMachine mode) - If-Not-Before HTTP header
long mTimerMS = 0; // timer for queries
size_t mFetchedSize = 0; // total fetched size
size_t mRequestedSize = 0; // total requested size (fetched + served from cache)
int mQueries = 0; // total number of object queries
int mFetches = 0; // total number of succesful fetches from CCDB
int mFailures = 0; // total number of failed fetches
Expand Down Expand Up @@ -258,6 +260,7 @@ T* CCDBManagerInstance::getForTimeStamp(std::string const& path, long timestamp,
if (sh != mHeaders.end()) {
size_t s = atol(sh->second.c_str());
mFetchedSize += s;
mRequestedSize += s;
}
}

Expand All @@ -272,6 +275,7 @@ T* CCDBManagerInstance::getForTimeStamp(std::string const& path, long timestamp,
if (headers) {
*headers = cached.cacheOfHeaders;
}
mRequestedSize += cached.size;
return reinterpret_cast<T*>(cached.noCleanupPtr ? cached.noCleanupPtr : cached.objPtr.get());
}
ptr = mCCDBAccessor.retrieveFromTFileAny<T>(path, mMetaData, timestamp, &mHeaders, cached.uuid,
Expand Down Expand Up @@ -318,6 +322,8 @@ T* CCDBManagerInstance::getForTimeStamp(std::string const& path, long timestamp,
if (sh != mHeaders.end()) {
size_t s = atol(sh->second.c_str());
mFetchedSize += s;
mRequestedSize += s;
cached.size = s;
cached.minSize = std::min(s, cached.minSize);
cached.maxSize = std::max(s, cached.minSize);
}
Expand All @@ -342,12 +348,14 @@ T* CCDBManagerInstance::getForTimeStamp(std::string const& path, long timestamp,
}
auto end = std::chrono::system_clock::now();
mTimerMS += std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count();
auto *ref = o2::framework::ServiceRegistryRef::globalDeviceRef();
auto* ref = o2::framework::ServiceRegistryRef::globalDeviceRef();
if (ref && ref->active<framework::DataProcessingStats>()) {
auto& stats = ref->get<o2::framework::DataProcessingStats>();
stats.updateStats({(int)o2::framework::ProcessingStatsId::CCDB_CACHE_HIT, o2::framework::DataProcessingStats::Op::Set, (int64_t)mQueries - mFailures - mFetches});
stats.updateStats({(int)o2::framework::ProcessingStatsId::CCDB_CACHE_MISS, o2::framework::DataProcessingStats::Op::Set, (int64_t)mFetches});
stats.updateStats({(int)o2::framework::ProcessingStatsId::CCDB_CACHE_FAILURE, o2::framework::DataProcessingStats::Op::Set, (int64_t)mFailures});
stats.updateStats({(int)o2::framework::ProcessingStatsId::CCDB_CACHE_FETCHED_BYTES, o2::framework::DataProcessingStats::Op::Set, (int64_t)mFetchedSize});
stats.updateStats({(int)o2::framework::ProcessingStatsId::CCDB_CACHE_REQUESTED_BYTES, o2::framework::DataProcessingStats::Op::Set, (int64_t)mRequestedSize});
}
return ptr;
}
Expand Down
2 changes: 1 addition & 1 deletion CCDB/src/BasicCCDBManager.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ std::pair<int64_t, int64_t> CCDBManagerInstance::getRunDuration(int runnumber, b

std::string CCDBManagerInstance::getSummaryString() const
{
std::string res = fmt::format("{} queries, {} bytes", mQueries, fmt::group_digits(mFetchedSize));
std::string res = fmt::format("{} queries, {} fetched / {} requested bytes", mQueries, fmt::group_digits(mFetchedSize), fmt::group_digits(mRequestedSize));
if (mCachingEnabled) {
res += fmt::format(" for {} objects", mCache.size());
}
Expand Down
5 changes: 4 additions & 1 deletion Framework/CCDBSupport/src/AnalysisCCDBHelpers.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

#include "AnalysisCCDBHelpers.h"
#include "CCDBFetcherHelper.h"
#include "Framework/DataProcessingStats.h"
#include "Framework/DeviceSpec.h"
#include "Framework/TimingInfo.h"
#include "Framework/ConfigParamRegistry.h"
Expand Down Expand Up @@ -105,7 +106,7 @@ AlgorithmSpec AnalysisCCDBHelpers::fetchFromCCDB(ConfigContext const& /*ctx*/)
std::unordered_map<std::string, int> bindings;
fillValidRoutes(*helper, spec.outputs, bindings);

return adaptStateless([schemas, bindings, helper](InputRecord& inputs, DataTakingContext& dtc, DataAllocator& allocator, TimingInfo& timingInfo) {
return adaptStateless([schemas, bindings, helper](InputRecord& inputs, DataTakingContext& dtc, DataAllocator& allocator, TimingInfo& timingInfo, DataProcessingStats& stats) {
O2_SIGNPOST_ID_GENERATE(sid, ccdb);
O2_SIGNPOST_START(ccdb, sid, "fetchFromAnalysisCCDB", "Fetching CCDB objects for analysis%" PRIu64, (uint64_t)timingInfo.timeslice);
for (auto& schema : schemas) {
Expand Down Expand Up @@ -182,6 +183,8 @@ AlgorithmSpec AnalysisCCDBHelpers::fetchFromCCDB(ConfigContext const& /*ctx*/)
allocator.adopt(Output{concrete.origin, concrete.description, concrete.subSpec}, outTable);
}

stats.updateStats({(int)ProcessingStatsId::CCDB_CACHE_FETCHED_BYTES, DataProcessingStats::Op::Set, (int64_t)helper->totalFetchedBytes});
stats.updateStats({(int)ProcessingStatsId::CCDB_CACHE_REQUESTED_BYTES, DataProcessingStats::Op::Set, (int64_t)helper->totalRequestedBytes});
O2_SIGNPOST_END(ccdb, sid, "fetchFromAnalysisCCDB", "Fetching CCDB objects");
});
});
Expand Down
4 changes: 4 additions & 0 deletions Framework/CCDBSupport/src/CCDBFetcherHelper.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,8 @@ auto CCDBFetcherHelper::populateCacheWith(std::shared_ptr<CCDBFetcherHelper> con
helper->mapURL2UUID[path].minSize = std::min(v.size(), helper->mapURL2UUID[path].minSize);
helper->mapURL2UUID[path].maxSize = std::max(v.size(), helper->mapURL2UUID[path].maxSize);
auto size = v.size();
helper->totalFetchedBytes += size;
helper->totalRequestedBytes += size;
api.appendFlatHeader(v, headers);
auto cacheId = allocator.adoptContainer(output, std::move(v), DataAllocator::CacheStrategy::Always, header::gSerializationMethodCCDB);
helper->mapURL2DPLCache[path] = cacheId;
Expand All @@ -271,6 +273,8 @@ auto CCDBFetcherHelper::populateCacheWith(std::shared_ptr<CCDBFetcherHelper> con
helper->mapURL2UUID[path].minSize = std::min(v.size(), helper->mapURL2UUID[path].minSize);
helper->mapURL2UUID[path].maxSize = std::max(v.size(), helper->mapURL2UUID[path].maxSize);
auto size = v.size();
helper->totalFetchedBytes += size;
helper->totalRequestedBytes += size;
api.appendFlatHeader(v, headers);
auto cacheId = allocator.adoptContainer(output, std::move(v), DataAllocator::CacheStrategy::Always, header::gSerializationMethodCCDB);
helper->mapURL2DPLCache[path] = cacheId;
Expand Down
2 changes: 2 additions & 0 deletions Framework/CCDBSupport/src/CCDBFetcherHelper.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ struct CCDBFetcherHelper {

static ParserResult parseRemappings(char const*);

size_t totalFetchedBytes = 0;
size_t totalRequestedBytes = 0;
std::unordered_map<std::string, CCDBCacheInfo> mapURL2UUID;
std::unordered_map<std::string, DataAllocator::CacheId> mapURL2DPLCache;
std::string createdNotBefore = "0";
Expand Down
29 changes: 25 additions & 4 deletions Framework/CCDBSupport/src/CCDBHelpers.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

#include "CCDBHelpers.h"
#include "Framework/DeviceSpec.h"
#include "Framework/DataProcessingStats.h"
#include "Framework/Logger.h"
#include "Framework/TimingInfo.h"
#include "Framework/ConfigParamRegistry.h"
Expand All @@ -28,14 +29,16 @@ O2_DECLARE_DYNAMIC_LOG(ccdb);
namespace o2::framework
{

namespace {
namespace
{
struct CCDBFetcherHelper {
struct CCDBCacheInfo {
std::string etag;
size_t cacheValidUntil = 0;
size_t cachePopulatedAt = 0;
size_t cacheMiss = 0;
size_t cacheHit = 0;
size_t size = 0;
size_t minSize = -1ULL;
size_t maxSize = 0;
int lastCheckedTF = 0;
Expand All @@ -50,6 +53,8 @@ struct CCDBFetcherHelper {
std::string url;
};

size_t totalFetchedBytes = 0;
size_t totalRequestedBytes = 0;
std::unordered_map<std::string, CCDBCacheInfo> mapURL2UUID;
std::unordered_map<std::string, DataAllocator::CacheId> mapURL2DPLCache;
std::string createdNotBefore = "0";
Expand Down Expand Up @@ -80,7 +85,7 @@ struct CCDBFetcherHelper {
return apis[entry == remappings.end() ? "" : entry->second];
}
};
}
} // namespace

bool isPrefix(std::string_view prefix, std::string_view full)
{
Expand Down Expand Up @@ -336,8 +341,11 @@ auto populateCacheWith(std::shared_ptr<CCDBFetcherHelper> const& helper,
helper->mapURL2UUID[path].etag = headers["ETag"]; // update uuid
helper->mapURL2UUID[path].cachePopulatedAt = timestampToUse;
helper->mapURL2UUID[path].cacheMiss++;
helper->mapURL2UUID[path].size = v.size();
helper->mapURL2UUID[path].minSize = std::min(v.size(), helper->mapURL2UUID[path].minSize);
helper->mapURL2UUID[path].maxSize = std::max(v.size(), helper->mapURL2UUID[path].maxSize);
helper->totalFetchedBytes += v.size();
helper->totalRequestedBytes += v.size();
api.appendFlatHeader(v, headers);
auto cacheId = allocator.adoptContainer(output, std::move(v), DataAllocator::CacheStrategy::Always, header::gSerializationMethodCCDB);
helper->mapURL2DPLCache[path] = cacheId;
Expand All @@ -350,8 +358,11 @@ auto populateCacheWith(std::shared_ptr<CCDBFetcherHelper> const& helper,
helper->mapURL2UUID[path].cachePopulatedAt = timestampToUse;
helper->mapURL2UUID[path].cacheValidUntil = headers["Cache-Valid-Until"].empty() ? 0 : std::stoul(headers["Cache-Valid-Until"]);
helper->mapURL2UUID[path].cacheMiss++;
helper->mapURL2UUID[path].size = v.size();
helper->mapURL2UUID[path].minSize = std::min(v.size(), helper->mapURL2UUID[path].minSize);
helper->mapURL2UUID[path].maxSize = std::max(v.size(), helper->mapURL2UUID[path].maxSize);
helper->totalFetchedBytes += v.size();
helper->totalRequestedBytes += v.size();
api.appendFlatHeader(v, headers);
auto cacheId = allocator.adoptContainer(output, std::move(v), DataAllocator::CacheStrategy::Always, header::gSerializationMethodCCDB);
helper->mapURL2DPLCache[path] = cacheId;
Expand All @@ -368,6 +379,7 @@ auto populateCacheWith(std::shared_ptr<CCDBFetcherHelper> const& helper,
auto cacheId = helper->mapURL2DPLCache[path];
O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "populateCacheWith", "Reusing %{public}s for %{public}s (DPL id %" PRIu64 ")", path.data(), headers["ETag"].data(), cacheId.value);
helper->mapURL2UUID[path].cacheHit++;
helper->totalRequestedBytes += helper->mapURL2UUID[path].size;
allocator.adoptFromCache(output, cacheId, header::gSerializationMethodCCDB);
// the outputBuffer was not used, can we destroy it?
}
Expand All @@ -382,13 +394,13 @@ AlgorithmSpec CCDBHelpers::fetchFromCCDB()
/// Add a callback on stop which dumps the statistics for the caching per
/// path
callbacks.set<CallbackService::Id::Stop>([helper]() {
LOGP(info, "CCDB cache miss/hit ratio:");
LOGP(info, "CCDB cache miss/hit ratio ({} fetched / {} requested bytes):", helper->totalFetchedBytes, helper->totalRequestedBytes);
for (auto& entry : helper->mapURL2UUID) {
LOGP(info, " {}: {}/{} ({}-{} bytes)", entry.first, entry.second.cacheMiss, entry.second.cacheHit, entry.second.minSize, entry.second.maxSize);
}
});

return adaptStateless([helper](DataTakingContext& dtc, DataAllocator& allocator, TimingInfo& timingInfo) {
return adaptStateless([helper](DataTakingContext& dtc, DataAllocator& allocator, TimingInfo& timingInfo, DataProcessingStats& stats) {
auto sid = _o2_signpost_id_t{(int64_t)timingInfo.timeslice};
O2_SIGNPOST_START(ccdb, sid, "fetchFromCCDB", "Fetching CCDB objects for timeslice %" PRIu64, (uint64_t)timingInfo.timeslice);
static Long64_t orbitResetTime = -1;
Expand Down Expand Up @@ -429,8 +441,11 @@ AlgorithmSpec CCDBHelpers::fetchFromCCDB()
if (etag.empty()) {
helper->mapURL2UUID[path].etag = headers["ETag"]; // update uuid
helper->mapURL2UUID[path].cacheMiss++;
helper->mapURL2UUID[path].size = v.size();
helper->mapURL2UUID[path].minSize = std::min(v.size(), helper->mapURL2UUID[path].minSize);
helper->mapURL2UUID[path].maxSize = std::max(v.size(), helper->mapURL2UUID[path].maxSize);
helper->totalFetchedBytes += v.size();
helper->totalRequestedBytes += v.size();
newOrbitResetTime = getOrbitResetTime(v);
api.appendFlatHeader(v, headers);
auto cacheId = allocator.adoptContainer(output, std::move(v), DataAllocator::CacheStrategy::Always, header::gSerializationMethodNone);
Expand All @@ -440,8 +455,11 @@ AlgorithmSpec CCDBHelpers::fetchFromCCDB()
// somewhere here pruneFromCache should be called
helper->mapURL2UUID[path].etag = headers["ETag"]; // update uuid
helper->mapURL2UUID[path].cacheMiss++;
helper->mapURL2UUID[path].size = v.size();
helper->mapURL2UUID[path].minSize = std::min(v.size(), helper->mapURL2UUID[path].minSize);
helper->mapURL2UUID[path].maxSize = std::max(v.size(), helper->mapURL2UUID[path].maxSize);
helper->totalFetchedBytes += v.size();
helper->totalRequestedBytes += v.size();
newOrbitResetTime = getOrbitResetTime(v);
api.appendFlatHeader(v, headers);
auto cacheId = allocator.adoptContainer(output, std::move(v), DataAllocator::CacheStrategy::Always, header::gSerializationMethodNone);
Expand All @@ -455,6 +473,7 @@ AlgorithmSpec CCDBHelpers::fetchFromCCDB()
auto cacheId = helper->mapURL2DPLCache[path];
O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "fetchFromCCDB", "Reusing %{public}s for %{public}s (DPL id %" PRIu64 ")", path.data(), headers["ETag"].data(), cacheId.value);
helper->mapURL2UUID[path].cacheHit++;
helper->totalRequestedBytes += helper->mapURL2UUID[path].size;
allocator.adoptFromCache(output, cacheId, header::gSerializationMethodNone);

if (newOrbitResetTime != orbitResetTime) {
Expand All @@ -480,6 +499,8 @@ AlgorithmSpec CCDBHelpers::fetchFromCCDB()
dtc.runNumber.data(), orbitResetTime, timingInfo.creation, timestamp, timingInfo.firstTForbit);

populateCacheWith(helper, timestamp, timingInfo, dtc, allocator);
stats.updateStats({(int)ProcessingStatsId::CCDB_CACHE_FETCHED_BYTES, DataProcessingStats::Op::Set, (int64_t)helper->totalFetchedBytes});
stats.updateStats({(int)ProcessingStatsId::CCDB_CACHE_REQUESTED_BYTES, DataProcessingStats::Op::Set, (int64_t)helper->totalRequestedBytes});
O2_SIGNPOST_END(ccdb, _o2_signpost_id_t{(int64_t)timingInfo.timeslice}, "fetchFromCCDB", "Fetching CCDB objects");
}); });
}
Expand Down
2 changes: 2 additions & 0 deletions Framework/Core/include/Framework/DataProcessingStats.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ enum struct ProcessingStatsId : short {
CCDB_CACHE_HIT,
CCDB_CACHE_MISS,
CCDB_CACHE_FAILURE,
CCDB_CACHE_FETCHED_BYTES,
CCDB_CACHE_REQUESTED_BYTES,
AVAILABLE_MANAGED_SHM_BASE = 512,
};

Expand Down
16 changes: 16 additions & 0 deletions Framework/Core/src/CommonServices.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -1176,6 +1176,22 @@ o2::framework::ServiceSpec CommonServices::dataProcessingStats()
.scope = Scope::DPL,
.minPublishInterval = 1000,
.maxRefreshLatency = 10000,
.sendInitialValue = true},
MetricSpec{.name = "ccdb-cache-fetched-bytes",
.enabled = true,
.metricId = static_cast<short>(ProcessingStatsId::CCDB_CACHE_FETCHED_BYTES),
.kind = Kind::UInt64,
.scope = Scope::DPL,
.minPublishInterval = 1000,
.maxRefreshLatency = 10000,
.sendInitialValue = true},
MetricSpec{.name = "ccdb-cache-requested-bytes",
.enabled = true,
.metricId = static_cast<short>(ProcessingStatsId::CCDB_CACHE_REQUESTED_BYTES),
.kind = Kind::UInt64,
.scope = Scope::DPL,
.minPublishInterval = 1000,
.maxRefreshLatency = 10000,
.sendInitialValue = true}};

for (auto& metric : metrics) {
Expand Down
Loading