Skip to content

Commit

Permalink
Merge pull request #28010 from wddgit/concurrentIOVsCondDBESSource
Browse files Browse the repository at this point in the history
Upgrade CondDBESSource to support concurrent IOVs
  • Loading branch information
cmsbuild committed Nov 6, 2019
2 parents 7458c67 + 37151d3 commit 74e3c77
Show file tree
Hide file tree
Showing 18 changed files with 794 additions and 167 deletions.
1 change: 1 addition & 0 deletions CondCore/CondDB/interface/KeyList.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ namespace cond {
class KeyList {
public:
void init(IOVProxy iovProxy);
void init(KeyList const&);

void load(const std::vector<unsigned long long>& keys);

Expand Down
18 changes: 14 additions & 4 deletions CondCore/CondDB/interface/KeyListProxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,30 +3,40 @@

#include "CondCore/CondDB/interface/PayloadProxy.h"
#include "CondCore/CondDB/interface/KeyList.h"
#include <memory>
#include <vector>
#include <string>

namespace cond {

struct Iov_t;

namespace persistency {

class Session;

template <>
class PayloadProxy<cond::persistency::KeyList> : public PayloadProxy<std::vector<cond::Time_t> > {
class PayloadProxy<cond::persistency::KeyList> : public PayloadProxy<std::vector<cond::Time_t>> {
public:
typedef std::vector<cond::Time_t> DataT;
typedef PayloadProxy<DataT> super;

explicit PayloadProxy(const char* source = nullptr) : super(source), m_keyList() {
explicit PayloadProxy(Iov_t const* mostRecentCurrentIov,
Session const* mostRecentSession,
std::shared_ptr<std::vector<Iov_t>> const* mostRecentRequests,
const char* source = nullptr)
: super(mostRecentCurrentIov, mostRecentSession, mostRecentRequests, source), m_keyList() {
if (source)
m_name = source;
}

~PayloadProxy() override {}

void initKeyList(PayloadProxy const& originalPayloadProxy) { m_keyList.init(originalPayloadProxy.m_keyList); }

// dereference (does not load)
const KeyList& operator()() const { return m_keyList; }

void invalidateCache() override { super::invalidateCache(); }

void loadMore(CondGetter const& getter) override { m_keyList.init(getter.get(m_name)); }

protected:
Expand Down
77 changes: 31 additions & 46 deletions CondCore/CondDB/interface/PayloadProxy.h
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
#ifndef CondCore_CondDB_PayloadProxy_h
#define CondCore_CondDB_PayloadProxy_h

#include "CondCore/CondDB/interface/Exception.h"
#include "CondCore/CondDB/interface/IOVProxy.h"
#include "CondCore/CondDB/interface/Session.h"
#include "CondCore/CondDB/interface/Time.h"
#include "CondCore/CondDB/interface/Types.h"

#include <memory>
#include <string>
#include <vector>

namespace cond {

Expand All @@ -21,58 +27,49 @@ namespace cond {
class BasePayloadProxy {
public:
//
BasePayloadProxy();

void setUp(Session dbSession);

void loadTag(const std::string& tag);

void loadTag(const std::string& tag, const boost::posix_time::ptime& snapshotTime);

void reload();
BasePayloadProxy(Iov_t const* mostRecentCurrentIov,
Session const* mostRecentSession,
std::shared_ptr<std::vector<Iov_t>> const* mostRecentRequests);

virtual ~BasePayloadProxy();

virtual void make() = 0;

virtual void invalidateCache() = 0;

// current cached object token
const Hash& payloadId() const { return m_currentIov.payloadId; }

// this one had the loading in a separate function in the previous impl
ValidityInterval setIntervalFor(Time_t target, bool loadPayload = false);

bool isValid() const;

TimeType timeType() const { return m_iovProxy.timeType(); }

virtual void loadMore(CondGetter const&) {}

IOVProxy iov();

const std::vector<Iov_t>& requests() const { return m_requests; }
void initializeForNewIOV();

private:
virtual void loadPayload() = 0;

protected:
IOVProxy m_iovProxy;
Iov_t m_currentIov;
Iov_t m_iovAtInitialization;
Session m_session;
std::vector<Iov_t> m_requests;
std::shared_ptr<std::vector<Iov_t>> m_requests;

Iov_t const* m_mostRecentCurrentIov;
Session const* m_mostRecentSession;
std::shared_ptr<std::vector<Iov_t>> const* m_mostRecentRequests;
};

/* proxy to the payload valid at a given time...
*/
template <typename DataT>
class PayloadProxy : public BasePayloadProxy {
public:
explicit PayloadProxy(const char* source = nullptr) : BasePayloadProxy() {}
explicit PayloadProxy(Iov_t const* mostRecentCurrentIov,
Session const* mostRecentSession,
std::shared_ptr<std::vector<Iov_t>> const* mostRecentRequests,
const char* source = nullptr)
: BasePayloadProxy(mostRecentCurrentIov, mostRecentSession, mostRecentRequests) {}

~PayloadProxy() override {}

void initKeyList(PayloadProxy const&) {}

// dereference
const DataT& operator()() const {
if (!m_data) {
Expand All @@ -83,38 +80,26 @@ namespace cond {

void make() override {
if (isValid()) {
if (m_currentIov.payloadId == m_currentPayloadId)
if (m_iovAtInitialization.payloadId == m_currentPayloadId)
return;
m_session.transaction().start(true);
loadPayload();
m_session.transaction().commit();
}
}

virtual void invalidateTransientCache() {
m_data.reset();
m_currentPayloadId.clear();
}

void invalidateCache() override {
m_data.reset();
m_currentPayloadId.clear();
m_currentIov.clear();
m_requests.clear();
}

protected:
void loadPayload() override {
if (m_currentIov.payloadId.empty()) {
if (m_iovAtInitialization.payloadId.empty()) {
throwException("Can't load payload: no valid IOV found.", "PayloadProxy::loadPayload");
}
m_data = m_session.fetchPayload<DataT>(m_currentIov.payloadId);
m_currentPayloadId = m_currentIov.payloadId;
m_requests.push_back(m_currentIov);
m_data = m_session.fetchPayload<DataT>(m_iovAtInitialization.payloadId);
m_currentPayloadId = m_iovAtInitialization.payloadId;
m_requests->push_back(m_iovAtInitialization);
}

private:
std::shared_ptr<DataT> m_data;
std::unique_ptr<DataT> m_data;
Hash m_currentPayloadId;
};

Expand Down
2 changes: 2 additions & 0 deletions CondCore/CondDB/src/KeyList.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ namespace cond {
m_objects.clear();
}

void KeyList::init(KeyList const& originalKeyList) { init(originalKeyList.m_proxy); }

void KeyList::load(const std::vector<unsigned long long>& keys) {
std::shared_ptr<SessionImpl> simpl = m_proxy.session();
if (!simpl.get())
Expand Down
53 changes: 11 additions & 42 deletions CondCore/CondDB/src/PayloadProxy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,53 +4,22 @@ namespace cond {

namespace persistency {

BasePayloadProxy::BasePayloadProxy() : m_iovProxy(), m_session() {}
BasePayloadProxy::BasePayloadProxy(Iov_t const* mostRecentCurrentIov,
Session const* mostRecentSession,
std::shared_ptr<std::vector<Iov_t>> const* mostRecentRequests)
: m_mostRecentCurrentIov(mostRecentCurrentIov),
m_mostRecentSession(mostRecentSession),
m_mostRecentRequests(mostRecentRequests) {}

BasePayloadProxy::~BasePayloadProxy() {}

void BasePayloadProxy::setUp(Session dbSession) {
m_session = dbSession;
invalidateCache();
}

void BasePayloadProxy::loadTag(const std::string& tag) {
m_session.transaction().start(true);
m_iovProxy = m_session.readIov(tag);
m_session.transaction().commit();
invalidateCache();
}

void BasePayloadProxy::loadTag(const std::string& tag, const boost::posix_time::ptime& snapshotTime) {
m_session.transaction().start(true);
m_iovProxy = m_session.readIov(tag, snapshotTime);
m_session.transaction().commit();
invalidateCache();
}
bool BasePayloadProxy::isValid() const { return m_iovAtInitialization.isValid(); }

void BasePayloadProxy::reload() {
std::string tag = m_iovProxy.tag();
if (!tag.empty())
loadTag(tag);
void BasePayloadProxy::initializeForNewIOV() {
m_iovAtInitialization = *m_mostRecentCurrentIov;
m_session = *m_mostRecentSession;
m_requests = *m_mostRecentRequests;
}

ValidityInterval BasePayloadProxy::setIntervalFor(cond::Time_t time, bool load) {
if (!m_currentIov.isValidFor(time)) {
m_currentIov.clear();
m_session.transaction().start(true);
auto it = m_iovProxy.find(time);
if (it != m_iovProxy.end()) {
m_currentIov = *it;
if (load)
loadPayload();
}
m_session.transaction().commit();
}
return ValidityInterval(m_currentIov.since, m_currentIov.till);
}

bool BasePayloadProxy::isValid() const { return m_currentIov.isValid(); }

IOVProxy BasePayloadProxy::iov() { return m_iovProxy; }

} // namespace persistency
} // namespace cond
22 changes: 18 additions & 4 deletions CondCore/CondDB/test/testConnectionPool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
//Module includes
#include "CondCore/CondDB/interface/ConnectionPool.h"
#include "CondCore/CondDB/interface/PayloadProxy.h"
#include "CondCore/CondDB/interface/Types.h"
//Entity class
#include "CondFormats/RunInfo/interface/RunInfo.h"
//CORAL includes
Expand Down Expand Up @@ -80,11 +81,24 @@ void testCreateCoralSession(cond::persistency::ConnectionPool& connPool,
void testCreateSession(cond::persistency::ConnectionPool& connPool,
std::string const& connectionString,
bool const writeCapable) {
cond::Iov_t iov;
cond::persistency::Session session = connPool.createSession(connectionString, writeCapable);
cond::persistency::PayloadProxy<RunInfo> pp;
pp.setUp(session);
pp.loadTag("RunInfo_v1_mc");
pp.setIntervalFor(1, true);
auto requests = std::make_shared<std::vector<cond::Iov_t>>();
cond::persistency::PayloadProxy<RunInfo> pp(&iov, &session, &requests);

session.transaction().start(true);
cond::persistency::IOVProxy iovProxy = session.readIov("RunInfo_v1_mc");
session.transaction().commit();

session.transaction().start(true);
auto it = iovProxy.find(1);
if (it != iovProxy.end()) {
iov = *it;
}
session.transaction().commit();

pp.initializeForNewIOV();
pp.make();
std::cout << "run number: " << pp().m_run << std::endl;
}

Expand Down
1 change: 0 additions & 1 deletion CondCore/CondDB/test/testFrontier.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
#include "FWCore/ServiceRegistry/interface/ServiceRegistry.h"
//
#include "CondCore/CondDB/interface/ConnectionPool.h"
#include "CondCore/CondDB/interface/PayloadProxy.h"
//
#include "MyTestData.h"
//
Expand Down

0 comments on commit 74e3c77

Please sign in to comment.